Регистрация Apache Spark в Scala
Я ищу решение, позволяющее записывать дополнительные данные при выполнении кода в Apache Spark Nodes, которые могут помочь в дальнейшем выявить некоторые проблемы, которые могут возникнуть во время выполнения. Пытаться использовать традиционное решение, например, com.typesafe.scalalogging.LazyLogging
, не удается, потому что экземпляр журнала не может быть сериализован в распределенной среде, такой как Apache Spark.
Я исследовал эту проблему, и на данный момент решение, которое я нашел, это использовать черту org.apache.spark.Logging
следующим образом:
class SparkExample with Logging {
val someRDD = ...
someRDD.map {
rddElement => logInfo(s"$rddElement will be processed.")
doSomething(rddElement)
}
}
Однако похоже, что свойство Logging не является постоянным решением для Apache Spark, поскольку оно помечено как @DeveloperApi
, и в документации по классу упоминается:
Вероятно, это будет изменено или удалено в будущих выпусках.
Мне интересно - это любое известное решение для ведения журнала, которое я могу использовать, и позволяю мне регистрировать данные, когда RDD выполняются на узлах Apache Spark?
@Later Edit. Некоторые из комментариев ниже предлагают использовать Log4J. Я попытался использовать Log4J, но у меня все еще возникают проблемы при использовании регистратора из класса Scala (а не объекта Scala).
Вот мой полный код:
import org.apache.log4j.Logger
import org.apache.spark._
object Main {
def main(args: Array[String]) {
new LoggingTestWithRDD().doTest()
}
}
class LoggingTestWithRDD extends Serializable {
val log = Logger.getLogger(getClass.getName)
def doTest(): Unit = {
val conf = new SparkConf().setMaster("local[4]").setAppName("LogTest")
val spark = new SparkContext(conf)
val someRdd = spark.parallelize(List(1, 2, 3))
someRdd.map {
element =>
log.info(s"$element will be processed")
element + 1
}
spark.stop()
}
}
Исключением, которое я вижу, является:
Исключение в потоке "main" org.apache.spark.SparkException: Задача не сериализуема → Причиняется: java.io.NotSerializableException: org.apache.log4j.Logger
Ответы
Ответ 1
Вы можете использовать решение Akhil, предложенное в
https://www.mail-archive.com/[email protected]/msg29010.html.
Я использовал сам, и он работает.
Ахиль Дас Мон, 25 мая 2015 года 08:20:40 -0700
Попробуйте следующим образом:
object Holder extends Serializable {
@transient lazy val log = Logger.getLogger(getClass.getName)
}
val someRdd = spark.parallelize(List(1, 2, 3)).foreach { element =>
Holder.log.info(element)
}
Ответ 2
val log = Logger.getLogger(getClass.getName),
Вы можете использовать "журнал" для записи журналов. Кроме того, если вам нужны свойства журнала изменений, вам нужно иметь log4j.properties в папке /conf. По умолчанию у нас будет шаблон в этом месте.
Ответ 3
Использовать Log4j 2.x. Основной регистратор был сериализуемым. Проблема решена.
Обсуждение Jira: https://issues.apache.org/jira/browse/LOG4J2-801
"org.apache.logging.log4j" % "log4j-api" % "2.x.x"
"org.apache.logging.log4j" % "log4j-core" % "2.x.x"
"org.apache.logging.log4j" %% "log4j-api- scala" % "2.x.x"
Ответ 4
Вот мое решение:
Я использую SLF4j (с привязкой Log4j),
в моем базовом классе каждой искровой работы у меня есть что-то вроде этого:
import org.slf4j.LoggerFactory
val LOG = LoggerFactory.getLogger(getClass)
Как раз перед тем местом, где я использую LOG
в распределенном функциональном коде, я скопирую ссылку на локатор на локальную константу.
val LOG = this.LOG
Это сработало для меня!