Как я могу распараллелить простой цикл Python?
Это, вероятно, тривиальный вопрос, но как я могу распараллелить следующий цикл в python?
# setup output lists
output1 = list()
output2 = list()
output3 = list()
for j in range(0, 10):
# calc individual parameter value
parameter = j * offset
# call the calculation
out1, out2, out3 = calc_stuff(parameter = parameter)
# put results into correct output list
output1.append(out1)
output2.append(out2)
output3.append(out3)
Я знаю, как запускать отдельные потоки в Python, но я не знаю, как "собрать" результаты.
Несколько процессов тоже будут прекрасными - все, что проще для этого случая. Я использую в настоящее время Linux, но код должен работать как на Windows, так и на Mac как хорошо.
Какой самый простой способ распараллеливать этот код?
Ответы
Ответ 1
Использование нескольких потоков на CPython не даст вам лучшей производительности для кода pure-Python из-за глобальной блокировки интерпретатора (GIL). Я предлагаю вместо этого использовать multiprocessing
:
pool = multiprocessing.Pool(4)
out1, out2, out3 = zip(*pool.map(calc_stuff, range(0, 10 * offset, offset)))
Обратите внимание, что это не будет работать в интерактивном интерпретаторе.
Чтобы избежать обычной FUD вокруг GIL: в любом случае не было бы никакого преимущества использовать потоки для этого примера. Вы хотите использовать здесь процессы, а не потоки, потому что они избегают целой проблемы.
Ответ 2
Чтобы распараллелить простой цикл for, joblib придает большое значение необработанному использованию многопроцессорной обработки. Не только короткий синтаксис, но и такие вещи, как прозрачное группирование итераций, когда они выполняются очень быстро (для устранения накладных расходов) или захват трассировки дочернего процесса, для лучшей отчетности об ошибках.
Отказ от ответственности: я оригинальный автор joblib.
Ответ 3
Какой самый простой способ распараллеливать этот код?
Мне очень нравится concurrent.futures
для этого, доступный в Python3 начиная с версии 3.2 - и через backport до 2.6 и 2.7 на PyPi.
Вы можете использовать потоки или процессы и использовать тот же самый интерфейс.
Multiprocessing
Поместите это в файл - futuretest.py:
import concurrent.futures
import time, random # add some random sleep time
offset = 2 # you don't supply these so
def calc_stuff(parameter=None): # these are examples.
sleep_time = random.choice([0, 1, 2, 3, 4, 5])
time.sleep(sleep_time)
return parameter / 2, sleep_time, parameter * parameter
def procedure(j): # just factoring out the
parameter = j * offset # procedure
# call the calculation
return calc_stuff(parameter=parameter)
def main():
output1 = list()
output2 = list()
output3 = list()
start = time.time() # let see how long this takes
# we can swap out ProcessPoolExecutor for ThreadPoolExecutor
with concurrent.futures.ProcessPoolExecutor() as executor:
for out1, out2, out3 in executor.map(procedure, range(0, 10)):
# put results into correct output list
output1.append(out1)
output2.append(out2)
output3.append(out3)
finish = time.time()
# these kinds of format strings are only available on Python 3.6:
# time to upgrade!
print(f'original inputs: {repr(output1)}')
print(f'total time to execute {sum(output2)} = sum({repr(output2)})')
print(f'time saved by parallelizing: {sum(output2) - (finish-start)}')
print(f'returned in order given: {repr(output3)}')
if __name__ == '__main__':
main()
И вот вывод:
$ python3 -m futuretest
original inputs: [0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]
total time to execute 33 = sum([0, 3, 3, 4, 3, 5, 1, 5, 5, 4])
time saved by parallellizing: 27.68999981880188
returned in order given: [0, 4, 16, 36, 64, 100, 144, 196, 256, 324]
Многопоточность
Теперь измените ProcessPoolExecutor
на ThreadPoolExecutor
и снова запустите модуль:
$ python3 -m futuretest
original inputs: [0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]
total time to execute 19 = sum([0, 2, 3, 5, 2, 0, 0, 3, 3, 1])
time saved by parallellizing: 13.992000102996826
returned in order given: [0, 4, 16, 36, 64, 100, 144, 196, 256, 324]
Теперь вы сделали многопоточность и многопроцессорность!
Обратите внимание на производительность и использование обоих вместе.
Сэмплирование слишком мало для сравнения результатов.
Тем не менее, я подозреваю, что многопоточность будет быстрее, чем многопроцессорность в целом, особенно в Windows, поскольку Windows не поддерживает форкирование, поэтому каждый новый процесс требует времени для запуска. На Linux или Mac они, вероятно, будут ближе.
Вы можете вложить несколько потоков внутри нескольких процессов, но не рекомендуется использовать несколько потоков для выделения нескольких процессов.
Ответ 4
from joblib import Parallel, delayed
import multiprocessing
inputs = range(10)
def processInput(i):
return i * i
num_cores = multiprocessing.cpu_count()
results = Parallel(n_jobs=num_cores)(delayed(processInput)(i) for i in inputs)
print(results)
Вышеописанное прекрасно работает на моей машине (Ubuntu, пакет joblib был предварительно установлен, но может быть установлен через pip install joblib
).
Взято с https://blog.dominodatalab.com/simple-parallelization/
Ответ 5
Есть несколько преимуществ использования Ray:
- Вы можете распараллеливать на нескольких машинах в дополнение к нескольким ядрам (с одним и тем же кодом).
- Эффективная обработка числовых данных через разделяемую память (и сериализация без копирования).
- Высокая пропускная способность при распределенном планировании.
- Отказоустойчивость.
В вашем случае вы можете запустить Ray и определить удаленную функцию
import ray
ray.init()
@ray.remote(num_return_vals=3)
def calc_stuff(parameter=None):
# Do something.
return 1, 2, 3
а затем вызвать его параллельно
output1, output2, output3 = [], [], []
# Launch the tasks.
for j in range(10):
id1, id2, id3 = calc_stuff.remote(parameter=j)
output1.append(id1)
output2.append(id2)
output3.append(id3)
# Block until the results have finished and get the results.
output1 = ray.get(output1)
output2 = ray.get(output2)
output3 = ray.get(output3)
Чтобы запустить тот же пример в кластере, единственной строкой, которая могла бы измениться, был бы вызов ray.init(). Соответствующую документацию можно найти здесь.
Обратите внимание, что я помогаю развивать Рэя.
Ответ 6
почему вы не используете потоки и один мьютекс для защиты одного глобального списка?
import os
import re
import time
import sys
import thread
from threading import Thread
class thread_it(Thread):
def __init__ (self,param):
Thread.__init__(self)
self.param = param
def run(self):
mutex.acquire()
output.append(calc_stuff(self.param))
mutex.release()
threads = []
output = []
mutex = thread.allocate_lock()
for j in range(0, 10):
current = thread_it(j * offset)
threads.append(current)
current.start()
for t in threads:
t.join()
#here you have output list filled with data
помните, что вы будете так же быстро, как ваш самый медленный поток
Ответ 7
Я обнаружил, что joblib
очень полезен для меня. Пожалуйста, смотрите следующий пример:
from joblib import Parallel, delayed
def yourfunction(k):
s=3.14*k*k
print "Area of a circle with a radius ", k, " is:", s
element_run = Parallel(n_jobs=-1)(delayed(yourfunction)(k) for k in range(1,10))
n_jobs = -1: использовать все доступные ядра
Ответ 8
очень простой пример параллельной обработки
from multiprocessing import Process
output1 = list()
output2 = list()
output3 = list()
def yourfunction():
for j in range(0, 10):
# calc individual parameter value
parameter = j * offset
# call the calculation
out1, out2, out3 = calc_stuff(parameter=parameter)
# put results into correct output list
output1.append(out1)
output2.append(out2)
output3.append(out3)
if __name__ == '__main__':
p = Process(target=pa.yourfunction, args=('bob',))
p.start()
p.join()
Ответ 9
Это может быть полезно при реализации многопроцессорных и параллельных/распределенных вычислений в Python.
Учебник YouTube по использованию пакета techila
Techila - это распределенное вычислительное промежуточное программное обеспечение, которое интегрируется непосредственно с Python с использованием пакета techila. Функция персика в пакете может быть полезна при распараллеливании структур цикла. (Следующий фрагмент кода из Форумы сообщества Techila)
techila.peach(funcname = 'theheavyalgorithm', # Function that will be called on the compute nodes/ Workers
files = 'theheavyalgorithm.py', # Python-file that will be sourced on Workers
jobs = jobcount # Number of Jobs in the Project
)
Ответ 10
Допустим, у нас есть асинхронная функция
async def work_async(self, student_name: str, code: str, loop):
"""
Some async function
"""
# Do some async procesing
Это должно быть запущено на большом массиве. Некоторые атрибуты передаются в программу, а некоторые используются из свойства элемента словаря в массиве.
async def process_students(self, student_name: str, loop):
market = sys.argv[2]
subjects = [...] #Some large array
batchsize = 5
for i in range(0, len(subjects), batchsize):
batch = subjects[i:i+batchsize]
await asyncio.gather(*(self.work_async(student_name,
sub['Code'],
loop)
for sub in batch))
Ответ 11
Посмотрите на это:
http://docs.python.org/library/queue.html
Это может быть неправильный способ сделать это, но я бы сделал что-то вроде:
Фактический код;
from multiprocessing import Process, JoinableQueue as Queue
class CustomWorker(Process):
def __init__(self,workQueue, out1,out2,out3):
Process.__init__(self)
self.input=workQueue
self.out1=out1
self.out2=out2
self.out3=out3
def run(self):
while True:
try:
value = self.input.get()
#value modifier
temp1,temp2,temp3 = self.calc_stuff(value)
self.out1.put(temp1)
self.out2.put(temp2)
self.out3.put(temp3)
self.input.task_done()
except Queue.Empty:
return
#Catch things better here
def calc_stuff(self,param):
out1 = param * 2
out2 = param * 4
out3 = param * 8
return out1,out2,out3
def Main():
inputQueue = Queue()
for i in range(10):
inputQueue.put(i)
out1 = Queue()
out2 = Queue()
out3 = Queue()
processes = []
for x in range(2):
p = CustomWorker(inputQueue,out1,out2,out3)
p.daemon = True
p.start()
processes.append(p)
inputQueue.join()
while(not out1.empty()):
print out1.get()
print out2.get()
print out3.get()
if __name__ == '__main__':
Main()
Надеюсь, что это поможет.
Ответ 12
спасибо @iuryxavier
from multiprocessing import Pool
from multiprocessing import cpu_count
def add_1(x):
return x + 1
if __name__ == "__main__":
pool = Pool(cpu_count())
results = pool.map(add_1, range(10**12))
pool.close() # 'TERM'
pool.join() # 'KILL'