Multiprocessing.Pool - PicklingError: не может pickle <type 'thread.lock'>: поиск атрибута thread.lock не удалось
multiprocessing.Pool
сводит меня с ума...
Я хочу обновить многие пакеты, и для каждого из них я должен проверить, есть ли большая версия или нет. Это выполняется с помощью функции check_one
.
Основной код находится в методе Updater.update
: там я создаю объект пула и вызываю метод map()
.
Вот код:
def check_one(args):
res, total, package, version = args
i = res.qsize()
logger.info('\r[{0:.1%} - {1}, {2} / {3}]',
i / float(total), package, i, total, addn=False)
try:
json = PyPIJson(package).retrieve()
new_version = Version(json['info']['version'])
except Exception as e:
logger.error('Error: Failed to fetch data for {0} ({1})', package, e)
return
if new_version > version:
res.put_nowait((package, version, new_version, json))
class Updater(FileManager):
# __init__ and other methods...
def update(self):
logger.info('Searching for updates')
packages = Queue.Queue()
data = ((packages, self.set_len, dist.project_name, Version(dist.version)) \
for dist in self.working_set)
pool = multiprocessing.Pool()
pool.map(check_one, data)
pool.close()
pool.join()
while True:
try:
package, version, new_version, json = packages.get_nowait()
except Queue.Empty:
break
txt = 'A new release is avaiable for {0}: {1!s} (old {2}), update'.format(package,
new_version,
version)
u = logger.ask(txt, bool=('upgrade version', 'keep working version'), dont_ask=self.yes)
if u:
self.upgrade(package, json, new_version)
else:
logger.info('{0} has not been upgraded', package)
self._clean()
logger.success('Updating finished successfully')
Когда я запускаю его, я получаю эту странную ошибку:
Searching for updates
Exception in thread Thread-1:
Traceback (most recent call last):
File "/usr/lib/python2.7/threading.py", line 552, in __bootstrap_inner
self.run()
File "/usr/lib/python2.7/threading.py", line 505, in run
self.__target(*self.__args, **self.__kwargs)
File "/usr/local/lib/python2.7/dist-packages/multiprocessing/pool.py", line 225, in _handle_tasks
put(task)
PicklingError: Can't pickle <type 'thread.lock'>: attribute lookup thread.lock failed
Ответы
Ответ 1
мультипроцессор передает задачи (которые включают check_one
и data
) рабочим процессам через mp.SimpleQueue
. В отличие от Queue.Queue
, все, что помещено в mp.SimpleQueue
должно быть доступно для выбора. Queue.Queue
нельзя выбрать:
import multiprocessing as mp
import Queue
def foo(queue):
pass
pool=mp.Pool()
q=Queue.Queue()
pool.map(foo,(q,))
дает это исключение:
UnpickleableError: Cannot pickle <type 'thread.lock'> objects
Ваши data
включают в себя packages
, которые являются Queue.Queue. Это может быть источником проблемы.
Вот возможный обходной путь: Queue
используется для двух целей:
- узнать приблизительный размер (позвонив по
qsize
) - сохранить результаты для последующего поиска.
Вместо вызова qsize
, чтобы разделить значение между несколькими процессами, мы могли бы использовать mp.Value
.
Вместо того, чтобы хранить результаты в очереди, мы можем (и должны) просто возвращать значения из вызовов check_one
. pool.map
собирает результаты в очередь своего собственного создания и возвращает результаты как возвращаемое значение pool.map
.
Например:
import multiprocessing as mp
import Queue
import random
import logging
# logger=mp.log_to_stderr(logging.DEBUG)
logger = logging.getLogger(__name__)
qsize = mp.Value('i', 1)
def check_one(args):
total, package, version = args
i = qsize.value
logger.info('\r[{0:.1%} - {1}, {2} / {3}]'.format(
i / float(total), package, i, total))
new_version = random.randrange(0,100)
qsize.value += 1
if new_version > version:
return (package, version, new_version, None)
else:
return None
def update():
logger.info('Searching for updates')
set_len=10
data = ( (set_len, 'project-{0}'.format(i), random.randrange(0,100))
for i in range(set_len) )
pool = mp.Pool()
results = pool.map(check_one, data)
pool.close()
pool.join()
for result in results:
if result is None: continue
package, version, new_version, json = result
txt = 'A new release is avaiable for {0}: {1!s} (old {2}), update'.format(
package, new_version, version)
logger.info(txt)
logger.info('Updating finished successfully')
if __name__=='__main__':
logging.basicConfig(level=logging.DEBUG)
update()
Ответ 2
После многого рытья по аналогичной проблеме...
Также выясняется, что любой объект, который содержит объект threading.Condition(), НИКОГДА не будет работать с multiprocessing.Pool.
Вот пример
import multiprocessing as mp
import threading
class MyClass(object):
def __init__(self):
self.cond = threading.Condition()
def foo(mc):
pass
pool=mp.Pool()
mc=MyClass()
pool.map(foo,(mc,))
Я запускал это с Python 2.7.5 и получал ту же ошибку:
Exception in thread Thread-2:
Traceback (most recent call last):
File "/usr/lib64/python2.7/threading.py", line 811, in __bootstrap_inner
self.run()
File "/usr/lib64/python2.7/threading.py", line 764, in run
self.__target(*self.__args, **self.__kwargs)
File "/usr/lib64/python2.7/multiprocessing/pool.py", line 342, in _handle_tasks
put(task)
PicklingError: Can't pickle <type 'thread.lock'>: attribute lookup thread.lock failed
Но затем запустил его на python 3.4.1, и эта проблема была исправлена.
Хотя я еще не сталкивался с какими-либо полезными обходными решениями для тех из нас, кто еще на 2.7.x.
Ответ 3
Я испытал эту проблему с версией Python 3.6 на докере. Изменили версию на 3.7.3 и это было решено.