Как я разорвать отношения наблюдателя в многопоточном С++?
У меня есть объект, который предлагает клиентам Subscribe(Observer*)
и Unsubscribe(Observer*)
. Тема запускается в своем собственном потоке (из которого он вызывает Notify()
для подписанных наблюдателей), а мьютекс защищает свой внутренний список наблюдателей.
Я бы хотел, чтобы клиентский код, который я не контролирую, чтобы безопасно удалить Observer после его отмены. Как это можно достичь?
- Удержание мьютекса - даже рекурсивное
мьютекс - пока я уведомляю наблюдателей
не является вариантом из-за
риск взаимоблокировки.
- Я мог бы отметить наблюдателя для удаления
в вызове Unsubscribe и удалите его
из темы Тема. затем
клиенты могут ждать специального
Уведомление "Безопасное удаление". Эта
выглядит безопасно, но обременительно для
клиенты.
Edit
Ниже приведен пример иллюстративного кода. Проблема заключается в том, как предотвратить Unsubscribe, когда Run находится в комментарии "Проблема здесь". Затем я мог бы вернуться к удаленному объекту. В качестве альтернативы, если я держу мьютекс повсюду, а не копирую, я могу затормозить определенных клиентов.
#include <set>
#include <functional>
#include <boost/thread.hpp>
#include <boost/bind.hpp>
using namespace std;
using namespace boost;
class Observer
{
public:
void Notify() {}
};
class Subject
{
public:
Subject() : t(bind(&Subject::Run, this))
{
}
void Subscribe(Observer* o)
{
mutex::scoped_lock l(m);
observers.insert(o);
}
void Unsubscribe(Observer* o)
{
mutex::scoped_lock l(m);
observers.erase(o);
}
void Run()
{
for (;;)
{
WaitForSomethingInterestingToHappen();
set<Observer*> notifyList;
{
mutex::scoped_lock l(m);
notifyList = observers;
}
// Problem here
for_each(notifyList.begin(), notifyList.end(),
mem_fun(&Observer::Notify));
}
}
private:
set<Observer*> observers;
thread t;
mutex m;
};
Изменить
Я не могу уведомить наблюдателей, удерживая мьютекс из-за риска взаимоблокировки. Самый очевидный способ, которым это может случиться, - вызов клиента Подписаться или Отменить подписку изнутри Notify - легко исправить, сделав мьютекс рекурсивным. Более коварным является риск прерывистого тупика на разных потоках.
Я работаю в многопоточной среде, поэтому в любой момент выполнения потока он обычно будет содержать последовательность блокировок L1, L2,... Ln. В другой нити будут зафиксированы замки K1, K2,... Km. Правильно написанный клиент гарантирует, что разные потоки будут всегда получать блокировки в том же порядке. Но когда клиенты взаимодействуют с моим мьютексом Subject - назовите его X - эта стратегия будет нарушена: вызовы для подписки/отмены подписки получают блокировки в порядке L1, L2,... Ln, X. Звонки на уведомление из моей темы темы получают блокировки в порядок X, K1, K2,... Km. Если какой-либо из Li или Kj может совпадать по любому пути вызова, клиент страдает прерывистым тупиком, с небольшой перспективой отладки его. Поскольку я не контролирую клиентский код, я не могу этого сделать.
Ответы
Ответ 1
"Идеальное" решение предполагает использование shared_ptr
и weak_ptr
. Однако для того, чтобы быть общим, он также должен учитывать, что проблема Subject
была удалена до некоторой части ее Observer
(да, это тоже может произойти).
class Subject {
public:
void Subscribe(std::weak_ptr<Observer> o);
void Unsubscribe(std::weak_ptr<Observer> o);
private:
std::mutex mutex;
std::set< std::weak_ptr<Observer> > observers;
};
class Observer: boost::noncopyable {
public:
~Observer();
void Notify();
private:
std::mutex;
std::weak_ptr<Subject> subject;
};
В этой структуре мы создаем циклический граф, но с разумным использованием weak_ptr
, чтобы обе Observer
и Subject
могли быть уничтожены без согласования.
Примечание. Для простоты я предположил, что a Observer
наблюдает за одним Subject
за раз, но он может легко наблюдать несколько объектов.
Теперь кажется, что вы застряли в небезопасном управлении памятью. Как вы можете себе представить, это довольно сложная ситуация. В этом случае я бы предложил эксперимент: асинхронный Unsubscribe
. Или, по крайней мере, вызов Unsubscribe
будет синхронным извне, но будет выполняться асинхронно.
Идея проста: мы будем использовать очередь событий для достижения синхронизации. То есть:
- вызов
Unsubscribe
отправляет событие в очередь (полезная нагрузка Observer*
), а затем ждет
- когда поток
Subject
обработал событие Unsubscribe
, он просыпает ожидающий поток (ы)
Вы можете использовать либо ожидание, либо переменную условия, я бы посоветовал переменную условия, если производительность не указала иначе.
Примечание: это решение полностью не учитывает Subject
преждевременное умирание.
Ответ 2
Unsubscribe() должен быть синхронным, чтобы он не возвращался до тех пор, пока Observer не будет больше не находиться в списке объектов. Это единственный способ сделать это безопасно.
ETA (перемещение моего комментария к ответу):
Поскольку время, похоже, не является проблемой, возьмите и отпустите мьютекс между уведомлением каждого наблюдателя. Вы не сможете использовать for_each так, как сейчас, и вам нужно будет проверить итератор, чтобы убедиться, что он все еще действителен.
for ( ... )
{
take mutex
check iterator validity
notify
release mutex
}
Это сделает то, что вы хотите.
Ответ 3
Можете ли вы изменить подпись Subscribe() Unsubscribe()? Замена наблюдателя * на что-то вроде shared_ptr <Observer> упростит ситуацию.
РЕДАКТИРОВАТЬ: "Легко" заменить "проще" выше.
Пример того, как это сложно "получить", см. В истории Boost.Signals и принят -but-not-yet-in-the-distribution Boost.Signals2 (ранее библиотеки Boost.ThreadSafeSignals).
Ответ 4
Ммм... Я действительно не понимаю ваш вопрос, потому что если клиент вызывает Unsubscribe, вы сможете позволить клиенту удалить его (он не используется вами). Однако, если по какой-то причине вы не можете закрыть связь, как только клиент отменит подписку на наблюдателя, вы можете добавить "Subject" новую операцию для безопасного удаления Observer или просто для клиентов, чтобы сигнализировать о том, что они больше не заинтересованы в Observer.
Rethink edit: Хорошо, теперь я думаю, что понимаю вашу проблему. Я думаю, что лучшим решением вашей проблемы является следующее:
- У каждого сохраненного элемента наблюдателя есть "действительный" флаг. Этот флаг будет использоваться для уведомления об этом или нет, пока вы находитесь в цикле уведомлений.
- Для защиты доступа к этому "действительному" флажку вам нужен мьютекс. Затем операция отмены подписки блокирует мьютекс для "допустимого" флага, устанавливает для него значение false для выбранного наблюдателя.
- Цикл уведомлений также должен блокировать и разблокировать мьютекс допустимого флага и действовать только на "действительные" наблюдатели.
Учитывая, что операция отмены подписки заблокирует мьютекс до reset допустимого флага (и что этот конкретный Observer больше не будет использоваться в вашем потоке), код будет потокобезопасным, а клиенты могут удалить любого наблюдателя как только отменит подписку.
Ответ 5
Будет ли что-то подобное удовлетворительным? По-прежнему небезопасно отменять подписку на наблюдателя, хотя вас уведомляют, для этого вам понадобится интерфейс, как вы упомянули (насколько я могу судить).
Subscribe(Observer *x)
{
mutex.lock();
// add x to the list
mutex.unlock();
}
Unsubscribe(Observer *x)
{
mutex.lock();
while (!ok_to_delete)
cond.wait(mutex);
// remove x from list
mutex.unlock();
}
NotifyLoop()
{
while (true) {
// wait for something to trigger a notify
mutex.lock();
ok_to_delete = false;
// build a list of observers to notify
mutex.unlock();
// notify all observers from the list saved earlier
mutex.lock();
ok_to_delete = true;
cond.notify_all();
mutex.unlock();
}
}
Если вы хотите иметь возможность Unsubscribe() внутри Notify() - (плохое дизайнерское решение на IMO клиента), вы можете добавить идентификатор потока потока уведомлений в свою структуру данных. В функции "Отменить подписку" вы можете проверить этот идентификатор потока на текущий идентификатор потока (большинство библиотек потоков предоставляют это - например, pthread_self). Если они одинаковы, вы можете продолжить, не дожидаясь условия переменной.
ПРИМЕЧАНИЕ. Если клиент отвечает за удаление наблюдателя, это означает, что вы столкнулись с ситуацией, когда внутри обратного вызова Notify вы отменили подписку и удалили наблюдателя, но все еще выполняете что-то с этим, помеченным этим указателем. Это то, о чем клиент должен знать и только удалить его в конце Notify().
Ответ 6
Вместо того, чтобы клиенты получали уведомление "SafeToDelete", предоставьте им метод IsSubscribed (Observer *). Затем код клиента становится:
subject.Unsubscribe( obsever );l
while( subject.IsSubscribed( observer ) ) {
sleep_some_short_time; // OS specific sleep stuff
}
delete observer;
что не слишком обременительно.
Ответ 7
Вы можете создать "очередь для удаления" в типе CSubject. Когда вы удаляете Observer, вы можете вызвать pSubject- > QueueForDelete (pObserver). Затем, когда предметный поток находится между уведомлениями, он может безопасно удалять наблюдателей из очереди.
Ответ 8
Я думаю, что это трюк, если не очень элегантно:
class Subject {
public:
Subject() : t(bind(&Subject::Run, this)),m_key(0) { }
void Subscribe(Observer* o) {
mutex::scoped_lock l(m);
InternalObserver io( o );
boost::shared_ptr<InternalObserver> sp(&io);
observers.insert(pair<int,boost::shared_ptr<InternalObserver>> (MakeKey(o),sp));
}
void Unsubscribe(Observer* o) {
mutex::scoped_lock l(m);
observers.find( MakeKey(o) )->second->exists = false; }
void WaitForSomethingInterestingToHappen() {}
void Run()
{
for (;;)
{
WaitForSomethingInterestingToHappen();
for( unsigned int i = 0; i < observers.size(); ++ i )
{
mutex::scoped_lock l(m);
if( observers[i]->exists )
{
mem_fun(&Observer::Notify);//needs changing
}
else
{
observers.erase(i);
--i;
}
}
}
}
private:
int MakeKey(Observer* o) {
return ++m_key;//needs changeing, sha of the object?
}
class InternalObserver {
public:
InternalObserver(Observer* o) : m_o( o ), exists( true ) {}
Observer* m_o;
bool exists;
};
map< int, boost::shared_ptr<InternalObserver> > observers;
thread t;
mutex m;
int m_key;
};
Ответ 9
Измените observers
на map
с помощью клавиши Observer*
и введите обертку Observer
. Оболочка включает volatile
boolean, чтобы указать, действительно ли Observer
. В методе subscribe
объект-оболочка создается в состоянии действительный. В методе unsubscribe
обертка помечена как недействительная. Notify
вызывается в оболочке вместо фактического Observer. Обертка вызовет Notify
в фактическом Observer, если она действительна (по-прежнему подписана)
#include <map>
#include <functional>
#include <boost/thread.hpp>
#include <boost/bind.hpp>
using namespace std;
using namespace boost;
class Observer
{
public:
void Notify() {}
};
class ObserverWrapper : public Observer
{
public:
Observer* wrappee;
volatile bool valid;
ObserverWrapper(Observer* o)
{
wrappee = o;
valid = true;
}
void Notify()
{
if (valid) wrappee->Notify();
}
}
class Subject
{
public:
Subject() : t(bind(&Subject::Run, this))
{
}
void Subscribe(Observer* o)
{
mutex::scoped_lock l(m);
boost::shared_ptr<ObserverWrapper> sptr(new ObserverWrapper(o));
observers.insert(pair<Observer*, sptr));
}
void Unsubscribe(Observer* o)
{
mutex::scoped_lock l(m);
observers.find(o)->second->valid = false;
observers.erase(o);
}
void Run()
{
for (;;)
{
WaitForSomethingInterestingToHappen();
vector<ObserverWrapper*> notifyList;
{
mutex::scoped_lock l(m);
boost::copy(observers | boost::adaptors::map_values, std::back_inserter(notifyList));
}
// Should be no problem here
for_each(notifyList.begin(), notifyList.end(),
mem_fun(&ObserverWrapper::Notify));
}
}
private:
map<Observer*, ObserverWrapper*> observers;
thread t;
mutex m;
};