Каков правильный способ запуска/остановки работы с искровыми потоками в пряже?
Я экспериментирую и отправляюсь в поиски в течение многих часов, без везения.
У меня есть приложение для искрообразования, которое отлично работает в локальном искровом кластере. Теперь мне нужно развернуть его на cloudera 5.4.4. Мне нужно иметь возможность запустить его, постоянно ли работать в фоновом режиме и быть в состоянии остановить его.
Я пробовал это:
$ spark-submit --master yarn-cluster --class MyMain my.jar myArgs
Но он просто печатает эти строки бесконечно.
15/07/28 17:58:18 INFO Client: Application report for application_1438092860895_0012 (state: RUNNING)
15/07/28 17:58:19 INFO Client: Application report for application_1438092860895_0012 (state: RUNNING)
Номер вопроса 1: поскольку это потоковое приложение, оно должно запускаться непрерывно. Итак, как мне запустить его в фоновом режиме? Все примеры, которые я могу найти для подачи искровых работ на пряжу, похоже, предполагают, что приложение будет выполнять некоторую работу и прекратить работу, и поэтому вы хотите запустить ее на переднем плане. Но это не так для потоковой передачи.
Далее... в этот момент приложение, похоже, не работает. Я полагаю, что это может быть ошибкой или неправильной конфигурацией с моей стороны, поэтому я попытался посмотреть в журналах, чтобы увидеть, что происходит:
$ yarn logs -applicationId application_1438092860895_012
Но это говорит мне:
/tmp/logs/hdfs/logs/application_1438092860895_0012does not have any log files.
Итак вопрос номер 2: Если приложение работает, почему у него нет файлов журналов?
Так что в итоге мне просто пришлось убить его:
$ yarn application -kill application_1438092860895_012
Что вызывает вопрос номер 3: предполагая, что я могу в конечном итоге запустить приложение и запустить его в фоновом режиме, является ли "приложение пряжи -kill" предпочтительным способом его остановки?
Ответы
Ответ 1
- Вы можете закрыть консоль
spark-submit
. Работа выполняется в фоновом режиме уже при записи состояния RUNNING.
- Журналы видны сразу после завершения приложения. Во время выполнения все журналы доступны непосредственно на рабочих узлах локально (вы можете видеть в веб-интерфейсе пользователя ресурса YARN) и агрегируются в HDFS после завершения задания.
-
yarn application -kill
, вероятно, лучший способ остановить приложение Spark streaming, но оно не идеально. Было бы лучше сделать некоторое изящное завершение, чтобы остановить все потоковые приемники и остановить потоковый контекст, но я лично не знаю, как это сделать.
Ответ 2
Наконец-то я нахожу способ безопасно закрыть работу с искровым потоком.
- записать поток сервера сокетов ждать остановки потока.
package xxx.xxx.xxx
import java.io.{BufferedReader, InputStreamReader}
import java.net.{ServerSocket, Socket}
import org.apache.spark.streaming.StreamingContext
object KillServer {
class NetworkService(port: Int, ssc: StreamingContext) extends Runnable {
val serverSocket = new ServerSocket(port)
def run() {
Thread.currentThread().setName("Zhuangdy | Waiting for graceful stop at port " + port)
while (true) {
val socket = serverSocket.accept()
(new Handler(socket, ssc)).run()
}
}
}
class Handler(socket: Socket, ssc: StreamingContext) extends Runnable {
def run() {
val reader = new InputStreamReader(socket.getInputStream)
val br = new BufferedReader(reader)
if (br.readLine() == "kill") {
ssc.stop(true, true)
}
br.close();
}
}
def run(port:Int, ssc: StreamingContext): Unit ={
(new NetworkService(port, ssc)).run
}
}
-
в вашем методе main
, где вы начинаете потоковый контекст, добавьте следующий код
ssc.start()
KillServer.run(11212, ssc)
ssc.awaitTermination()
-
Напишите spark-submit для отправки заданий в пряжу и прямой вывод в файл, который вы будете использовать позже
spark-submit --class "com.Mainclass" \
--conf "spark.streaming.stopGracefullyOnShutdown=true" \
--master yarn-cluster --queue "root" \
--deploy-mode cluster \
--executor-cores 4 --num-executors 8 --executor-memory 3G \
hdfs:///xxx.jar > output 2>&1 &
- Наконец, безопасная работа по отключению искрового потока без потери данных или результата вычисления не сохраняется!!! (Штрих-сервер, который используется для прекращения потокового контекста, изящно работает на драйвере, поэтому вы получите результат шага 3, чтобы получить драйвер addr, и используя echo nc для отправки команды уничтожения сокета)
#!/bin/bash
driver=`cat output | grep ApplicationMaster | grep -Po '\d+.\d+.\d+.\d+'`
echo "kill" | nc $driver 11212
driverid=`yarn application -list 2>&1 | grep ad.Stat | grep -Po 'application_\d+_\d+'`
yarn application -kill $driverid
Ответ 3
- Каков ваш источник данных? Если он надежный, как прямой приемник Kafka, остановка уничтожения пряжи должна быть прекрасной. Когда ваше приложение перезагрузится, оно будет считываться с последнего полного смещения партии. Если источник данных не является надежным или вы хотите обработать изящное завершение самостоятельно, вы должны реализовать какой-то внешний крючок в потоковом контексте. Я столкнулся с одной и той же проблемой, и в итоге я создал небольшой взлом, чтобы добавить новую вкладку в webui, которая действует как кнопка остановки.
Ответ 4
Последний элемент головоломки - это то, как остановить приложение Spark Streaming, развернутое на YARN, изящно. Стандартный метод остановки (или, скорее, убийства) приложения YARN используется командой yarn application -kill [applicationId]
. И эта команда останавливает приложение Spark Streaming, но это может произойти в середине партии. Поэтому, если задание считывает данные из Kafka, сохраняет результаты обработки на HDFS и, наконец, совершает смещения Kafka, вы должны ожидать дублирования данных на HDFS, когда работа была остановлена непосредственно перед выполнением смещений.
Первой попыткой решить изящную проблему выключения было вызов метода остановки контекстного потока Spark в hookdown завершения.
sys.addShutdownHook {
streamingContext.stop(stopSparkContext = true, stopGracefully = true)
}
Разочарование крюка выключения слишком поздно, чтобы закончить начатую партию, и приложение Spark убито почти сразу. Кроме того, нет никакой гарантии, что курок остановки будет вызван JVM вообще.
Во время написания этого сообщения в блоге единственный подтвержденный способ законсервирования отключить приложение Spark Streaming на YARN - это как-то уведомлять приложение о запланированном завершении работы, а затем останавливать потоковый контекст программно (но не от завершения остановки). Команда yarn application -kill
должна использоваться только в качестве последнего средства, если заявленное приложение не прекратилось после определенного таймаута.
Приложение может быть уведомлено о запланированном завершении работы с использованием файла маркера на HDFS (самый простой способ) или с использованием простой конечной точки Socket/HTTP, отображаемой на драйвере (сложным способом).
Поскольку мне нравится принцип KISS, ниже вы можете найти псевдокод оболочки script для запуска/остановки приложения Spark Streaming с использованием файла маркера:
start() {
hdfs dfs -touchz /path/to/marker/my_job_unique_name
spark-submit ...
}
stop() {
hdfs dfs -rm /path/to/marker/my_job_unique_name
force_kill=true
application_id=$(yarn application -list | grep -oe "application_[0-9]*_[0-9]*"`)
for i in `seq 1 10`; do
application_status=$(yarn application -status ${application_id} | grep "State : \(RUNNING\|ACCEPTED\)")
if [ -n "$application_status" ]; then
sleep 60s
else
force_kill=false
break
fi
done
$force_kill && yarn application -kill ${application_id}
}
В приложении Spark Streaming фоновый поток должен контролировать файл маркера, а когда файл исчезает, остановите контекст, вызывающий
streamingContext.stop(stopSparkContext = true, stopGracefully = true)
Также вы можете сослаться на http://blog.parseconsulting.com/2017/02/how-to-shutdown-spark-streaming-job.html