Получение индекса текущего ввода ввода в многопроцессорность python

    from multiprocessing import Pool
    with Pool(processes=5) as p:
        p.starmap(name_of_function, all_inputs)

У меня есть часть кода, как указано выше, которая выполняет параллельную функцию. Предполагая, что all_inputs имеет 10 000 элементов, я хотел бы знать, какой из них выполняется в настоящее время, например. 100 из 10 000... Есть ли способ получить этот индекс?

Ответы

Ответ 1

Рабочий процесс в multiprocessing.Pool является экземпляром Process, он сохраняет внутренний счетчик, чтобы идентифицировать себя, вы можете использовать это счетчик вместе с идентификатор процесса OS:

import os
from multiprocessing import current_process, Pool


def x(a):
    p = current_process()
    print('process counter:', p._identity[0], 'pid:', os.getpid())


if __name__ == '__main__':
    with Pool(2) as p:
        r = p.map(x, range(4))
    p.join()

дает:

process counter: 1 pid: 29443
process counter: 2 pid: 29444
process counter: 2 pid: 29444
process counter: 1 pid: 29443

Ответ 2

Вы можете использовать метод current_process для многопроцессорности. Если это недостаточно точное, вы можете даже передать процессы a name с помощью uuid

from multiprocessing import current_process


def x(a):
    print(current_process(), a)
    return a*a

