Лучший способ конвертировать поле строки в метку времени в Spark
У меня есть CSV, в котором поле имеет дату и время в определенном формате. Я не могу импортировать его непосредственно в свой Dataframe, потому что это должна быть метка времени. Поэтому я импортирую его как строку и преобразую его в Timestamp
, как этот
import java.sql.Timestamp
import java.text.SimpleDateFormat
import java.util.Date
import org.apache.spark.sql.Row
def getTimestamp(x:Any) : Timestamp = {
val format = new SimpleDateFormat("MM/dd/yyyy' 'HH:mm:ss")
if (x.toString() == "")
return null
else {
val d = format.parse(x.toString());
val t = new Timestamp(d.getTime());
return t
}
}
def convert(row : Row) : Row = {
val d1 = getTimestamp(row(3))
return Row(row(0),row(1),row(2),d1)
}
Есть ли лучший, более сжатый способ сделать это с помощью API Dataframe или spark-sql? Вышеупомянутый метод требует создания RDD и снова предоставить схему для Dataframe.
Ответы
Ответ 1
Искры> = 2,2
Начиная с версии 2.2 вы можете напрямую предоставить строку формата:
import org.apache.spark.sql.functions.to_timestamp
val ts = to_timestamp($"dts", "MM/dd/yyyy HH:mm:ss")
df.withColumn("ts", ts).show(2, false)
// +---+-------------------+-------------------+
// |id |dts |ts |
// +---+-------------------+-------------------+
// |1 |05/26/2016 01:01:01|2016-05-26 01:01:01|
// |2 |#[email protected]#@# |null |
// +---+-------------------+-------------------+
Spark> = 1,6, <2,2
Вы можете использовать функции обработки даты, которые были введены в Spark 1.5. Предполагая, что у вас есть следующие данные:
val df = Seq((1L, "05/26/2016 01:01:01"), (2L, "#[email protected]#@#")).toDF("id", "dts")
Вы можете использовать unix_timestamp
для синтаксического анализа строк и unix_timestamp
их в timestamp
import org.apache.spark.sql.functions.unix_timestamp
val ts = unix_timestamp($"dts", "MM/dd/yyyy HH:mm:ss").cast("timestamp")
df.withColumn("ts", ts).show(2, false)
// +---+-------------------+---------------------+
// |id |dts |ts |
// +---+-------------------+---------------------+
// |1 |05/26/2016 01:01:01|2016-05-26 01:01:01.0|
// |2 |#[email protected]#@# |null |
// +---+-------------------+---------------------+
Как вы можете видеть, он охватывает как синтаксический анализ, так и обработку ошибок. Строка формата должна быть совместима с Java SimpleDateFormat
.
Spark> 1,5, <1,6
Вам нужно будет использовать что-то вроде этого:
unix_timestamp($"dts", "MM/dd/yyyy HH:mm:ss").cast("double").cast("timestamp")
или же
(unix_timestamp($"dts", "MM/dd/yyyy HH:mm:ss") * 1000).cast("timestamp")
из-за SPARK-11724.
Искры <1,5
вы должны использовать их с expr
и HiveContext
.
Ответ 2
Я еще не играл с Spark SQL, но я думаю, что это было бы более идиоматично scala (нулевое использование не считается хорошей практикой):
def getTimestamp(s: String) : Option[Timestamp] = s match {
case "" => None
case _ => {
val format = new SimpleDateFormat("MM/dd/yyyy' 'HH:mm:ss")
Try(new Timestamp(format.parse(s).getTime)) match {
case Success(t) => Some(t)
case Failure(_) => None
}
}
}
Обратите внимание, что я предполагаю, что вы знаете типы элементов Row
заранее (если вы читаете его из файла csv, все они String
), поэтому я использую правильный тип типа String
, а не Any
( все подтип Any
).
Это также зависит от того, как вы хотите обрабатывать исключения для синтаксического анализа. В этом случае, если возникает исключение синтаксического анализа, возвращается None
.
Вы можете использовать его дальше:
rows.map(row => Row(row(0),row(1),row(2), getTimestamp(row(3))
Ответ 3
У меня есть метка времени ISO8601 в моем наборе данных, и мне нужно было преобразовать ее в формат "yyyy-MM-dd". Это то, что я сделал:
import org.joda.time.{DateTime, DateTimeZone}
object DateUtils extends Serializable {
def dtFromUtcSeconds(seconds: Int): DateTime = new DateTime(seconds * 1000L, DateTimeZone.UTC)
def dtFromIso8601(isoString: String): DateTime = new DateTime(isoString, DateTimeZone.UTC)
}
sqlContext.udf.register("formatTimeStamp", (isoTimestamp : String) => DateUtils.dtFromIso8601(isoTimestamp).toString("yyyy-MM-dd"))
И вы можете просто использовать UDF в своем искровом SQL-запросе.
Ответ 4
Я хотел бы переместить метод getTimeStamp
, написанный вами в rdd mapPartitions, и повторно использовать GenericMutableRow среди строк в итераторе:
val strRdd = sc.textFile("hdfs://path/to/cvs-file")
val rowRdd: RDD[Row] = strRdd.map(_.split('\t')).mapPartitions { iter =>
new Iterator[Row] {
val row = new GenericMutableRow(4)
var current: Array[String] = _
def hasNext = iter.hasNext
def next() = {
current = iter.next()
row(0) = current(0)
row(1) = current(1)
row(2) = current(2)
val ts = getTimestamp(current(3))
if(ts != null) {
row.update(3, ts)
} else {
row.setNullAt(3)
}
row
}
}
}
И вы все равно должны использовать схему для создания DataFrame
val df = sqlContext.createDataFrame(rowRdd, tableSchema)
Использование GenericMutableRow внутри реализации итератора можно найти в Агрегированный оператор, InMemoryColumnarTableScan, ParquetTableOperations и т.д.
Ответ 5
Я бы использовал https://github.com/databricks/spark-csv
Это позволит вам установить временные метки для вас.
import com.databricks.spark.csv._
val rdd: RDD[String] = sc.textFile("csvfile.csv")
val df : DataFrame = new CsvParser().withDelimiter('|')
.withInferSchema(true)
.withParseMode("DROPMALFORMED")
.csvRdd(sqlContext, rdd)
Ответ 6
У меня были некоторые проблемы с to_timestamp, где он возвращал пустую строку. После большого количества проб и ошибок, я смог обойти его, выставив как временную метку, а затем отбросив назад как строку. Надеюсь, это поможет кому-то еще с той же проблемой:
df.columns.intersect(cols).foldLeft(df)((newDf, col) => {
val conversionFunc = to_timestamp(newDf(col).cast("timestamp"), "MM/dd/yyyy HH:mm:ss").cast("string")
newDf.withColumn(col, conversionFunc)
})