Лучший способ конвертировать поле строки в метку времени в Spark

У меня есть CSV, в котором поле имеет дату и время в определенном формате. Я не могу импортировать его непосредственно в свой Dataframe, потому что это должна быть метка времени. Поэтому я импортирую его как строку и преобразую его в Timestamp, как этот

import java.sql.Timestamp
import java.text.SimpleDateFormat
import java.util.Date
import org.apache.spark.sql.Row

def getTimestamp(x:Any) : Timestamp = {
    val format = new SimpleDateFormat("MM/dd/yyyy' 'HH:mm:ss")
    if (x.toString() == "") 
    return null
    else {
        val d = format.parse(x.toString());
        val t = new Timestamp(d.getTime());
        return t
    }
}

def convert(row : Row) : Row = {
    val d1 = getTimestamp(row(3))
    return Row(row(0),row(1),row(2),d1)
}

Есть ли лучший, более сжатый способ сделать это с помощью API Dataframe или spark-sql? Вышеупомянутый метод требует создания RDD и снова предоставить схему для Dataframe.

Ответы

Ответ 1

Искры> = 2,2

Начиная с версии 2.2 вы можете напрямую предоставить строку формата:

import org.apache.spark.sql.functions.to_timestamp

val ts = to_timestamp($"dts", "MM/dd/yyyy HH:mm:ss")

df.withColumn("ts", ts).show(2, false)

// +---+-------------------+-------------------+
// |id |dts                |ts                 |
// +---+-------------------+-------------------+
// |1  |05/26/2016 01:01:01|2016-05-26 01:01:01|
// |2  |#[email protected]#@#             |null               |
// +---+-------------------+-------------------+

Spark> = 1,6, <2,2

Вы можете использовать функции обработки даты, которые были введены в Spark 1.5. Предполагая, что у вас есть следующие данные:

val df = Seq((1L, "05/26/2016 01:01:01"), (2L, "#[email protected]#@#")).toDF("id", "dts")

Вы можете использовать unix_timestamp для синтаксического анализа строк и unix_timestamp их в timestamp

import org.apache.spark.sql.functions.unix_timestamp

val ts = unix_timestamp($"dts", "MM/dd/yyyy HH:mm:ss").cast("timestamp")

df.withColumn("ts", ts).show(2, false)

// +---+-------------------+---------------------+
// |id |dts                |ts                   |
// +---+-------------------+---------------------+
// |1  |05/26/2016 01:01:01|2016-05-26 01:01:01.0|
// |2  |#[email protected]#@#             |null                 |
// +---+-------------------+---------------------+

Как вы можете видеть, он охватывает как синтаксический анализ, так и обработку ошибок. Строка формата должна быть совместима с Java SimpleDateFormat.

Spark> 1,5, <1,6

Вам нужно будет использовать что-то вроде этого:

unix_timestamp($"dts", "MM/dd/yyyy HH:mm:ss").cast("double").cast("timestamp")

или же

(unix_timestamp($"dts", "MM/dd/yyyy HH:mm:ss") * 1000).cast("timestamp")

из-за SPARK-11724.

Искры <1,5

вы должны использовать их с expr и HiveContext.

Ответ 2

Я еще не играл с Spark SQL, но я думаю, что это было бы более идиоматично scala (нулевое использование не считается хорошей практикой):

def getTimestamp(s: String) : Option[Timestamp] = s match {
  case "" => None
  case _ => {
    val format = new SimpleDateFormat("MM/dd/yyyy' 'HH:mm:ss")
    Try(new Timestamp(format.parse(s).getTime)) match {
      case Success(t) => Some(t)
      case Failure(_) => None
    }    
  }
}

Обратите внимание, что я предполагаю, что вы знаете типы элементов Row заранее (если вы читаете его из файла csv, все они String), поэтому я использую правильный тип типа String, а не Any ( все подтип Any).

Это также зависит от того, как вы хотите обрабатывать исключения для синтаксического анализа. В этом случае, если возникает исключение синтаксического анализа, возвращается None.

Вы можете использовать его дальше:

rows.map(row => Row(row(0),row(1),row(2), getTimestamp(row(3))

Ответ 3

У меня есть метка времени ISO8601 в моем наборе данных, и мне нужно было преобразовать ее в формат "yyyy-MM-dd". Это то, что я сделал:

import org.joda.time.{DateTime, DateTimeZone}
object DateUtils extends Serializable {
  def dtFromUtcSeconds(seconds: Int): DateTime = new DateTime(seconds * 1000L, DateTimeZone.UTC)
  def dtFromIso8601(isoString: String): DateTime = new DateTime(isoString, DateTimeZone.UTC)
}

sqlContext.udf.register("formatTimeStamp", (isoTimestamp : String) => DateUtils.dtFromIso8601(isoTimestamp).toString("yyyy-MM-dd"))

И вы можете просто использовать UDF в своем искровом SQL-запросе.

Ответ 4

Я хотел бы переместить метод getTimeStamp, написанный вами в rdd mapPartitions, и повторно использовать GenericMutableRow среди строк в итераторе:

val strRdd = sc.textFile("hdfs://path/to/cvs-file")
val rowRdd: RDD[Row] = strRdd.map(_.split('\t')).mapPartitions { iter =>
  new Iterator[Row] {
    val row = new GenericMutableRow(4)
    var current: Array[String] = _

    def hasNext = iter.hasNext
    def next() = {
      current = iter.next()
      row(0) = current(0)
      row(1) = current(1)
      row(2) = current(2)

      val ts = getTimestamp(current(3))
      if(ts != null) {
        row.update(3, ts)
      } else {
        row.setNullAt(3)
      }
      row
    }
  }
}

И вы все равно должны использовать схему для создания DataFrame

val df = sqlContext.createDataFrame(rowRdd, tableSchema)

Использование GenericMutableRow внутри реализации итератора можно найти в Агрегированный оператор, InMemoryColumnarTableScan, ParquetTableOperations и т.д.

Ответ 5

Я бы использовал https://github.com/databricks/spark-csv

Это позволит вам установить временные метки для вас.

import com.databricks.spark.csv._
val rdd: RDD[String] = sc.textFile("csvfile.csv")

val df : DataFrame = new CsvParser().withDelimiter('|')
      .withInferSchema(true)
      .withParseMode("DROPMALFORMED")
      .csvRdd(sqlContext, rdd)

Ответ 6

У меня были некоторые проблемы с to_timestamp, где он возвращал пустую строку. После большого количества проб и ошибок, я смог обойти его, выставив как временную метку, а затем отбросив назад как строку. Надеюсь, это поможет кому-то еще с той же проблемой:

df.columns.intersect(cols).foldLeft(df)((newDf, col) => {
  val conversionFunc = to_timestamp(newDf(col).cast("timestamp"), "MM/dd/yyyy HH:mm:ss").cast("string")
  newDf.withColumn(col, conversionFunc)
})