Многопроцессорность Python безопасно записывается в файл
Я пытаюсь решить большую числовую проблему, которая включает в себя множество подзадач, и я использую модуль многопроцессорности Python (в частности Pool.map) для разделения различных независимых подзадач на разные ядра. Каждая подзадача включает в себя вычисление множества подзадач, и я пытаюсь эффективно запоминать эти результаты, сохраняя их в файле, если они еще не были вычислены каким-либо процессом, иначе пропустите вычисление и просто прочитайте результаты из файла.
У меня возникают проблемы с файлами concurrency: иногда разные процессы проверяют, была ли еще вычисленная подзадача (путем поиска файла, в котором будут храниться результаты), см., что у него нет, запустите вычисление, затем попробуйте записать результаты в один и тот же файл одновременно. Как избежать столкновений, подобных этому?
Ответы
Ответ 1
@GP89 упомянул хорошее решение. Используйте очередь для отправки задач записи выделенному процессу, у которого есть единственный доступ для записи в файл. Все остальные работники имеют доступ только для чтения. Это устранит столкновения. Вот пример, который использует apply_async, но он также будет работать с картой:
import multiprocessing as mp
import time
fn = 'c:/temp/temp.txt'
def worker(arg, q):
'''stupidly simulates long running process'''
start = time.clock()
s = 'this is a test'
txt = s
for i in range(200000):
txt += s
done = time.clock() - start
with open(fn, 'rb') as f:
size = len(f.read())
res = 'Process' + str(arg), str(size), done
q.put(res)
return res
def listener(q):
'''listens for messages on the q, writes to file. '''
with open(fn, 'w') as f:
while 1:
m = q.get()
if m == 'kill':
f.write('killed')
break
f.write(str(m) + '\n')
f.flush()
def main():
#must use Manager queue here, or will not work
manager = mp.Manager()
q = manager.Queue()
pool = mp.Pool(mp.cpu_count() + 2)
#put listener to work first
watcher = pool.apply_async(listener, (q,))
#fire off workers
jobs = []
for i in range(80):
job = pool.apply_async(worker, (i, q))
jobs.append(job)
# collect results from the workers through the pool result queue
for job in jobs:
job.get()
#now we are done, kill the listener
q.put('kill')
pool.close()
pool.join()
if __name__ == "__main__":
main()
Ответ 2
Мне кажется, что вам нужно использовать Manager
чтобы временно сохранить результаты в список, а затем записать результаты из списка в файл. Также используйте starmap
для передачи объекта, который вы хотите обработать, и управляемого списка. Первым шагом является создание параметра для передачи в starmap
, который включает в себя управляемый список.
from multiprocessing import Manager
from multiprocessing import Pool
import pandas as pd
def worker(row, param):
# do something here and then append it to row
x = param**2
row.append(x)
if __name__ == '__main__':
pool_parameter = [] # list of objects to process
with Manager() as mgr:
row = mgr.list([])
# build list of parameters to send to starmap
for param in pool_parameter:
params.append([row,param])
with Pool() as p:
p.starmap(worker, params)
С этого момента вам нужно решить, как вы собираетесь обрабатывать список. Если у вас есть тонны оперативной памяти и огромный набор данных, не стесняйтесь объединять, используя панд. Затем вы можете очень легко сохранить файл в формате CSV или маринад.
df = pd.concat(row, ignore_index=True)
df.to_pickle('data.pickle')
df.to_csv('data.csv')