Как управлять результатами потоков python?
Я использую этот код:
def startThreads(arrayofkeywords):
global i
i = 0
while len(arrayofkeywords):
try:
if i<maxThreads:
keyword = arrayofkeywords.pop(0)
i = i+1
thread = doStuffWith(keyword)
thread.start()
except KeyboardInterrupt:
sys.exit()
thread.join()
для потоковой передачи в python, я почти все сделал, но я не знаю, как управлять результатами каждого потока, в каждом потоке у меня есть массив строк в качестве результата, как я могу объединить все эти массивы в один безопасный? Потому что, если я попытаюсь записать в глобальный массив, два потока могут писать одновременно.
Ответы
Ответ 1
Сначала вам нужно сохранить все те объекты thread
для вызова join()
на них. Как написано, вы сохраняете только последний из них, а затем только в случае отсутствия исключения.
Простым способом многопоточного программирования является предоставление каждому потоку всех необходимых ему данных, а затем не писать ни о чем за пределами этого рабочего набора. Если все нити соответствуют этому руководству, их записи не будут мешать друг другу. Затем, как только поток завершится, основной поток только агрегирует результаты в глобальный массив. Это известно как "fork/join parallelism".
Если вы подклассифицируете объект Thread, вы можете предоставить ему пространство для хранения этого возвращаемого значения без вмешательства в другие потоки. Затем вы можете сделать что-то вроде этого:
class MyThread(threading.Thread):
def __init__(self, ...):
self.result = []
...
def main():
# doStuffWith() returns a MyThread instance
threads = [ doStuffWith(k).start() for k in arrayofkeywords[:maxThreads] ]
for t in threads:
t.join()
ret = t.result
# process return value here
Edit:
Оглядываясь немного, кажется, что описанный выше метод не является предпочтительным способом создания потоков в Python. Вышеприведенное является скорее шаблоном Java-esque для потоков. Вместо этого вы можете сделать что-то вроде:
def handler(outList)
...
# Modify existing object (important!)
outList.append(1)
...
def doStuffWith(keyword):
...
result = []
thread = Thread(target=handler, args=(result,))
return (thread, result)
def main():
threads = [ doStuffWith(k) for k in arrayofkeywords[:maxThreads] ]
for t in threads:
t[0].start()
for t in threads:
t[0].join()
ret = t[1]
# process return value here
Ответ 2
Используйте экземпляр Queue.Queue
, который по своей сути является потокобезопасным. Каждый поток может .put
его результаты в этот глобальный экземпляр, когда он был сделан, а основной поток (когда он знает, что все рабочие потоки выполняются, .join
их, например, как в ответе @unholysampler) может зацикливаться .get
ting каждый результат от него и использовать каждый результат для .extend
списка "общий результат", пока очередь не опустеет.
Изменить: есть другие большие проблемы с вашим кодом - если максимальное количество потоков меньше числа ключевых слов, оно никогда не прекратится (вы пытаетесь запустить поток за ключевое слово - не меньше - но если вы уже запустили максимальные числа, вы навсегда зацикливаетесь, чтобы не иметь никакой другой цели).
Вместо этого используйте пул потоков, вроде как в этот рецепт, за исключением того, что вместо очереди вызовов вы будете ставить в очередь ключевые слова - поскольку вызываемый, который вы хотите запустить в потоке, одинаковый в каждом потоке, просто изменяя аргумент. Разумеется, этот вызываемый будет изменен, чтобы очистить что-то из очереди входящих задач (с .get
) и .put
список результатов в очередь исходящих результатов, когда это будет сделано.
Чтобы прервать N потоков, вы могли бы, после всех ключевых слов, .put
N "часовых" (например, None
, если ни одно ключевое слово не может быть None
): поток, вызываемый, будет завершен, если "ключевое слово" это просто вытащено None
.
Чаще всего Queue.Queue
предлагает лучший способ организовать потоки (и многопроцессорные!) архитектуры в Python, будь то общие, как в рецепте, который я указал вам, или более специализированном, как я предлагаю для вашего использования в двух последних абзацах.
Ответ 3
Вам нужно указывать указатели на каждую созданную вами нить. Как и в случае, ваш код обеспечивает только завершение последнего созданного потока. Это не означает, что все те, которые вы начали, еще не закончили.
def startThreads(arrayofkeywords):
global i
i = 0
threads = []
while len(arrayofkeywords):
try:
if i<maxThreads:
keyword = arrayofkeywords.pop(0)
i = i+1
thread = doStuffWith(keyword)
thread.start()
threads.append(thread)
except KeyboardInterrupt:
sys.exit()
for t in threads:
t.join()
//process results stored in each thread
Это также решает проблему доступа к записи, поскольку каждый поток будет хранить его локально. Затем, после того как все они выполнены, вы можете выполнить работу, чтобы объединить локальные данные каждого потока.
Ответ 4
Я знаю, что этот вопрос немного стар, но лучший способ сделать это - не слишком сильно навредить тому, что предложили другие коллеги:)
Пожалуйста, прочитайте ссылку на Pool. Таким образом, вы откажетесь в своей работе:
def doStuffWith(keyword):
return keyword + ' processed in thread'
def startThreads(arrayofkeywords):
pool = Pool(processes=maxThreads)
result = pool.map(doStuffWith, arrayofkeywords)
print result
Ответ 5
Запись в глобальный массив прекрасна, если вы используете семафор для защиты критического раздела. Вы "приобретаете" блокировку, когда хотите добавить к глобальному массиву, а затем "отпустите", когда вы закончите. Таким образом, каждый массив добавляет только один поток.
Просмотрите http://docs.python.org/library/threading.html и найдите семафор для получения дополнительной информации.
sem = threading.Semaphore()
...
sem.acquire()
# do dangerous stuff
sem.release()
Ответ 6
попробуйте некоторые методы семафора, такие как получение и выпуск.
http://docs.python.org/library/threading.html