Как прокручивать каждую строку dataFrame в pyspark
например
sqlContext = SQLContext(sc)
sample=sqlContext.sql("select Name ,age ,city from user")
sample.show()
Вышеуказанный оператор печатает всю таблицу на терминале, но я хочу получить доступ к каждой строке в этой таблице, используя для или для выполнения дальнейших вычислений.
Ответы
Ответ 1
Чтобы "зациклить" и воспользоваться преимуществами платформы параллельных вычислений Spark, вы можете определить пользовательскую функцию и использовать карту.
def customFunction(row):
return (row.name, row.age, row.city)
sample2 = sample.rdd.map(customFunction)
или же
sample2 = sample.rdd.map(lambda x: (x.name, x.age, x.city))
Затем пользовательская функция будет применена к каждой строке кадра данных. Обратите внимание, что sample2 будет RDD
, а не фреймом данных.
Карта может понадобиться, если вы собираетесь выполнять более сложные вычисления. Если вам просто нужно добавить простой производный столбец, вы можете использовать withColumn
, с возвратом информационного кадра.
sample3 = sample.withColumn('age2', sample.age + 2)
Ответ 2
Вы просто не можете. DataFrames
, так же как и другие распределенные структуры данных, не повторяемы и могут быть доступны с использованием только выделенной функции высшего порядка и/или методов SQL.
Можно конечно collect
for row in df.rdd.collect():
do_something(row)
или преобразовать в toLocalIterator
for row in df.rdd.toLocalIterator():
do_something(row)
и локально повторять, как показано выше, но это превосходит все цели использования Spark.
Ответ 3
Используя списки в python, вы можете собрать весь столбец значений в список, используя только две строки:
df = sqlContext.sql("show tables in default")
tableList = [x["tableName"] for x in df.rdd.collect()]
В приведенном выше примере мы возвращаем список таблиц в базе данных "по умолчанию", но то же самое можно адаптировать, заменив запрос, используемый в sql().
Или более сокращенно:
tableList = [x["tableName"] for x in sqlContext.sql("show tables in default").rdd.collect()]
И для вашего примера из трех столбцов мы можем создать список словарей, а затем перебрать их в цикле for.
sql_text = "select name, age, city from user"
tupleList = [{name:x["name"], age:x["age"], city:x["city"]}
for x in sqlContext.sql(sql_text).rdd.collect()]
for row in tupleList:
print("{} is a {} year old from {}".format(
row["name"],
row["age"],
row["city"]))
Ответ 4
Если вы хотите что-то сделать для каждой строки в объекте DataFrame, используйте map
. Это позволит вам выполнять дальнейшие вычисления в каждой строке. Это эквивалент цикла для всего набора данных от 0
до len(dataset)-1
.
Обратите внимание, что это вернет PipelinedRDD, а не DataFrame.
Ответ 5
выше
tupleList = [{name:x["name"], age:x["age"], city:x["city"]}
должен быть
tupleList = [{'name':x["name"], 'age':x["age"], 'city':x["city"]}
для name
, age
и city
не являются переменными, а просто клавишами словаря.
Ответ 6
Попробуй вот так
result = spark.createDataFrame([('SpeciesId','int'), ('SpeciesName','string')],["col_name", "data_type"]);
for f in result.collect():
print (f.col_name)
Ответ 7
Мне нужно назначить значение в каждом поле каждой строки DataFrame. Но значение исходит из другой строки DataFrame.
Пример: pepe Process1 10 Charles Process2 1 Jhonas null 1
Джонас назначил Process1, почему максимальное value = 10.
Благодарю вас!
Ответ 8
попробуйте вот так
result = spark.createDataFrame([('SpeciesId', 'int'), ('SpeciesName', 'string')], ["col_name", "data_type"])); для f в result.collect(): print (f.col_name)