Выполнение задач параллельно в python
Я использую python 2.7, у меня есть код, который выглядит так:
task1()
task2()
task3()
dependent1()
task4()
task5()
task6()
dependent2()
dependent3()
Единственными зависимостями здесь являются следующие: depend1 должен ждать задач1-3, depend2 должен ждать задач 4-6 и depend3 должен ждать иждивенцев1-2... Следующее было бы в порядке: запуск целого 6 задач сначала параллельно, затем первые два иждивенца параллельно.. затем конечная зависимая
Я предпочитаю иметь как можно больше задач параллельно, я искал некоторые модули, но я надеялся избежать внешних библиотек и не знаю, как техника Queue-Thread может решить мою проблему (может кто-то может порекомендовать хороший ресурс?)
Ответы
Ответ 1
Встроенный класс threading.Thread предлагает все, что вам нужно: start, чтобы начать новый поток и join, чтобы дождаться окончания потока.
import threading
def task1():
pass
def task2():
pass
def task3():
pass
def task4():
pass
def task5():
pass
def task6():
pass
def dep1():
t1 = threading.Thread(target=task1)
t2 = threading.Thread(target=task2)
t3 = threading.Thread(target=task3)
t1.start()
t2.start()
t3.start()
t1.join()
t2.join()
t3.join()
def dep2():
t4 = threading.Thread(target=task4)
t5 = threading.Thread(target=task5)
t4.start()
t5.start()
t4.join()
t5.join()
def dep3():
d1 = threading.Thread(target=dep1)
d2 = threading.Thread(target=dep2)
d1.start()
d2.start()
d1.join()
d2.join()
d3 = threading.Thread(target=dep3)
d3.start()
d3.join()
В качестве альтернативы для присоединения вы можете использовать Queue.join, чтобы ждать окончания потоков.
Ответ 2
Посмотрите Gevent.
Пример использования:
import gevent
from gevent import socket
def destination(jobs):
gevent.joinall(jobs, timeout=2)
print [job.value for job in jobs]
def task1():
return gevent.spawn(socket.gethostbyname, 'www.google.com')
def task2():
return gevent.spawn(socket.gethostbyname, 'www.example.com')
def task3():
return gevent.spawn(socket.gethostbyname, 'www.python.org')
jobs = []
jobs.append(task1())
jobs.append(task2())
jobs.append(task3())
destination(jobs)
Надеюсь, это то, что вы искали.
Ответ 3
Если вы хотите попробовать внешние библиотеки, вы можете элегантно выразить задачи и их зависимости с помощью Ray. Это хорошо работает на одной машине, преимущество в том, что параллелизм и зависимости легче выразить с помощью Ray, чем с многопроцессорной обработкой Python, и у него нет проблемы GIL (глобальная блокировка интерпретатора), которая часто мешает эффективной работе многопоточности. Кроме того, очень легко увеличить нагрузку на кластер, если вам это понадобится в будущем.
Решение выглядит так:
import ray
ray.init()
@ray.remote
def task1():
pass
@ray.remote
def task2():
pass
@ray.remote
def task3():
pass
@ray.remote
def dependent1(x1, x2, x3):
pass
@ray.remote
def task4():
pass
@ray.remote
def task5():
pass
@ray.remote
def task6():
pass
@ray.remote
def dependent2(x1, x2, x3):
pass
@ray.remote
def dependent3(x, y):
pass
id1 = task1.remote()
id2 = task2.remote()
id3 = task3.remote()
dependent_id1 = dependent1.remote(id1, id2, id3)
id4 = task4.remote()
id5 = task5.remote()
id6 = task6.remote()
dependent_id2 = dependent2.remote(id4, id5, id6)
dependent_id3 = dependent3.remote(dependent_id1, dependent_id2)
ray.get(dependent_id3) # This is optional, you can get the results if the tasks return an object
Вы также можете передавать фактические объекты Python между задачами, используя аргументы внутри задач и возвращая результаты (например, говоря "возвращаемое значение" вместо "проходного" выше).
Используя "pip install ray", вышеприведенный код работает "из коробки" на одной машине, и также легко распараллеливать приложения в кластере, либо в облаке, либо в вашем собственном кластере, см . Https://ray.readthedocs. io/en/latest/autoscaling.html и https://ray.readthedocs.io/en/latest/using-ray-on-a-cluster.html). Это может пригодиться, если ваша рабочая нагрузка будет расти позже.
Отказ от ответственности: я один из разработчиков Ray.