Тупик с протоколированием многопроцессорного/многопоточного python script
Я столкнулся с проблемой сбора журналов из следующего script.
Как только я установил значение SLEEP_TIME
слишком маленькое, LoggingThread
потоки каким-то образом блокируют модуль регистрации. Задержка script при регистрации
в функции action
. Если SLEEP_TIME
составляет около 0,1, собирают script
все сообщения журнала, как я ожидаю.
Я попытался выполнить этот ответ, но это не решает мою проблему.
import multiprocessing
import threading
import logging
import time
SLEEP_TIME = 0.000001
logger = logging.getLogger()
ch = logging.StreamHandler()
ch.setFormatter(logging.Formatter('%(asctime)s %(levelname)s %(funcName)s(): %(message)s'))
ch.setLevel(logging.DEBUG)
logger.setLevel(logging.DEBUG)
logger.addHandler(ch)
class LoggingThread(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
def run(self):
while True:
logger.debug('LoggingThread: {}'.format(self))
time.sleep(SLEEP_TIME)
def action(i):
logger.debug('action: {}'.format(i))
def do_parallel_job():
processes = multiprocessing.cpu_count()
pool = multiprocessing.Pool(processes=processes)
for i in range(20):
pool.apply_async(action, args=(i,))
pool.close()
pool.join()
if __name__ == '__main__':
logger.debug('START')
#
# multithread part
#
for _ in range(10):
lt = LoggingThread()
lt.setDaemon(True)
lt.start()
#
# multiprocess part
#
do_parallel_job()
logger.debug('FINISH')
Как использовать модуль регистрации в многопроцессорных и многопоточных сценариях?
Ответы
Ответ 1
Вероятно, это ошибка 6721.
Проблема распространена в любой ситуации, когда у вас есть замки, нити и вилки. Если у потока 1 была блокировка, а поток 2 вызывает fork, в разветвленном процессе будет только поток 2, и блокировка будет сохраняться вечно. В вашем случае это logging.StreamHandler.lock
.
Исправление можно найти здесь для модуля logging
. Обратите внимание, что вам нужно позаботиться и о любых других замках.
Ответ 2
Я столкнулся с подобной проблемой совсем недавно, когда использовал модуль регистрации вместе с многопроцессорной библиотекой Pathos. Все еще не уверен на 100%, но кажется, что в моем случае проблема могла быть вызвана тем фактом, что обработчик журналов пытался повторно использовать объект блокировки из разных процессов.
Был в состоянии исправить это с помощью простой оболочки вокруг стандартного обработчика журналирования:
import threading
from collections import defaultdict
from multiprocessing import current_process
import colorlog
class ProcessSafeHandler(colorlog.StreamHandler):
def __init__(self):
super().__init__()
self._locks = defaultdict(lambda: threading.RLock())
def acquire(self):
current_process_id = current_process().pid
self._locks[current_process_id].acquire()
def release(self):
current_process_id = current_process().pid
self._locks[current_process_id].release()