Как лучше всего делиться статическими данными между ipyparallel client и удаленными механизмами?

Я запускаю ту же симуляцию в цикле с разными параметрами. В каждой симуляции используется pandas DataFrame (data), который только читается, никогда не изменяется. Используя ipyparallel (параллельный IPython), я могу поместить эти DataFrames в глобальное пространство переменных для каждого движка в моем представлении до начала моделирования:

view['data'] = data

Затем у двигателей есть доступ к DataFrame для всех имитаций, которые запускаются на них. Процесс копирования данных (если маринованный, data составляет 40 МБ) составляет всего несколько секунд. Тем не менее, похоже, что, если число симуляций растет, использование памяти становится очень большим. Я полагаю, что эти общие данные копируются для каждой задачи, а не только для каждого движка. Какая наилучшая практика для обмена статическими данными только для чтения от клиента с двигателями? Копирование его один раз на двигатель приемлемо, но в идеале его нужно будет копировать только один раз на хост (у меня есть 4 механизма для хост-1 и 8 движков на хосте2).

Здесь мой код:

from ipyparallel import Client
import pandas as pd

rc = Client()
view = rc[:]  # use all engines
view.scatter('id', rc.ids, flatten=True)  # So we can track which engine performed what task

def do_simulation(tweaks):
    """ Run simulation with specified tweaks """
    #  Do sim stuff using the global data DataFrame
    return results, id, tweaks

if __name__ == '__main__':
    data = pd.read_sql("SELECT * FROM my_table", engine)
    threads = []  # store list of tweaks dicts
    for i in range(4):
        for j in range(5):
            for k in range(6):
                threads.append(dict(i=i, j=j, k=k)

    # Set up globals for each engine.  This is the read-only DataFrame
    view['data'] = data
    ar = view.map_async(do_simulation, threads)

    # Our async results should pop up over time.  Let measure our progress:
    for idx, (results, id, tweaks) in enumerate(ar):
        print 'Progress: {}%: Simulation {} finished on engine {}'.format(100.0 * ar.progress / len(ar), idx, id)
        # Store results as a pickle for the future
        pfile = '{}_{}_{}.pickle'.format(tweaks['i'], tweaks['j'], tweaks['j'])
        # Save our results to a pickle file
        pd.to_pickle(results, out_file_path + pfile)

    print 'Total execution time: {} (serial time: {})'.format(ar.wall_time, ar.serial_time)

Если подсчет моделирования мал (~ 50), то для начала требуется некоторое время, но я начинаю видеть отчеты о выполнении печати. Как ни странно, несколько задач будут назначены одному движку, и я не увижу ответа, пока все эти назначенные задачи не будут завершены для этого движка. Я ожидаю увидеть ответ от enumerate(ar) каждый раз, когда будет завершена одна задача моделирования.

Если количество вычислений велико (~ 1000), для начала требуется много времени, я вижу, что процессоры дросселируются на всех двигателях, но никаких отчетов о печати печати не видны до долгого времени (~ 40 минут), и когда Я вижу прогресс, кажется, что большой блок ( > 100) задач перешел на тот же движок и ожидал завершения от этого одного движка до достижения определенного прогресса. Когда этот один движок завершился, я увидел, что объект ar предоставил новые ответы за 4 секунды - возможно, это была временная задержка для записи выходных файлов pickle.

Наконец, host1 также запускает задачу ipycontroller, и ее использование памяти идет как сумасшедший (задача Python показывает использование > 6 ГБ ОЗУ, задача ядра показывает использование 3 ГБ). Двигатель host2 на самом деле не показывает много использования памяти. Что может вызвать этот всплеск в памяти?

Ответы

Ответ 1

Я использовал эту логику в коде пару лет назад, и я использовал this. Мой код был примерно таким:

shared_dict = {
    # big dict with ~10k keys, each with a list of dicts
}

balancer = engines.load_balanced_view()

with engines[:].sync_imports(): # your 'view' variable 
    import pandas as pd
    import ujson as json

engines[:].push(shared_dict)

results = balancer.map(lambda i: (i, my_func(i)), id)
results_data = results.get()

Если подсчеты моделирования малы (~ 50), то требуется некоторое время, чтобы получить но я начал видеть отчеты о печати. Как ни странно, несколько задач будут назначены одному движку, и я не вижу до тех пор, пока все назначенные задачи не будут завершены для этого двигатель. Я бы ожидал увидеть ответ от перечисления (ar) каждый раз завершается одна задача моделирования.

В моем случае my_func() был сложным методом, когда я помещал много сообщений о регистрации, записанных в файл, поэтому у меня были свои операторы печати. ​​

О назначении задачи, поскольку я использовал load_balanced_view(), я ушел в библиотеку, и он отлично поработал.

Если количество вычислений велико (~ 1000), требуется много времени, чтобы получить началось, я вижу, что процессоры дросселируются на всех двигателях, но никакого прогресса печатные заявления видны до долгого времени (~ 40 минут), и когда я это делаю см. прогресс, кажется, что большой блок ( > 100) задач перешел к одному и тому же двигатель и ожидаемое завершение от этого одного двигателя до предоставления некоторый прогресс. Когда этот один движок завершился, я увидел объект ar давал новые ответы когда-либо 4 секунды - возможно, это была временная задержка для записи выходных файлов pickle.

В течение долгого времени я этого не испытывал, поэтому ничего не могу сказать.

Я надеюсь, что это может привести к некоторому свету в вашей проблеме.


PS: как я сказал в комментарии, вы можете попробовать multiprocessing.Pool. Наверное, я не пытался использовать большие, доступные только для чтения данные как глобальную переменную. Я бы попробовал, потому что кажется, работает.