with Pool(5) as p:
    p.map(x, [1,2,3,4,5]

Ответ 3

IIUC, вы также можете передать индексы. (Украдите настройку с @user1767754) (Пожалуйста, дайте мне знать, если это не то, что вы ищете.)

from multiprocessing import Pool

arr = [1,2,3,4,5]
arr_with_idx = zip(arr, range(len(arr)))

def x(a, idx):
    print(idx)
    return a*a

with Pool(5) as p:
    p.starmap(x, arr_with_idx)

Или более кратко, используйте enumerate

from multiprocessing import Pool

arr = [1,2,3,4,5]

def x(idx, a):  # different here
    print(idx)
    return a*a

with Pool(5) as p:
    p.starmap(x, enumerate(arr))

starmap будет распаковывать каждый кортеж, и вы можете распечатать часть индекса.

Ответ 4

Я бы предложил передать индекс вместе с другими аргументами. Вы можете использовать enumerate, возможно, в сочетании с выражением генератора, добавьте значение к существующим аргументам. Здесь код, который принимает all_inputs, является итерируемым кортежами:

with Pool(processes=5) as p:
    p.starmap(name_of_function, ((i,) + args for i, args in enumerate(all_inputs)))

Вы можете выбрать из нескольких вариантов этой общей темы. Например, вы можете поместить индекс в конец аргументов, а не в начале (просто замените (i,) + args на args + (i,)).

Ответ 5

Самый простой способ узнать, какой процесс обрабатывает значение 100, - это разделить массив в равных частях до запуска пула. Это позволяет вам контролировать, какие процессы в пуле обрабатывают какую часть массива (поскольку вы передаете предопределенные начальные и конечные индексы для каждого из них для работы).

Например, если all_inputs имеет 50 элементов со значениями от 80 до 130, простое разделение задач для 8-ядерного ЦП будет возвращать следующие пары индексов:

Process #0 will work on indexes [0:5] with values: 80 - 85
Process #1 will work on indexes [6:11] with values: 86 - 91
Process #2 will work on indexes [12:17] with values: 92 - 97
Process #3 will work on indexes [18:23] with values: 98 - 103
Process #4 will work on indexes [24:29] with values: 104 - 109
Process #5 will work on indexes [30:35] with values: 110 - 115
Process #6 will work on indexes [36:41] with values: 116 - 121
Process #7 will work on indexes [42:50] with values: 122 - 130

Когда пул запускается, вы уже знаете, какой из них будет отвечать за обработку 100.

Успех этого подхода основан на jobDiv(), чтобы сделать это волшебство и разделить данные для процессов на основе размера входного массива и количества доступных ядер процессора.

Исходный код:

import multiprocessing as mp    

# processArray(): a parallel function that process an array based on start and 
#   end index positions.
def processArray(procId, array, indexes):
    startIdx, endIdx = indexes
    print("  Process #" + str(procId) + " startIdx=" + str(startIdx), " endIdx=" + str(endIdx))

    # Do some work:
    for i in range(startIdx, endIdx+1):
        print("    Process #" + str(procId) + " is computing index " + str(i), " with value " + str(array[i]))


# jobDiv(): performs a simple job division between available CPU cores
def jobDiv(inputArray, numCPUs):
    jobs = []
    arrayLength = len(inputArray)

    jobRange = int(arrayLength / numCPUs)
    extra = arrayLength - (jobRange * numCPUs)

    prevEnd = 0
    for c in range(numCPUs):
        endIdx = (c * jobRange) + jobRange - 1
        if (c == (numCPUs-1)):
            endIdx += extra

        startIdx = prevEnd
        if ( (c > 0) and (startIdx+1 < arrayLength) ):
            startIdx += 1

        jobs.append( (startIdx, endIdx) )
        prevEnd = endIdx

    return jobs


if __name__ == '__main__':
    # Initialize dataset for multiprocessing with 50 numbers, with values from 80 to 131
    nums = range(80, 131)

    # How many CPU cores can be used for this dataset
    numCPUs = mp.cpu_count()
    if (numCPUs > len(nums)):
        numCPUs = len(nums)

    # This function returns a list of tuples containing array indexes for
    # each process to work on. When nums has 100 elements and numCPUs is 8,
    # it returns the following list:
    #   (0, 11), (12, 23), (24, 35), (36, 47), (48, 59), (60, 71), (72, 83), (84, 99)
    indexes = jobDiv(nums, numCPUs)

    # Prepare parameters for every process in the pool, where each process gets one tuple of:
    #   (cpu_id, array, array_indexes)
    jobArgs = []
    for id, arg in enumerate(indexes):
        start, end = arg
        print("Process #" + str(id) + " will work on indexes [" + str(start) + ":" + str(end) +
              "] with values: " + str(nums[start]) + " - " + str(nums[end]))
        jobArgs.append( (id, nums, arg) )

    print("* Starting Pool")

    # For every process, send the data for processing along with it respective tuple of parameters
    with mp.Pool(processes=numCPUs) as p:
        sums = p.starmap(processArray, jobArgs)

    print("* Finished")

Выход

* Starting Pool
  Process #0 startIdx=0  endIdx=5
    Process #0 is computing index 0  with value 80
    Process #0 is computing index 1  with value 81
    Process #0 is computing index 2  with value 82
    Process #0 is computing index 3  with value 83
    Process #0 is computing index 4  with value 84
    Process #0 is computing index 5  with value 85
  Process #1 startIdx=6  endIdx=11
    Process #1 is computing index 6  with value 86
    Process #1 is computing index 7  with value 87
    Process #1 is computing index 8  with value 88
    Process #1 is computing index 9  with value 89
    Process #1 is computing index 10  with value 90
    Process #1 is computing index 11  with value 91
  Process #2 startIdx=12  endIdx=17
    Process #2 is computing index 12  with value 92
    Process #2 is computing index 13  with value 93
    Process #2 is computing index 14  with value 94
    Process #2 is computing index 15  with value 95
    Process #2 is computing index 16  with value 96
    Process #2 is computing index 17  with value 97
  Process #3 startIdx=18  endIdx=23
    Process #3 is computing index 18  with value 98
    Process #3 is computing index 19  with value 99
    Process #3 is computing index 20  with value 100
    Process #3 is computing index 21  with value 101
    Process #3 is computing index 22  with value 102
  Process #4 startIdx=24  endIdx=29
    Process #3 is computing index 23  with value 103
    Process #4 is computing index 24  with value 104
    Process #4 is computing index 25  with value 105
    Process #4 is computing index 26  with value 106
    Process #4 is computing index 27  with value 107
    Process #4 is computing index 28  with value 108
    Process #4 is computing index 29  with value 109
  Process #5 startIdx=30  endIdx=35
    Process #5 is computing index 30  with value 110
    Process #5 is computing index 31  with value 111
    Process #5 is computing index 32  with value 112
    Process #5 is computing index 33  with value 113
    Process #5 is computing index 34  with value 114
    Process #5 is computing index 35  with value 115
  Process #6 startIdx=36  endIdx=41
    Process #6 is computing index 36  with value 116
    Process #6 is computing index 37  with value 117
    Process #6 is computing index 38  with value 118
  Process #7 startIdx=42  endIdx=50
    Process #6 is computing index 39  with value 119
    Process #6 is computing index 40  with value 120
    Process #7 is computing index 42  with value 122
    Process #6 is computing index 41  with value 121
    Process #7 is computing index 43  with value 123
    Process #7 is computing index 44  with value 124
    Process #7 is computing index 45  with value 125
    Process #7 is computing index 46  with value 126
    Process #7 is computing index 47  with value 127
    Process #7 is computing index 48  with value 128
    Process #7 is computing index 49  with value 129
    Process #7 is computing index 50  with value 130
* Finished

Стоит отметить очевидное и сказать, что каждый процесс знает, какое значение обрабатывается на данный момент, однако main() не.

main() просто знает диапазон индексов, в котором каждый процесс будет работать, но он не знает (в режиме реального времени), какие значения в настоящее время обрабатываются этими процессами.

Если вам нужна main(), чтобы иметь доступ к этой информации во время работы процессов, лучше всего настроить Queue в main() и запустить ее на отдельном Thread до запуска пула. Затем обязательно отправьте объект Queue как часть параметров, которые передаются каждому процессу, чтобы все они могли совместно использовать один и тот же объект и хранить данные, которые они обрабатывают в данный момент.

Ответ 6

Если вы используете свой код на современном многоядерном процессоре в данный момент времени, ваши рабочие процессы будут выполнять параллельно несколько задач. Вы можете использовать очередь для реализации пользовательского протокола, который ваши работники позволят основному процессу узнать, с какой задачей (индексом задачи) они запускаются и заканчивают работу.

import os
import time
import queue
import random
import multiprocessing

def fn(st_queue, i):
    st_queue.put((multiprocessing.current_process().name, i))
    time.sleep(random.random())  # your long calculation
    st_queue.put((multiprocessing.current_process().name, None))
    return i ** 2

def main():
    status = {}
    st_queue = multiprocessing.Manager().Queue()

    result = []
    pool = multiprocessing.Pool(4)
    args = zip([st_queue] * 20, range(20))
    async_res = pool.starmap_async(
        fn, args, callback = lambda r: result.append(r))

    while not async_res.ready():
        try:
            msg = st_queue.get(True, 0.1)
        except queue.Empty:
            pass
        else:
            status.update([msg])
            print(status)      
    print(result.pop())
    pool.close()

if __name__ == '__main__':
    main()

Словарь состояния выглядит так:

{
    'ForkPoolWorker-4': None, 
    'ForkPoolWorker-5': 16, 
    'ForkPoolWorker-2': 18, 
    'ForkPoolWorker-3': 15
}

Ответ 7

Если вы с удовольствием сообщаете индекс из рабочих процессов, и вы не возражаете сообщать о них из последовательности, то вы можете использовать подходы перечисления, которые другие предложили. Если вы хотите отслеживать работу из основного процесса (например, для управления строкой состояния) и/или вам нужно знать общее количество начатых элементов, вам нужно будет, чтобы каждый рабочий отчет вернулся к родитель, когда он начинается. Это можно сделать с помощью трубы, как показано ниже.

Я не уверен, хотите ли вы сообщить индекс элемента или общее количество элементов, которые были запущены (они могут начинаться с последовательности), поэтому я сообщаю обо всех.

# dummy data and function
all_inputs = list(zip(range(10), range(20,30)))
def name_of_function(a, b):
    return a+b

# main code
from multiprocessing import Pool, Pipe, Lock

parent_conn, child_conn = Pipe()
lock = Lock()

def wrapper(idx, args):
    with lock:
        child_conn.send(idx)
    return name_of_function(*args)

with Pool(processes=5) as p:
    p.starmap_async(wrapper, enumerate(all_inputs))
    # receive status updates
    num_items = len(all_inputs)
    for i in range(num_items):
        idx = parent_conn.recv()
        print("processing index {} ({}/{})".format(idx, i+1, num_items))

child_conn.close()
parent_conn.close()

# output (note that items may be started out of sequence):
# processing index 0 (1/10)
# processing index 1 (2/10)
# processing index 2 (3/10)
# processing index 3 (4/10)
# processing index 5 (5/10)
# processing index 6 (6/10)
# processing index 4 (7/10)
# processing index 7 (8/10)
# processing index 8 (9/10)
# processing index 9 (10/10)

Обратите внимание, что вместо starmap используется starmap_async, чтобы продолжить выполнение основного потока во время выполнения подпроцессов. Кроме того, вы можете использовать starmap и запустить отдельный поток для отчета о прогрессе, например:

from threading import Thread
from multiprocessing import Pool, Pipe, Lock
parent_conn, child_conn = Pipe()
lock = Lock()

def receive_updates(num_items):
    for i in range(num_items):
        idx = parent_conn.recv()
        print("processing index {} ({}/{})".format(idx, i+1, num_items))

def wrapper(idx, args):
    with lock:
        child_conn.send(idx)
    return name_of_function(*args)

# launch another thread to receive the results, since the main thread
# will wait for starmap
result_thread = Thread(target=receive_updates, args=(len(all_inputs),))
result_thread.Daemon = True  # stop if main thread is killed
result_thread.start()

# your original code
with Pool(processes=5) as p:
    p.starmap(wrapper, all_inputs)

child_conn.close()
parent_conn.close()

Ответ 8

Если вам интересно узнать, когда каждый элемент будет завершен (вместо запуска), вы можете использовать apply_async с обратным вызовом, например:

# dummy data and function
all_inputs = list(zip(range(10), range(20,30)))
def name_of_function(a, b):
    return a+b

# main code
from multiprocessing import Pool

num_items = len(all_inputs)
num_done = 0
def handle_result(res):
    global num_done
    num_done += 1
    print('finished item {} of {}.'.format(num_done, num_items))

p = Pool(5)
for args in all_inputs:
    p.apply_async(name_of_function, args, callback=handle_result)
p.close()
p.join() # wait for tasks to finish

результат:

finished item 1 of 10.
finished item 2 of 10.
finished item 3 of 10.
finished item 4 of 10.
finished item 5 of 10.
finished item 6 of 10.
finished item 7 of 10.
finished item 8 of 10.
finished item 9 of 10.
finished item 10 of 10.

Обратите внимание, что результаты не обязательно будут в том же порядке, что и all_inputs. Если вам нужно точно знать, какой элемент был обработан, вы можете перечислить такие аргументы:

from multiprocessing import Pool

num_items = len(all_inputs)
num_done = 0
def handle_result(idx):
    global num_done
    num_done += 1
    print('finished index {} ({}/{}).'.format(idx, num_done, num_items))

def wrapper(idx, args):
    name_of_function(*args)
    return idx

p = Pool(5)
for args in enumerate(all_inputs):
    p.apply_async(wrapper, args, callback=handle_result)
p.close()
p.join() # wait for tasks to finish

результат:

finished index 0 (1/10).
finished index 1 (2/10).
finished index 2 (3/10).
finished index 3 (4/10).
finished index 4 (5/10).
finished index 6 (6/10).
finished index 8 (7/10).
finished index 7 (8/10).
finished index 9 (9/10).
finished index 5 (10/10).