Ответ 1
Я считаю, что здесь нет простого и универсального ответа. Многое зависит от семантики приложения, типа источников данных (надежного приемника, надежного приемника, файловой системы, без приемника) и требований.
В общем случае вы не должны допускать сбой приложения при одном сбое IO. Предполагая, что у вас есть какое-то действие:
outputAction[T](rdd: RDD[T]): Unit = ???
по крайней мере убедитесь, что он не будет распространять исключение на ваш драйвер.
outputActionWithDelay[T](d: Duration)(rdd: RDD[T]) = ???
stream foreachRDD { rdd => Try(outputAction(rdd)) }
Вопрос остается следующим. Самое простое, что вы можете сделать, это отказаться от данного окна. В зависимости от приложения это может быть приемлемым решением или нет, но в целом существует много случаев, когда потеря некоторых данных вполне приемлема.
Это может быть дополнительно улучшено путем отслеживания сбоев и принятия некоторых других действий, если достигнут какой-либо порог.
Если данные сброса неприемлемы, следующий шаг - повторить попытку после некоторой задержки:
outputActionWithDelay[T](d: Duration)(rdd: RDD[T]) = ???
stream foreachRDD {
rdd => Try(outputAction(rdd))
.recoverWith { case _ => Try(outputActionWithDelay(d1)(rdd)) }
.recoverWith { case _ => Try(outputActionWithDelay(d2)(rdd)) }
...
}
Количество повторений и длительности задержки будет варьироваться в зависимости от случая и депонирования источника и возможности хранения входящих данных.
Что вы можете сделать, когда ударяете последнюю попытку? Для начала мы можем добавить альтернативный источник вывода. Вместо использования первичного источника вы можете, например, выталкивать все в надежное хранилище внешних файлов и беспокоиться об этом позже. Это может неприменимо, если источник вывода требует определенного порядка входящих данных, но в противном случае стоит попробовать.
alternativeOutputAction[T](rdd: RDD[T]) = ???
stream foreachRDD {
rdd => Try(outputAction(rdd))
.recoverWith { case _ => Try(outputActionWithDelay(d1)
...
.recoverWith { case _ => Try(outputActionWithDelay(dn)(rdd)) }
.recoverWith { case _ => Try(alternativeOutputAction(rdd))
}
Если это терпит неудачу, это, вероятно, является симптомом серьезных проблем, и мы не можем много сделать на уровне приложений. Мы можем вернуться к первому подходу и просто надеяться, что ситуация скоро решится или выберет более сложный подход.
Если источник ввода может буферизовать данные, и мы используем надежное хранилище и репликацию, мы можем включить контрольную точку и просто убить приложение.
Если вы попытаетесь восстановить, вероятно, неплохо добавить некоторый вариант CircuitBreaker, и если приложение столкнулось с несколькими неудачами, для достижения попыток восстановления первичного выхода без задержки.