Parse CSV как DataFrame/DataSet с Apache Spark и Java

Я новичок в искры, и я хочу использовать групповое и уменьшить, чтобы найти следующее из CSV (одна строка от используемого):

  Department, Designation, costToCompany, State
  Sales, Trainee, 12000, UP
  Sales, Lead, 32000, AP
  Sales, Lead, 32000, LA
  Sales, Lead, 32000, TN
  Sales, Lead, 32000, AP
  Sales, Lead, 32000, TN 
  Sales, Lead, 32000, LA
  Sales, Lead, 32000, LA
  Marketing, Associate, 18000, TN
  Marketing, Associate, 18000, TN
  HR, Manager, 58000, TN

Я хотел бы упростить о CSV с группой Отдел, Назначение, Состояние с дополнительными столбцами с sum (costToCompany) и TotalEmployeeCount

Должен получить результат вроде:

  Dept, Desg, state, empCount, totalCost
  Sales,Lead,AP,2,64000
  Sales,Lead,LA,3,96000  
  Sales,Lead,TN,2,64000

Есть ли способ достичь этого с помощью преобразований и действий. Или мы должны идти на операции RDD?

Ответы

Ответ 1

Процедура

  • Создайте класс (схему), чтобы инкапсулировать вашу структуру (ее не требуется для подхода B, но это сделает ваш код более легким для чтения, если вы используете Java)

    public class Record implements Serializable {
      String department;
      String designation;
      long costToCompany;
      String state;
      // constructor , getters and setters  
    }
    
  • Загрузка файла CVS (JSON)

    JavaSparkContext sc;
    JavaRDD<String> data = sc.textFile("path/input.csv");
    JavaSQLContext sqlContext = new JavaSQLContext(sc);
    
    JavaRDD<Record> rdd_records = sc.textFile(data).map(
      new Function<String, Record>() {
          public Record call(String line) throws Exception {
             // Here you can use JSON
             // Gson gson = new Gson();
             // gson.fromJson(line, Record.class);
             String[] fields = line.split(",");
             Record sd = new Record(fields[0], fields[1], fields[2].trim(), fields[3]);
             return sd;
          }
    });
    

В этот момент у вас есть 2 подхода:

A. SparkSQL

  • Зарегистрируйте таблицу (используя свой определенный класс схемы)

    JavaSchemaRDD table = sqlContext.applySchema(rdd_records, Record.class);
    table.registerAsTable("record_table");
    table.printSchema();
    
  • Запросить таблицу с нужной командой Query-group-by

    JavaSchemaRDD res = sqlContext.sql("
      select department,designation,state,sum(costToCompany),count(*) 
      from record_table 
      group by department,designation,state
    ");
    
  • Здесь вы также сможете выполнить любой другой запрос, используя SQL-подход

В. Спарк

  • Сопоставление с использованием составного ключа: Department, Designation, State

    JavaPairRDD<String, Tuple2<Long, Integer>> records_JPRDD = 
    rdd_records.mapToPair(new
      PairFunction<Record, String, Tuple2<Long, Integer>>(){
        public Tuple2<String, Tuple2<Long, Integer>> call(Record record){
          Tuple2<String, Tuple2<Long, Integer>> t2 = 
          new Tuple2<String, Tuple2<Long,Integer>>(
            record.Department + record.Designation + record.State,
            new Tuple2<Long, Integer>(record.costToCompany,1)
          );
          return t2;
    }
    

    });

  • reduceByKey, используя составной ключ, суммируя столбец costToCompany и накапливая количество записей с помощью ключа

    JavaPairRDD<String, Tuple2<Long, Integer>> final_rdd_records = 
     records_JPRDD.reduceByKey(new Function2<Tuple2<Long, Integer>, Tuple2<Long,
     Integer>, Tuple2<Long, Integer>>() {
        public Tuple2<Long, Integer> call(Tuple2<Long, Integer> v1,
        Tuple2<Long, Integer> v2) throws Exception {
            return new Tuple2<Long, Integer>(v1._1 + v2._1, v1._2+ v2._2);
        }
    });
    

Ответ 2

Следующие могут быть не совсем корректными, но это должно дать вам некоторое представление о том, как жонглировать данными. Это некрасиво, нужно заменить на классы case и т.д., Но, как быстрый пример использования искры api, я надеюсь, что это достаточно:)

