Как преобразовать файл csv в rdd
Я новичок в искру. Я хочу выполнить некоторые операции над конкретными данными в записи CSV.
Я пытаюсь прочитать CSV файл и преобразовать его в RDD. Мои дальнейшие операции основаны на заголовке, представленном в файле CSV.
(Из комментариев)
Это мой код:
final JavaRDD<String> File = sc.textFile(Filename).cache();
final JavaRDD<String> lines = File.flatMap(new FlatMapFunction<String, String>() {
@Override public Iterable<String> call(String s) {
return Arrays.asList(EOL.split(s));
}
});
final String heading=lines.first().toString();
Я могу получить значения заголовка следующим образом. Я хочу сопоставить это с каждой записью в CSV файле.
final String[] header=heading.split(" ");
Я могу получить значения заголовка следующим образом. Я хочу сопоставить это с каждой записью в CSV файле.
В java Im, использующем CSVReader record.getColumnValue(Column header)
, чтобы получить конкретное значение. Мне нужно сделать что-то подобное этому.
Ответы
Ответ 1
Упрощенный подход состоял бы в том, чтобы сохранить заголовок.
Скажем, у вас есть файл. CSV:
user, topic, hits
om, scala, 120
daniel, spark, 80
3754978, spark, 1
Мы можем определить класс заголовка, который использует разборную версию первой строки:
class SimpleCSVHeader(header:Array[String]) extends Serializable {
val index = header.zipWithIndex.toMap
def apply(array:Array[String], key:String):String = array(index(key))
}
Чтобы мы могли использовать данные в дальнейшем, мы можем использовать этот заголовок:
val csv = sc.textFile("file.csv") // original file
val data = csv.map(line => line.split(",").map(elem => elem.trim)) //lines in rows
val header = new SimpleCSVHeader(data.take(1)(0)) // we build our header with the first line
val rows = data.filter(line => header(line,"user") != "user") // filter the header out
val users = rows.map(row => header(row,"user")
val usersByHits = rows.map(row => header(row,"user") -> header(row,"hits").toInt)
...
Обратите внимание, что header
не намного больше, чем простое отображение мнемоники на индекс массива. Практически все это можно сделать на порядковом месте элемента в массиве, например user = row(0)
PS: Добро пожаловать в Scala: -)
Ответ 2
Вы можете использовать библиотеку spark-csv: https://github.com/databricks/spark-csv
Это непосредственно из документации:
import org.apache.spark.sql.SQLContext
SQLContext sqlContext = new SQLContext(sc);
HashMap<String, String> options = new HashMap<String, String>();
options.put("header", "true");
options.put("path", "cars.csv");
DataFrame df = sqlContext.load("com.databricks.spark.csv", options);
Ответ 3
Во-первых, я должен сказать, что это намного проще, если вы помещаете заголовки в отдельные файлы - это соглашение в больших данных.
В любом случае ответ Даниэля довольно хорош, но у него есть неэффективность и ошибка, поэтому я собираюсь опубликовать свои собственные. Неэффективность заключается в том, что вам не нужно проверять каждую запись, чтобы увидеть, является ли она заголовком, вам просто нужно проверить первую запись для каждого раздела. Ошибка заключается в том, что с помощью .split(",")
вы можете получить исключение или получить неправильный столбец, когда записи являются пустой строкой и происходят в начале или конце записи, - чтобы исправить это, вам нужно использовать .split(",", -1)
. Итак, вот полный код:
val header =
scala.io.Source.fromInputStream(
hadoop.fs.FileSystem.get(new java.net.URI(filename), sc.hadoopConfiguration)
.open(new hadoop.fs.Path(path)))
.getLines.head
val columnIndex = header.split(",").indexOf(columnName)
sc.textFile(path).mapPartitions(iterator => {
val head = iterator.next()
if (head == header) iterator else Iterator(head) ++ iterator
})
.map(_.split(",", -1)(columnIndex))
Конечные точки, рассмотрите Паркет, если вы хотите только ловить определенные столбцы. Или, по крайней мере, подумайте о реализации лениво оцениваемой функции разделения, если у вас широкие ряды.
Ответ 4
Мы можем использовать новый DataFrameRDD для чтения и записи CSV-данных.
Существует несколько преимуществ DataFrameRDD над NormalRDD:
- DataFrameRDD бит быстрее, чем NormalRDD, поскольку мы определяем схему и которая помогает значительно оптимизировать время выполнения и обеспечить нам значительное увеличение производительности.
- Даже если столбец сдвинется в CSV, он автоматически примет правильный столбец, поскольку мы не будем жестко кодировать номер столбца, который присутствовал при чтении данных как textFile, а затем разделил его, а затем, используя число столбцов, чтобы получить данные.
- В нескольких строках кода вы можете напрямую прочитать файл CSV.
Вам потребуется библиотека: добавьте ее в build.sbt
libraryDependencies += "com.databricks" % "spark-csv_2.10" % "1.2.0"
Искры Scala код для него:
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val csvInPath = "/path/to/csv/abc.csv"
val df = sqlContext.read.format("com.databricks.spark.csv").option("header","true").load(csvInPath)
//format is for specifying the type of file you are reading
//header = true indicates that the first line is header in it
Чтобы преобразовать в обычный RDD, взяв из него некоторые столбцы и
val rddData = df.map(x=>Row(x.getAs("colA")))
//Do other RDD operation on it
Сохранение формата RDD в формате CSV:
val aDf = sqlContext.createDataFrame(rddData,StructType(Array(StructField("colANew",StringType,true))))
aDF.write.format("com.databricks.spark.csv").option("header","true").save("/csvOutPath/aCSVOp")
Поскольку заголовок имеет значение true, мы получим имя заголовка во всех выходных файлах.
Ответ 5
Я бы рекомендовал прочитать заголовок непосредственно из драйвера, а не через Spark. Две причины для этого: 1) Это одна строка. Нет никакого преимущества для распределенного подхода. 2) Нам нужна эта строка в драйвере, а не в рабочих узлах.
Это выглядит примерно так:
// Ridiculous amount of code to read one line.
val uri = new java.net.URI(filename)
val conf = sc.hadoopConfiguration
val fs = hadoop.fs.FileSystem.get(uri, conf)
val path = new hadoop.fs.Path(filename)
val stream = fs.open(path)
val source = scala.io.Source.fromInputStream(stream)
val header = source.getLines.head
Теперь, когда вы делаете RDD, вы можете отбросить заголовок.
val csvRDD = sc.textFile(filename).filter(_ != header)
Затем мы можем сделать RDD из одного столбца, например:
val idx = header.split(",").indexOf(columnName)
val columnRDD = csvRDD.map(_.split(",")(idx))
Ответ 6
Вот еще один пример использования Spark/ Scala to конвертировать CSV в RDD. Более подробное описание см. В post.
def main(args: Array[String]): Unit = {
val csv = sc.textFile("/path/to/your/file.csv")
// split / clean data
val headerAndRows = csv.map(line => line.split(",").map(_.trim))
// get header
val header = headerAndRows.first
// filter out header (eh. just check if the first val matches the first header name)
val data = headerAndRows.filter(_(0) != header(0))
// splits to map (header/value pairs)
val maps = data.map(splits => header.zip(splits).toMap)
// filter out the user "me"
val result = maps.filter(map => map("user") != "me")
// print result
result.foreach(println)
}
Ответ 7
Другой альтернативой является использование метода mapPartitionsWithIndex
, так как вы получите номер индекса раздела и список всех строк в этом разделе. Раздел 0 и строка 0 будут заголовком
val rows = sc.textFile(path)
.mapPartitionsWithIndex({ (index: Int, rows: Iterator[String]) =>
val results = new ArrayBuffer[(String, Int)]
var first = true
while (rows.hasNext) {
// check for first line
if (index == 0 && first) {
first = false
rows.next // skip the first row
} else {
results += rows.next
}
}
results.toIterator
}, true)
rows.flatMap { row => row.split(",") }
Ответ 8
Как насчет этого?
val Delimeter = ","
val textFile = sc.textFile("data.csv").map(line => line.split(Delimeter))
Ответ 9
Я предлагаю вам попробовать
https://spark.apache.org/docs/latest/sql-programming-guide.html#rdds
JavaRDD<Person> people = sc.textFile("examples/src/main/resources/people.txt").map(
new Function<String, Person>() {
public Person call(String line) throws Exception {
String[] parts = line.split(",");
Person person = new Person();
person.setName(parts[0]);
person.setAge(Integer.parseInt(parts[1].trim()));
return person;
}
});
У вас должен быть класс в этом примере с спецификацией заголовка файла и связать ваши данные с схемой и применить критерии, как в mysql.., чтобы получить желаемый результат
Ответ 10
Я думаю, вы можете попробовать загрузить этот csv в RDD, а затем создать фрейм данных из этого RDD, вот документ создания dataframe из rdd: http://spark.apache.org/docs/latest/sql-programming-guide.html#interoperating-with-rdds
Ответ 11
Для искры scala Я обычно использую, когда я не могу использовать пакеты искры csv...
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val rawdata = sc.textFile("hdfs://example.host:8020/user/example/example.csv")
val header = rawdata.first()
val tbldata = rawdata.filter(_(0) != header(0))
Ответ 12
Начиная с Spark 2.0, CSV можно читать непосредственно в DataFrame
.
Если файл данных не имеет строки заголовка, это будет:
val df = spark.read.csv("file://path/to/data.csv")
Это приведет к загрузке данных, но даст каждому столбцу общие имена, такие как _c0
, _c1
и т.д.
Если есть заголовки, то добавление .option("header", "true")
будет использовать первую строку для определения столбцов в DataFrame
:
val df = spark.read
.option("header", "true")
.csv("file://path/to/data.csv")
Для конкретного примера предположим, что у вас есть файл с содержимым:
user,topic,hits
om,scala,120
daniel,spark,80
3754978,spark,1
Затем получится общее количество хитов, сгруппированных по темам:
import org.apache.spark.sql.functions._
import spark.implicits._
val rawData = spark.read
.option("header", "true")
.csv("file://path/to/data.csv")
// specifies the query, but does not execute it
val grouped = rawData.groupBy($"topic").agg(sum($"hits))
// runs the query, pulling the data to the master node
// can fail if the amount of data is too much to fit
// into the master node memory!
val collected = grouped.collect
// runs the query, writing the result back out
// in this case, changing format to Parquet since that can
// be nicer to work with in Spark
grouped.write.parquet("hdfs://some/output/directory/")
// runs the query, writing the result back out
// in this case, in CSV format with a header and
// coalesced to a single file. This is easier for human
// consumption but usually much slower.
grouped.coalesce(1)
.write
.option("header", "true")
.csv("hdfs://some/output/directory/")