PySpark Drop Строки

как вы отбрасываете строки из RDD в PySpark? В частности, первая строка, поскольку она имеет тенденцию содержать имена столбцов в моих наборах данных. Из изучения API я не могу найти простой способ сделать это. Конечно, я мог бы сделать это через Bash/HDFS, но я просто хочу знать, можно ли это сделать из PySpark.

Ответы

Ответ 1

AFAIK там нет "легкого" способа сделать это.

Это должно сделать трюк, хотя:

val header = data.first
val rows = data.filter(line => line != header)

Ответ 2

Конкретно для PySpark:

В соответствии с @maasg вы можете сделать это:

header = rdd.first()
rdd.filter(lambda line: line != header)

но это не технически правильно, так как это возможно, вы исключаете строки, содержащие данные, а также заголовок. Однако, похоже, это работает для меня:

def remove_header(itr_index, itr):
    return iter(list(itr)[1:]) if itr_index == 0 else itr
rdd.mapPartitionsWithIndex(remove_header)

Аналогично:

rdd.zipWithIndex().filter(lambda tup: tup[1] > 0).map(lambda tup: tup[0])

Я новичок в Spark, поэтому не могу разумно комментировать, что будет быстрее.

Ответ 3

Простой способ добиться этого в PySpark (API Python):

noHeaderRDD = rawRDD.zipWithIndex().filter(lambda (row,index): index > 0).keys()

Ответ 4

Лично я считаю, что просто использовать фильтр, чтобы избавиться от этого материала, это самый простой способ. Но в вашем комментарии у меня есть другой подход. Glom the RDD, поэтому каждый раздел представляет собой массив (я предполагаю, что у вас есть 1 файл на раздел, и каждый файл имеет оскорбительную строку сверху), а затем просто пропустите первый элемент (это с помощью scala api).

data.glom().map(x => for (elem <- x.drop(1){/*do stuff*/}) //x is an array so just skip the 0th index

Помните, что одна из больших возможностей RDD заключается в том, что они неизменяемы, поэтому естественное удаление строки - это сложная задача сделать

UPDATE: Лучшее решение.
rdd.mapPartions(x => for (elem <- x.drop(1){/*do stuff*/} )
То же, что и glom, но не имеет накладных расходов на то, чтобы поместить все в массив, так как x - это итератор в этом случае

Ответ 5

Я тестировал с помощью spark2.1. Скажем, вы хотите удалить первые 14 строк, не зная о количестве файлов столбцов.

sc = spark.sparkContext
lines = sc.textFile("s3://location_of_csv")
parts = lines.map(lambda l: l.split(","))
parts.zipWithIndex().filter(lambda tup: tup[1] > 14).map(lambda x:x[0])

withColumn - это функция df. Итак, ниже не будет работать в стиле RDD, как используется в приведенном выше случае.

parts.withColumn("index",monotonically_increasing_id()).filter(index > 14)