Рекурсивно извлекать содержимое файла из подкаталогов с помощью sc.textFile
Кажется, что SparkContext textFile ожидает, что в данной папке будут присутствовать только файлы - это не означает
- (a) recurse или
- (b) даже поддерживать каталоги (пытается читать каталоги в виде файлов)
Любое предложение о том, как структурировать рекурсию - возможно, проще, чем вручную создать рекурсивный список файлов/спуск?
Вот пример использования: файлы под
/данные/таблицы/my_table
Я хочу, чтобы читать через hdfs, вызывать все файлы на всех уровнях каталогов в этом родительском каталоге.
UPDATE
Sc.textFile() вызывает Hadoop FileInputFormat через (подкласс) TextInputFormat. Внутри логики существует рекурсивное чтение каталога - то есть сначала обнаружение, если запись была каталогом, и если да, то спускаться:
<!-- language: java -->
for (FileStatus globStat: matches) {
218 if (globStat.isDir()) {
219 for(FileStatus stat: fs.listStatus(globStat.getPath(),
220 inputFilter)) {
221 result.add(stat);
222 }
223 } else {
224 result.add(globStat);
225 }
226 }
Однако при вызове sc.textFile в записи каталога появляются ошибки: "not a file". Такое поведение запутывает - учитывая, что для обработки каталогов необходима надлежащая поддержка.
Ответы
Ответ 1
Я смотрел старую версию FileInputFormat..
ПЕРЕД настройка рекурсивной конфигурации mapreduce.input.fileinputformat.input.dir.recursive
scala> sc.textFile("dev/*").count
java.io.IOException: Not a file: file:/shared/sparkup/dev/audit-release/blank_maven_build
Значение по умолчанию равно null/не задано как "false":
scala> sc.hadoopConfiguration.get("mapreduce.input.fileinputformat.input.dir.recursive")
res1: String = null
ПОСЛЕ:
Теперь установите значение:
sc.hadoopConfiguration.set("mapreduce.input.fileinputformat.input.dir.recursive","true")
Теперь повторите рекурсивную операцию:
scala>sc.textFile("dev/*/*").count
..
res5: Long = 3481
So it works.
Обновление добавлено/для полной рекурсии за комментарий @Ben
Ответ 2
Я обнаружил, что эти параметры должны быть установлены следующим образом:
.set("spark.hive.mapred.supports.subdirectories","true")
.set("spark.hadoop.mapreduce.input.fileinputformat.input.dir.recursive","true")