Перемещение файлов в Hadoop с использованием Java API?

Я хочу перемещать файлы в HDFS с помощью API Java. Я не могу понять, как это сделать. Кажется, что только класс FileSystem хочет разрешить перемещение в локальную файловую систему и обратно... но я хочу сохранить их в HDFS и переместить туда.

Я пропустил что-то основное? Единственный способ, который я могу сделать, это прочитать его из потока ввода и записать его обратно... и затем удалить старую копию (yuck).

спасибо

Ответы

Ответ 1

Используйте FileSystem.rename():

public abstract boolean rename(Path src, Path dst) throws IOException

Переименует путь src в путь dst. Может выполняться на локальном fs или удаленном DFS.

Параметры:
src - путь для переименования
dst - новый путь после переименования
Возврат:
true если переименовать успешно Броски:
IOException - при сбое

Ответ 2

Подход java.nio. * не всегда работает на HDFS. Таким образом, найдено следующее решение, которое работает.

Переместить файлы из одного каталога в другой, используя org.apache.hadoop.fs.FileUtil.copy API

val fs = FileSystem.get(new Configuration())
        val conf = new org.apache.hadoop.conf.Configuration()
        val srcFs = FileSystem.get(new org.apache.hadoop.conf.Configuration())
        val dstFs = FileSystem.get(new org.apache.hadoop.conf.Configuration())
        val dstPath = new org.apache.hadoop.fs.Path(DEST_FILE_DIR)

        for (file <- fileList) {
          // The 5th parameter indicates whether source should be deleted or not
          FileUtil.copy(srcFs, file, dstFs, dstPath, true, conf)

Ответ 4

hdfsDirectory="hdfs://srcPath"   
 val conf = new org.apache.hadoop.conf.Configuration()
        val src:Path = new org.apache.hadoop.fs.Path(hdfsDirectory)
        val fs = FileSystem.get(src.toUri,conf)
        val srcPath: Path = new Path("hdfs://srcPath")
        val srcFs =FileSystem.get(srcPath.toUri,conf)
        val dstPath:Path =new Path("hdfs://targetPath/")
        val dstFs =FileSystem.get(dstPath.toUri,conf)
        val exists = fs.exists(new org.apache.hadoop.fs.Path(hdfsDirectory))
        val status:Array[FileStatus] = fs.listStatus(new Path(hdfsDirectory))
        if (status.length>0) {
          status.foreach(x => {
            println("My files: " + x.getPath)
            FileUtil.copy(srcFs, x.getPath, dstFs, dstPath, true, conf)
            println("Files moved !!" +x.getPath)
          }
          )}
        else{
          println("No Files Found !!")
        }