Добавить столбец в Data Frame в Apache Spark 1.3
Возможно ли, и что было бы самым эффективным аккуратным методом для добавления столбца в Data Frame?
Более конкретно, столбец может служить идентификаторами строк для существующего кадра данных.
В упрощенном случае, читая файл, а не токенизируя его, я могу думать о чем-то, как показано ниже (в Scala), но он завершается с ошибками (в строке 3), и в любом случае это не похоже на лучшее маршрут возможен:
var dataDF = sc.textFile("path/file").toDF()
val rowDF = sc.parallelize(1 to DataDF.count().toInt).toDF("ID")
dataDF = dataDF.withColumn("ID", rowDF("ID"))
Ответы
Ответ 1
Прошло некоторое время с тех пор, как я опубликовал вопрос, и, похоже, некоторые другие люди тоже хотели бы получить ответ. Ниже я нашел.
Таким образом, первоначальная задача состояла в том, чтобы добавить столбец с идентификаторами строк (в основном, последовательность 1 to numRows
) к любому кадру данных, поэтому порядок/присутствие строк можно отслеживать (например, при выборе). Это может быть достигнуто чем-то в этом направлении:
sqlContext.textFile(file).
zipWithIndex().
map(case(d, i)=>i.toString + delimiter + d).
map(_.split(delimiter)).
map(s=>Row.fromSeq(s.toSeq))
Что касается общего случая добавления любого столбца в любой кадр данных:
"Ближайшим" к этой функциональности в Spark API являются withColumn
и withColumnRenamed
. Согласно Scala docs, прежний возвращает новый DataFrame, добавляя столбец. По-моему, это немного запутанное и неполное определение. Обе эти функции могут работать только с фреймом данных this
, т.е. С учетом двух кадров данных df1
и df2
со столбцом col
:
val df = df1.withColumn("newCol", df1("col") + 1) // -- OK
val df = df1.withColumn("newCol", df2("col") + 1) // -- FAIL
Поэтому, если вам не удастся преобразовать столбец существующего фрейма данных в нужную вам форму, вы не можете использовать withColumn
или withColumnRenamed
для добавления произвольных столбцов (автономных или других кадров данных).
Как уже отмечалось выше, обходным решением может быть использование join
- это было бы довольно беспорядочно, хотя это возможно - добавление уникальных ключей, таких как выше, с помощью zipWithIndex
к обоим кадрам данных или столбцам, может работать. Хотя эффективность...
Ясно, что добавление столбца в фрейм данных не является простой функциональностью для распределенной среды, и для этого может быть не очень эффективный, аккуратный метод. Но я думаю, что все еще очень важно иметь эту базовую функциональность, даже с предупреждениями о производительности.
Ответ 2
Не уверен, что он работает в искровом свете 1.3, но в искробере 1.5 я использую withColumn:
import sqlContext.implicits._
import org.apache.spark.sql.functions._
df.withColumn("newName",lit("newValue"))
Я использую это, когда мне нужно использовать значение, которое не связано с существующими столбцами блока данных
Это похоже на ответ @NehaM, но проще
Ответ 3
Я получил помощь от ответа сверху. Однако, я считаю, что он неполный, если мы хотим изменить DataFrame
, а существующие API немного отличаются от Spark 1.6
.
zipWithIndex()
возвращает a Tuple
of (Row, Long)
, который содержит каждую строку и соответствующий индекс. Мы можем использовать его для создания нового Row
в соответствии с нашей потребностью.
val rdd = df.rdd.zipWithIndex()
.map(indexedRow => Row.fromSeq(indexedRow._2.toString +: indexedRow._1.toSeq))
val newstructure = StructType(Seq(StructField("Row number", StringType, true)).++(df.schema.fields))
sqlContext.createDataFrame(rdd, newstructure ).show
Я надеюсь, что это будет полезно.
Ответ 4
Вы можете использовать row_number с Функция окна, как показано ниже, чтобы получить отдельный идентификатор для каждой строки в фрейме данных.
df.withColumn("ID", row_number() over Window.orderBy("any column name in the dataframe"))
Вы также можете использовать monotonically_increasing_id
для того же, что и
df.withColumn("ID", monotonically_increasing_id())
И есть еще другие способы.