Python: параллельный запуск подпроцесса
У меня есть следующий код, который записывает md5sums в файл журнала
for file in files_output:
p=subprocess.Popen(['md5sum',file],stdout=logfile)
p.wait()
-
Будут ли они написаны параллельно? т.е. если md5sum занимает много времени для одного из файлов, будет ли еще один запуск до того, как будет завершен предыдущий?
-
Если ответ на вышеизложенное да, могу ли я предположить, что порядок md5sums, записанный в лог файл, может отличаться в зависимости от того, как долго md5sum берет для каждого файла? (некоторые файлы могут быть огромными, некоторые небольшие)
Ответы
Ответ 1
Все подпроцессы выполняются параллельно. (Чтобы избежать этого, нужно ждать явно для их завершения.) Они даже могут записывать в файл журнала одновременно, тем самым искажая вывод. Чтобы этого избежать, вы должны позволить каждому процессу записывать в другой файл журнала и собирать все выходы, когда все процессы завершены.
q = Queue.Queue()
result = {} # used to store the results
for fileName in fileNames:
q.put(fileName)
def worker():
while True:
fileName = q.get()
if fileName is None: # EOF?
return
subprocess_stuff_using(fileName)
wait_for_finishing_subprocess()
checksum = collect_md5_result_for(fileName)
result[fileName] = checksum # store it
threads = [ threading.Thread(target=worker) for _i in range(20) ]
for thread in threads:
thread.start()
q.put(None) # one EOF marker for each thread
После этого результаты должны быть сохранены в result
.
Ответ 2
- Да, эти процессы md5sum будут запущены параллельно.
- Да, порядок записи md5sums будет непредсказуемым. И, как правило, считается, что плохая практика заключается в совместном использовании одного ресурса, такого как файл, из многих процессов.
Также ваш способ сделать p.wait()
после цикла for
будет ждать завершения последнего процесса md5sum, а остальные из них все еще могут работать.
Но вы можете немного изменить этот код, чтобы иметь преимущества параллельной обработки и предсказуемости синхронизированного вывода, если вы собираете вывод md5sum во временные файлы и собираете его обратно в один файл после завершения всех процессов.
import subprocess
import os
processes = []
for file in files_output:
f = os.tmpfile()
p = subprocess.Popen(['md5sum',file],stdout=f)
processes.append((p, f))
for p, f in processes:
p.wait()
f.seek(0)
logfile.write(f.read())
f.close()
Ответ 3
Простым способом сбора данных из параллельных подпроцессов md5sum является использование пула потоков и запись в файл из основного процесса:
from multiprocessing.dummy import Pool # use threads
from subprocess import check_output
def md5sum(filename):
try:
return check_output(["md5sum", filename]), None
except Exception as e:
return None, e
if __name__ == "__main__":
p = Pool(number_of_processes) # specify number of concurrent processes
with open("md5sums.txt", "wb") as logfile:
for output, error in p.imap(md5sum, filenames): # provide filenames
if error is None:
logfile.write(output)
- вывод из
md5sum
невелик, поэтому вы можете сохранить его в памяти
-
imap
сохраняет порядок
-
number_of_processes
может отличаться от количества файлов или ядер процессора (большие значения не означают быстрее: это зависит от относительной производительности IO (дисков) и CPU)
Вы можете попробовать передать несколько файлов сразу в подпроцессы md5sum.
В этом случае вам не нужен внешний подпроцесс; вы можете вычислить md5 в Python:
import hashlib
from functools import partial
def md5sum(filename, chunksize=2**15, bufsize=-1):
m = hashlib.md5()
with open(filename, 'rb', bufsize) as f:
for chunk in iter(partial(f.read, chunksize), b''):
m.update(chunk)
return m.hexdigest()
Чтобы использовать несколько процессов вместо потоков (чтобы простой Python md5sum()
выполнялся параллельно с использованием нескольких процессоров), просто удалите .dummy
из импорта в указанном выше коде.