Как вычислить суммарную сумму, используя Spark
У меня есть rdd (String, Int), который сортируется по клавише
val data = Array(("c1",6), ("c2",3),("c3",4))
val rdd = sc.parallelize(data).sortByKey
Теперь я хочу запустить значение для первого ключа с нулем и последующими ключами в качестве суммы предыдущих клавиш.
Например: c1 = 0, c2 = значение c1, c3 = (значение c1 + значение c2), c4 = (c1 +.. + c3 значение)
ожидаемый результат:
(c1,0), (c2,6), (c3,9)...
Можно ли это достичь?
Я попробовал его с картой, но сумма не сохраняется внутри карты.
var sum = 0 ;
val t = keycount.map{ x => { val temp = sum; sum = sum + x._2 ; (x._1,temp); }}
Ответы
Ответ 1
-
Вычислить частичные результаты для каждого раздела:
val partials = rdd.mapPartitionsWithIndex((i, iter) => {
val (keys, values) = iter.toSeq.unzip
val sums = values.scanLeft(0)(_ + _)
Iterator((keys.zip(sums.tail), sums.last))
})
-
Соберите частичные суммы
val partialSums = partials.values.collect
-
Вычислить суммарную сумму по разделам и передать ее:
val sumMap = sc.broadcast(
(0 until rdd.partitions.size)
.zip(partialSums.scanLeft(0)(_ + _))
.toMap
)
-
Вычислить конечные результаты:
val result = partials.keys.mapPartitionsWithIndex((i, iter) => {
val offset = sumMap.value(i)
if (iter.isEmpty) Iterator()
else iter.next.map{case (k, v) => (k, v + offset)}.toIterator
})
Ответ 2
Вот решение в PySpark. Внутренне это по существу то же самое, что и решение @zero323 Scala, но оно обеспечивает универсальную функцию с помощью Spark-подобного API.
import numpy as np
def cumsum(rdd, get_summand):
"""Given an ordered rdd of items, computes cumulative sum of
get_summand(row), where row is an item in the RDD.
"""
def cumsum_in_partition(iter_rows):
total = 0
for row in iter_rows:
total += get_summand(row)
yield (total, row)
rdd = rdd.mapPartitions(cumsum_in_partition)
def last_partition_value(iter_rows):
final = None
for cumsum, row in iter_rows:
final = cumsum
return (final,)
partition_sums = rdd.mapPartitions(last_partition_value).collect()
partition_cumsums = list(np.cumsum(partition_sums))
partition_cumsums = [0] + partition_cumsums
partition_cumsums = sc.broadcast(partition_cumsums)
def add_sums_of_previous_partitions(idx, iter_rows):
return ((cumsum + partition_cumsums.value[idx], row)
for cumsum, row in iter_rows)
rdd = rdd.mapPartitionsWithIndex(add_sums_of_previous_partitions)
return rdd
# test for correctness by summing numbers, with and without Spark
rdd = sc.range(10000,numSlices=10).sortBy(lambda x: x)
cumsums, values = zip(*cumsum(rdd,lambda x: x).collect())
assert all(cumsums == np.cumsum(values))
Ответ 3
Я столкнулся с подобной проблемой и реализовал решение @Paul. Я хотел сделать cumsum на целочисленной частотной таблице, отсортированной по ключу (целое число), и была небольшая проблема с np.cumsum(partition_sums)
, ошибка unsupported operand type(s) for +=: 'int' and 'NoneType'
.
Потому что, если диапазон достаточно велик, вероятность того, что каждый раздел имеет что-то, таким образом, достаточно велика (нет значений None). Однако, если диапазон намного меньше, чем количество, а количество разделов остается неизменным, некоторые разделы будут пустыми. Здесь идет измененное решение:
def cumsum(rdd, get_summand):
"""Given an ordered rdd of items, computes cumulative sum of
get_summand(row), where row is an item in the RDD.
"""
def cumsum_in_partition(iter_rows):
total = 0
for row in iter_rows:
total += get_summand(row)
yield (total, row)
rdd = rdd.mapPartitions(cumsum_in_partition)
def last_partition_value(iter_rows):
final = None
for cumsum, row in iter_rows:
final = cumsum
return (final,)
partition_sums = rdd.mapPartitions(last_partition_value).collect()
# partition_cumsums = list(np.cumsum(partition_sums))
#----from here are the changed lines
partition_sums = [x for x in partition_sums if x is not None]
temp = np.cumsum(partition_sums)
partition_cumsums = list(temp)
#----
partition_cumsums = [0] + partition_cumsums
partition_cumsums = sc.broadcast(partition_cumsums)
def add_sums_of_previous_partitions(idx, iter_rows):
return ((cumsum + partition_cumsums.value[idx], row)
for cumsum, row in iter_rows)
rdd = rdd.mapPartitionsWithIndex(add_sums_of_previous_partitions)
return rdd
#test on random integer frequency
x = np.random.randint(10, size=1000)
D = sqlCtx.createDataFrame(pd.DataFrame(x.tolist(),columns=['D']))
c = D.groupBy('D').count().orderBy('D')
c_rdd = c.rdd.map(lambda x:x['count'])
cumsums, values = zip(*cumsum(c_rdd,lambda x: x).collect())
Ответ 4
Spark имеет встроенные опоры для функций улья ANALYTICS/WINDOWING, и суммарная сумма может быть легко достигнута с использованием функций ANALYTICS.
Hive wiki функции ANALYTICS/WINDOWING.
Пример:
Предполагая, что у вас есть объект sqlContext -
val datardd = sqlContext.sparkContext.parallelize(Seq(("a",1),("b",2), ("c",3),("d",4),("d",5),("d",6)))
import sqlContext.implicits._
//Register as test table
datardd.toDF("id","val").createOrReplaceTempView("test")
//Calculate Cumulative sum
sqlContext.sql("select id,val, " +
"SUM(val) over ( order by id rows between unbounded preceding and current row ) cumulative_Sum " +
"from test").show()
Этот подход вызывает предупреждение ниже. В случае, если исполнитель запускаетOfMemory, настраивает параметры памяти заданий для работы с огромным набором данных.
WARN WindowExec: для работы с окном не определен раздел. перемещение все данные в один раздел, это может привести к серьезной производительности Деградация
Надеюсь, это поможет.
Ответ 5
вы можете попробовать с помощью окон с помощью rowsBetween. надеюсь, все еще полезно.
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
val data = Array(("c1",6), ("c2",3),("c3",4))
val df = sc.parallelize(data).sortByKey().toDF("c", "v")
val w = Window.orderBy("c")
val r = df.select( $"c", sum($"v").over(w.rowsBetween(-2, -1)).alias("cs"))
display(r)