Ответ 1
1) Что мне здесь не хватает; почему нельзя распределять пул между процессами?
Не все объекты/экземпляры выбираются/сериализуются, в этом случае пул использует threading.lock, который не подбирается:
>>> import threading, pickle
>>> pickle.dumps(threading.Lock())
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
[...]
File "/Users/rafael/dev/venvs/general/bin/../lib/python2.7/copy_reg.py", line 70, in _reduce_ex
raise TypeError, "can't pickle %s objects" % base.__name__
TypeError: can't pickle lock objects
или лучше:
>>> import threading, pickle
>>> from concurrent.futures import ThreadPoolExecutor
>>> pickle.dumps(ThreadPoolExecutor(1))
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/usr/local/Cellar/python/2.7.3/Frameworks/Python.framework/Versions/2.7/lib/python2.7/pickle.py", line 1374, in dumps
Pickler(file, protocol).dump(obj)
File
[...]
"/usr/local/Cellar/python/2.7.3/Frameworks/Python.framework/Versions/2.7/lib/python2.7/pickle.py", line 306, in save
rv = reduce(self.proto)
File "/Users/rafael/dev/venvs/general/bin/../lib/python2.7/copy_reg.py", line 70, in _reduce_ex
raise TypeError, "can't pickle %s objects" % base.__name__
TypeError: can't pickle lock objects
Если вы думаете об этом, это имеет смысл, блокировка - это примитив семафора, управляемый операционной системой (поскольку python использует собственные потоки). Возможность рассортировать и сохранять это состояние объекта внутри исполняемого файла python действительно не будет достигать ничего значимого, поскольку его истинное состояние хранится ОС.
2) Что такое шаблон для реализации вложенных parallelism в Python? Если возможно, сохранение рекурсивной структуры, а не торговля ею для итерации
Теперь, для престижа, все, о чем я упоминал выше, на самом деле не относится к вашему примеру, поскольку вы используете потоки (ThreadPoolExecutor), а не процессы (ProcessPoolExecutor), поэтому никакого обмена данными между процессами не должно произойти.
Ваш пример java просто кажется более эффективным, так как пул потоков, который вы используете (CachedThreadPool), создает новые потоки по мере необходимости, тогда как реализации исполнителей python ограничены и требуют явного максимального количества потоков (max_workers). Там немного различий синтаксиса между языками, которые также, кажется, отбрасывают вас (статические экземпляры в python - это, по сути, все, что явно не охвачено), но по сути оба примера создавали бы точно такое же количество потоков для выполнения. Например, здесь приведен пример использования довольно наивной реализации CachedThreadPoolExecutor в python:
from concurrent.futures import ThreadPoolExecutor
class CachedThreadPoolExecutor(ThreadPoolExecutor):
def __init__(self):
super(CachedThreadPoolExecutor, self).__init__(max_workers=1)
def submit(self, fn, *args, **extra):
if self._work_queue.qsize() > 0:
print('increasing pool size from %d to %d' % (self._max_workers, self._max_workers+1))
self._max_workers +=1
return super(CachedThreadPoolExecutor, self).submit(fn, *args, **extra)
pool = CachedThreadPoolExecutor()
def fibonacci(n):
print n
if n < 2:
return n
a = pool.submit(fibonacci, n - 1)
b = pool.submit(fibonacci, n - 2)
return a.result() + b.result()
print(fibonacci(10))
Настройка производительности:
Я настоятельно рекомендую заглянуть в gevent, так как это даст вам высокий concurrency без накладных расходов на поток. Это не всегда так, но ваш код на самом деле является плакатным ребенком для использования gevent. Вот пример:
import gevent
def fibonacci(n):
print n
if n < 2:
return n
a = gevent.spawn(fibonacci, n - 1)
b = gevent.spawn(fibonacci, n - 2)
return a.get() + b.get()
print(fibonacci(10))
Полностью ненаучный, но на моем компьютере приведенный выше код работает быстрее 9x, чем его эквивалент в потоке.
Надеюсь, это поможет.