Как передать экземпляр multiprocessing.Pool для функции обратного вызова apply_async?
Вот моя простая программа факторизации, я добавил функцию обратного вызова в pool.apply_async(findK, args=(N,begin,end))
, приглашение сообщения prime factorization is over
, когда факторизация завершена, она отлично работает.
import math
import multiprocessing
def findK(N,begin,end):
for k in range(begin,end):
if N% k == 0:
print(N,"=" ,k ,"*", N/k)
return True
return False
def prompt(result):
if result:
print("prime factorization is over")
def mainFun(N,process_num):
pool = multiprocessing.Pool(process_num)
for i in range(process_num):
if i ==0 :
begin =2
else:
begin = int(math.sqrt(N)/process_num*i)+1
end = int(math.sqrt(N)/process_num*(i+1))
pool.apply_async(findK, args=(N,begin,end) , callback = prompt)
pool.close()
pool.join()
if __name__ == "__main__":
N = 684568031001583853
process_num = 16
mainFun(N,process_num)
Теперь я хочу изменить функцию обратного вызова в apply_async, чтобы изменить приглашение в функцию выключения, чтобы убить все остальные процессы.
def prompt(result):
if result:
pool.terminate()
Экземпляр пула не определен в строке запроса или передается в приглашение.
pool.terminate()
не может работать в быстрой функции.
Как передать экземпляр multiprocessing.Pool функции apply_async'callback?
(Я сделал это в формате класса, просто для добавления метода класса и вызова self.pool.terminate может убить все остальные процессы,
как выполнить задание в формате функции?)
Если не задан пул как глобальная переменная, можно ли передать пул в функцию обратного вызова?
Ответы
Ответ 1
Передача дополнительных аргументов функции обратного вызова не поддерживается. Но у вас есть много элегантных способов обхода этого.
Вы можете инкапсулировать логику пула в объект:
class Executor:
def __init__(self, process_num):
self.pool = multiprocessing.Pool(process_num)
def prompt(self, result):
if result:
print("prime factorization is over")
self.pool.terminate()
def schedule(self, function, args):
self.pool.apply_async(function, args=args, callback=self.prompt)
def wait(self):
self.pool.close()
self.pool.join()
def main(N,process_num):
executor = Executor(process_num)
for i in range(process_num):
...
executor.schedule(findK, (N,begin,end))
executor.wait()
Или вы можете использовать concurrent.futures.Executor, которая возвращает объект Future
. Вы просто добавляете пул к объекту Future
перед установкой обратного вызова.
def prompt(future):
if future.result():
print("prime factorization is over")
future.pool_executor.shutdown(wait=False)
def main(N,process_num):
executor = concurrent.futures.ProcessPoolExecutor(max_workers=process_num)
for i in range(process_num):
...
future = executor.submit(findK, N,begin,end)
future.pool_executor = executor
future.add_done_callback(prompt)
Ответ 2
Вам нужно, чтобы pool
оказался в среде prompt
. Одна из возможностей - переместить pool
в глобальную область действия (хотя это не самая лучшая практика). Это работает:
import math
import multiprocessing
pool = None
def findK(N,begin,end):
for k in range(begin,end):
if N% k == 0:
print(N,"=" ,k ,"*", N/k)
return True
return False
def prompt(result):
if result:
print("prime factorization is over")
pool.terminate()
def mainFun(N,process_num):
global pool
pool = multiprocessing.Pool(process_num)
for i in range(process_num):
if i ==0 :
begin =2
else:
begin = int(math.sqrt(N)/process_num*i)+1
end = int(math.sqrt(N)/process_num*(i+1))
pool.apply_async(findK, args=(N,begin,end) , callback = prompt)
pool.close()
pool.join()
if __name__ == "__main__":
N = 684568031001583853
process_num = 16
mainFun(N,process_num)
Ответ 3
Вы можете просто определить локальную функцию close
как обратный вызов:
import math
import multiprocessing
def findK(N, begin, end):
for k in range(begin, end):
if N % k == 0:
print(N, "=", k, "*", N / k)
return True
return False
def mainFun(N, process_num):
pool = multiprocessing.Pool(process_num)
def close(result):
if result:
print("prime factorization is over")
pool.terminate()
for i in range(process_num):
if i == 0:
begin = 2
else:
begin = int(math.sqrt(N) / process_num * i) + 1
end = int(math.sqrt(N) / process_num * (i + 1))
pool.apply_async(findK, args=(N, begin, end), callback=close)
pool.close()
pool.join()
if __name__ == "__main__":
N = 684568031001583853
process_num = 16
mainFun(N, process_num)
Вы также можете использовать partial
функцию functool
, с
import functools
def close_pool(pool, results):
if result:
pool.terminate()
def mainFun(N, process_num):
pool = multiprocessing.Pool(process_num)
close = funtools.partial(close_pool, pool)
....