Запуск pyspark script на EMR
В настоящее время я автоматизирую скрипты Apache Spark Pyspark с использованием кластеров EC2 с использованием предварительно сконфигурированного каталога. /ec 2. Для целей автоматизации и планирования я хотел бы использовать модуль Boto EMR для отправки сценариев в кластер.
Я смог загрузить и установить Spark в кластере EMR. Я также могу запустить script в EMR, используя мою локальную версию pyspark и установив мастер как таковой:
$: MASTER=spark://<insert EMR master node of cluster here> ./bin/pyspark <myscriptname.py>
Однако для этого требуется, чтобы я запускал этот script локально, и поэтому я не могу полностью использовать способность Boto: 1) запустить кластер 2) добавить шаги script и 3) остановить кластер. Я нашел примеры, используя команды script -runner.sh и emr "step" для spark-shell (scala), но я предполагаю, что есть более простой способ сделать это с помощью модуля Python (pyspark). Спасибо за это заранее!
Ответы
Ответ 1
Вот отличный пример того, как его нужно настроить. Перейдите к "Быстрый пример" для кода Python.
Однако для того, чтобы все работало в emr-4.7.2, нужно было сделать несколько настроек, так что вот команда AWS CLI, которая работала для меня:
aws emr add-steps --cluster-id <Your EMR cluster id> --steps Type=spark,Name=TestJob,Args=[--deploy-mode,cluster,--master,yarn,--conf,spark.yarn.submit.waitAppCompletion=true,s3a://your-source-bucket/code/pythonjob.py,s3a://your-source-bucket/data/data.csv,s3a://your-destination-bucket/test-output/],ActionOnFailure=CONTINUE
И вот содержимое файла pythonjob.py
:
from __future__ import print_function
from pyspark import SparkContext
import sys
if __name__ == "__main__":
if len(sys.argv) != 3:
print("Usage: testjob ", file=sys.stderr)
exit(-1)
sc = SparkContext(appName="MyTestJob")
dataTextAll = sc.textFile(sys.argv[1])
dataRDD = dataTextAll.map(lambda x: x.split(",")).map(lambda y: (str(y[0]), float(y[1]))).reduceByKey(lambda a, b: a + b)
dataRDD.saveAsTextFile(sys.argv[2])
sc.stop()
Он считывает файл data.csv
из S3, разбивает каждую строку, преобразует первое значение в строку и второй - плавает, группирует по первому значению и суммирует значения во втором столбце и записывает результат обратно в S3.
Несколько комментариев:
- Я решил оставить
spark.yarn.submit.waitAppCompletion=true
так
что я могу контролировать выполнение задания в консоли.
- Пути ввода и вывода (
sys.argv[1]
и sys.argv[2]
соответственно) передаются в script как часть задания sumbission (Args
в команде add-steps
).
- Имейте в виду, что при настройке задания вы должны использовать
s3a://
URI вместо s3n://
и s3://
для Hadoop 2.7+.
- Если ваш кластер находится в VPC, вам нужно создать конечную точку VPC для Amazon S3, если вы намерены читать/писать оттуда в своем Рабочие места EMR.
Ответ 2
Это может быть полезно, хотя он не использует boto.
Используйте aws cli, чтобы создать кластер и добавить к нему шаги (искровое задание).
1) Создайте кластер:
aws emr create-cluster --name "Spark cluster" --ami-version 3.8 --applications Name=Spark --ec2-attributes KeyName=ir --log-uri s3://Path/logs --instance-type m3.xlarge --instance-count 1 --use-default-roles
2) добавьте шаг (искровое задание). Обратите внимание, что ваш python script должен храниться в master node (в этом случае он находится в /home/hadoop/spark ).
aws emr add-steps --cluster-id j-xxxxxxx --steps Name=Spark,Jar=s3://eu-west-1.elasticmapreduce/libs/script-runner/script-runner.jar,Args=[/home/hadoop/spark/bin/spark-submit,--deploy-mode,client,/home/hadoop/spark/myscript.py],ActionOnFailure=CONTINUE
вы также можете объединить два шага в один и создать задание кластера/запуска и завершить работу кластера.
Несколько заметок: 1) Я пробовал несколько способов прочитать script из S3, но не Удача:(
поэтому я закончил копирование с помощью boto или aws cli в node. 2), так как я тестировал, что на одном node в emr режим развертывания на шаге является клиентом для клиента, вы должны изменить его на кластер.
Ответ 3
вам нужно изменить режим развертывания на кластер (вместо клиента) для доступа к script из S3.