Python: как я могу запускать функции python параллельно?
Я исследовал сначала и не смог найти ответ на мой вопрос. Я пытаюсь запустить несколько функций параллельно в Python.
У меня есть что-то вроде этого:
files.py
import common #common is a util class that handles all the IO stuff
dir1 = 'C:\folder1'
dir2 = 'C:\folder2'
filename = 'test.txt'
addFiles = [25, 5, 15, 35, 45, 25, 5, 15, 35, 45]
def func1():
c = common.Common()
for i in range(len(addFiles)):
c.createFiles(addFiles[i], filename, dir1)
c.getFiles(dir1)
time.sleep(10)
c.removeFiles(addFiles[i], dir1)
c.getFiles(dir1)
def func2():
c = common.Common()
for i in range(len(addFiles)):
c.createFiles(addFiles[i], filename, dir2)
c.getFiles(dir2)
time.sleep(10)
c.removeFiles(addFiles[i], dir2)
c.getFiles(dir2)
Я хочу вызвать func1 и func2 и запустить их одновременно. Функции не взаимодействуют друг с другом или с одним и тем же объектом. Прямо сейчас мне нужно дождаться, когда func1 завершится до начала func2. Как мне сделать что-то вроде ниже:
process.py
from files import func1, func2
runBothFunc(func1(), func2())
Я хочу, чтобы иметь возможность создавать оба каталога довольно близко к одному времени, потому что каждый мин я подсчитываю, сколько файлов создается. Если каталог не существует, он отключит мое время.
Ответы
Ответ 1
Вы можете использовать threading
или multiprocessing
.
Из-за особенностей CPython, threading
вряд ли достигнет истины parallelism. По этой причине multiprocessing
обычно лучше.
Вот полный пример:
from multiprocessing import Process
def func1():
print 'func1: starting'
for i in xrange(10000000): pass
print 'func1: finishing'
def func2():
print 'func2: starting'
for i in xrange(10000000): pass
print 'func2: finishing'
if __name__ == '__main__':
p1 = Process(target=func1)
p1.start()
p2 = Process(target=func2)
p2.start()
p1.join()
p2.join()
Механизм запуска/объединения дочерних процессов может быть легко инкапсулирован в функцию по строкам вашего runBothFunc
:
def runInParallel(*fns):
proc = []
for fn in fns:
p = Process(target=fn)
p.start()
proc.append(p)
for p in proc:
p.join()
runInParallel(func1, func2)
Ответ 2
Это может быть сделано элегантно с Ray, системой, которая позволяет вам легко распараллеливать и распространять ваш код Python.
Чтобы распараллелить ваш пример, вам нужно определить свои функции с @ray.remote
декоратора @ray.remote
, а затем вызвать их с помощью .remote
.
import ray
ray.init()
dir1 = 'C:\\folder1'
dir2 = 'C:\\folder2'
filename = 'test.txt'
addFiles = [25, 5, 15, 35, 45, 25, 5, 15, 35, 45]
# Define the functions.
# You need to pass every global variable used by the function as an argument.
# This is needed because each remote function runs in a different process,
# and thus it does not have access to the global variables defined in
# the current process.
@ray.remote
def func1(filename, addFiles, dir):
# func1() code here...
@ray.remote
def func2(filename, addFiles, dir):
# func2() code here...
# Start two tasks in the background and wait for them to finish.
ray.get([func1.remote(filename, addFiles, dir1), func2.remote(filename, addFiles, dir2)])
Если вы передаете один и тот же аргумент обеим функциям, а аргумент велик, более эффективный способ сделать это - использовать ray.put()
. Это позволяет избежать сериализации большого аргумента и создания двух его копий в памяти:
largeData_id = ray.put(largeData)
ray.get([func1(largeData_id), func2(largeData_id)])
Если func1()
и func2()
возвращают результаты, вам нужно переписать код следующим образом:
ret_id1 = func1.remote(filename, addFiles, dir1)
ret_id2 = func1.remote(filename, addFiles, dir2)
ret1, ret2 = ray.get([ret_id1, ret_id2])
Существует несколько преимуществ использования Ray по сравнению с многопроцессорным модулем. В частности, один и тот же код будет работать как на одной машине, так и на кластере машин. Для получения дополнительных преимуществ Рэй см. Этот пост.
Ответ 3
Нельзя гарантировать, что две функции будут выполняться синхронно друг с другом, что похоже на то, что вы хотите сделать.
Самое лучшее, что вы можете сделать, - разделить функцию на несколько шагов, а затем дождаться, когда оба будут завершены в критических точках синхронизации, используя Process.join
, как упоминания @aix answer.
Это лучше, чем time.sleep(10)
, потому что вы не можете гарантировать точное время. При явном ожидании вы говорите, что функции должны выполняться, выполняя этот шаг, прежде чем перейти к следующему, вместо того, чтобы предполагать, что это будет сделано в течение 10 мс, что не гарантируется на основе того, что еще происходит на машине.
Ответ 4
Если вы являетесь пользователем Windows и используете python 3, этот пост поможет вам выполнить параллельное программирование в python. Когда вы запускаете обычное программирование пула библиотеки многопроцессорности, вы получите сообщение об ошибке в отношении основной функции вашей программы. Это связано с тем, что в Windows нет функции fork(). Следующая статья дает решение указанной проблемы.
http://python.6.x6.nabble.com/Multiprocessing-Pool-woes-td5047050.html
Поскольку я использовал python 3, я немного изменил программу:
from types import FunctionType
import marshal
def _applicable(*args, **kwargs):
name = kwargs['__pw_name']
code = marshal.loads(kwargs['__pw_code'])
gbls = globals() #gbls = marshal.loads(kwargs['__pw_gbls'])
defs = marshal.loads(kwargs['__pw_defs'])
clsr = marshal.loads(kwargs['__pw_clsr'])
fdct = marshal.loads(kwargs['__pw_fdct'])
func = FunctionType(code, gbls, name, defs, clsr)
func.fdct = fdct
del kwargs['__pw_name']
del kwargs['__pw_code']
del kwargs['__pw_defs']
del kwargs['__pw_clsr']
del kwargs['__pw_fdct']
return func(*args, **kwargs)
def make_applicable(f, *args, **kwargs):
if not isinstance(f, FunctionType): raise ValueError('argument must be a function')
kwargs['__pw_name'] = f.__name__ # edited
kwargs['__pw_code'] = marshal.dumps(f.__code__) # edited
kwargs['__pw_defs'] = marshal.dumps(f.__defaults__) # edited
kwargs['__pw_clsr'] = marshal.dumps(f.__closure__) # edited
kwargs['__pw_fdct'] = marshal.dumps(f.__dict__) # edited
return _applicable, args, kwargs
def _mappable(x):
x,name,code,defs,clsr,fdct = x
code = marshal.loads(code)
gbls = globals() #gbls = marshal.loads(gbls)
defs = marshal.loads(defs)
clsr = marshal.loads(clsr)
fdct = marshal.loads(fdct)
func = FunctionType(code, gbls, name, defs, clsr)
func.fdct = fdct
return func(x)
def make_mappable(f, iterable):
if not isinstance(f, FunctionType): raise ValueError('argument must be a function')
name = f.__name__ # edited
code = marshal.dumps(f.__code__) # edited
defs = marshal.dumps(f.__defaults__) # edited
clsr = marshal.dumps(f.__closure__) # edited
fdct = marshal.dumps(f.__dict__) # edited
return _mappable, ((i,name,code,defs,clsr,fdct) for i in iterable)
После этой функции вышеупомянутый код проблемы также немного изменился:
from multiprocessing import Pool
from poolable import make_applicable, make_mappable
def cube(x):
return x**3
if __name__ == "__main__":
pool = Pool(processes=2)
results = [pool.apply_async(*make_applicable(cube,x)) for x in range(1,7)]
print([result.get(timeout=10) for result in results])
И я получил вывод как:
[1, 8, 27, 64, 125, 216]
Я думаю, что этот пост может быть полезен для некоторых пользователей Windows.
Ответ 5
Проверьте эту библиотеку для распараллеливания https://github.com/arturmrowca/parallel_job_execution
Ответ 6
Если ваши функции в основном выполняют работу по вводу/выводу (и меньше работы с ЦП), и у вас есть Python 3. 2+, вы можете использовать ThreadPoolExecutor:
from concurrent.futures import ThreadPoolExecutor
def run_io_tasks_in_parallel(tasks):
with ThreadPoolExecutor() as executor:
running_tasks = [executor.submit(task) for task in tasks]
for running_task in running_tasks:
running_task.result()
run_io_tasks_in_parallel([
lambda: print('IO task 1 running!'),
lambda: print('IO task 2 running!'),
])
Если ваши функции в основном выполняют работу ЦП (и меньше операций ввода-вывода) и у вас есть Python 2. 6+, вы можете использовать модуль многопроцессорной обработки:
from multiprocessing import Process
def run_cpu_tasks_in_parallel(tasks):
running_tasks = [Process(target=task) for task in tasks]
for running_task in running_tasks:
running_task.start()
for running_task in running_tasks:
running_task.join()
run_cpu_tasks_in_parallel([
lambda: print('CPU task 1 running!'),
lambda: print('CPU task 2 running!'),
])