Несколько искровых заданий, добавляющих данные паркета к одному базовому пути с разбиением на разделы
У меня есть несколько заданий, которые я хочу выполнить параллельно, чтобы добавить ежедневные данные в один и тот же путь, используя разделение.
например.
dataFrame.write().
partitionBy("eventDate", "category")
.mode(Append)
.parquet("s3://bucket/save/path");
Работа 1 - категория = "billing_events"
Job 2 - category = "click_events"
Оба этих задания будут обрезать любые существующие разделы, которые существуют в ведро s3 до выполнения, а затем сохранить полученные паркетные файлы в соответствующие разделы.
то есть.
задание 1 → s3://bucket/save/path/eventDate = 20160101/channel = billing_events
задание 2 → s3://bucket/save/path/eventDate = 20160101/channel = click_events
Проблема im сталкивается с временными файлами, которые создаются во время выполнения задания с помощью искры. Он сохраняет файлы разработки на базовый путь
s3://ведро/сохранить/путь/_temporary/...
поэтому оба задания заканчиваются совместным использованием одной и той же папки temp и создают конфликт, который, как заметил я, может вызвать одно задание для удаления временных файлов, а другое задание не работает с 404 из s3, поскольку ожидаемый файл temp не существует.
Кто-нибудь столкнулся с этой проблемой и придумал стратегию параллельного выполнения заданий на одном базовом пути?
im с использованием искры 1.6.0 на данный момент
Ответы
Ответ 1
Итак, после долгих чтений о том, как решить эту проблему, я подумал, что я передаю некоторую мудрость назад, чтобы обернуть вещи. Спасибо главным образом комментариям Тэла.
Я также обнаружил, что запись непосредственно в s3://bucket/save/path кажется опасной, потому что если задание убито и очистка временной папки не происходит в конце задания, это похоже на ее остался там для следующего задания, и я заметил, что иногда предыдущие убитые рабочие файлы temp попадают в s3://bucket/save/path и вызывают дублирование... Полностью ненадежный...
Кроме того, операция переименования файлов текущей папки в соответствующие файлы s3 занимает ужасающее количество времени (около 1 секунды на файл), поскольку S3 поддерживает только копирование/удаление, а не переименование. Кроме того, только экземпляр драйвера переименовывает эти файлы с использованием одного потока, так как 1/5 некоторых заданий с большим количеством файлов/разделов расходуется только на ожидание операций переименования.
Я исключил использование DirectOutputCommitter по ряду причин.
Единственный безопасный, эффективный и последовательный способ выполнения этих заданий - сохранить их в уникальной временной папке (уникальной по идентификатору приложения или временной шкале) в hdfs в первую очередь. И скопируйте на S3 по завершении задания.
Это позволяет выполнять параллельные задания, поскольку они будут сохраняться в уникальных временных папках, не нужно использовать DirectOutputCommitter, поскольку операция переименования в HDFS выполняется быстрее, чем S3, и сохраненные данные более согласованы.
Ответ 2
Я подозреваю, что это связано с изменениями в обнаружении разделов, которые были введены в Spark 1.6. Изменения означают, что Spark будет рассматривать только пути, такие как .../xxx=yyy/
, в качестве разделов, если вы указали "базовый путь" -описания (см. Примечания к выпуску Spark здесь).
Итак, я думаю, что ваша проблема будет решена, если вы добавите опцию базового пути, например:
dataFrame
.write()
.partitionBy("eventDate", "category")
.option("basepath", "s3://bucket/save/path")
.mode(Append)
.parquet("s3://bucket/save/path");
(У меня не было возможности проверить его, но, надеюсь, он выполнит трюк:))
Ответ 3
Вместо использования partitionBy
dataFrame.write(). partitionBy ( "eventDate", "category" ) .mode(Append) .parquet( "s3://ведро/сохранить/путь" );
В качестве альтернативы вы можете записать файлы как
В задании-1 укажите путь к файлу паркета как
dataFrame.write(). mode (Добавить).parquet( "s3://bucket/save/path/eventDate = 20160101/channel = billing_events" )
& в задании-2 укажите путь к файлу паркета как
dataFrame.write(). mode (Append).parquet( "s3://bucket/save/path/eventDate = 20160101/channel = click_events" )
- Оба задания создадут отдельный _temporary каталог в соответствующей папке, поэтому проблема concurrency решена.
- И обнаружение раздела также произойдет как eventDate = 20160101 и для столбца канала.
- Недостаток - даже если channel = click_events не существует в файле данных все еще паркета для канала = click_events будет создан.
Ответ 4
Я сделал вариацию ответа parthiv и создал искусственные разделы на основе уникального идентификатора. Вероятно, вы можете использовать идентификатор потока, но я использовал хэш ввода для каждого из моих параллельных заданий.
df = ... \
.withColumn('job_id', lit(hashlib.md5(...).hexdigest()))
df.write() \
.format('parquet') \
.partitionBy('date', 'job_id') \
...
Это немного злоупотребляет функцией раздела, но пока это работает.
Ответ 5
Несколько задач записи на этом пути с "partitionBy", будет FAILED, когда _temporary
было удалить в cleanupJob
из FileOutputCommitter
, как No such file or directory
.
ИСПЫТАТЕЛЬНЫЙ КОД:
def batchTask[A](TASK_tag: String, taskData: TraversableOnce[A], batchSize: Int, fTask: A => Unit, fTaskId: A => String): Unit = {
var list = new scala.collection.mutable.ArrayBuffer[(String, java.util.concurrent.Future[Int])]()
val executors = java.util.concurrent.Executors.newFixedThreadPool(batchSize)
try {
taskData.foreach(d => {
val task = executors.submit(new java.util.concurrent.Callable[Int] {
override def call(): Int = {
fTask(d)
1
}
})
list += ((fTaskId(d), task))
})
var count = 0
list.foreach(r => if (!r._2.isCancelled) count += r._2.get())
} finally {
executors.shutdown()
}
}
def testWriteFail(outPath: String)(implicit spark: SparkSession, sc: SparkContext): Unit = {
println(s"try save: ${outPath}")
import org.apache.spark.sql.functions._
import spark.sqlContext.implicits._
batchTask[Int]("test", 1 to 20, 6, t => {
val df1 =
Seq((1, "First Value", java.sql.Date.valueOf("2010-01-01")), (2, "Second Value", java.sql.Date.valueOf("2010-02-01")))
.toDF("int_column", "string_column", "date_column")
.withColumn("t0", lit(t))
df1.repartition(1).write
.mode("overwrite")
.option("mapreduce.fileoutputcommitter.marksuccessfuljobs", false)
.partitionBy("t0").csv(outPath)
}, t => f"task.${t}%4d") // some Exception
println(s"fail: count=${spark.read.csv(outPath).count()}")
}
try {
testWriteFail(outPath + "/fail")
} catch {
case e: Throwable =>
}
Не удалось
Используйте OutputCommitter
:
package org.jar.spark.util
import java.io.IOException
/*
* 用于 DataFrame 多任务写入同一个目录。
* <pre>
* 1. 基于临时目录写入
* 2. 如果【任务的输出】可能会有重叠,不要使用 overwrite 方式,以免误删除
* </pre>
* <p/>
* Created by liao on 2018-12-02.
*/
object JMultiWrite {
val JAR_Write_Cache_Flag = "jar.write.cache.flag"
val JAR_Write_Cache_TaskId = "jar.write.cache.taskId"
/** 自动删除目标目录下同名子目录 */
val JAR_Write_Cache_Overwrite = "jar.write.cache.overwrite"
implicit class ImplicitWrite[T](dw: org.apache.spark.sql.DataFrameWriter[T]) {
/**
* 输出到文件,需要在外面配置 option format mode 等
*
* @param outDir 输出目标目录
* @param taskId 此次任务ID,用于隔离各任务的输出,必须具有唯一性
* @param cacheDir 缓存目录,最好是 '_' 开头的目录,如 "_jarTaskCache"
* @param overwrite 是否删除已经存在的目录,默认 false 表示 Append模式
* <font color=red>(如果 并行任务可能有相同 子目录输出时,会冲掉,此时不要使用 overwrite)</font>
*/
def multiWrite(outDir: String, taskId: String, cacheDir: String = "_jarTaskCache", overwrite: Boolean = false): Boolean = {
val p = path(outDir, cacheDir, taskId)
dw.options(options(cacheDir, taskId))
.option(JAR_Write_Cache_Overwrite, overwrite)
.mode(org.apache.spark.sql.SaveMode.Overwrite)
.save(p)
true
}
}
def options(cacheDir: String, taskId: String): Map[String, String] = {
Map(JAR_Write_Cache_Flag -> cacheDir,
JAR_Write_Cache_TaskId -> taskId,
"mapreduce.fileoutputcommitter.marksuccessfuljobs" -> "false",
"mapreduce.job.outputformat.class" -> classOf[JarOutputFormat].getName
)
}
def path(outDir: String, cacheDir: String, taskId: String): String = {
assert(outDir != "", "need OutDir")
assert(cacheDir != "", "need CacheDir")
assert(taskId != "", "needTaskId")
outDir + "/" + cacheDir + "/" + taskId
}
/*-o-o-o-o-o-o-o-o-o-o-o-o-o-o-o-o-o-o-o-o-o-o-o-o-o-o-o-o-o-o-o-o-*/
class JarOutputFormat extends org.apache.hadoop.mapreduce.lib.output.TextOutputFormat {
var committer: org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter = _
override def getOutputCommitter(context: org.apache.hadoop.mapreduce.TaskAttemptContext): org.apache.hadoop.mapreduce.OutputCommitter = {
if (this.committer == null) {
val output = org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.getOutputPath(context)
this.committer = new JarOutputCommitter(output, context)
}
this.committer
}
}
class JarOutputCommitter(output: org.apache.hadoop.fs.Path, context: org.apache.hadoop.mapreduce.TaskAttemptContext)
extends org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter(output, context) {
override def commitJob(context: org.apache.hadoop.mapreduce.JobContext): Unit = {
val finalOutput = this.output
val cacheFlag = context.getConfiguration.get(JAR_Write_Cache_Flag, "")
val myTaskId = context.getConfiguration.get(JAR_Write_Cache_TaskId, "")
val overwrite = context.getConfiguration.getBoolean(JAR_Write_Cache_Overwrite, false)
val hasCacheFlag = finalOutput.getName == myTaskId && finalOutput.getParent.getName == cacheFlag
val finalReal = if (hasCacheFlag) finalOutput.getParent.getParent else finalOutput // 确定最终目录
// 遍历输出目录
val fs = finalOutput.getFileSystem(context.getConfiguration)
val jobAttemptPath = getJobAttemptPath(context)
val arr$ = fs.listStatus(jobAttemptPath, new org.apache.hadoop.fs.PathFilter {
override def accept(path: org.apache.hadoop.fs.Path): Boolean = !"_temporary".equals(path.getName())
})
if (hasCacheFlag && overwrite) // 移除同名子目录
{
if (fs.isDirectory(finalReal)) arr$.foreach(stat =>
if (fs.isDirectory(stat.getPath)) fs.listStatus(stat.getPath).foreach(stat2 => {
val p1 = stat2.getPath
val p2 = new org.apache.hadoop.fs.Path(finalReal, p1.getName)
if (fs.isDirectory(p1) && fs.isDirectory(p2) && !fs.delete(p2, true)) throw new IOException("Failed to delete " + p2)
})
)
}
arr$.foreach(stat => {
mergePaths(fs, stat, finalReal)
})
cleanupJob(context)
if (hasCacheFlag) { // 移除缓存目录
try {
fs.delete(finalOutput, false)
val pp = finalOutput.getParent
if (fs.listStatus(pp).isEmpty)
fs.delete(pp, false)
} catch {
case e: Exception =>
}
}
// 不用输出 _SUCCESS 了
//if (context.getConfiguration.getBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs", true)) {
// val markerPath = new org.apache.hadoop.fs.Path(this.outputPath, "_SUCCESS")
// fs.create(markerPath).close()
//}
}
}
@throws[IOException]
def mergePaths(fs: org.apache.hadoop.fs.FileSystem, from: org.apache.hadoop.fs.FileStatus, to: org.apache.hadoop.fs.Path): Unit = {
if (from.isFile) {
if (fs.exists(to) && !fs.delete(to, true)) throw new IOException("Failed to delete " + to)
if (!fs.rename(from.getPath, to)) throw new IOException("Failed to rename " + from + " to " + to)
}
else if (from.isDirectory) if (fs.exists(to)) {
val toStat = fs.getFileStatus(to)
if (!toStat.isDirectory) {
if (!fs.delete(to, true)) throw new IOException("Failed to delete " + to)
if (!fs.rename(from.getPath, to)) throw new IOException("Failed to rename " + from + " to " + to)
}
else {
val arr$ = fs.listStatus(from.getPath)
for (subFrom <- arr$) {
mergePaths(fs, subFrom, new org.apache.hadoop.fs.Path(to, subFrom.getPath.getName))
}
}
}
else if (!fs.rename(from.getPath, to)) throw new IOException("Failed to rename " + from + " to " + to)
}
}
А потом:
def testWriteOk(outPath: String)(implicit spark: SparkSession, sc: SparkContext): Unit = {
println(s"try save: ${outPath}")
import org.apache.spark.sql.functions._
import org.jar.spark.util.JMultiWrite.ImplicitWrite // 导入工具
import spark.sqlContext.implicits._
batchTask[Int]("test.ok", 1 to 20, 6, t => {
val taskId = t.toString
val df1 =
Seq((1, "First Value", java.sql.Date.valueOf("2010-01-01")), (2, "Second Value", java.sql.Date.valueOf("2010-02-01")))
.toDF("int_column", "string_column", "date_column")
.withColumn("t0", lit(taskId))
df1.repartition(1).write
.partitionBy("t0")
.format("csv")
.multiWrite(outPath, taskId, overwrite = true) // 这里使用了 overwrite ,如果分区有重叠,请不要使用 overwrite
}, t => f"task.${t}%4d")
println(s"ok: count=${spark.read.csv(outPath).count()}") // 40
}
try {
testWriteOk(outPath + "/ok")
} catch {
case e: Throwable =>
}
Успех:
$ ls ok/
t0=1 t0=10 t0=11 t0=12 t0=13 t0=14 t0=15 t0=16 t0=17 t0=18 t0=19 t0=2 t0=20 t0=3 t0=4 t0=5 t0=6 t0=7 t0=8 t0=9
То же самое относится к другим форматам вывода, обратите внимание на использование overwrite
.
Испытание на искру 2.11.8.
Спасибо за @Tal Joffe