Apache Spark on YARN: большое количество файлов входных данных (объединение нескольких входных файлов в искру)
Необходима помощь для лучшей практики внедрения.
Рабочая среда выглядит следующим образом:
- Файл данных журнала поступает нерегулярно.
- Размер файла данных журнала составляет от 3,9 КБ до 8,5 МБ. В среднем около 1 МБ.
- Количество записей файла данных составляет от 13 строк до 22000 строк. В среднем около 2700 строк.
- Файл данных должен быть обработан после агрегирования.
- Алгоритм последующей обработки можно изменить.
- Постобработочный файл управляется отдельно с исходным файлом данных, поскольку алгоритм пост-обработки может быть изменен.
- Выполняется ежедневная агрегация. Все файлы, обработанные после обработки, должны быть отфильтрованы по записям и агрегированию (среднее значение, max min...).
- Поскольку агрегация мелкозернистая, количество записей после агрегации не так мало. Это может быть около половины количества исходных записей.
- В какой-то момент количество файлов после обработки может составлять около 200 000.
- Файл данных должен быть удален отдельно.
В тесте я попытался обработать 160 000 обработанных после обработки файлов Spark, начиная с sc.textFile() с помощью glob-пути, с ошибкой OutOfMemory в процессе драйвера не удалось.
Какова наилучшая практика для обработки данных такого типа?
Должен ли я использовать HBase вместо обычных файлов для сохранения данных после обработки?
Ответы
Ответ 1
Мы написали собственный загрузчик. Он решил проблему с небольшими файлами в HDFS. Он использует Hadoop CombineFileInputFormat.
В нашем случае это уменьшило количество преобразователей от 100000 до 3000 и значительно ускорило работу.
https://github.com/RetailRocket/SparkMultiTool
Пример:
import ru.retailrocket.spark.multitool.Loaders
val sessions = Loaders.combineTextFile(sc, "file:///test/*")
// or val sessions = Loaders.combineTextFile(sc, conf.weblogs(), size = 256, delim = "\n")
// where size is split size in Megabytes, delim - line break character
println(sessions.count())
Ответ 2
Я уверен, что причиной получения OOM является обработка большого количества небольших файлов. Вы хотите объединить входные файлы, чтобы не было так много разделов. Я пытаюсь ограничить свои задания примерно до 10 тыс. Разделов.
После textFile
вы можете использовать .coalesce(10000, false)
... не на 100% уверен, что это сработает, потому что это было какое-то время, так как я сделал это, пожалуйста, дайте мне знать. Поэтому попробуйте
sc.textFile(path).coalesce(10000, false)
Ответ 3
Вы можете использовать этот
Сначала вы можете получить Buffer/List of S3 Paths/Same для HDFS или локального пути
Если вы пытаетесь с Amazon S3, то:
import scala.collection.JavaConverters._
import java.util.ArrayList
import com.amazonaws.services.s3.AmazonS3Client
import com.amazonaws.services.s3.model.ObjectListing
import com.amazonaws.services.s3.model.S3ObjectSummary
import com.amazonaws.services.s3.model.ListObjectsRequest
def listFiles(s3_bucket:String, base_prefix : String) = {
var files = new ArrayList[String]
//S3 Client and List Object Request
var s3Client = new AmazonS3Client();
var objectListing: ObjectListing = null;
var listObjectsRequest = new ListObjectsRequest();
//Your S3 Bucket
listObjectsRequest.setBucketName(s3_bucket)
//Your Folder path or Prefix
listObjectsRequest.setPrefix(base_prefix)
//Adding s3:// to the paths and adding to a list
do {
objectListing = s3Client.listObjects(listObjectsRequest);
for (objectSummary <- objectListing.getObjectSummaries().asScala) {
files.add("s3://" + s3_bucket + "/" + objectSummary.getKey());
}
listObjectsRequest.setMarker(objectListing.getNextMarker());
} while (objectListing.isTruncated());
//Removing Base Directory Name
files.remove(0)
//Creating a Scala List for same
files.asScala
}
Теперь передайте этот объект List следующему фрагменту кода, обратите внимание: sc является объектом SQLContext
var df: DataFrame = null;
for (file <- files) {
val fileDf= sc.textFile(file)
if (df!= null) {
df= df.unionAll(fileDf)
} else {
df= fileDf
}
}
Теперь вы получили окончательный унифицированный RDD, т.е. df
Необязательно, и вы также можете переделать его в одном BigRDD
val files = sc.textFile(filename, 1).repartition(1)
Перераспределение всегда работает: D