val rawlines = sc.textfile("hdfs://.../*.csv")
case class Employee(dep: String, des: String, cost: Double, state: String)
val employees = rawlines
  .map(_.split(",") /*or use a proper CSV parser*/
  .map( Employee(row(0), row(1), row(2), row(3) )

# the 1 is the amount of employees (which is obviously 1 per line)
val keyVals = employees.map( em => (em.dep, em.des, em.state), (1 , em.cost))

val results = keyVals.reduceByKey{ a,b =>
    (a._1 + b._1, b._1, b._2) # (a.count + b.count , a.cost + b.cost )
}

#debug output
results.take(100).foreach(println)

results
  .map( keyval => someThingToFormatAsCsvStringOrWhatever )
  .saveAsTextFile("hdfs://.../results")

Или вы можете использовать SparkSQL:

val sqlContext = new SQLContext(sparkContext)

# case classes can easily be registered as tables
employees.registerAsTable("employees")

val results = sqlContext.sql("""select dep, des, state, sum(cost), count(*) 
  from employees 
  group by dep,des,state"""

Ответ 3

Для JSON, если ваш текстовый файл содержит один объект JSON в строке, вы можете использовать sqlContext.jsonFile(path), чтобы Spark SQL загружал его как SchemaRDD (схема будет автоматически выведена). Затем вы можете зарегистрировать его в виде таблицы и запросить его с помощью SQL. Вы также можете вручную загрузить текстовый файл как RDD[String], содержащий один объект JSON для каждой записи, и использовать sqlContext.jsonRDD(rdd), чтобы включить его как SchemaRDD. jsonRDD полезен, когда вам нужно предварительно обработать ваши данные.

Ответ 4

Файл CSV может быть проанализирован встроенным считывателем CSV Spark. Он вернется DataFrame/DataSet для успешного чтения файла. На вершине DataFrame/DataSet, вы легко применяете SQL-подобные операции.

Использование Spark 2.x(и выше) с Java

Создать объект SparkSession aka spark

import org.apache.spark.sql.SparkSession;

SparkSession spark = SparkSession
    .builder()
    .appName("Java Spark SQL Example")
    .getOrCreate();

Создать схему для строки с StructType

import org.apache.spark.sql.types.StructType;

StructType schema = new StructType()
    .add("department", "string")
    .add("designation", "string")
    .add("ctc", "long")
    .add("state", "string");

Создайте фрейм данных из файла CSV и примените к нему схему

Dataset<Row> df = spark.read()
    .option("mode", "DROPMALFORMED")
    .schema(schema)
    .csv("path/input.csv");

больше параметров при чтении данных из файла CSV

Теперь мы можем агрегировать данные двумя способами

1. SQL-путь

Зарегистрируйте таблицу в метастаре sql искры для выполнения операции SQL

df.createOrReplaceTempView("employee");

Запустить SQL-запрос на зарегистрированном фрейме данных

Dataset<Row> sqlResult = spark.sql(
    "SELECT department, designation, state, SUM(ctc), COUNT(department)" 
        + " FROM employee GROUP BY department, designation, state");

sqlResult.show(); //for testing

Мы даже можем выполнить SQL непосредственно в CSV файле без создания таблицы с Spark SQL


2. Цепочка объектов или программирование или похожий на Java способ Сделайте необходимый импорт для функций sql

import static org.apache.spark.sql.functions.count;
import static org.apache.spark.sql.functions.sum;

Используйте groupBy и agg в dataframe/dataset для выполнения count и sum по данным

Dataset<Row> dfResult = df.groupBy("department", "designation", "state")
    .agg(sum("ctc"), count("department"));
// After Spark 1.6 columns mentioned in group by will be added to result by default

dfResult.show();//for testing

зависимые библиотеки

"org.apache.spark" % "spark-core_2.11" % "2.0.0" 
"org.apache.spark" % "spark-sql_2.11" % "2.0.0"

Ответ 5

Для входных файлов CSV вы можете использовать spark-csv, который преобразует входной файл CSV в DataFrame.