Рекурсивно извлекать содержимое файла из подкаталогов с помощью sc.textFile

Кажется, что SparkContext textFile ожидает, что в данной папке будут присутствовать только файлы - это не означает

  • (a) recurse или
  • (b) даже поддерживать каталоги (пытается читать каталоги в виде файлов)

Любое предложение о том, как структурировать рекурсию - возможно, проще, чем вручную создать рекурсивный список файлов/спуск?

Вот пример использования: файлы под

/данные/таблицы/my_table

Я хочу, чтобы читать через hdfs, вызывать все файлы на всех уровнях каталогов в этом родительском каталоге.

UPDATE

Sc.textFile() вызывает Hadoop FileInputFormat через (подкласс) TextInputFormat. Внутри логики существует рекурсивное чтение каталога - то есть сначала обнаружение, если запись была каталогом, и если да, то спускаться:

<!-- language: java -->
     for (FileStatus globStat: matches) {
218          if (globStat.isDir()) {
219            for(FileStatus stat: fs.listStatus(globStat.getPath(),
220                inputFilter)) {
221              result.add(stat);
222            }          
223          } else {
224            result.add(globStat);
225          }
226        }

Однако при вызове sc.textFile в записи каталога появляются ошибки: "not a file". Такое поведение запутывает - учитывая, что для обработки каталогов необходима надлежащая поддержка.

Ответы

Ответ 1

Я смотрел старую версию FileInputFormat..

ПЕРЕД настройка рекурсивной конфигурации mapreduce.input.fileinputformat.input.dir.recursive

scala> sc.textFile("dev/*").count
     java.io.IOException: Not a file: file:/shared/sparkup/dev/audit-release/blank_maven_build

Значение по умолчанию равно null/не задано как "false":

scala> sc.hadoopConfiguration.get("mapreduce.input.fileinputformat.input.dir.recursive")
res1: String = null

ПОСЛЕ:

Теперь установите значение:

sc.hadoopConfiguration.set("mapreduce.input.fileinputformat.input.dir.recursive","true")

Теперь повторите рекурсивную операцию:

scala>sc.textFile("dev/*/*").count

..
res5: Long = 3481

So it works.

Обновление добавлено/для полной рекурсии за комментарий @Ben

Ответ 2

Я обнаружил, что эти параметры должны быть установлены следующим образом:

.set("spark.hive.mapred.supports.subdirectories","true")
.set("spark.hadoop.mapreduce.input.fileinputformat.input.dir.recursive","true")