Python threading: могу ли я спать на двух threading.Event() s одновременно?
Если у меня есть два объекта threading.Event()
и хотите спать, пока не будет установлен один из них, существует ли эффективный способ сделать это в python? Ясно, что я мог бы сделать что-то с опросом/тайм-аутами, но мне бы хотелось, чтобы поток был спящим до тех пор, пока он не будет установлен, сродни тому, как select
используется для дескрипторов файлов.
Итак, в следующей реализации, как бы выглядела эффективная реализация non-poll wait_for_either
?
a = threading.Event()
b = threading.Event()
wait_for_either(a, b)
Ответы
Ответ 1
Ниже приведено нерегулярное потоковое решение без опроса: измените существующий Event
, чтобы запустить обратный вызов, когда они меняются, и обрабатывать настройку нового события в этом обратном вызове:
import threading
def or_set(self):
self._set()
self.changed()
def or_clear(self):
self._clear()
self.changed()
def orify(e, changed_callback):
e._set = e.set
e._clear = e.clear
e.changed = changed_callback
e.set = lambda: or_set(e)
e.clear = lambda: or_clear(e)
def OrEvent(*events):
or_event = threading.Event()
def changed():
bools = [e.is_set() for e in events]
if any(bools):
or_event.set()
else:
or_event.clear()
for e in events:
orify(e, changed)
changed()
return or_event
Использование образца:
def wait_on(name, e):
print "Waiting on %s..." % (name,)
e.wait()
print "%s fired!" % (name,)
def test():
import time
e1 = threading.Event()
e2 = threading.Event()
or_e = OrEvent(e1, e2)
threading.Thread(target=wait_on, args=('e1', e1)).start()
time.sleep(0.05)
threading.Thread(target=wait_on, args=('e2', e2)).start()
time.sleep(0.05)
threading.Thread(target=wait_on, args=('or_e', or_e)).start()
time.sleep(0.05)
print "Firing e1 in 2 seconds..."
time.sleep(2)
e1.set()
time.sleep(0.05)
print "Firing e2 in 2 seconds..."
time.sleep(2)
e2.set()
time.sleep(0.05)
Результат:
Waiting on e1...
Waiting on e2...
Waiting on or_e...
Firing e1 in 2 seconds...
e1 fired!or_e fired!
Firing e2 in 2 seconds...
e2 fired!
Это должно быть поточно-безопасным. Любые комментарии приветствуются.
EDIT: О, и вот ваша функция wait_for_either
, хотя способ, которым я написал код, лучше всего сделать и передать вокруг or_event
. Обратите внимание, что or_event
не следует устанавливать или очищать вручную.
def wait_for_either(e1, e2):
OrEvent(e1, e2).wait()
Ответ 2
Одним из решений (с опросом) было бы выполнение последовательных ожиданий на каждом Event
в цикле
def wait_for_either(a, b):
while True:
if a.wait(tunable_timeout):
break
if b.wait(tunable_timeout):
break
Я думаю, что если вы настроите время ожидания достаточно хорошо, результаты будут в порядке.
Лучший не-опрос, о котором я могу думать, - подождать каждого в другом потоке и установить общий Event
, которого вы будете ждать в основном потоке.
def repeat_trigger(waiter, trigger):
waiter.wait()
trigger.set()
def wait_for_either(a, b):
trigger = threading.Event()
ta = threading.Thread(target=repeat_trigger, args=(a, trigger))
tb = threading.Thread(target=repeat_trigger, args=(b, trigger))
ta.start()
tb.start()
# Now do the union waiting
trigger.wait()
Довольно интересно, поэтому я написал версию OOP предыдущего решения:
class EventUnion(object):
"""Register Event objects and wait for release when any of them is set"""
def __init__(self, ev_list=None):
self._trigger = Event()
if ev_list:
# Make a list of threads, one for each Event
self._t_list = [
Thread(target=self._triggerer, args=(ev, ))
for ev in ev_list
]
else:
self._t_list = []
def register(self, ev):
"""Register a new Event"""
self._t_list.append(Thread(target=self._triggerer, args=(ev, )))
def wait(self, timeout=None):
"""Start waiting until any one of the registred Event is set"""
# Start all the threads
map(lambda t: t.start(), self._t_list)
# Now do the union waiting
return self._trigger.wait(timeout)
def _triggerer(self, ev):
ev.wait()
self._trigger.set()
Ответ 3
Запуск дополнительных потоков кажется ясным решением, но не очень эффективным.
Функция wait_events будет блокировать использование любого из событий.
def wait_events(*events):
event_share = Event()
def set_event_share(event):
event.wait()
event.clear()
event_share.set()
for event in events:
Thread(target=set_event_share(event)).start()
event_share.wait()
wait_events(event1, event2, event3)
Ответ 4
Расширение ответа Клаудиу, где вы можете либо ждать события 1 ИЛИ событие 2., либо событие 1 И даже 2.
from threading import Thread, Event, _Event
class ConditionalEvent(_Event):
def __init__(self, events_list, condition):
_Event.__init__(self)
self.event_list = events_list
self.condition = condition
for e in events_list:
self._setup(e, self._state_changed)
self._state_changed()
def _state_changed(self):
bools = [e.is_set() for e in self.event_list]
if self.condition == 'or':
if any(bools):
self.set()
else:
self.clear()
elif self.condition == 'and':
if all(bools):
self.set()
else:
self.clear()
def _custom_set(self,e):
e._set()
e._state_changed()
def _custom_clear(self,e):
e._clear()
e._state_changed()
def _setup(self, e, changed_callback):
e._set = e.set
e._clear = e.clear
e._state_changed = changed_callback
e.set = lambda: self._custom_set(e)
e.clear = lambda: self._custom_clear(e)
Пример использования будет очень похож, как раньше
import time
e1 = Event()
e2 = Event()
or_e = ConditionalEvent([e1, e2], 'or')
Thread(target=wait_on, args=('e1', e1)).start()
time.sleep(0.05)
Thread(target=wait_on, args=('e2', e2)).start()
time.sleep(0.05)
Thread(target=wait_on, args=('or_e', or_e)).start()
time.sleep(0.05)
print "Firing e1 in 2 seconds..."
time.sleep(2)
e1.set()
time.sleep(0.05)
print "Firing e2 in 2 seconds..."
time.sleep(2)
e2.set()
time.sleep(0.05)
Ответ 5
Не очень, но вы можете использовать два дополнительных потока для мультиплексирования событий...
def wait_for_either(a, b):
flag = False #some condition variable, event, or similar
class Event_Waiter(threading.Thread):
def __init__(self, event):
self.e = event
def run(self):
self.e.wait()
flag.set()
a_thread = Event_Waiter(a)
b_thread = Event_Waiter(b)
a.start()
b.start()
flag.wait()
Примечание. Возможно, вам придется беспокоиться о том, чтобы случайно получить оба события, если они прибудут слишком быстро. Вспомогательные потоки (a_thread и b_thread) должны блокировать синхронизацию вокруг попытки установить флаг, а затем должны убить другой поток (возможно, сбросив это событие потока, если оно было уничтожено).
Ответ 6
def wait_for_event_timeout(*events):
while not all([e.isSet() for e in events]):
#Check to see if the event is set. Timeout 1 sec.
ev_wait_bool=[e.wait(1) for e in events]
# Process if all events are set. Change all to any to process if any event set
if all(ev_wait_bool):
logging.debug('processing event')
else:
logging.debug('doing other work')
e1 = threading.Event()
e2 = threading.Event()
t3 = threading.Thread(name='non-block-multi',
target=wait_for_event_timeout,
args=(e1,e2))
t3.start()
logging.debug('Waiting before calling Event.set()')
time.sleep(5)
e1.set()
time.sleep(10)
e2.set()
logging.debug('Event is set')