Python threading: могу ли я спать на двух threading.Event() s одновременно?

Если у меня есть два объекта threading.Event() и хотите спать, пока не будет установлен один из них, существует ли эффективный способ сделать это в python? Ясно, что я мог бы сделать что-то с опросом/тайм-аутами, но мне бы хотелось, чтобы поток был спящим до тех пор, пока он не будет установлен, сродни тому, как select используется для дескрипторов файлов.

Итак, в следующей реализации, как бы выглядела эффективная реализация non-poll wait_for_either?

a = threading.Event()
b = threading.Event()

wait_for_either(a, b)

Ответы

Ответ 1

Ниже приведено нерегулярное потоковое решение без опроса: измените существующий Event, чтобы запустить обратный вызов, когда они меняются, и обрабатывать настройку нового события в этом обратном вызове:

import threading

def or_set(self):
    self._set()
    self.changed()

def or_clear(self):
    self._clear()
    self.changed()

def orify(e, changed_callback):
    e._set = e.set
    e._clear = e.clear
    e.changed = changed_callback
    e.set = lambda: or_set(e)
    e.clear = lambda: or_clear(e)

def OrEvent(*events):
    or_event = threading.Event()
    def changed():
        bools = [e.is_set() for e in events]
        if any(bools):
            or_event.set()
        else:
            or_event.clear()
    for e in events:
        orify(e, changed)
    changed()
    return or_event

Использование образца:

def wait_on(name, e):
    print "Waiting on %s..." % (name,)
    e.wait()
    print "%s fired!" % (name,)

def test():
    import time

    e1 = threading.Event()
    e2 = threading.Event()

    or_e = OrEvent(e1, e2)

    threading.Thread(target=wait_on, args=('e1', e1)).start()
    time.sleep(0.05)
    threading.Thread(target=wait_on, args=('e2', e2)).start()
    time.sleep(0.05)
    threading.Thread(target=wait_on, args=('or_e', or_e)).start()
    time.sleep(0.05)

    print "Firing e1 in 2 seconds..."
    time.sleep(2)
    e1.set()
    time.sleep(0.05)

    print "Firing e2 in 2 seconds..."
    time.sleep(2)
    e2.set()
    time.sleep(0.05)

Результат:

Waiting on e1...
Waiting on e2...
Waiting on or_e...
Firing e1 in 2 seconds...
e1 fired!or_e fired!

Firing e2 in 2 seconds...
e2 fired!

Это должно быть поточно-безопасным. Любые комментарии приветствуются.

EDIT: О, и вот ваша функция wait_for_either, хотя способ, которым я написал код, лучше всего сделать и передать вокруг or_event. Обратите внимание, что or_event не следует устанавливать или очищать вручную.

def wait_for_either(e1, e2):
    OrEvent(e1, e2).wait()

Ответ 2

Одним из решений (с опросом) было бы выполнение последовательных ожиданий на каждом Event в цикле

def wait_for_either(a, b):
    while True:
        if a.wait(tunable_timeout):
            break
        if b.wait(tunable_timeout):
            break

Я думаю, что если вы настроите время ожидания достаточно хорошо, результаты будут в порядке.


Лучший не-опрос, о котором я могу думать, - подождать каждого в другом потоке и установить общий Event, которого вы будете ждать в основном потоке.

def repeat_trigger(waiter, trigger):
    waiter.wait()
    trigger.set()

def wait_for_either(a, b):
    trigger = threading.Event()
    ta = threading.Thread(target=repeat_trigger, args=(a, trigger))
    tb = threading.Thread(target=repeat_trigger, args=(b, trigger))
    ta.start()
    tb.start()
    # Now do the union waiting
    trigger.wait()

Довольно интересно, поэтому я написал версию OOP предыдущего решения:

