Как добавить постоянный столбец идентификаторов строк в Spark DataFrame?
Этот вопрос не нова, однако я нахожу удивительное поведение в Spark. Мне нужно добавить столбец идентификаторов строк в DataFrame. Я использовал метод DataFrame monotonically_increasing_id(), и он дает мне дополнительный код уникальных идентификаторов строк (которые НЕ являются последовательными, но уникальны).
Проблема, с которой я столкнулась, заключается в том, что при фильтрации DataFrame идентификаторы строк в результирующем DataFrame повторно назначаются. Ниже приведены два DataFrames.
-
первый - это исходный DataFrame с идентификаторами строк, добавленными следующим образом:
df.withColumn("rowId", monotonically_increasing_id())
-
второй DataFrame - это тот, который был получен после фильтрации на col P через df.filter(col("P"))
.
Проблема проиллюстрирована rowId для custId 169, которая была 5 в исходном DataFrame, но после фильтрации эта строкаId (5) была повторно назначена custmId 773, когда custId 169 был отфильтрован! Я не знаю, почему это поведение по умолчанию.
Я хотел бы, чтобы rowIds
был "липким"; если я удаляю строки из DataFrame, я не хочу, чтобы их идентификаторы "повторно использовались", я хочу, чтобы они ушли вместе со своими строками. Можно ли это сделать? Я не вижу никаких флагов для запроса этого поведения из метода monotonically_increasing_id
.
+---------+--------------------+-------+
| custId | features| P |rowId|
+---------+--------------------+-------+
|806 |[50,5074,...| true| 0|
|832 |[45,120,1...| true| 1|
|216 |[6691,272...| true| 2|
|926 |[120,1788...| true| 3|
|875 |[54,120,1...| true| 4|
|169 |[19406,21...| false| 5|
after filtering on P:
+---------+--------------------+-------+
| custId| features| P |rowId|
+---------+--------------------+-------+
| 806|[50,5074,...| true| 0|
| 832|[45,120,1...| true| 1|
| 216|[6691,272...| true| 2|
| 926|[120,1788...| true| 3|
| 875|[54,120,1...| true| 4|
| 773|[3136,317...| true| 5|
Ответы
Ответ 1
Spark 2.0
Искра 1.x
Проблема, которую вы испытываете, довольно тонкая, но может быть сведена к простому факту monotonically_increasing_id
- чрезвычайно уродливая функция. Это явно не чисто, и его ценность зависит от того, что полностью не контролируется.
Он не принимает никаких параметров, поэтому с точки зрения оптимизатора это не имеет значения, когда он вызывается и может быть нажат после всех других операций. Отсюда и поведение, которое вы видите.
Если вы посмотрите на код, вы обнаружите, что это явно отмечено расширением выражения MonotonicallyIncreasingID
с помощью Nondeterministic
.
<ы > Я не думаю, что есть какое-то изящное решение, но один из способов справиться с этим - добавить искусственную зависимость от отфильтрованного значения. Например, с помощью UDF, например:
from pyspark.sql.types import LongType
from pyspark.sql.functions import udf
bound = udf(lambda _, v: v, LongType())
(df
.withColumn("rn", monotonically_increasing_id())
# Due to nondeterministic behavior it has to be a separate step
.withColumn("rn", bound("P", "rn"))
.where("P"))
В общем случае было бы проще добавлять индексы с помощью zipWithIndex
на RDD
, а затем преобразовать его обратно в DataFrame
.
* Обходной путь, показанный выше, уже не является допустимым решением (и не требуется) в Spark 2.x, где UDF Python подвержены оптимизации плана выполнения.
Ответ 2
Я не мог воспроизвести это. Я использую Spark 2.0, хотя, возможно, поведение изменилось, или я не делаю то же, что и вы.
val df = Seq(("one", 1,true),("two", 2,false),("three", 3,true),("four", 4,true))
.toDF("name", "value","flag")
.withColumn("rowd", monotonically_increasing_id())
df.show
val df2 = df.filter(col("flag")=== true)
df2.show
df: org.apache.spark.sql.DataFrame = [name: string, value: int ... 2 more fields]
+-----+-----+-----+----+
| name|value| flag|rowd|
+-----+-----+-----+----+
| one| 1| true| 0|
| two| 2|false| 1|
|three| 3| true| 2|
| four| 4| true| 3|
+-----+-----+-----+----+
df2: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [name: string, value: int ... 2 more fields]
+-----+-----+----+----+
| name|value|flag|rowd|
+-----+-----+----+----+
| one| 1|true| 0|
|three| 3|true| 2|
| four| 4|true| 3|
+-----+-----+----+----+
Ответ 3
Я недавно работал над аналогичной проблемой. Хотя monotonically_increasing_id()
очень быстрый, он ненадежен и не даст вам последовательных номеров строк, а только увеличит уникальные целые числа.
Создание раздела Windows с последующим использованием row_number().over(some_windows_partition)
занимает очень много времени.
Наилучшим решением на данный момент является использование zip с индексом, а затем преобразование zip файла обратно в исходный фрейм данных с новой схемой, включающей столбец индекса.
Попробуй это:
from pyspark.sql import Row
from pyspark.sql.types import StructType, StructField, LongType
new_schema = StructType(**original_dataframe**.schema.fields[:] + [StructField("index", LongType(), False)])
zipped_rdd = **original_dataframe**.rdd.zipWithIndex()
indexed = (zipped_rdd.map(lambda ri: row_with_index(*list(ri[0]) + [ri[1]])).toDF(new_schema))
Где original_dataframe
это dataframe
вы должны добавить индекс и row_with_index
является новой схемой с индексом столбца, который вы можете написать, как
row_with_index = Row(
"calendar_date"
,"year_week_number"
,"year_period_number"
,"realization"
,"index"
)
Здесь, calendar_date
, year_week_number
, year_period_number
, и realization
были столбцами моего исходного dataframe
. Вы можете заменить имена именами ваших столбцов. Индекс - это новое имя столбца, которое вы должны были добавить для номеров строк.
Этот процесс значительно эффективнее и плавнее по сравнению с row_number().over(some_windows_partition)
.
Надеюсь это поможет.
Ответ 4
Чтобы обойти сдвигающую оценку monotonically_increasing_id(), вы можете попробовать записать фрейм данных на диск и перечитать. Тогда столбец id теперь просто поле данных, которое считывается, а не динамически вычисляется в какой-то момент в конвейере. Хотя это довольно уродливое решение, оно работало, когда я быстро проверил.
Ответ 5
Это сработало для меня. Создал другой столбец идентификаторов и использовал функцию окна row_number
import org.apache.spark.sql.functions.{row_number}
import org.apache.spark.sql.expressions.Window
val df1: DataFrame = df.withColumn("Id",lit(1))
df1
.select(
...,
row_number()
.over(Window
.partitionBy("Id"
.orderBy(col("...").desc))
)
.alias("Row_Nbr")
)
Ответ 6
Чтобы получить более высокую производительность по сравнению с решением Chris T, вы можете попытаться записать в разделяемый фрейм данных с поддержкой apache вместо записи на диск. https://ignite.apache.org/use-cases/spark/shared-memory-layer.html