Чтение файлов, отправленных с помощью spark-submit драйвером
Я отправляю задание Spark для запуска в удаленном кластере, запустив
spark-submit ... --deploy-mode cluster --files some.properties ...
Я хочу прочитать содержимое файла some.properties
с помощью кода драйвера, т.е. до создания контекста Spark и запуска задач RDD. Файл копируется в удаленный драйвер, но не в рабочий каталог драйвера.
Способы этой проблемы, о которых я знаю, следующие:
- Загрузите файл в HDFS
- Сохраните файл в банке приложения
Оба неудобны, поскольку этот файл часто изменяется на отправляющей машине dev.
Есть ли способ прочитать файл, который был загружен с использованием флага --files
во время основного метода кода драйвера?
Ответы
Ответ 1
Да, вы можете получить доступ к файлам, загруженным с помощью аргумента --files
.
Вот как я могу получить доступ к файлам, переданным через --files
:
./bin/spark-submit \
--class com.MyClass \
--master yarn-cluster \
--files /path/to/some/file.ext \
--jars lib/datanucleus-api-jdo-3.2.6.jar,lib/datanucleus-rdbms-3.2.9.jar,lib/datanucleus-core-3.2.10.jar \
/path/to/app.jar file.ext
и в моем искровом коде:
val filename = args(0)
val linecount = Source.fromFile(filename).getLines.size
Я действительно верю, что эти файлы загружаются на рабочих в том же каталоге, что и jar, поэтому просто передается имя файла, а не абсолютный путь к Source.fromFile
.
Ответ 2
Параметры -files и --archives поддерживают указание имен файлов С#, аналогичным Hadoop. Например, вы можете указать: -files localtest.txt # appSees.txt, и это выведет файл, который вы локально назвали localtest.txt в HDFS, но это будет связано с именем appSees.txt, и ваше приложение должно использовать имя как appSees.txt, чтобы ссылаться на него при запуске на YARN.
это работает для моего искрового потокового приложения как в режиме пряжи/клиента, так и в режиме пряжи/кластера. может помочь вам
Ответ 3
После расследования я нашел одно решение по вышеуказанному вопросу. Отправьте конфигурацию any.properties во время spark-submit и используйте ее драйвером spark до и после инициализации SparkSession. Надеюсь, это поможет вам.
any.properties
spark.key=value
spark.app.name=MyApp
SparkTest.java
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
public class SparkTest{
public Static void main(String[] args){
String warehouseLocation = new File("spark-warehouse").getAbsolutePath();
Config conf = loadConf();
System.out.println(conf.getString("spark.key"));
// Initialize SparkContext and use configuration from properties
SparkConf sparkConf = new SparkConf(true).setAppName(conf.getString("spark.app.name"));
SparkSession sparkSession =
SparkSession.builder().config(sparkConf).config("spark.sql.warehouse.dir", warehouseLocation)
.enableHiveSupport().getOrCreate();
JavaSparkContext javaSparkContext = new JavaSparkContext(sparkSession.sparkContext());
}
public static Config loadConf() {
String configFileName = "any.properties";
System.out.println(configFileName);
Config configs = ConfigFactory.load(ConfigFactory.parseFile(new java.io.File(configFileName)));
System.out.println(configs.getString("spark.key")); // get value from properties file
return configs;
}
}
Искра Отправить:
spark-submit --class SparkTest --master yarn --deploy-mode client --files any.properties,yy-site.xml --jars ...........
Ответ 4
Вот хорошее решение, которое я разработал в Python Spark, чтобы интегрировать любые данные в виде файла извне на платформу Big Data.
Удачи.
# Load from the Spark driver any local text file and return a RDD (really useful in YARN mode to integrate new data at the fly)
# (See https://community.hortonworks.com/questions/38482/loading-local-file-to-apache-spark.html)
def parallelizeTextFileToRDD(sparkContext, localTextFilePath, splitChar):
localTextFilePath = localTextFilePath.strip(' ')
if (localTextFilePath.startswith("file://")):
localTextFilePath = localTextFilePath[7:]
import subprocess
dataBytes = subprocess.check_output("cat " + localTextFilePath, shell=True)
textRDD = sparkContext.parallelize(dataBytes.split(splitChar))
return textRDD
# Usage example
myRDD = parallelizeTextFileToRDD(sc, '~/myTextFile.txt', '\n') # Load my local file as a RDD
myRDD.saveAsTextFile('/user/foo/myTextFile') # Store my data to HDFS
Ответ 5
Поворот проблемы заключается в том, что вы можете создать временный SparkContext
просто, вызвав SparkContext.getOrCreate()
, а затем прочитайте файл, который вы передали в --files
, с помощью SparkFiles.get('FILE')
.
Как только вы прочитаете файл, вы получите всю необходимую конфигурацию, необходимую в переменной SparkConf()
.
После этого вызовите эту функцию:
SparkContext.stop(SparkContext.getOrCreate())
Это приведет к отключению существующего SparkContext
, а в следующей строке просто инициализирует новый SparkContext
с необходимыми конфигурациями, подобными этому.
sc = SparkContext(conf=conf).getOrCreate()
Вы получили SparkContext
с желаемыми настройками
Ответ 6
используйте spark-submit --help
, обнаружите, что эта опция только для рабочего каталога исполнителя, а не драйвера.
--files FILES: Comma-separated list of files to be placed in the working directory of each executor.