Пул потоков, похожий на многопроцессорный пул?
Существует ли класс пула для рабочих потоков, аналогичный модулю многопроцессорности класс пула?
Мне нравится, например, простой способ распараллеливать функцию отображения
def long_running_func(p):
c_func_no_gil(p)
p = multiprocessing.Pool(4)
xs = p.map(long_running_func, range(100))
однако я хотел бы сделать это без накладных расходов на создание новых процессов.
Я знаю о GIL. Тем не менее, в моей функции usecase, функция будет связанной с IO функцией C, для которой оболочка python освободит GIL до фактического вызова функции.
Нужно ли мне писать собственный пул потоков?
Ответы
Ответ 1
Я только выяснил, что на самом деле есть интерфейс пула на основе потоков в модуле multiprocessing
, однако он несколько спрятан и неправильно документирован.
Его можно импортировать через
from multiprocessing.pool import ThreadPool
Он реализуется с использованием фиктивного класса Process, обертывающего поток python. Этот класс процессов, основанный на потоках, можно найти в multiprocessing.dummy
, который кратко упоминается в docs. Этот фиктивный модуль предположительно обеспечивает весь интерфейс многопроцессорности на основе потоков.
Ответ 2
В Python 3 вы можете использовать concurrent.futures.ThreadPoolExecutor
, то есть:
executor = ThreadPoolExecutor(max_workers=10)
a = executor.submit(my_function)
Подробнее см. docs.
Ответ 3
Да, и похоже, что у него (более или менее) тот же API.
import multiprocessing
def worker(lnk):
....
def start_process():
.....
....
if(PROCESS):
pool = multiprocessing.Pool(processes=POOL_SIZE, initializer=start_process)
else:
pool = multiprocessing.pool.ThreadPool(processes=POOL_SIZE,
initializer=start_process)
pool.map(worker, inputs)
....
Ответ 4
Для чего-то очень простого и легкого (слегка измененного отсюда):
from Queue import Queue
from threading import Thread
class Worker(Thread):
"""Thread executing tasks from a given tasks queue"""
def __init__(self, tasks):
Thread.__init__(self)
self.tasks = tasks
self.daemon = True
self.start()
def run(self):
while True:
func, args, kargs = self.tasks.get()
try:
func(*args, **kargs)
except Exception, e:
print e
finally:
self.tasks.task_done()
class ThreadPool:
"""Pool of threads consuming tasks from a queue"""
def __init__(self, num_threads):
self.tasks = Queue(num_threads)
for _ in range(num_threads):
Worker(self.tasks)
def add_task(self, func, *args, **kargs):
"""Add a task to the queue"""
self.tasks.put((func, args, kargs))
def wait_completion(self):
"""Wait for completion of all the tasks in the queue"""
self.tasks.join()
if __name__ == '__main__':
from random import randrange
from time import sleep
delays = [randrange(1, 10) for i in range(100)]
def wait_delay(d):
print 'sleeping for (%d)sec' % d
sleep(d)
pool = ThreadPool(20)
for i, d in enumerate(delays):
pool.add_task(wait_delay, d)
pool.wait_completion()
Чтобы поддерживать обратные вызовы при завершении задачи, вы можете просто добавить обратный вызов в кортеж задачи.
Ответ 5
Здесь что-то, что выглядит многообещающим в Поваренной книге Python:
Рецепт 576519: пул потоков с таким же API как (multi) processing.Pool(Python)
Ответ 6
Привет, чтобы использовать пул потоков в Python, вы можете использовать эту библиотеку:
from multiprocessing.dummy import Pool as ThreadPool
а затем для использования, эта библиотека делает следующее:
pool = ThreadPool(threads)
results = pool.map(service, tasks)
pool.close()
pool.join()
return results
Потоки - это количество потоков, которые вы хотите, а задачи - это список задач, большинство из которых относятся к службе.
Ответ 7
Накладные расходы на создание новых процессов минимальны, особенно когда их всего 4. Я сомневаюсь, что это высокая производительность вашего приложения. Держите его простым, оптимизируйте, где вам нужно, и где указывают результаты профилирования.
Ответ 8
В потоковом пуле нет встроенного пула. Тем не менее, очень быстро реализовать очередь производителей/потребителей с классом Queue
.
С:
https://docs.python.org/2/library/queue.html
from threading import Thread
from Queue import Queue
def worker():
while True:
item = q.get()
do_work(item)
q.task_done()
q = Queue()
for i in range(num_worker_threads):
t = Thread(target=worker)
t.daemon = True
t.start()
for item in source():
q.put(item)
q.join() # block until all tasks are done
Ответ 9
Здесь результат, который я, наконец, использовал. Это модифицированная версия классов, написанная выше.
Файл: threadpool.py
from queue import Queue, Empty
import threading
from threading import Thread
class Worker(Thread):
_TIMEOUT = 2
""" Thread executing tasks from a given tasks queue. Thread is signalable,
to exit
"""
def __init__(self, tasks, th_num):
Thread.__init__(self)
self.tasks = tasks
self.daemon, self.th_num = True, th_num
self.done = threading.Event()
self.start()
def run(self):
while not self.done.is_set():
try:
func, args, kwargs = self.tasks.get(block=True,
timeout=self._TIMEOUT)
try:
func(*args, **kwargs)
except Exception as e:
print(e)
finally:
self.tasks.task_done()
except Empty as e:
pass
return
def signal_exit(self):
""" Signal to thread to exit """
self.done.set()
class ThreadPool:
"""Pool of threads consuming tasks from a queue"""
def __init__(self, num_threads, tasks=[]):
self.tasks = Queue(num_threads)
self.workers = []
self.done = False
self._init_workers(num_threads)
for task in tasks:
self.tasks.put(task)
def _init_workers(self, num_threads):
for i in range(num_threads):
self.workers.append(Worker(self.tasks, i))
def add_task(self, func, *args, **kwargs):
"""Add a task to the queue"""
self.tasks.put((func, args, kwargs))
def _close_all_threads(self):
""" Signal all threads to exit and lose the references to them """
for workr in self.workers:
workr.signal_exit()
self.workers = []
def wait_completion(self):
"""Wait for completion of all the tasks in the queue"""
self.tasks.join()
def __del__(self):
self._close_all_threads()
def create_task(func, *args, **kwargs):
return (func, args, kwargs)
Использовать пул
from random import randrange
from time import sleep
delays = [randrange(1, 10) for i in range(30)]
def wait_delay(d):
print('sleeping for (%d)sec' % d)
sleep(d)
pool = ThreadPool(20)
for i, d in enumerate(delays):
pool.add_task(wait_delay, d)
pool.wait_completion()