Как реализовать пользовательский приемник/трекер задач в Spark?
У меня есть класс, как показано ниже, и когда я запускаю это через командную строку, я хочу видеть статус прогресса. что-то вроде <
10% completed...
30% completed...
100% completed...Job done!
Я использую искру 1.0 на пряжу и использую Java API.
public class MyJavaWordCount {
public static void main(String[] args) throws Exception {
if (args.length < 2) {
System.err.println("Usage: MyJavaWordCount <master> <file>");
System.exit(1);
}
System.out.println("args[0]: <master>="+args[0]);
System.out.println("args[1]: <file>="+args[1]);
JavaSparkContext ctx = new JavaSparkContext(
args[0],
"MyJavaWordCount",
System.getenv("SPARK_HOME"),
System.getenv("SPARK_EXAMPLES_JAR"));
JavaRDD<String> lines = ctx.textFile(args[1], 1);
// output input output
JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
// output input
public Iterable<String> call(String s) {
return Arrays.asList(s.split(" "));
}
});
// K V input K V
JavaPairRDD<String, Integer> ones = words.mapToPair(new PairFunction<String, String, Integer>() {
// K V input
public Tuple2<String, Integer> call(String s) {
// K V
return new Tuple2<String, Integer>(s, 1);
}
});
JavaPairRDD<String, Integer> counts = ones.reduceByKey(new Function2<Integer, Integer, Integer>() {
public Integer call(Integer i1, Integer i2) {
return i1 + i2;
}
});
List<Tuple2<String, Integer>> output = counts.collect();
for (Tuple2 tuple : output) {
System.out.println(tuple._1 + ": " + tuple._2);
}
System.exit(0);
}
}
Ответы
Ответ 1
Если вы используете scala -spark, этот код поможет вам добавить световой приемник.
Создайте свой SparkContext
val sc=new SparkContext(sparkConf)
Теперь вы можете добавить своего искрового прослушивателя в искровом контексте
sc.addSparkListener(new SparkListener() {
override def onApplicationStart(applicationStart: SparkListenerApplicationStart) {
println("Spark ApplicationStart: " + applicationStart.appName);
}
override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd) {
println("Spark ApplicationEnd: " + applicationEnd.time);
}
});
Вот список интерфейсов для прослушивания событий из расписания Spark.
Ответ 2
Вы должны реализовать SparkListener
. Просто переопределите любые события, которые вас интересуют (задание/этап/начало/конец события), затем вызовите sc.addSparkListener(myListener)
.
Это не дает вам прямого отслеживания прогресса, основанного на процентах, но, по крайней мере, вы можете отслеживать этот прогресс и его интенсивность. Трудность объясняется тем, насколько непредсказуемым может быть количество этапов искры, а также то, как время работы каждого этапа может значительно отличаться. Прогресс на этапе должен быть более предсказуемым.
Ответ 3
Прежде всего, если вы хотите отслеживать прогресс, то вы можете рассмотреть spark.ui.showConsoleProgress
PLS check @Yijie Shens answer (Исходный результат: стиль журнала и прогресс-стиль) для этого.
Я думаю, что нет необходимости внедрять слушателя Spark для такой вещи. Если вы не очень конкретны.
Вопрос: Как реализовать пользовательский приемник/трекер задач в Spark?
Вы можете использовать SparkListener и перехватывать события SparkListener.
Классический пример этой реализации с помощью Spark Framework - это HeartBeatReceiver.
Пример: HeartBeatReceiver.scala
/**
* Lives in the driver to receive heartbeats from executors..
*/
private[spark] class HeartbeatReceiver(sc: SparkContext, clock: Clock)
extends SparkListener with ThreadSafeRpcEndpoint with Logging {
def this(sc: SparkContext) {
this(sc, new SystemClock)
}
sc.addSparkListener(this) ...
Ниже приведен список доступных событий прослушивателя. из которых приложения/задания должны быть полезны для вас
-
SparkListenerApplicationStart
-
SparkListenerJobStart
-
SparkListenerStageSubmitted
-
SparkListenerTaskStart
-
SparkListenerTaskGettingResult
-
SparkListenerTaskEnd
-
SparkListenerStageCompleted
-
SparkListenerJobEnd
-
SparkListenerApplicationEnd
-
SparkListenerEnvironmentUpdate
-
SparkListenerBlockManagerAdded
-
SparkListenerBlockManagerRemoved
-
SparkListenerBlockUpdated
-
SparkListenerUnpersistRDD
-
SparkListenerExecutorAdded
-
SparkListenerExecutorRemoved