Python останавливает несколько процессов, когда возвращается результат?
Я пытаюсь написать простой прокси-искатель на рабочем столе в python.
def proof_of_work(b, nBytes):
nonce = 0
# while the first nBytes of hash(b + nonce) are not 0
while sha256(b + uint2bytes(nonce))[:nBytes] != bytes(nBytes):
nonce = nonce + 1
return nonce
Теперь я пытаюсь сделать это многопроцессорным, поэтому он может использовать все ядра ЦП и быстрее найти nonce. Моя идея состоит в том, чтобы использовать multiprocessing.Pool
и выполнять функцию proof_of_work несколько раз, передавая два параметра num_of_cpus_running
и this_cpu_id
следующим образом:
def proof_of_work(b, nBytes, num_of_cpus_running, this_cpu_id):
nonce = this_cpu_id
while sha256(b + uint2bytes(nonce))[:nBytes] != bytes(nBytes):
nonce = nonce + num_of_cpus_running
return nonce
Итак, если есть 4 ядра, каждый из них будет вычислять такие числа:
core 0: 0, 4, 8, 16, 32 ...
core 1: 1, 5, 9, 17, 33 ...
core 2: 2, 6, 10, 18, 34 ...
core 3: 3, 7, 15, 31, 38 ...
Итак, я должен переписать proof_of_work
, поэтому, когда любой из процессов находит nonce, все остальные перестают искать nonces, принимая во внимание, что найденное nonce должно быть самым низким значением, для которого требуемые байты равны 0. Если по какой-то причине ЦП ускоряется, и возвращает действительное значение nonce, превышающее минимальное допустимое значение nonce, тогда доказательство работы недействительно.
Единственное, что я не знаю, как это сделать, - это часть, в которой процесс A остановится только в том случае, если в процессе B обнаружен nonce, который ниже, чем nonce, который вычисляется прямо сейчас процессом A. Если его более высокий, A продолжает вычислять (на всякий случай) до тех пор, пока он не достигнет nonce, предоставленного B.
Надеюсь, я правильно объяснил. Кроме того, если есть более быстрая реализация всего, что я написал, я хотел бы услышать об этом. Большое вам спасибо!
Ответы
Ответ 1
Один простой вариант - использовать микропакеты и проверить, был ли найден ответ. Слишком маленькие партии несут накладные расходы при запуске параллельных заданий, слишком большой размер заставляет другие процессы выполнять дополнительную работу, в то время как один процесс уже нашел ответ. Каждая партия должна занимать 1 - 10 секунд, чтобы быть эффективной.
Пример кода:
from multiprocessing import Pool
from hashlib import sha256
from time import time
def find_solution(args):
salt, nBytes, nonce_range = args
target = '0' * nBytes
for nonce in xrange(nonce_range[0], nonce_range[1]):
result = sha256(salt + str(nonce)).hexdigest()
#print('%s %s vs %s' % (result, result[:nBytes], target)); sleep(0.1)
if result[:nBytes] == target:
return (nonce, result)
return None
def proof_of_work(salt, nBytes):
n_processes = 8
batch_size = int(2.5e5)
pool = Pool(n_processes)
nonce = 0
while True:
nonce_ranges = [
(nonce + i * batch_size, nonce + (i+1) * batch_size)
for i in range(n_processes)
]
params = [
(salt, nBytes, nonce_range) for nonce_range in nonce_ranges
]
# Single-process search:
#solutions = map(find_solution, params)
# Multi-process search:
solutions = pool.map(find_solution, params)
print('Searched %d to %d' % (nonce_ranges[0][0], nonce_ranges[-1][1]-1))
# Find non-None results
solutions = filter(None, solutions)
if solutions:
return solutions
nonce += n_processes * batch_size
if __name__ == '__main__':
start = time()
solutions = proof_of_work('abc', 6)
print('\n'.join('%d => %s' % s for s in solutions))
print('Solution found in %.3f seconds' % (time() - start))
Выход (ноутбук с Core i7):
Searched 0 to 1999999
Searched 2000000 to 3999999
Searched 4000000 to 5999999
Searched 6000000 to 7999999
Searched 8000000 to 9999999
Searched 10000000 to 11999999
Searched 12000000 to 13999999
Searched 14000000 to 15999999
Searched 16000000 to 17999999
Searched 18000000 to 19999999
Searched 20000000 to 21999999
Searched 22000000 to 23999999
Searched 24000000 to 25999999
Searched 26000000 to 27999999
Searched 28000000 to 29999999
Searched 30000000 to 31999999
Searched 32000000 to 33999999
Searched 34000000 to 35999999
Searched 36000000 to 37999999
37196346 => 000000f4c9aee9d427dc94316fd49192a07f1aeca52f6b7c3bb76be10c5adf4d
Solution found in 20.536 seconds
С одним ядром потребовалось 76,468 секунды. В любом случае, это далеко не самый эффективный способ найти решение, но он работает. Например, если длина salt
длинна, состояние SHA-256
может быть предварительно вычислено после того, как соль будет поглощена и продолжит поиск грубой силы. Также массив байтов может быть более эффективным, чем hexdigest()
.
Ответ 2
Общий способ сделать это:
- подумайте о рабочих пакетах, например. для выполнения расчета для определенного диапазона диапазон не должен занимать много времени, скажем, от 0,1 секунды до секунды
- У некоторых менеджеров распределите рабочие пакеты с рабочим
- после того, как рабочий пакет завершен, сообщите менеджеру результат и запросите новый рабочий пакет
- Если работа выполнена и результат найден, примите результаты от работников и дайте им сигнал о том, что больше не нужно выполнять работу - теперь рабочие могут безопасно завершать
Таким образом, вам не нужно проверять с менеджером каждую итерацию (которая будет замедлять все) или делать неприятные вещи, такие как остановка нити в середине сеанса. Излишне говорить, что менеджер должен быть потокобезопасным.
Это идеально подходит для вашей модели, поскольку вам все еще нужны результаты других работников, даже если результат найден.
Обратите внимание, что в вашей модели может быть, что поток может не синхронизироваться с другими потоками, отставая. Вы не хотите делать еще миллион вычислений, как только результат будет найден. Я просто повторяю это из вопроса, потому что я думаю, что модель неверна. Вы должны исправить эту модель, а не исправлять ее реализацию.
Ответ 3
Вы можете использовать multiprocessing.Queue(). У вас есть очередь на процессор/процесс. Когда процесс находит nonce, он помещает его в очередь других процессов. Другие процессы проверяют свою очередь (неблокирующую) на каждой итерации цикла while, и если в ней есть что-то, они решают продолжить или прекратить работу на основе значения в очереди:
def proof_of_work(b, nBytes, num_of_cpus_running, this_cpu_id, qSelf, qOthers):
nonce = this_cpu_id
while sha256(b + uint2bytes(nonce))[:nBytes] != bytes(nBytes):
nonce = nonce + num_of_cpus_running
try:
otherNonce = qSelf.get(block=False)
if otherNonce < nonce:
return
except:
pass
for q in qOthers:
q.put(nonce)
return nonce
qДругие - это список очередей (каждая очередь = multiprocessing.Queue()), принадлежащая другим процессам.
Если вы решили использовать очереди, как я предложил, вы должны иметь возможность написать лучшую/лучшую реализацию вышеуказанного подхода.