Почему нет ускорения при использовании многопроцессорной обработки питонов для неудобной параллельной проблемы внутри цикла for, с общими данными numpy?

Я хочу ускорить смущающую параллельную проблему, связанную с байесовским выводом. Цель состоит в выводе коэффициентов u для набора изображений x, заданных матрицей A, таких, что X = A * U. X имеет размеры mxn, A mxp и U pxn. Для каждого столбца X нужно вывести оптимальный соответствующий столбец коэффициентов U. В итоге эта информация используется для обновления A. Я использую m = 3000, p = 1500 и n = 100. Итак, поскольку это линейная модель, вывод коэффициента-матрицы u состоит из n независимых вычислений. Таким образом, я попытался работать с модулем многопроцессорности Python, но ускорения не было.

Вот что я сделал:

Основная структура без распараллеливания:

import numpy as np
from convex import Crwlasso_cd

S = np.empty((m, batch_size))

for t in xrange(start_iter, niter):

    ## Begin Warm Start ##
    # Take 5 gradient steps w/ this batch using last coef. to warm start inf.
    for ws in range(5):
        # Initialize the coefficients
        if ws:
            theta = U
        else:
            theta = np.dot(A.T, X)

        # Infer the Coefficients for the given data batch X of size mxn (n=batch_size)
        # Crwlasso_cd is the function that does the inference per data sample
        # It basically a C-inline code
        for k in range(batch_size):
            U[:,k] = Crwlasso_cd(X[:, k].copy(), A, theta=theta[:,k].copy())

        # Given the inferred coefficients, update and renormalize
        # the basis functions A 
        dA1 = np.dot(X - np.dot(A, U), U.T) # Gaussian data likelihood
        A += (eta / batch_size) * dA1
        A = np.dot(A, np.diag(1/np.sqrt((A**2).sum(axis=0))))

Реализация многопроцессорной обработки:

Я попытался реализовать многопроцессорность. У меня есть 8-ядерная машина, которую я могу использовать.

  • Есть три типа for-loop. Единственный, который кажется "параллелизуемым", является третьим, где коэффициенты выводятся:
    • Создайте очередь и соберите номера итераций от 0 до batch_size-1 в очереди
    • Сгенерировать 8 процессов и позволить им работать через очередь
  • Поделитесь данными U с использованием многопроцессорности. Array

Итак, я заменил этот третий цикл следующим образом:

from multiprocessing import Process, Queue
import multiprocessing as mp
from Queue import Empty

num_cpu = mp.cpu_count()
work_queue = Queue()

# Generate the empty ndarray U and a multiprocessing.Array-Wrapper U_mp around U
# The class Wrap_mp is attached. Basically, U_mp.asarray() gives the corresponding
# ndarray
U = np.empty((p, batch_size))
U_mp = Wrap_mp(U)

...

        # Within the for-loops:
        for p in xrange(batch_size):
        work_queue.put(p)

        processes = [Process(target=infer_coefficients_mp, args=(work_queue,U_mp,A,X)) for p in range(num_cpu)]

        for p in processes:
            p.start()
            print p.pid
        for p in processes:
            p.join()

Вот класс Wrap_mp:

class Wrap_mp(object):
""" Wrapper around multiprocessing.Array to share an array across
    processes. Store the array as a multiprocessing.Array, but compute with it
as a numpy.ndarray
"""

    def __init__(self, arr):
        """ Initialize a shared array from a numpy array.

            The data is copied.
        """
        self.data = ndarray_to_shmem(arr)
        self.dtype = arr.dtype
        self.shape = arr.shape

    def __array__(self):
        """ Implement the array protocole.
        """
        arr = shmem_as_ndarray(self.data, dtype=self.dtype)
        arr.shape = self.shape
        return arr

    def asarray(self):
        return self.__array__()

И вот функция infer_coefficients_mp:

def infer_feature_coefficients_mp(work_queue,U_mp,A,X):

    while True:
        try:
            index = work_queue.get(block=False)
            x = X[:,index]
            U = U_mp.asarray()
            theta = np.dot(phit,x)

            # Infer the coefficients of the column index
            U[:,index] = Crwlasso_cd(x.copy(), A, theta=theta.copy())

         except Empty:
            break

Теперь проблема заключается в следующем:

  • Многопроцессорная версия не быстрее, чем версия одного потока для данных размеров данных.
  • Идентификатор процесса увеличивается с каждой итерацией. Означает ли это, что постоянно создается новый процесс? Разве это не создает огромные накладные расходы? Как я могу избежать этого? Есть ли возможность создать внутри всего цикла для 8 циклов и просто обновить их данными?
  • Является ли способ, которым я разделяю коэффициенты U между процессами, замедляет вычисление? Есть ли другой, лучший способ сделать это?
  • Будет ли пул процессов лучше?

Я очень благодарен за любую помощь! Я начал работать с Python месяц назад, и теперь я довольно потерялся.

Engin

Ответы

Ответ 1

Каждый раз, когда вы создаете Process, вы создаете новый процесс. Если вы делаете это в своем цикле for, то да, вы начинаете новые процессы каждый раз через цикл. Похоже на то, что вы хотите сделать, это инициализировать свою очередь и процессы за пределами цикла, а затем заполнить очередь внутри цикла.

Я использовал multiprocessing.Pool раньше, и он полезен, но он не предлагает многое из того, что вы уже реализовали с помощью очереди.

Ответ 2

В конце концов, все это сводится к одному вопросу: возможно ли запускать процессы вне основного цикла for и для каждой итерации, загружать в них обновленные переменные, обрабатывать эти данные и собирать вновь вычисленные данные из всех процессов, без необходимости запускать новые процессы на каждой итерации?