Ответ 1
Вы можете получить регистратор из объекта SparkContext:
log4jLogger = sc._jvm.org.apache.log4j
LOGGER = log4jLogger.LogManager.getLogger(__name__)
LOGGER.info("pyspark script logger initialized")
У меня есть программа Python Spark, которую я запускаю с spark-submit
. Я хочу поставить в него записи ведения журнала.
logging.info("This is an informative message.")
logging.debug("This is a debug message.")
Я хочу использовать тот же журнал, что и Spark, чтобы сообщения журнала выходили в одном формате, а уровень управлялся теми же конфигурационными файлами. Как это сделать?
Я попытался помещать инструкции logging
в код и начинать с logging.getLogger()
. В обоих случаях я вижу сообщения журнала Spark, но не мои. Я смотрел документацию по протоколу Python, но не смог понять ее там.
Не уверен, что это что-то особенное для скриптов, представленных Spark, или просто я не понимаю, как работает журнал.
Вы можете получить регистратор из объекта SparkContext:
log4jLogger = sc._jvm.org.apache.log4j
LOGGER = log4jLogger.LogManager.getLogger(__name__)
LOGGER.info("pyspark script logger initialized")
Вам нужно получить журнал для самой искры, по умолчанию getLogger() вернет регистратор для вашего собственного модуля. Попробуйте что-то вроде:
logger = logging.getLogger('py4j')
logger.info("My test info statement")
Он также может быть "pyspark" вместо "py4j".
Если функция, которую вы используете в своей искровой программе (и которая выполняет некоторые протоколирования), определена в том же модуле, что и основная функция, она приведет к некоторой ошибке сериализации.
Это объясняется здесь, и пример того же человека дается здесь
Я также испытал это на искре 1.3.1
ИЗМЕНИТЬ:
Чтобы изменить регистрацию с STDERR на STDOUT, вам нужно будет удалить текущий StreamHandler и добавить новый.
Найти существующего обработчика потока (эта строка может быть удалена по окончании)
print(logger.handlers)
# will look like [<logging.StreamHandler object at 0x7fd8f4b00208>]
Вероятно, будет только один, но если нет, вам нужно будет обновить позицию.
logger.removeHandler(logger.handlers[0])
Добавить новый обработчик для sys.stdout
import sys # Put at top if not already there
sh = logging.StreamHandler(sys.stdout)
sh.setLevel(logging.DEBUG)
logger.addHandler(sh)
В моем случае я просто рад, что мои сообщения в журнале добавлены к рабочим stderr вместе с обычными сообщениями журнала искробезопасности.
Если это соответствует вашим потребностям, тогда трюк должен перенаправить конкретный регистратор Python на stderr
.
Например, следующее, вдохновленное этот ответ, отлично подходит для меня:
def getlogger(name, level=logging.INFO):
import logging
import sys
logger = logging.getLogger(name)
logger.setLevel(level)
if logger.handlers:
# or else, as I found out, we keep adding handlers and duplicate messages
pass
else:
ch = logging.StreamHandler(sys.stderr)
ch.setLevel(level)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
ch.setFormatter(formatter)
logger.addHandler(ch)
return logger
Использование:
def tst_log():
logger = getlogger('my-worker')
logger.debug('a')
logger.info('b')
logger.warning('c')
logger.error('d')
logger.critical('e')
...
Выход (плюс несколько окружающих линий для контекста):
17/05/03 03:25:32 INFO MemoryStore: Block broadcast_24 stored as values in memory (estimated size 5.8 KB, free 319.2 MB)
2017-05-03 03:25:32,849 - my-worker - INFO - b
2017-05-03 03:25:32,849 - my-worker - WARNING - c
2017-05-03 03:25:32,849 - my-worker - ERROR - d
2017-05-03 03:25:32,849 - my-worker - CRITICAL - e
17/05/03 03:25:32 INFO PythonRunner: Times: total = 2, boot = -40969, init = 40971, finish = 0
17/05/03 03:25:32 INFO Executor: Finished task 7.0 in stage 20.0 (TID 213). 2109 bytes result sent to driver
Ключ взаимодействия pyspark и java log4j - это jvm. Ниже приведен код python, в conf отсутствует URL-адрес, но это касается протоколирования.
from pyspark.conf import SparkConf
from pyspark.sql import SparkSession
my_jars = os.environ.get("SPARK_HOME")
myconf = SparkConf()
myconf.setMaster("local").setAppName("DB2_Test")
myconf.set("spark.jars","%s/jars/log4j-1.2.17.jar" % my_jars)
spark = SparkSession\
.builder\
.appName("DB2_Test")\
.config(conf = myconf) \
.getOrCreate()
Logger= spark._jvm.org.apache.log4j.Logger
mylogger = Logger.getLogger(__name__)
mylogger.error("some error trace")
mylogger.info("some info trace")