Ответ 1
Обратите внимание, что вы можете начать с массива сложного типа dtype:
In [4]: data = np.zeros(250,dtype='float32, (250000,2)float32')
и рассматривать его как массив однородного типа dtype:
In [5]: data2 = data.view('float32')
а затем преобразуйте его обратно в сложный тип dtype:
In [7]: data3 = data2.view('float32, (250000,2)float32')
Изменение dtype - очень быстрая операция; это не влияет на базовые данные, только то, как NumPy интерпретирует его. Таким образом, изменение типа dtype практически безрезультатно.
Итак, то, что вы читали о массивах с простыми (однородными) типами, может быть легко применено к вашему сложному dtype с трюком выше.
Приведенный ниже код заимствует многие идеи из J.F. Ответ Себастьяна здесь.
import numpy as np
import multiprocessing as mp
import contextlib
import ctypes
import struct
import base64
def decode(arg):
chunk, counter = arg
print len(chunk), counter
for x in chunk:
peak_counter = 0
data_buff = base64.b64decode(x)
buff_size = len(data_buff) / 4
unpack_format = ">%dL" % buff_size
index = 0
for y in struct.unpack(unpack_format, data_buff):
buff1 = struct.pack("I", y)
buff2 = struct.unpack("f", buff1)[0]
with shared_arr.get_lock():
data = tonumpyarray(shared_arr).view(
[('f0', '<f4'), ('f1', '<f4', (250000, 2))])
if (index % 2 == 0):
data[counter][1][peak_counter][0] = float(buff2)
else:
data[counter][1][peak_counter][1] = float(buff2)
peak_counter += 1
index += 1
counter += 1
def pool_init(shared_arr_):
global shared_arr
shared_arr = shared_arr_ # must be inherited, not passed as an argument
def tonumpyarray(mp_arr):
return np.frombuffer(mp_arr.get_obj())
def numpy_array(shared_arr, peaks):
"""Fills the NumPy array 'data' with m/z-intensity values acquired
from b64 decoding and unpacking the binary string read from the
mzXML file, which is stored in the list 'peaks'.
The m/z values are assumed to be ordered without validating this
assumption.
Note: This function uses multi-processing
"""
processors = mp.cpu_count()
with contextlib.closing(mp.Pool(processes=processors,
initializer=pool_init,
initargs=(shared_arr, ))) as pool:
chunk_size = int(len(peaks) / processors)
map_parameters = []
for i in range(processors):
counter = i * chunk_size
# WARNING: I removed -1 from (i + 1)*chunk_size, since the right
# index is non-inclusive.
chunk = peaks[i*chunk_size : (i + 1)*chunk_size]
map_parameters.append((chunk, counter))
pool.map(decode, map_parameters)
if __name__ == '__main__':
shared_arr = mp.Array(ctypes.c_float, (250000 * 2 * 250) + 250)
peaks = ...
numpy_array(shared_arr, peaks)
Если вы можете гарантировать, что различные процессы, выполняющие назначения
if (index % 2 == 0):
data[counter][1][peak_counter][0] = float(buff2)
else:
data[counter][1][peak_counter][1] = float(buff2)
никогда не конкурируют, чтобы изменять данные в тех же местах, тогда я считаю, что вы действительно можете отказаться от блокировки
with shared_arr.get_lock():
но я недостаточно понимаю ваш код, чтобы точно знать, поэтому, чтобы быть в безопасности, я включил замок.