Как отслеживать асинхронные результаты, возвращаемые из многопроцессорного пула
Я пытаюсь добавить multiprocessing в некоторый код, который содержит функции, которые я не могу изменить. Я хочу, чтобы эти функции выполнялись как задания для многопроцессорного пула асинхронно. Я делаю что-то вроде кода здесь. Однако я не уверен, как отслеживать результаты. Как узнать, к какой прикладной функции соответствует возвращенный результат?
Важные моменты, которые нужно подчеркнуть, это то, что я не могу изменять существующие функции (другие вещи полагаются на них, оставаясь такими, какими они есть), и что результаты могут быть возвращены в порядке, отличном от порядка, в котором задания функции применяются к бассейн.
Спасибо за любые мысли по этому поводу!
РЕДАКТИРОВАТЬ: Ниже приведен код ошибки:
import multiprocessing
from multiprocessing import Pool
import os
import signal
import time
import inspect
def multiply(multiplicand1=0, multiplicand2=0):
return multiplicand1*multiplicand2
def workFunctionTest(**kwargs):
time.sleep(3)
return kwargs
def printHR(object):
"""
This function prints a specified object in a human readable way.
"""
# dictionary
if isinstance(object, dict):
for key, value in sorted(object.items()):
print u'{a1}: {a2}'.format(a1=key, a2=value)
# list or tuple
elif isinstance(object, list) or isinstance(object, tuple):
for element in object:
print element
# other
else:
print object
class Job(object):
def __init__(
self,
workFunction=workFunctionTest,
workFunctionKeywordArguments={'testString': "hello world"},
workFunctionTimeout=1,
naturalLanguageString=None,
classInstance=None,
resultGetter=None,
result=None
):
self.workFunction=workFunction
self.workFunctionKeywordArguments=workFunctionKeywordArguments
self.workFunctionTimeout=workFunctionTimeout
self.naturalLanguageString=naturalLanguageString
self.classInstance=self.__class__.__name__
self.resultGetter=resultGetter
self.result=result
def description(self):
descriptionString=""
for key, value in sorted(vars(self).items()):
descriptionString+=str("{a1}:{a2} ".format(a1=key, a2=value))
return descriptionString
def printout(self):
"""
This method prints a dictionary of all data attributes.
"""
printHR(vars(self))
class JobGroup(object):
"""
This class acts as a container for jobs. The data attribute jobs is a list of job objects.
"""
def __init__(
self,
jobs=None,
naturalLanguageString="null",
classInstance=None,
result=None
):
self.jobs=jobs
self.naturalLanguageString=naturalLanguageString
self.classInstance=self.__class__.__name__
self.result=result
def description(self):
descriptionString=""
for key, value in sorted(vars(self).items()):
descriptionString+=str("{a1}:{a2} ".format(a1=key, a2=value))
return descriptionString
def printout(self):
"""
This method prints a dictionary of all data attributes.
"""
printHR(vars(self))
def initialise_processes():
signal.signal(signal.SIGINT, signal.SIG_IGN)
def execute(
jobObject=None,
numberOfProcesses=multiprocessing.cpu_count()
):
# Determine the current function name.
functionName=str(inspect.stack()[0][3])
def collateResults(result):
"""
This is a process pool callback function which collates a list of results returned.
"""
# Determine the caller function name.
functionName=str(inspect.stack()[1][3])
print("{a1}: result: {a2}".format(a1=functionName, a2=result))
results.append(result)
def getResults(job):
# Determine the current function name.
functionName=str(inspect.stack()[0][3])
while True:
try:
result=job.resultGetter.get(job.workFunctionTimeout)
break
except multiprocessing.TimeoutError:
print("{a1}: subprocess timeout for job".format(a1=functionName, a2=job.description()))
#job.result=result
return result
# Create a process pool.
pool1 = multiprocessing.Pool(numberOfProcesses, initialise_processes)
print("{a1}: pool {a2} of {a3} processes created".format(a1=functionName, a2=str(pool1), a3=str(numberOfProcesses)))
# Unpack the input job object and submit it to the process pool.
print("{a1}: unpacking and applying job object {a2} to pool...".format(a1=functionName, a2=jobObject))
if isinstance(jobObject, Job):
# If the input job object is a job, apply it to the pool with its associated timeout specification.
# Return a list of results.
job=jobObject
print("{a1}: job submitted to pool: {a2}".format(a1=functionName, a2=job.description()))
# Apply the job to the pool, saving the object pool.ApplyResult to the job object.
job.resultGetter=pool1.apply_async(
func=job.workFunction,
kwds=job.workFunctionKeywordArguments
)
# Get results.
# Acquire the job result with respect to the specified job timeout and apply this result to the job data attribute result.
print("{a1}: getting results for job...".format(a1=functionName))
job.result=getResults(job)
print("{a1}: job completed: {a2}".format(a1=functionName, a2=job.description()))
print("{a1}: job result: {a2}".format(a1=functionName, a2=job.result))
# Return the job result from execute.
return job.result
pool1.terminate()
pool1.join()
elif isinstance(jobObject, JobGroup):
# If the input job object is a job group, cycle through each job and apply it to the pool with its associated timeout specification.
for job in jobObject.jobs:
print("{a1}: job submitted to pool: {a2}".format(a1=functionName, a2=job.description()))
# Apply the job to the pool, saving the object pool.ApplyResult to the job object.
job.resultGetter=pool1.apply_async(
func=job.workFunction,
kwds=job.workFunctionKeywordArguments
)
# Get results.
# Cycle through each job and and append the result for the job to a list of results.
results=[]
for job in jobObject.jobs:
# Acquire the job result with respect to the specified job timeout and apply this result to the job data attribute result.
print("{a1}: getting results for job...".format(a1=functionName))
job.result=getResults(job)
print("{a1}: job completed: {a2}".format(a1=functionName, a2=job.description()))
#print("{a1}: job result: {a2}".format(a1=functionName, a2=job.result))
# Collate the results.
results.append(job.result)
# Apply the list of results to the job group data attribute results.
jobObject.results=results
print("{a1}: job group results: {a2}".format(a1=functionName, a2=jobObject.results))
# Return the job result list from execute.
return jobObject.results
pool1.terminate()
pool1.join()
else:
# invalid input object
print("{a1}: invalid job object {a2}".format(a1=functionName, a2=jobObject))
def main():
print('-'*80)
print("MULTIPROCESSING SYSTEM DEMONSTRATION\n")
# Create a job.
print("# creating a job...\n")
job1=Job(
workFunction=workFunctionTest,
workFunctionKeywordArguments={'testString': "hello world"},
workFunctionTimeout=4
)
print("- printout of new job object:")
job1.printout()
print("\n- printout of new job object in logging format:")
print job1.description()
# Create another job.
print("\n# creating another job...\n")
job2=Job(
workFunction=multiply,
workFunctionKeywordArguments={'multiplicand1': 2, 'multiplicand2': 3},
workFunctionTimeout=6
)
print("- printout of new job object:")
job2.printout()
print("\n- printout of new job object in logging format:")
print job2.description()
# Create a JobGroup object.
print("\n# creating a job group (of jobs 1 and 2)...\n")
jobGroup1=JobGroup(
jobs=[job1, job2],
)
print("- printout of new job group object:")
jobGroup1.printout()
print("\n- printout of new job group object in logging format:")
print jobGroup1.description()
# Submit the job group.
print("\nready to submit job group")
response=raw_input("\nPress Enter to continue...\n")
execute(jobGroup1)
response=raw_input("\nNote the results printed above. Press Enter to continue the demonstration.\n")
# Demonstrate timeout.
print("\n # creating a new job in order to demonstrate timeout functionality...\n")
job3=Job(
workFunction=workFunctionTest,
workFunctionKeywordArguments={'testString': "hello world"},
workFunctionTimeout=1
)
print("- printout of new job object:")
job3.printout()
print("\n- printout of new job object in logging format:")
print job3.description()
print("\nNote the timeout specification of only 1 second.")
# Submit the job.
print("\nready to submit job")
response=raw_input("\nPress Enter to continue...\n")
execute(job3)
response=raw_input("\nNote the recognition of timeouts printed above. This concludes the demonstration.")
print('-'*80)
if __name__ == '__main__':
main()
EDIT: этот вопрос был помещен [в состояние удержания] по следующей заявленной причине:
"Вопросы, требующие кода, должны продемонстрировать минимальное понимание проблемы, которая будет решена. Включите попытки решения, почему они не работают и ожидаемые результаты. См. также: Переполнение стека контрольный список вопросов"
Этот вопрос не запрашивает код; он просит мысли, общее руководство. Продемонстрировано минимальное понимание рассматриваемой проблемы (обратите внимание на правильное использование терминов "многопроцессорность", "пул" и "асинхронно" и отметьте ссылку на предыдущий код). Что касается попыток решения, я признаю, что попытки решения этих проблем были бы полезными. Я добавил такой код сейчас. Надеюсь, что я затронул поднятые проблемы, которые привели к статусу [приостановлено].
Ответы
Ответ 1
Не видя фактического кода, я могу ответить только в общих чертах. Но есть два общих решения.
Во-первых, вместо использования callback
и игнорирования AsyncResult
s сохраните их в какой-то коллекции. Тогда вы можете просто использовать эту коллекцию. Например, если вы хотите иметь возможность искать результаты для функции, используя эту функцию в качестве ключа, просто создайте dict
с помощью функций:
def in_parallel(funcs):
results = {}
pool = mp.Pool()
for func in funcs:
results[func] = pool.apply_async(func)
pool.close()
pool.join()
return {func: result.get() for func, result in results.items()}
В качестве альтернативы вы можете изменить функцию обратного вызова, чтобы сохранить результаты в своей коллекции по ключу. Например:
def in_parallel(funcs):
results = {}
pool = mp.Pool()
for func in funcs:
def callback(result, func=func):
results[func] = result
pool.apply_async(func, callback=callback)
pool.close()
pool.join()
return results
Я использую эту функцию как ключ. Но вместо этого вы хотите использовать индекс, это так же просто. Любое значение, которое у вас есть, можно использовать в качестве ключа.
Между тем, пример, который вы связали, на самом деле просто вызывает одну и ту же функцию на множестве аргументов, ожидая завершения всех этих действий и оставив результаты в некотором итерабельном порядке в произвольном порядке. То, что делает imap_unordered
, но намного проще. Вы можете заменить сложную вещь из связанного кода следующим образом:
pool = mp.Pool()
results = list(pool.imap_unordered(foo_pool, range(10)))
pool.close()
pool.join()
И затем, если вы хотите, чтобы результаты были в исходном порядке, а не в произвольном порядке, вы можете просто переключиться на imap
или map
. Итак:
pool = mp.Pool()
results = pool.map(foo_pool, range(10))
pool.close()
pool.join()
Если вам понадобится нечто подобное, но слишком сложное, чтобы вписаться в парадигму map
, concurrent.futures
, вероятно, сделает вашу жизнь проще, чем multiprocessing
. Если вы находитесь на Python 2.x, вам нужно будет установить backport. Но тогда вы можете делать то, что гораздо сложнее сделать с AsyncResult
или callback
(или map
), например, составление целого ряда будущих фьючерсов в одно большое будущее. См. Примеры в связанных документах.
Последнее примечание:
Важные моменты, которые нужно подчеркнуть, это то, что я не могу изменить существующие функции...
Если вы не можете изменить функцию, вы всегда можете ее обернуть. Например, допустим, что у меня есть функция, возвращающая квадрат числа, но я пытаюсь построить арифметические числа отображения битов на свои квадраты, поэтому мне нужно также иметь исходный номер как часть результата. Это легко:
def number_and_square(x):
return x, square(x)
И теперь я могу просто apply_async(number_and_square)
вместо просто square
и получить результаты, которые я хочу.
Я не делал этого в приведенных выше примерах, потому что в первом случае я хранил ключ в коллекции с вызывающей стороны, а во-вторых, я привязал его к функции обратного вызова. Но привязка его к обертке вокруг функции так же просто, как и любая из них, и может быть подходящей, когда ни одна из них не является.