Многопроцессор python apply_async использует только один процесс

У меня есть script, который включает открытие файла из списка, а затем выполнение чего-то в тексте внутри этого файла. Я использую многопроцессор python и пул, чтобы попытаться распараллелить эту операцию. Абстракция script ниже:

import os
from multiprocessing import Pool

results = []
def testFunc(files):
    for file in files:
        print "Working in Process #%d" % (os.getpid())
        #This is just an illustration of some logic. This is not what I'm actually doing.
        for line in file:
            if 'dog' in line:
                results.append(line)

if __name__=="__main__":
    p = Pool(processes=2)
    files = ['/path/to/file1.txt', '/path/to/file2.txt']
    results = p.apply_async(testFunc, args = (files,))
    results2 = results.get()

Когда я запускаю это, распечатка идентификатора процесса одинакова для каждой итерации. В основном то, что я пытаюсь сделать, это взять каждый элемент входного списка и развернуть его до отдельного процесса, но похоже, что один процесс выполняет всю работу.

Ответы

Ответ 1

  • apply_async обрабатывает одну задачу в пуле. Вам нужно будет позвонить apply_async много раз, чтобы использовать больше процессоров.
  • Не позволяйте обоим процессам пытаться писать в один список, results. Поскольку работники пула являются отдельными процессами, два не будет записываться в тот же список. Один из способов обойти это - использовать очередь вывода. Вы можете настроить его самостоятельно или использовать обратный вызов apply_async для настройки очереди для вас. apply_async вызовет обратный вызов после завершения функции.
  • Вы можете использовать map_async вместо apply_async, но тогда вы получите список списков, которые вы должны были бы сгладить.

Итак, попробуйте вместо этого что-то вроде:

import os
import multiprocessing as mp

results = []   

def testFunc(file):
    result = []
    print "Working in Process #%d" % (os.getpid())
    # This is just an illustration of some logic. This is not what I'm
    # actually doing.
    with open(file, 'r') as f:
        for line in f:
            if 'dog' in line:
                result.append(line)
    return result


def collect_results(result):
    results.extend(result)

if __name__ == "__main__":
    p = mp.Pool(processes=2)
    files = ['/path/to/file1.txt', '/path/to/file2.txt']
    for f in files:
        p.apply_async(testFunc, args=(f, ), callback=collect_results)
    p.close()
    p.join()
    print(results)

Ответ 2

Возможно, в этом случае вы должны использовать map_async:

import os
from multiprocessing import Pool

results = []
def testFunc(file):
    message =  ("Working in Process #%d" % (os.getpid()))
    #This is just an illustration of some logic. This is not what I'm actually doing.
    for line in file:
        if 'dog' in line:
            results.append(line)
    return message

if __name__=="__main__":
    print("saddsf")
    p = Pool(processes=2)
    files = ['/path/to/file1.txt', '/path/to/file2.txt']
    results = p.map_async(testFunc, files)
    print(results.get())