PySpark DataFrames - способ перечисления без преобразования на Pandas?
У меня очень большой pyspark.sql.dataframe.DataFrame с именем df.
Мне нужен какой-то способ перечисления записей, таким образом, доступ к записи с определенным индексом. (или выберите группу записей с диапазоном индексов)
В pandas я мог бы сделать только
indexes=[2,3,6,7]
df[indexes]
Здесь я хочу нечто подобное (и без преобразования данных в pandas)
Ближайшим, к которому я могу добраться, является:
-
Перечисление всех объектов в исходном фрейме данных:
indexes=np.arange(df.count())
df_indexed=df.withColumn('index', indexes)
- Поиск значений, которые мне нужны, с помощью функции where().
ВОПРОСЫ:
- Почему это не работает и как заставить его работать? Как добавить строку в dataframe?
-
Будет ли работать позже, чтобы сделать что-то вроде:
indexes=[2,3,6,7]
df1.where("index in indexes").collect()
-
Какой бы быстрый и простой способ справиться с этим?
Ответы
Ответ 1
Это не работает, потому что:
- второй аргумент для
withColumn
должен быть Column
не коллекцией. np.array
здесь не будет работать.
- когда вы передаете
"index in indexes"
, поскольку выражение SQL для where
indexes
выходит за пределы области видимости и не разрешено как допустимый идентификатор
PySpark >= 1.4.0
Вы можете добавить номера строк, используя соответствующую функцию окна и запрос, используя метод Column.isin
или правильно сформированную строку запроса:
from pyspark.sql.functions import col, rowNumber
from pyspark.sql.window import Window
w = Window.orderBy()
indexed = df.withColumn("index", rowNumber().over(w))
# Using DSL
indexed.where(col("index").isin(set(indexes)))
# Using SQL expression
indexed.where("index in ({0})".format(",".join(str(x) for x in indexes)))
Похоже, что функции окна, называемые без предложения PARTITION BY
, перемещают все данные в один раздел, поэтому выше может быть не лучшим решением в конце концов.
Любой более быстрый и простой способ справиться с этим?
Не совсем. Spark DataFrames не поддерживает случайный доступ к строкам.
PairedRDD
можно получить с помощью метода lookup
, который является относительно быстрым, если данные разбиты на разделы с помощью HashPartitioner
. Существует также indexed-rdd проект, который поддерживает эффективный поиск.
Edit
Независимо от версии PySpark вы можете попробовать что-то вроде этого:
from pyspark.sql import Row
from pyspark.sql.types import StructType, StructField, LongType
row = Row("char")
row_with_index = Row("char", "index")
df = sc.parallelize(row(chr(x)) for x in range(97, 112)).toDF()
df.show(5)
## +----+
## |char|
## +----+
## | a|
## | b|
## | c|
## | d|
## | e|
## +----+
## only showing top 5 rows
# This part is not tested but should work and save some work later
schema = StructType(
df.schema.fields[:] + [StructField("index", LongType(), False)])
indexed = (df.rdd # Extract rdd
.zipWithIndex() # Add index
.map(lambda ri: row_with_index(*list(ri[0]) + [ri[1]])) # Map to rows
.toDF(schema)) # It will work without schema but will be more expensive
# inSet in Spark < 1.3
indexed.where(col("index").isin(indexes))
Ответ 2
Если вам нужен диапазон номеров, который гарантированно не столкнется, но не требует .over(partitionBy())
, вы можете использовать monotonicallyIncreasingId()
.
from pyspark.sql.functions import monotonicallyIncreasingId
df.select(monotonicallyIncreasingId().alias("rowId"),"*")
Обратите внимание, что значения не особенно "опрятные". Каждому разделу присваивается диапазон значений, и вывод не будет смежным. Например. 0, 1, 2, 8589934592, 8589934593, 8589934594
.
Это было добавлено в Spark 28 апреля 2015 года здесь: https://github.com/apache/spark/commit/d94cd1a733d5715792e6c4eac87f0d5c81aebbe2
Ответ 3
Конечно, вы можете добавить массив для индексирования, массив по вашему выбору:
В Scala сначала нам нужно создать индексный массив:
val index_array=(1 to df.count.toInt).toArray
index_array: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
Теперь вы можете добавить этот столбец в свой DF. Во-первых, для этого вам нужно открыть наш DF и получить его как массив, а затем застегнуть его с помощью index_array, а затем преобразовать новый массив в RDD. Последний шаг - получить его как DF:
final_df = sc.parallelize((df.collect.map(
x=>(x(0),x(1))) zip index_array).map(
x=>(x._1._1.toString,x._1._2.toString,x._2))).
toDF("column_name")
После этого индексирование станет более понятным.
Ответ 4
monotonicallyIncreasingId()
- это назначит номера строк в порядке incresing, но не в последовательности.
выборка с двумя столбцами:
|---------------------|------------------|
| RowNo | Heading 2 |
|---------------------|------------------|
| 1 | xy |
|---------------------|------------------|
| 12 | xz |
|---------------------|------------------|
Если вы хотите присвоить номера строк, используйте следующий трюк.
Протестировано в версиях искры 2.0.1 и более.
df.createOrReplaceTempView("df")
dfRowId = spark.sql("select *, row_number() over (partition by 0) as rowNo from df")
выборка с двумя столбцами:
|---------------------|------------------|
| RowNo | Heading 2 |
|---------------------|------------------|
| 1 | xy |
|---------------------|------------------|
| 2 | xz |
|---------------------|------------------|
Надеюсь, что это поможет.