Ответ 1
Вместо использования Pool().imap()
возможно, вам лучше вручную создать дочерние процессы с помощью Process()
. Я уверен, что возвращенный объект позволит вам получить статус жизни любого ребенка. Вы узнаете, вешают ли они.
Я хочу применить функцию параллельно с использованием multiprocessing.Pool. Проблема в том, что если один вызов функции вызывает ошибку сегментации, пул вешает навсегда. Кто-нибудь знает, как я могу создать пул, который обнаруживает, когда что-то подобное происходит и вызывает ошибку?
В следующем примере показано, как воспроизвести его (требуется scikit-learn > 0.14)
import numpy as np
from sklearn.ensemble import gradient_boosting
import time
from multiprocessing import Pool
class Bad(object):
tree_ = None
def fit_one(i):
if i == 3:
# this will segfault
bad = np.array([[Bad()] * 2], dtype=np.object)
gradient_boosting.predict_stages(bad,
np.random.rand(20, 2).astype(np.float32),
1.0, np.random.rand(20, 2))
else:
time.sleep(1)
return i
pool = Pool(2)
out = pool.imap_unordered(fit_one, range(10))
# we will never see 3
for o in out:
print o
Вместо использования Pool().imap()
возможно, вам лучше вручную создать дочерние процессы с помощью Process()
. Я уверен, что возвращенный объект позволит вам получить статус жизни любого ребенка. Вы узнаете, вешают ли они.
Это известная ошибка, проблема # 22393, на Python. Нет значимого обходного пути, пока вы используете multiprocessing.pool
до тех пор, пока он не будет исправлен. Патч доступен по этой ссылке, но он еще не интегрирован в основную версию, поэтому стабильная версия Python не устраняет проблему.
Я не запускаю ваш пример, чтобы узнать, может ли он обрабатывать эту ошибку, но попытайтесь выполнить параллельные фьючерсы. Просто замените my_function (i) вашей fit_one (i). Сохраните структуру __name__=='__main__':
. Кажется, что это сопряжено с параллельными фьючерсами. Код ниже проверяется на моей машине, поэтому мы надеемся, что работаем прямо на вашем компьютере.
import concurrent.futures
def my_function(i):
print('function running')
return i
def run():
number_processes=4
executor = concurrent.futures.ProcessPoolExecutor(number_processes)
futures = [executor.submit(my_function,i) for i in range(10)]
concurrent.futures.wait(futures)
for f in futures:
print(f.result())
if __name__ == '__main__':
run()
Как описано в комментариях, это просто работает в Python 3, если вы используете concurrent.Futures.ProcessPoolExecutor
вместо multiprocessing.Pool
.
Если вы застряли на Python 2, лучшим вариантом, который я нашел, является использование аргумента timeout
для объектов результатов, возвращаемых Pool.apply_async
и Pool.map_async
. Например:
pool = Pool(2)
out = pool.map_async(fit_one, range(10))
for o in out:
print o.get(timeout=1000) # allow 1000 seconds max
Это работает до тех пор, пока у вас есть верхняя граница того, как долго должен выполняться дочерний процесс для выполнения задачи.