Многопроцессор python apply_async использует только один процесс
У меня есть script, который включает открытие файла из списка, а затем выполнение чего-то в тексте внутри этого файла. Я использую многопроцессор python и пул, чтобы попытаться распараллелить эту операцию. Абстракция script ниже:
import os
from multiprocessing import Pool
results = []
def testFunc(files):
for file in files:
print "Working in Process #%d" % (os.getpid())
#This is just an illustration of some logic. This is not what I'm actually doing.
for line in file:
if 'dog' in line:
results.append(line)
if __name__=="__main__":
p = Pool(processes=2)
files = ['/path/to/file1.txt', '/path/to/file2.txt']
results = p.apply_async(testFunc, args = (files,))
results2 = results.get()
Когда я запускаю это, распечатка идентификатора процесса одинакова для каждой итерации. В основном то, что я пытаюсь сделать, это взять каждый элемент входного списка и развернуть его до отдельного процесса, но похоже, что один процесс выполняет всю работу.
Ответы
Ответ 1
-
apply_async
обрабатывает одну задачу в пуле. Вам нужно будет позвонить
apply_async
много раз, чтобы использовать больше процессоров.
- Не позволяйте обоим процессам пытаться писать в один список,
results
. Поскольку работники пула являются отдельными процессами, два
не будет записываться в тот же список. Один из способов обойти это - использовать очередь вывода. Вы можете настроить его самостоятельно или использовать обратный вызов apply_async
для настройки очереди для вас. apply_async
вызовет обратный вызов после завершения функции.
- Вы можете использовать
map_async
вместо apply_async
, но тогда вы
получите список списков, которые вы должны были бы сгладить.
Итак, попробуйте вместо этого что-то вроде:
import os
import multiprocessing as mp
results = []
def testFunc(file):
result = []
print "Working in Process #%d" % (os.getpid())
# This is just an illustration of some logic. This is not what I'm
# actually doing.
with open(file, 'r') as f:
for line in f:
if 'dog' in line:
result.append(line)
return result
def collect_results(result):
results.extend(result)
if __name__ == "__main__":
p = mp.Pool(processes=2)
files = ['/path/to/file1.txt', '/path/to/file2.txt']
for f in files:
p.apply_async(testFunc, args=(f, ), callback=collect_results)
p.close()
p.join()
print(results)
Ответ 2
Возможно, в этом случае вы должны использовать map_async
:
import os
from multiprocessing import Pool
results = []
def testFunc(file):
message = ("Working in Process #%d" % (os.getpid()))
#This is just an illustration of some logic. This is not what I'm actually doing.
for line in file:
if 'dog' in line:
results.append(line)
return message
if __name__=="__main__":
print("saddsf")
p = Pool(processes=2)
files = ['/path/to/file1.txt', '/path/to/file2.txt']
results = p.map_async(testFunc, files)
print(results.get())