class EventUnion(object):
    """Register Event objects and wait for release when any of them is set"""
    def __init__(self, ev_list=None):
        self._trigger = Event()
        if ev_list:
            # Make a list of threads, one for each Event
            self._t_list = [
                Thread(target=self._triggerer, args=(ev, ))
                for ev in ev_list
            ]
        else:
            self._t_list = []

    def register(self, ev):
        """Register a new Event"""
        self._t_list.append(Thread(target=self._triggerer, args=(ev, )))

    def wait(self, timeout=None):
        """Start waiting until any one of the registred Event is set"""
        # Start all the threads
        map(lambda t: t.start(), self._t_list)
        # Now do the union waiting
        return self._trigger.wait(timeout)

    def _triggerer(self, ev):
        ev.wait()
        self._trigger.set()

Ответ 3

Запуск дополнительных потоков кажется ясным решением, но не очень эффективным. Функция wait_events будет блокировать использование любого из событий.

def wait_events(*events):
    event_share = Event()

    def set_event_share(event):
        event.wait()
        event.clear()
        event_share.set()
    for event in events:
        Thread(target=set_event_share(event)).start()

    event_share.wait()

wait_events(event1, event2, event3)

Ответ 4

Расширение ответа Клаудиу, где вы можете либо ждать события 1 ИЛИ событие 2., либо событие 1 И даже 2.

from threading import Thread, Event, _Event

class ConditionalEvent(_Event):
    def __init__(self, events_list, condition):
        _Event.__init__(self)

        self.event_list = events_list
        self.condition = condition

        for e in events_list:
            self._setup(e, self._state_changed)

        self._state_changed()

    def _state_changed(self):
        bools = [e.is_set() for e in self.event_list]
        if self.condition == 'or':

            if any(bools):
                self.set()
            else:
                self.clear()

        elif self.condition == 'and':

            if all(bools):
                self.set()
            else:
                self.clear()

    def _custom_set(self,e):
        e._set()
        e._state_changed()

    def _custom_clear(self,e):
        e._clear()
        e._state_changed()

    def _setup(self, e, changed_callback):
        e._set = e.set
        e._clear = e.clear
        e._state_changed = changed_callback
        e.set = lambda: self._custom_set(e)
        e.clear = lambda: self._custom_clear(e)

Пример использования будет очень похож, как раньше

import time

e1 = Event()
e2 = Event()

or_e = ConditionalEvent([e1, e2], 'or')


Thread(target=wait_on, args=('e1', e1)).start()
time.sleep(0.05)
Thread(target=wait_on, args=('e2', e2)).start()
time.sleep(0.05)
Thread(target=wait_on, args=('or_e', or_e)).start()
time.sleep(0.05)

print "Firing e1 in 2 seconds..."
time.sleep(2)
e1.set()
time.sleep(0.05)

print "Firing e2 in 2 seconds..."
time.sleep(2)
e2.set()
time.sleep(0.05)

Ответ 5

Не очень, но вы можете использовать два дополнительных потока для мультиплексирования событий...

def wait_for_either(a, b):
  flag = False #some condition variable, event, or similar

  class Event_Waiter(threading.Thread):
    def __init__(self, event):
        self.e = event
    def run(self):
        self.e.wait()
        flag.set()

  a_thread = Event_Waiter(a)
  b_thread = Event_Waiter(b)
  a.start()
  b.start()
  flag.wait()

Примечание. Возможно, вам придется беспокоиться о том, чтобы случайно получить оба события, если они прибудут слишком быстро. Вспомогательные потоки (a_thread и b_thread) должны блокировать синхронизацию вокруг попытки установить флаг, а затем должны убить другой поток (возможно, сбросив это событие потока, если оно было уничтожено).

Ответ 6

def wait_for_event_timeout(*events):
    while not all([e.isSet() for e in events]):
        #Check to see if the event is set. Timeout 1 sec.
        ev_wait_bool=[e.wait(1) for e in events]
        # Process if all events are set. Change all to any to process if any event set
        if all(ev_wait_bool):
            logging.debug('processing event')
        else:
            logging.debug('doing other work')


e1 = threading.Event()
e2 = threading.Event()

t3 = threading.Thread(name='non-block-multi',
                      target=wait_for_event_timeout,
                      args=(e1,e2))
t3.start()

logging.debug('Waiting before calling Event.set()')
time.sleep(5)
e1.set()
time.sleep(10)
e2.set()
logging.debug('Event is set')