Как получить возвращаемое значение из потока в Python?
Функция foo
ниже возвращает строку 'foo'
. Как я могу получить значение 'foo'
которое возвращается из цели потока?
from threading import Thread
def foo(bar):
print('hello {}'.format(bar))
return 'foo'
thread = Thread(target=foo, args=('world!',))
thread.start()
return_value = thread.join()
"Очевидный способ сделать это", показанный выше, не работает: thread.join()
вернул None
.
Ответы
Ответ 1
FWIW, модуль multiprocessing
имеет приятный интерфейс для этого, используя класс Pool
. И если вы хотите придерживаться потоков, а не процессов, вы можете просто использовать класс multiprocessing.pool.ThreadPool
как замену.
def foo(bar, baz):
print 'hello {0}'.format(bar)
return 'foo' + baz
from multiprocessing.pool import ThreadPool
pool = ThreadPool(processes=1)
async_result = pool.apply_async(foo, ('world', 'foo')) # tuple of args for foo
# do some other stuff in the main process
return_val = async_result.get() # get the return value from your function.
Ответ 2
Один из способов, которые я видел, - это передать изменяемый объект, такой как список или словарь, в конструктор потока вместе с индексом или другим идентификатором некоторого вида. Затем поток может сохранить свои результаты в своем выделенном слоте в этом объекте. Например:
def foo(bar, result, index):
print 'hello {0}'.format(bar)
result[index] = "foo"
from threading import Thread
threads = [None] * 10
results = [None] * 10
for i in range(len(threads)):
threads[i] = Thread(target=foo, args=('world!', results, i))
threads[i].start()
# do some other stuff
for i in range(len(threads)):
threads[i].join()
print " ".join(results) # what sound does a metasyntactic locomotive make?
Если вы действительно хотите, чтобы join()
возвращал возвращаемое значение вызванной функции, вы можете сделать это с помощью подкласса Thread
как показано ниже:
from threading import Thread
def foo(bar):
print 'hello {0}'.format(bar)
return "foo"
class ThreadWithReturnValue(Thread):
def __init__(self, group=None, target=None, name=None,
args=(), kwargs={}, Verbose=None):
Thread.__init__(self, group, target, name, args, kwargs, Verbose)
self._return = None
def run(self):
if self._Thread__target is not None:
self._return = self._Thread__target(*self._Thread__args,
**self._Thread__kwargs)
def join(self):
Thread.join(self)
return self._return
twrv = ThreadWithReturnValue(target=foo, args=('world!',))
twrv.start()
print twrv.join() # prints foo
Это становится немного странным из-за некоторого искажения имени, и оно получает доступ к "частным" структурам данных, которые специфичны для реализации Thread
... но это работает.
Для python3
class ThreadWithReturnValue(Thread):
def __init__(self, group=None, target=None, name=None,
args=(), kwargs={}, Verbose=None):
Thread.__init__(self, group, target, name, args, kwargs)
self._return = None
def run(self):
print(type(self._target))
if self._target is not None:
self._return = self._target(*self._args,
**self._kwargs)
def join(self, *args):
Thread.join(self, *args)
return self._return
Ответ 3
Ответ Джейка хорош, но если вы не хотите использовать threadpool (вы не знаете, сколько потоков вам понадобится, но создавайте их по мере необходимости), тогда хороший способ передачи информации между потоками является встроенным Queue.Queue, поскольку он обеспечивает безопасность потоков.
Я создал следующий декоратор, чтобы заставить его действовать так же, как и threadpool:
def threaded(f, daemon=False):
import Queue
def wrapped_f(q, *args, **kwargs):
'''this function calls the decorated function and puts the
result in a queue'''
ret = f(*args, **kwargs)
q.put(ret)
def wrap(*args, **kwargs):
'''this is the function returned from the decorator. It fires off
wrapped_f in a new thread and returns the thread object with
the result queue attached'''
q = Queue.Queue()
t = threading.Thread(target=wrapped_f, args=(q,)+args, kwargs=kwargs)
t.daemon = daemon
t.start()
t.result_queue = q
return t
return wrap
Затем вы просто используете его как:
@threaded
def long_task(x):
import time
x = x + 5
time.sleep(5)
return x
# does not block, returns Thread object
y = long_task(10)
print y
# this blocks, waiting for the result
result = y.result_queue.get()
print result
Декорированная функция создает новый поток при каждом вызове и возвращает объект Thread, который содержит очередь, которая получит результат.
ОБНОВИТЬ
Прошло довольно много времени с тех пор, как я опубликовал этот ответ, но он по-прежнему получает представления, поэтому я решил обновить его, чтобы отразить то, как я это делаю в новых версиях Python:
Python 3.2 добавлен в модуль concurrent.futures
который обеспечивает интерфейс высокого уровня для параллельных задач. Он предоставляет ThreadPoolExecutor
и ProcessPoolExecutor
, поэтому вы можете использовать поток или пул процессов с тем же api.
Одно из преимуществ этого api заключается в том, что отправка задачи в Executor
возвращает объект Future
, который будет содержать возвращаемое значение вызываемого вами объекта.
Это делает ненужным ненужный объект queue
, что довольно упрощает декор:
_DEFAULT_POOL = ThreadPoolExecutor()
def threadpool(f, executor=None):
@wraps(f)
def wrap(*args, **kwargs):
return (executor or _DEFAULT_POOL).submit(f, *args, **kwargs)
return wrap
Это будет использовать исполняющий модуль нитью по умолчанию, если он не передан.
Использование очень похоже на:
@threadpool
def long_task(x):
import time
x = x + 5
time.sleep(5)
return x
# does not block, returns Future object
y = long_task(10)
print y
# this blocks, waiting for the result
result = y.result()
print result
Если вы используете Python 3. 4+, то очень хорошая особенность использования этого метода (и объектов Future в целом) заключается в том, что возвращенное будущее можно обернуть, чтобы превратить его в asyncio.Future
с asyncio.wrap_future
. Это позволяет легко работать с сопрограммами:
result = await asyncio.wrap_future(long_task(10))
Если вам не нужен доступ к базовому concurrent.Future
объекту concurrent.Future
, вы можете включить обертку в декоратор:
_DEFAULT_POOL = ThreadPoolExecutor()
def threadpool(f, executor=None):
@wraps(f)
def wrap(*args, **kwargs):
return asyncio.wrap_future((executor or _DEFAULT_POOL).submit(f, *args, **kwargs))
return wrap
Затем, когда вам нужно вытолкнуть интенсивный процессор или блокировать код из потока цикла событий, вы можете поместить его в украшенную функцию:
@threadpool
def some_long_calculation():
...
# this will suspend while the function is executed on a threadpool
result = await some_long_calculation()
Ответ 4
Другое решение, которое не требует изменения существующего кода:
import Queue
from threading import Thread
def foo(bar):
print 'hello {0}'.format(bar)
return 'foo'
que = Queue.Queue()
t = Thread(target=lambda q, arg1: q.put(foo(arg1)), args=(que, 'world!'))
t.start()
t.join()
result = que.get()
print result
Его также можно легко настроить в многопоточной среде:
import Queue
from threading import Thread
def foo(bar):
print 'hello {0}'.format(bar)
return 'foo'
que = Queue.Queue()
threads_list = list()
t = Thread(target=lambda q, arg1: q.put(foo(arg1)), args=(que, 'world!'))
t.start()
threads_list.append(t)
# Add more threads here
...
threads_list.append(t2)
...
threads_list.append(t3)
...
# Join all the threads
for t in threads_list:
t.join()
# Check thread return value
while not que.empty():
result = que.get()
print result
Ответ 5
Parris/kindall answer join
/return
answer портирован на Python 3:
from threading import Thread
def foo(bar):
print('hello {0}'.format(bar))
return "foo"
class ThreadWithReturnValue(Thread):
def __init__(self, group=None, target=None, name=None, args=(), kwargs=None, *, daemon=None):
Thread.__init__(self, group, target, name, args, kwargs, daemon=daemon)
self._return = None
def run(self):
if self._target is not None:
self._return = self._target(*self._args, **self._kwargs)
def join(self):
Thread.join(self)
return self._return
twrv = ThreadWithReturnValue(target=foo, args=('world!',))
twrv.start()
print(twrv.join()) # prints foo
Обратите внимание, что класс Thread
реализован по-разному в Python 3.
Ответ 6
Я украл добрый ответ и немного очистил его.
Ключевой частью является добавление * args и ** kwargs для join() для обработки таймаута
class threadWithReturn(Thread):
def __init__(self, *args, **kwargs):
super(threadWithReturn, self).__init__(*args, **kwargs)
self._return = None
def run(self):
if self._Thread__target is not None:
self._return = self._Thread__target(*self._Thread__args, **self._Thread__kwargs)
def join(self, *args, **kwargs):
super(threadWithReturn, self).join(*args, **kwargs)
return self._return
ОБНОВЛЕННЫЙ ОТВЕТ НИЖЕ
Это мой самый популярный ответ, поэтому я решил обновить код, который будет работать как на py2, так и на py3.
Кроме того, я вижу много ответов на этот вопрос, которые показывают отсутствие понимания Thread.join(). Некоторые полностью не справляются с timeout
arg. Но есть и угловой случай, когда вы должны знать о случаях, когда у вас есть (1) целевая функция, которая может вернуть None
и (2) вы также передаете аргумент timeout
для join(). Пожалуйста, см. "ТЕСТ 4", чтобы понять этот угловой случай.
ThreadWithReturn класс, который работает с py2 и py3:
import sys
from threading import Thread
from builtins import super # https://stackoverflow.com/a/30159479
if sys.version_info >= (3, 0):
_thread_target_key = '_target'
_thread_args_key = '_args'
_thread_kwargs_key = '_kwargs'
else:
_thread_target_key = '_Thread__target'
_thread_args_key = '_Thread__args'
_thread_kwargs_key = '_Thread__kwargs'
class ThreadWithReturn(Thread):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._return = None
def run(self):
target = getattr(self, _thread_target_key)
if not target is None:
self._return = target(*getattr(self, _thread_args_key), **getattr(self, _thread_kwargs_key))
def join(self, *args, **kwargs):
super().join(*args, **kwargs)
return self._return
Ниже приведены некоторые примеры тестов:
import time, random
# TEST TARGET FUNCTION
def giveMe(arg, seconds=None):
if not seconds is None:
time.sleep(seconds)
return arg
# TEST 1
my_thread = ThreadWithReturn(target=giveMe, args=('stringy',))
my_thread.start()
returned = my_thread.join()
# (returned == 'stringy')
# TEST 2
my_thread = ThreadWithReturn(target=giveMe, args=(None,))
my_thread.start()
returned = my_thread.join()
# (returned is None)
# TEST 3
my_thread = ThreadWithReturn(target=giveMe, args=('stringy',), kwargs={'seconds': 5})
my_thread.start()
returned = my_thread.join(timeout=2)
# (returned is None) # because join() timed out before giveMe() finished
# TEST 4
my_thread = ThreadWithReturn(target=giveMe, args=(None,), kwargs={'seconds': 5})
my_thread.start()
returned = my_thread.join(timeout=random.randint(1, 10))
Можете ли вы определить угловой случай, с которым мы можем столкнуться с TEST 4?
Проблема в том, что мы ожидаем, что giveMe() вернет None (см. TEST 2), но мы также ожидаем, что join() вернет None, если он истечет.
returned is None
означает либо:
(1), что возвращаемое свойство giveMe(), или
(2) время соединения()
Этот пример тривиален, так как мы знаем, что giveMe() всегда будет возвращать None. Но в реальном экземпляре (где цель может законно вернуть None или что-то еще), мы хотим явно проверить, что произошло.
Ниже приведено, как обратиться к этому углу:
# TEST 4
my_thread = ThreadWithReturn(target=giveMe, args=(None,), kwargs={'seconds': 5})
my_thread.start()
returned = my_thread.join(timeout=random.randint(1, 10))
if my_thread.isAlive():
# returned is None because join() timed out
# this also means that giveMe() is still running in the background
pass
# handle this based on your app logic
else:
# join() is finished, and so is giveMe()
# BUT we could also be in a race condition, so we need to update returned, just in case
returned = my_thread.join()
Ответ 7
Использование очереди:
import threading, queue
def calc_square(num, out_queue1):
l = []
for x in num:
l.append(x*x)
out_queue1.put(l)
arr = [1,2,3,4,5,6,7,8,9,10]
out_queue1=queue.Queue()
t1=threading.Thread(target=calc_square, args=(arr,out_queue1))
t1.start()
t1.join()
print (out_queue1.get())
Ответ 8
Мое решение проблемы состоит в том, чтобы обернуть функцию и поток в классе. Не требует использования пулов, очередей или переменной переменной типа c. Он также не блокирует. Вместо этого вы проверяете статус. См. Пример использования в конце кода.
import threading
class ThreadWorker():
'''
The basic idea is given a function create an object.
The object can then run the function in a thread.
It provides a wrapper to start it,check its status,and get data out the function.
'''
def __init__(self,func):
self.thread = None
self.data = None
self.func = self.save_data(func)
def save_data(self,func):
'''modify function to save its returned data'''
def new_func(*args, **kwargs):
self.data=func(*args, **kwargs)
return new_func
def start(self,params):
self.data = None
if self.thread is not None:
if self.thread.isAlive():
return 'running' #could raise exception here
#unless thread exists and is alive start or restart it
self.thread = threading.Thread(target=self.func,args=params)
self.thread.start()
return 'started'
def status(self):
if self.thread is None:
return 'not_started'
else:
if self.thread.isAlive():
return 'running'
else:
return 'finished'
def get_results(self):
if self.thread is None:
return 'not_started' #could return exception
else:
if self.thread.isAlive():
return 'running'
else:
return self.data
def add(x,y):
return x +y
add_worker = ThreadWorker(add)
print add_worker.start((1,2,))
print add_worker.status()
print add_worker.get_results()
Ответ 9
Вы можете определить mutable над областью функции threaded и добавить к ней результат. (Я также модифицировал код, совместимый с python3)
returns = {}
def foo(bar):
print('hello {0}'.format(bar))
returns[bar] = 'foo'
from threading import Thread
t = Thread(target=foo, args=('world!',))
t.start()
t.join()
print(returns)
Это возвращает {'world!': 'foo'}
Если вы используете ввод функции в качестве ключа к вашему запросу dict, каждый уникальный вход гарантированно даст запись в результатах
Ответ 10
Вы можете использовать Пул в качестве пула рабочих процессов, как показано ниже:
from multiprocessing import Pool
def f1(x, y):
return x*y
if __name__ == '__main__':
with Pool(processes=10) as pool:
result = pool.apply(f1, (2, 3))
print(result)
Ответ 11
Принимая во внимание комментарий @iman в ответе @JakeBiesinger, я переписал его для различного количества потоков:
from multiprocessing.pool import ThreadPool
def foo(bar, baz):
print 'hello {0}'.format(bar)
return 'foo' + baz
numOfThreads = 3
results = []
pool = ThreadPool(numOfThreads)
for i in range(0, numOfThreads):
results.append(pool.apply_async(foo, ('world', 'foo'))) # tuple of args for foo)
# do some other stuff in the main process
# ...
# ...
results = [r.get() for r in results]
print results
pool.close()
pool.join()
Приветствия,
Гай.
Ответ 12
join
всегда возвращать None
, я думаю, что вы должны подклассом Thread
обрабатывать коды возврата и т.д.
Ответ 13
Я использую эту оболочку, которая удобно превращает любую функцию для работы в Thread
- заботясь о ее возвращаемом значении или исключении. Он не добавляет накладные расходы Queue
.
def threading_func(f):
"""Decorator for running a function in a thread and handling its return
value or exception"""
def start(*args, **kw):
def run():
try:
th.ret = f(*args, **kw)
except:
th.exc = sys.exc_info()
def get(timeout=None):
th.join(timeout)
if th.exc:
raise th.exc[0], th.exc[1], th.exc[2] # py2
##raise th.exc[1] #py3
return th.ret
th = threading.Thread(None, run)
th.exc = None
th.get = get
th.start()
return th
return start
Примеры использования
def f(x):
return 2.5 * x
th = threading_func(f)(4)
print("still running?:", th.is_alive())
print("result:", th.get(timeout=1.0))
@threading_func
def th_mul(a, b):
return a * b
th = th_mul("text", 2.5)
try:
print(th.get())
except TypeError:
print("exception thrown ok.")
Примечания к модулю threading
Удобное возвращаемое значение и обработка исключений для потоковой функции является частой "питонической" потребностью и должно быть уже предложено модулем threading
- возможно, непосредственно в стандартном классе Thread
. ThreadPool
имеет слишком много накладных расходов для простых задач - 3 управления потоками, много бюрократии. К сожалению, макет Thread
был скопирован с Java изначально - что вы видите, например. из бесполезного 1-го (!) параметра конструктора group
.
Ответ 14
Как уже упоминалось, многопроцессорный пул намного медленнее основного потока. Использование очередей, предложенных в некоторых ответах, является очень эффективной альтернативой. Я использую его со словарями, чтобы иметь возможность запускать множество небольших потоков и восстанавливать несколько ответов, объединяя их со словарями:
#!/usr/bin/env python3
import threading
# use Queue for python2
import queue
import random
LETTERS = 'abcdefghijklmnopqrstuvwxyz'
LETTERS = [ x for x in LETTERS ]
NUMBERS = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
def randoms(k, q):
result = dict()
result['letter'] = random.choice(LETTERS)
result['number'] = random.choice(NUMBERS)
q.put({k: result})
threads = list()
q = queue.Queue()
results = dict()
for name in ('alpha', 'oscar', 'yankee',):
threads.append( threading.Thread(target=randoms, args=(name, q)) )
threads[-1].start()
_ = [ t.join() for t in threads ]
while not q.empty():
results.update(q.get())
print(results)
Ответ 15
Определите свою цель для поиска
1) принять аргумент q
2) замените любые утверждения return foo
на q.put(foo); return
поэтому функция
def func(a):
ans = a * a
return ans
станет
def func(a, q):
ans = a * a
q.put(ans)
return
а затем вы будете действовать как таковой
from Queue import Queue
from threading import Thread
ans_q = Queue()
arg_tups = [(i, ans_q) for i in xrange(10)]
threads = [Thread(target=func, args=arg_tup) for arg_tup in arg_tups]
_ = [t.start() for t in threads]
_ = [t.join() for t in threads]
results = [q.get() for _ in xrange(len(threads))]
И вы можете использовать декораторы/обертки функций, чтобы вы могли использовать ваши существующие функции как target
без их модификации, но следуйте этой базовой схеме.
Ответ 16
Одно обычное решение состоит в том, чтобы обернуть вашу функцию foo
декоратором, например
result = queue.Queue()
def task_wrapper(*args):
result.put(target(*args))
Тогда весь код может выглядеть так:
result = queue.Queue()
def task_wrapper(*args):
result.put(target(*args))
threads = [threading.Thread(target=task_wrapper, args=args) for args in args_list]
for t in threads:
t.start()
while(True):
if(len(threading.enumerate()) < max_num):
break
for t in threads:
t.join()
return result
Примечание
Одна из важных проблем заключается в том, что возвращаемые значения могут быть unorderred.
(Фактически, return value
не обязательно сохраняется в queue
, так как вы можете выбрать произвольную структуру потокобезопасной)
Ответ 17
Почему бы просто не использовать глобальную переменную?
import threading
class myThread(threading.Thread):
def __init__(self, ind, lock):
threading.Thread.__init__(self)
self.ind = ind
self.lock = lock
def run(self):
global results
with self.lock:
results.append(self.ind)
results = []
lock = threading.Lock()
threads = [myThread(x, lock) for x in range(1, 4)]
for t in threads:
t.start()
for t in threads:
t.join()
print(results)
Ответ 18
Очень простой способ сделать это для таких манекенов, как я:
import queue
import threading
# creating queue instance
q = queue.Queue()
# creating threading class
class AnyThread():
def __init__ (self):
threading.Thread.__init__(self)
def run(self):
# in this class and function we will put our test target function
test()
t = AnyThread()
# having our test target function
def test():
# do something in this function:
result = 3 + 2
# and put result to a queue instance
q.put(result)
for i in range(3): #calling our threading fucntion 3 times (just for example)
t.run()
output = q.get() # here we get output from queue instance
print(output)
>>> 5
>>> 5
>>> 5
главное здесь - модуль queue
. Мы создаем экземпляр queue.Queue()
и включаем его в нашу функцию. Мы кормим его своим результатом, который позже мы выходим за пределы потока.
См. еще один пример с аргументами, переданными нашей тестовой функции:
import queue
import threading
# creating queue instance
q = queue.Queue()
# creating threading class
class AnyThread():
def __init__ (self):
threading.Thread.__init__(self)
def run(self, a, b):
# in this class and function we will put our execution test function
test(a, b)
t = AnyThread()
# having our test target function
def test(a, b):
# do something in this function:
result = a + b
# and put result to a queue instance
q.put(result)
for i in range(3): #calling our threading fucntion 3 times (just for example)
t.run(3+i, 2+i)
output = q.get() # here we get output from queue instance
print(output)
>>> 5
>>> 7
>>> 9
Ответ 19
Идея GuySoft отличная, но я думаю, что объект не обязательно должен наследовать от Thread, а start() может быть удален из интерфейса:
from threading import Thread
import queue
class ThreadWithReturnValue(object):
def __init__(self, target=None, args=(), **kwargs):
self._que = queue.Queue()
self._t = Thread(target=lambda q,arg1,kwargs1: q.put(target(*arg1, **kwargs1)) ,
args=(self._que, args, kwargs), )
self._t.start()
def join(self):
self._t.join()
return self._que.get()
def foo(bar):
print('hello {0}'.format(bar))
return "foo"
twrv = ThreadWithReturnValue(target=foo, args=('world!',))
print(twrv.join()) # prints foo
Ответ 20
Ответ на Kindall в Python3
class ThreadWithReturnValue(Thread):
def __init__(self, group=None, target=None, name=None,
args=(), kwargs={}, *, daemon=None):
Thread.__init__(self, group, target, name, args, kwargs, daemon)
self._return = None
def run(self):
try:
if self._target:
self._return = self._target(*self._args, **self._kwargs)
finally:
del self._target, self._args, self._kwargs
def join(self,timeout=None):
Thread.join(self,timeout)
return self._return
Ответ 21
Если только True или False должны быть проверены при вызове функции, более простое решение, которое я нахожу, это обновление глобального списка.
import threading
lists = {"A":"True", "B":"True"}
def myfunc(name: str, mylist):
for i in mylist:
if i == 31:
lists[name] = "False"
return False
else:
print("name {} : {}".format(name, i))
t1 = threading.Thread(target=myfunc, args=("A", [1, 2, 3, 4, 5, 6], ))
t2 = threading.Thread(target=myfunc, args=("B", [11, 21, 31, 41, 51, 61], ))
t1.start()
t2.start()
t1.join()
t2.join()
for value in lists.values():
if value == False:
# Something is suspicious
# Take necessary action
Это более полезно, если вы хотите узнать, вернул ли какой-либо из потоков ложное состояние, чтобы предпринять необходимые действия.