Пул потоков Python, который обрабатывает исключения
Я искал хорошую реализацию простого пула пула пула python и действительно не могу найти ничего, что бы соответствовало моим потребностям. Я использую python 2.7, и все модули, которые я нашел, либо не работают, либо не обрабатывают исключения в рабочих правильно. Мне было интересно, знал ли кто-нибудь о библиотеке, которая могла бы предложить тип функциональности, которую я ищу. Помогите с благодарностью.
Multiprocessing
Моя первая попытка заключалась в встроенном модуле multiprocessing
, но поскольку это не использует потоки, а подпроцессы, вместо этого мы сталкиваемся с проблемой, что объекты не могут быть маринованными. Не идите сюда.
from multiprocessing import Pool
class Sample(object):
def compute_fib(self, n):
phi = (1 + 5**0.5) / 2
self.fib = int(round((phi**n - (1-phi)**n) / 5**0.5))
samples = [Sample() for i in range(8)]
pool = Pool(processes=8)
for s in samples: pool.apply_async(s.compute_fib, [20])
pool.join()
for s in samples: print s.fib
# PicklingError: Can't pickle <type 'instancemethod'>: attribute lookup __builtin__.instancemethod failed
Фьючерс
Итак, я вижу, что есть обратный порт некоторых из крутых параллельных функций python 3.2 здесь. Это кажется идеальным и простым в использовании. Проблема в том, что когда вы получаете исключение в одном из рабочих, вы получаете только тип исключения, такого как "ZeroDivisionError", но не трассируетесь и, следовательно, не указали, какая строка вызвала исключение. Код становится невозможным для отладки. Нет.
from concurrent import futures
class Sample(object):
def compute_fib(self, n):
phi = (1 + 5**0.5) / 2
1/0
self.fib = int(round((phi**n - (1-phi)**n) / 5**0.5))
samples = [Sample() for i in range(8)]
pool = futures.ThreadPoolExecutor(max_workers=8)
threads = [pool.submit(s.compute_fib, 20) for s in samples]
futures.wait(threads, return_when=futures.FIRST_EXCEPTION)
for t in threads: t.result()
for s in samples: print s.fib
# futures-2.1.3-py2.7.egg/concurrent/futures/_base.pyc in __get_result(self)
# 354 def __get_result(self):
# 355 if self._exception:
#--> 356 raise self._exception
# 357 else:
# 358 return self._result
#
# ZeroDivisionError: integer division or modulo by zero
Workerpool
Я нашел другую реализацию этого шаблона здесь. На этот раз, когда возникает исключение, оно печатается, но тогда мой интерактивный интерпретатор ipython остается в зависании и его нужно убить из другой оболочки. Нет.
import workerpool
class Sample(object):
def compute_fib(self, n):
phi = (1 + 5**0.5) / 2
1/0
self.fib = int(round((phi**n - (1-phi)**n) / 5**0.5))
samples = [Sample() for i in range(8)]
pool = workerpool.WorkerPool(size=8)
for s in samples: pool.map(s.compute_fib, [20])
pool.wait()
for s in samples: print s.fib
# ZeroDivisionError: integer division or modulo by zero
# ^C^C^C^C^C^C^C^C^D^D
# $ kill 1783
Threadpool
Еще одна реализация здесь. На этот раз, когда возникает исключение, оно печатается на stderr
, но script не прерывается и вместо этого продолжает выполнение, что не соответствует цели исключения и может сделать вещи небезопасными. Все еще не используется.
import threadpool
class Sample(object):
def compute_fib(self, n):
phi = (1 + 5**0.5) / 2
1/0
self.fib = int(round((phi**n - (1-phi)**n) / 5**0.5))
samples = [Sample() for i in range(8)]
pool = threadpool.ThreadPool(8)
requests = [threadpool.makeRequests(s.compute_fib, [20]) for s in samples]
requests = [y for x in requests for y in x]
for r in requests: pool.putRequest(r)
pool.wait()
for s in samples: print s.fib
# ZeroDivisionError: integer division or modulo by zero
# ZeroDivisionError: integer division or modulo by zero
# ZeroDivisionError: integer division or modulo by zero
# ZeroDivisionError: integer division or modulo by zero
# ZeroDivisionError: integer division or modulo by zero
# ZeroDivisionError: integer division or modulo by zero
# ZeroDivisionError: integer division or modulo by zero
# ZeroDivisionError: integer division or modulo by zero
#---> 17 for s in samples: print s.fib
#
#AttributeError: 'Sample' object has no attribute 'fib'
- Обновление -
Похоже, что в отношении библиотеки futures
поведение python 3 не совпадает с python 2.
futures_exceptions.py
:
from concurrent.futures import ThreadPoolExecutor, as_completed
def div_zero(x):
return x / 0
with ThreadPoolExecutor(max_workers=4) as executor:
futures = executor.map(div_zero, range(4))
for future in as_completed(futures): print(future)
Выход Python 2.7.6:
Traceback (most recent call last):
File "...futures_exceptions.py", line 12, in <module>
for future in as_completed(futures):
File "...python2.7/site-packages/concurrent/futures/_base.py", line 198, in as_completed
with _AcquireFutures(fs):
File "...python2.7/site-packages/concurrent/futures/_base.py", line 147, in __init__
self.futures = sorted(futures, key=id)
File "...python2.7/site-packages/concurrent/futures/_base.py", line 549, in map
yield future.result()
File "...python2.7/site-packages/concurrent/futures/_base.py", line 397, in result
return self.__get_result()
File "...python2.7/site-packages/concurrent/futures/_base.py", line 356, in __get_result
raise self._exception
ZeroDivisionError: integer division or modulo by zero
Выход Python 3.3.2:
Traceback (most recent call last):
File "...futures_exceptions.py", line 11, in <module>
for future in as_completed(futures):
File "...python3.3/concurrent/futures/_base.py", line 193, in as_completed
with _AcquireFutures(fs):
File "...python3.3/concurrent/futures/_base.py", line 142, in __init__
self.futures = sorted(futures, key=id)
File "...python3.3/concurrent/futures/_base.py", line 546, in result_iterator
yield future.result()
File "...python3.3/concurrent/futures/_base.py", line 392, in result
return self.__get_result()
File "...python3.3/concurrent/futures/_base.py", line 351, in __get_result
raise self._exception
File "...python3.3/concurrent/futures/thread.py", line 54, in run
result = self.fn(*self.args, **self.kwargs)
File "...futures_exceptions.py", line 7, in div_zero
return x / 0
ZeroDivisionError: division by zero
Ответы
Ответ 1
Я лично использую Futures
, так как интерфейс очень прост. Для проблемы трассировки я нашел обходное решение для его сохранения. Оформите мой ответ на этот другой вопрос:
Получение исходного номера строки для исключения в concurrent.futures
Ответ 2
Простое решение: используйте любую альтернативу, которая вам подходит, и реализуйте свой собственный блок try-except
для ваших работников. Если вы хотите, вызовите корневой вызов.
Я бы не сказал, что эти библиотеки обрабатывают исключения "неправильно". Они имеют поведение по умолчанию, но примитивное. Вы должны будете справиться с этим сами, если по умолчанию вас не устраивает.