Есть ли простая параллельная карта на основе процессов для python?
Я ищу простую параллельную карту на основе процессов для python, то есть функцию
parmap(function,[data])
который будет запускать функцию для каждого элемента [данных] в другом процессе (ну, на другом ядре, но AFAIK, единственный способ запускать материал на разных ядрах на python - это запустить несколько интерпретаторов) и вернуть список результатов.
Есть ли что-то подобное? Мне бы хотелось что-то просто, поэтому простой модуль был бы приятным. Конечно, если такой вещи не существует, я соглашусь на большую библиотеку: -/
Ответы
Ответ 1
Мне кажется, что вам нужен метод map в multiprocessing.Pool():
map (func, iterable [, chunksize])
A parallel equivalent of the map() built-in function (it supports only
one iterable argument though). It blocks till the result is ready.
This method chops the iterable into a number of chunks which it submits to the
process pool as separate tasks. The (approximate) size of these chunks can be
specified by setting chunksize to a positive integ
Например, если вы хотите отобразить эту функцию:
def f(x):
return x**2
чтобы ранжировать (10), вы можете сделать это, используя встроенную функцию map():
map(f, range(10))
или с помощью метода объекта multiprocessing.Pool() map():
import multiprocessing
pool = multiprocessing.Pool()
print pool.map(f, range(10))
Ответ 2
Для тех, кто ищет Python-эквивалент R mclapply(), вот моя реализация. Это улучшение следующих двух примеров:
Это может применяться к функциям карты с одним или несколькими аргументами.
import numpy as np, pandas as pd
from scipy import sparse
import functools, multiprocessing
from multiprocessing import Pool
num_cores = multiprocessing.cpu_count()
def parallelize_dataframe(df, func, U=None, V=None):
#blockSize = 5000
num_partitions = 5 # int( np.ceil(df.shape[0]*(1.0/blockSize)) )
blocks = np.array_split(df, num_partitions)
pool = Pool(num_cores)
if V is not None and U is not None:
# apply func with multiple arguments to dataframe (i.e. involves multiple columns)
df = pd.concat(pool.map(functools.partial(func, U=U, V=V), blocks))
else:
# apply func with one argument to dataframe (i.e. involves single column)
df = pd.concat(pool.map(func, blocks))
pool.close()
pool.join()
return df
def square(x):
return x**2
def test_func(data):
print("Process working on: ", data.shape)
data["squareV"] = data["testV"].apply(square)
return data
def vecProd(row, U, V):
return np.sum( np.multiply(U[int(row["obsI"]),:], V[int(row["obsJ"]),:]) )
def mProd_func(data, U, V):
data["predV"] = data.apply( lambda row: vecProd(row, U, V), axis=1 )
return data
def generate_simulated_data():
N, D, nnz, K = [302, 184, 5000, 5]
I = np.random.choice(N, size=nnz, replace=True)
J = np.random.choice(D, size=nnz, replace=True)
vals = np.random.sample(nnz)
sparseY = sparse.csc_matrix((vals, (I, J)), shape=[N, D])
# Generate parameters U and V which could be used to reconstruct the matrix Y
U = np.random.sample(N*K).reshape([N,K])
V = np.random.sample(D*K).reshape([D,K])
return sparseY, U, V
def main():
Y, U, V = generate_simulated_data()
# find row, column indices and obvseved values for sparse matrix Y
(testI, testJ, testV) = sparse.find(Y)
colNames = ["obsI", "obsJ", "testV", "predV", "squareV"]
dtypes = {"obsI":int, "obsJ":int, "testV":float, "predV":float, "squareV": float}
obsValDF = pd.DataFrame(np.zeros((len(testV), len(colNames))), columns=colNames)
obsValDF["obsI"] = testI
obsValDF["obsJ"] = testJ
obsValDF["testV"] = testV
obsValDF = obsValDF.astype(dtype=dtypes)
print("Y.shape: {!s}, #obsVals: {}, obsValDF.shape: {!s}".format(Y.shape, len(testV), obsValDF.shape))
# calculate the square of testVals
obsValDF = parallelize_dataframe(obsValDF, test_func)
# reconstruct prediction of testVals using parameters U and V
obsValDF = parallelize_dataframe(obsValDF, mProd_func, U, V)
print("obsValDF.shape after reconstruction: {!s}".format(obsValDF.shape))
print("First 5 elements of obsValDF:\n", obsValDF.iloc[:5,:])
if __name__ == '__main__':
main()
Ответ 3
Это может быть сделано элегантно с Ray, системой, которая позволяет вам легко распараллеливать и распространять ваш код Python.
Чтобы распараллелить ваш пример, вам нужно определить функцию карты с @ray.remote
декоратора @ray.remote
, а затем вызвать ее с помощью .remote
. Это обеспечит выполнение каждого экземпляра удаленной функции в отдельном процессе.
import time
import ray
ray.init()
# Define the function you want to apply map on, as remote function.
@ray.remote
def f(x):
# Do some work...
time.sleep(1)
return x*x
# Define a helper parmap(f, list) function.
# This function executes a copy of f() on each element in "list".
# Each copy of f() runs in a different process.
# Note f.remote(x) returns a future of its result (i.e.,
# an identifier of the result) rather than the result itself.
def parmap(f, list):
return [f.remote(x) for x in list]
# Call parmap() on a list consisting of first 5 integers.
result_ids = parmap(f, range(1, 6))
# Get the results
results = ray.get(result_ids)
print(results)
Это напечатает:
[1, 4, 9, 16, 25]
и это закончится примерно в len(list)/p
(округляется до ближайшего целого числа), где p
- количество ядер на вашем компьютере. Предполагая, что машина с 2 ядрами, наш пример будет выполняться с округлением в 5/2
, т.е. примерно за 3
секунды.
Существует несколько преимуществ использования Ray по сравнению с многопроцессорным модулем. В частности, один и тот же код будет работать как на одной машине, так и на кластере машин. Для получения дополнительных преимуществ Рэй см. Этот пост.
Ответ 4
Я знаю, что это старый пост, но на всякий случай я написал инструмент для создания этого супер, супер простого, называемого parmapper (я на самом деле называю его parmap в моем использовании, но название было принято).
Он выполняет множество настроек и деконструкций процессов и добавляет множество функций. В грубом порядке важности
- Может принимать лямбду и другие непробиваемые функции
- Может применять starmap и другие подобные методы вызова, чтобы сделать его очень простым для непосредственного использования.
- Может разделяться между потоками и/или процессами
- Включает такие функции, как индикаторы выполнения
Это влечет за собой небольшую стоимость, но для большинства применений это незначительно.
Я надеюсь, что вы найдете это полезным.
(Примечание: он, как и map
в Python 3+, возвращает итерацию, поэтому, если вы ожидаете, что все результаты пройдут через него немедленно, используйте list()
)