Ответ 1
Как сказал в своем комментарии @Khris, вы должны разделить свой блок данных на несколько больших кусков и перебрать каждый кусок параллельно. Вы можете произвольно разделить блок данных на куски случайного размера, но имеет смысл разделить блок данных на равные по размеру фрагменты на основе количества процессов, которые вы планируете использовать. К счастью, у кого-то еще уже выяснили, как сделать эту часть для нас:
# don't forget to import
import pandas as pd
import multiprocessing
# create as many processes as there are CPUs on your machine
num_processes = multiprocessing.cpu_count()
# calculate the chunk size as an integer
chunk_size = int(df.shape[0]/num_processes)
# this solution was reworked from the above link.
# will work even if the length of the dataframe is not evenly divisible by num_processes
chunks = [df.ix[df.index[i:i + chunk_size]] for i in range(0, df.shape[0], chunk_size)]
Это создает список, содержащий наш блок данных в кусках. Теперь нам нужно передать его в наш пул вместе с функцией, которая будет управлять данными.
def func(d):
# let create a function that squares every value in the dataframe
return d * d
# create our pool with `num_processes` processes
pool = multiprocessing.Pool(processes=num_processes)
# apply our function to each chunk in the list
result = pool.map(func, chunks)
В этот момент result
будет список, содержащий каждый кусок после того, как он будет обработан. В этом случае все значения были квадратами. Теперь проблема заключается в том, что исходный фреймворк не был изменен, поэтому мы должны заменить все его существующие значения результатами нашего пула.
for i in range(len(result)):
# since result[i] is just a dataframe
# we can reassign the original dataframe based on the index of each chunk
df.ix[result[i].index] = result[i]
Теперь моя функция манипулировать моим фреймворком данных векторизована и, скорее всего, была бы быстрее, если бы я просто применил ее ко всей моей файловой системе вместо того, чтобы расщепляться на куски. Однако в вашем случае ваша функция будет перебирать каждую строку каждого фрагмента, а затем возвращать кусок. Это позволяет обрабатывать строки num_process
за раз.
def func(d):
for row in d.iterrow():
idx = row[0]
k = row[1]['Chromosome']
start,end = row[1]['Bin'].split('-')
sequence = sequence_from_coordinates(k,1,start,end) #slow download form http
d.set_value(idx,'GC%',gc_content(sequence,percent=False,verbose=False))
d.set_value(idx,'G4 repeats', sum([len(list(i)) for i in g4_scanner(sequence)]))
d.set_value(idx,'max flexibility',max([item[1] for item in dna_flex(sequence,verbose=False)]))
# return the chunk!
return d
Затем вы переназначаете значения в исходном фрейме данных, и вы успешно распараллеливали этот процесс.
Сколько процессов следует использовать?
Ваша оптимальная производительность будет зависеть от ответа на этот вопрос. Пока "ВСЕ ПРОЦЕССЫ!!!!" это один ответ, лучший ответ гораздо более тонкий. После определенного момента бросание большего количества процессов на проблему на самом деле создает дополнительные накладные расходы, чем это стоит. Это называется Закон Amdahl. Опять же, нам повезло, что другие уже решили этот вопрос для нас:
Хорошим по умолчанию является использование multiprocessing.cpu_count()
, которое является поведением по умолчанию multiprocessing.Pool
. В соответствии с документацией "Если процессы имеют значение" Нет ", тогда используется число, возвращаемое cpu_count()". Поэтому я установил num_processes
в начале multiprocessing.cpu_count()
. Таким образом, если вы перейдете на более жесткую машину, вы получите преимущества от нее, не изменяя непосредственно переменную num_processes
.