Ответ 1
Некоторое "уродливое, но рабочее решение" может быть создано путем расширения FileInputDStream.
Запись sc.textFileStream(d)
эквивалентна
new FileInputDStream[LongWritable, Text, TextInputFormat](streamingContext, d).map(_._2.toString)
Вы можете создать CustomFileInputDStream, который расширит FileInputDStream. Пользовательский класс скопирует метод вычисления из класса FileInputDStream и настроит метод findNewFiles в соответствии с вашими потребностями.
изменение метода findNewFiles из:
private def findNewFiles(currentTime: Long): Array[String] = {
try {
lastNewFileFindingTime = clock.getTimeMillis()
// Calculate ignore threshold
val modTimeIgnoreThreshold = math.max(
initialModTimeIgnoreThreshold, // initial threshold based on newFilesOnly setting
currentTime - durationToRemember.milliseconds // trailing end of the remember window
)
logDebug(s"Getting new files for time $currentTime, " +
s"ignoring files older than $modTimeIgnoreThreshold")
val filter = new PathFilter {
def accept(path: Path): Boolean = isNewFile(path, currentTime, modTimeIgnoreThreshold)
}
val newFiles = fs.listStatus(directoryPath, filter).map(_.getPath.toString)
val timeTaken = clock.getTimeMillis() - lastNewFileFindingTime
logInfo("Finding new files took " + timeTaken + " ms")
logDebug("# cached file times = " + fileToModTime.size)
if (timeTaken > slideDuration.milliseconds) {
logWarning(
"Time taken to find new files exceeds the batch size. " +
"Consider increasing the batch size or reducing the number of " +
"files in the monitored directory."
)
}
newFiles
} catch {
case e: Exception =>
logWarning("Error finding new files", e)
reset()
Array.empty
}
}
в
private def findNewFiles(currentTime: Long): Array[String] = {
try {
lastNewFileFindingTime = clock.getTimeMillis()
// Calculate ignore threshold
val modTimeIgnoreThreshold = math.max(
initialModTimeIgnoreThreshold, // initial threshold based on newFilesOnly setting
currentTime - durationToRemember.milliseconds // trailing end of the remember window
)
logDebug(s"Getting new files for time $currentTime, " +
s"ignoring files older than $modTimeIgnoreThreshold")
val filter = new PathFilter {
def accept(path: Path): Boolean = isNewFile(path, currentTime, modTimeIgnoreThreshold)
}
val directories = fs.listStatus(directoryPath).filter(_.isDirectory)
val newFiles = ArrayBuffer[FileStatus]()
directories.foreach(directory => newFiles.append(fs.listStatus(directory.getPath, filter) : _*))
val timeTaken = clock.getTimeMillis() - lastNewFileFindingTime
logInfo("Finding new files took " + timeTaken + " ms")
logDebug("# cached file times = " + fileToModTime.size)
if (timeTaken > slideDuration.milliseconds) {
logWarning(
"Time taken to find new files exceeds the batch size. " +
"Consider increasing the batch size or reducing the number of " +
"files in the monitored directory."
)
}
newFiles.map(_.getPath.toString).toArray
} catch {
case e: Exception =>
logWarning("Error finding new files", e)
reset()
Array.empty
}
}
будет проверять файлы во всех подпапках первой степени, вы можете настроить его, чтобы использовать временную метку партии, чтобы получить доступ к соответствующим "подкаталогам".
Я создал CustomFileInputDStream, как я упоминал, и активировал его, позвонив:
new CustomFileInputDStream[LongWritable, Text, TextInputFormat](streamingContext, d).map(_._2.toString)
Кажется, мы себя ожидаем.
Когда я пишу такое решение, я должен добавить некоторые моменты для рассмотрения:
-
Вы нарушаете инкапсуляцию Spark и создаете пользовательский класс, который вам нужно будет поддерживать только в качестве прохода времени.
-
Я считаю, что решение, подобное этому, - последнее средство. Если ваш вариант использования может быть реализован по-разному, обычно лучше избегать такого решения.
-
Если у вас будет много "подкаталогов" на S3 и будет проверять каждый из них, это будет стоить вам.
-
Будет очень интересно понять, не поддерживает ли Databricks вложенные файлы только из-за возможного штрафа за производительность или нет, возможно, есть более глубокая причина, о которой я не думал.