Ответ 1
Для файла в HDFS вы можете использовать способ hadoop:
val conf = sc.hadoopConfiguration
val fs = org.apache.hadoop.fs.FileSystem.get(conf)
val exists = fs.exists(new org.apache.hadoop.fs.Path("/path/on/hdfs/to/SUCCESS.txt"))
Я новичок в искру, и у меня есть вопрос. У меня есть двухэтапный процесс, в котором первый шаг записывает файл SUCCESS.txt в местоположение на HDFS. Мой второй шаг, который является искровым заданием, должен проверить, существует ли этот файл SUCCESS.txt до того, как он начнет обработку данных.
Я проверил искровой API и не нашел никакого метода, который проверяет, существует ли файл. Любые идеи, как справиться с этим?
Единственный найденный метод - sc.textFile(hdfs:///SUCCESS.txt).count(), который генерирует исключение, если файл не существует. Я должен поймать это исключение и написать свою программу соответственно. Мне не понравился этот подход. Надеюсь найти лучшую альтернативу.
Для файла в HDFS вы можете использовать способ hadoop:
val conf = sc.hadoopConfiguration
val fs = org.apache.hadoop.fs.FileSystem.get(conf)
val exists = fs.exists(new org.apache.hadoop.fs.Path("/path/on/hdfs/to/SUCCESS.txt"))
Я скажу, лучший способ вызвать это через функцию, которая внутренне проверяет наличие файла в традиционной проверке файла hadoop.
object OutputDirCheck {
def dirExists(hdfsDirectory: String): Boolean = {
val hadoopConf = new org.apache.hadoop.conf.Configuration()
val fs = org.apache.hadoop.fs.FileSystem.get(hadoopConf)
fs.exists(new org.apache.hadoop.fs.Path(hdfsDirectory))
}
}
Для Pyspark вы можете достичь этого, не вызывая подпроцесс, используя что-то вроде:
fs = sc._jvm.org.apache.hadoop.fs.FileSystem.get(sc._jsc.hadoopConfiguration())
fs.exists(sc._jvm.org.apache.hadoop.fs.Path("path/to/SUCCESS.txt"))
Для Java-кодеров;
SparkConf sparkConf = new SparkConf().setAppName("myClassname");
SparkContext sparky = new SparkContext(sparkConf);
JavaSparkContext context = new JavaSparkContext(sparky);
FileSystem hdfs = org.apache.hadoop.fs.FileSystem.get(context.hadoopConfiguration());
Path path = new Path(sparkConf.get(path_to_File));
if (!hdfs.exists(path)) {
//Path does not exist.
}
else{
//Path exist.
}
Для пользователей pyspark pyson:
Я ничего не нашел с python или pyspark, поэтому нам нужно выполнить команду hdfs из кода python. Это сработало для меня.
Команда hdfs, чтобы получить, если папка Exisits: возвращение 0, если true
hdfs dfs -test -d /folder-path
команда hdfs для получения, если файл существует: возврат 0, если true
hdfs dfs -test -d /folder-path
Для размещения этого кода в python я следую ниже строкам кода:
import subprocess
def run_cmd(args_list):
proc = subprocess.Popen(args_list, stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
proc.communicate()
return proc.returncode
cmd = ['hdfs', 'dfs', '-test', '-d', "/folder-path"]
code = run_cmd(cmd)
if code == 0:
print('folder exist')
print(code)
Вывод, если папка существует:
существует папка 0
Для PySpark:
from py4j.protocol import Py4JJavaError
def path_exist(path):
try:
rdd = sc.textFile(path)
rdd.take(1)
return True
except Py4JJavaError as e:
return False