Как синхронизировать три потока?
Мое приложение состоит из основного процесса и двух потоков, все работают одновременно и используют три очереди fifo:
Фигу-q - Qmain, Q1 и Q2. Внутри каждой очереди каждый использует счетчик, который увеличивается, когда элемент помещается в очередь, и уменьшается, когда элемент "получает" из очереди.
Обработка включает в себя два потока,
QMaster, которые получают от Q1 и Q2 и помещаются в Qmain,
Монитор, который помещается в Q2,
и основной процесс, который поступает из Qmain и помещается в Q1.
Цикл цикла QMaster последовательно проверяет количество Q1 и Q2, и если какие-либо элементы находятся в q, они получают их и помещают в Qmain.
Цикл Monitor-thread получает данные из внешних источников, упаковывает их и помещает в Q2.
В главном процессе приложения также выполняется цикл, проверяющий количество Qmain, и если какие-либо элементы получают элемент
от Qmain на каждой итерации цикла и обрабатывать его дальше. Во время этой обработки это время от времени
помещает элемент в Q1, который будет обработан позже (когда он будет получен от Qmain по очереди).
Проблема:
Я реализовал все, как описано выше, и работает в случайном (коротком) времени, а затем зависает.
Мне удалось определить источник сбоя, чтобы произойти в приращении/уменьшении
счетчик fifo-q (это может произойти в любом из них).
Что я пробовал:
Использование трех мьютексов: QMAIN_LOCK, Q1_LOCK и Q2_LOCK, которые я блокирую при любой операции get/put
делается на соответствующем fifo-q. Результат: приложение не работает, просто зависает.
Основной процесс должен продолжаться все время, не должен быть заблокирован на "read" (сбой имени named-pipe, failer socketpair).
Любые советы?
Я думаю, что я не правильно использую мьютекс, как это сделать?
(Любые комментарии по улучшению вышеуказанного дизайна также приветствуются)
[править] ниже - процессы и fifo-q-template:
Где и как в этом случае я должен помещать мьютексы, чтобы избежать описанных выше проблем?
main-process:
...
start thread QMaster
start thread Monitor
...
while (!quit)
{
...
if (Qmain.count() > 0)
{
X = Qmain.get();
process(X)
delete X;
}
...
//at some random time:
Q2.put(Y);
...
}
Monitor:
{
while (1)
{
//obtain & package data
Q2.put(data)
}
}
QMaster:
{
while(1)
{
if (Q1.count() > 0)
Qmain.put(Q1.get());
if (Q2.count() > 0)
Qmain.put(Q2.get());
}
}
fifo_q:
template < class X* > class fifo_q
{
struct item
{
X* data;
item *next;
item() { data=NULL; next=NULL; }
}
item *head, *tail;
int count;
public:
fifo_q() { head=tail=NULL; count=0; }
~fifo_q() { clear(); /*deletes all items*/ }
void put(X x) { item i=new item(); (... adds to tail...); count++; }
X* get() { X *d = h.data; (...deletes head ...); count--; return d; }
clear() {...}
};
Ответы
Ответ 1
Пример того, как я буду адаптировать проект и заблокировать доступ к очереди с помощью posix.
Помните, что я бы обернул мьютекс, чтобы использовать RAII или использовать boost-threading, и что я бы использовал stl:: deque или stl:: queue в качестве очереди, но оставаясь как можно ближе к вашему коду:
main-process:
...
start thread Monitor
...
while (!quit)
{
...
if (Qmain.count() > 0)
{
X = Qmain.get();
process(X)
delete X;
}
...
//at some random time:
QMain.put(Y);
...
}
Monitor:
{
while (1)
{
//obtain & package data
QMain.put(data)
}
}
fifo_q:
template < class X* > class fifo_q
{
struct item
{
X* data;
item *next;
item() { data=NULL; next=NULL; }
}
item *head, *tail;
int count;
pthread_mutex_t m;
public:
fifo_q() { head=tail=NULL; count=0; }
~fifo_q() { clear(); /*deletes all items*/ }
void put(X x)
{
pthread_mutex_lock(&m);
item i=new item();
(... adds to tail...);
count++;
pthread_mutex_unlock(&m);
}
X* get()
{
pthread_mutex_lock(&m);
X *d = h.data;
(...deletes head ...);
count--;
pthread_mutex_unlock(&m);
return d;
}
clear() {...}
};
Заметьте, что мьютекс еще нужно инициализировать, как в примере здесь и что count() также должен использовать mutex
Ответ 2
Вы не должны блокировать второй мьютекс, когда вы уже заблокировали его.
Поскольку вопрос отмечен С++, я предлагаю реализовать блокировку внутри логики get/add класса очереди (например, с использованием блокировок boost) или написать оболочку, если ваша очередь не является классом.
Это позволяет упростить логику блокировки.
Относительно добавленных вами источников: проверка размера очереди и последующий put/get должны выполняться в одной транзакции, иначе другой поток может редактировать очередь между
Ответ 3
Используйте отладчик. Когда ваше решение с мьютексами зависает, посмотрите, что делают потоки, и вы получите хорошее представление о причине проблемы.
Какова ваша платформа? В Unix/Linux вы можете использовать очереди сообщений POSIX (вы также можете использовать очереди сообщений System V, сокеты, FIFO,...), поэтому вам не нужны мьютексы.
Узнайте о переменных условий. По вашему описанию это похоже на то, что ваш Qmaster-поток занят циклом, сжигающим ваш процессор.
Один из ваших ответов предполагает, что вы делаете что-то вроде:
Q2_mutex.lock()
Qmain_mutex.lock()
Qmain.put(Q2.get())
Qmain_mutex.unlock()
Q2_mutex.unlock()
но вы, вероятно, захотите сделать это, как:
Q2_mutex.lock()
X = Q2.get()
Q2_mutex.unlock()
Qmain_mutex.lock()
Qmain.put(X)
Qmain_mutex.unlock()
и, как предложил Грегори, инкапсулировать логику в get/put.
EDIT: Теперь, когда вы разместили свой код, мне интересно, это учебное упражнение?
Потому что я вижу, что вы кодируете свой собственный класс очереди FIFO вместо того, чтобы использовать стандартную std:: queue С++. Я полагаю, вы хорошо проверили свой класс, и проблемы там нет.
Кроме того, я не понимаю, почему вам нужны три разные очереди. Кажется, что очереди Qmain будет достаточно, и тогда вам не понадобится поток Qmaster, который действительно оживает.
Об инкапсуляции вы можете создать класс synch_fifo_q, который инкапсулирует класс fifo_q. Добавьте частную переменную mutex, а затем общедоступные методы (put, get, clear, count,...) должны быть как put (X) {lock m_mutex; m_fifo_q.put(Х); разблокировать m_mutex; }
вопрос: что произойдет, если у вас будет несколько читателей из очереди? Гарантировано ли, что после "count() > 0" вы можете сделать "get()" и получить элемент?
Ответ 4
Я написал простое приложение ниже:
#include <queue>
#include <windows.h>
#include <process.h>
using namespace std;
queue<int> QMain, Q1, Q2;
CRITICAL_SECTION csMain, cs1, cs2;
unsigned __stdcall TMaster(void*)
{
while(1)
{
if( Q1.size() > 0)
{
::EnterCriticalSection(&cs1);
::EnterCriticalSection(&csMain);
int i1 = Q1.front();
Q1.pop();
//use i1;
i1 = 2 * i1;
//end use;
QMain.push(i1);
::LeaveCriticalSection(&csMain);
::LeaveCriticalSection(&cs1);
}
if( Q2.size() > 0)
{
::EnterCriticalSection(&cs2);
::EnterCriticalSection(&csMain);
int i1 = Q2.front();
Q2.pop();
//use i1;
i1 = 3 * i1;
//end use;
QMain.push(i1);
::LeaveCriticalSection(&csMain);
::LeaveCriticalSection(&cs2);
}
}
return 0;
}
unsigned __stdcall TMoniter(void*)
{
while(1)
{
int irand = ::rand();
if ( irand % 6 >= 3)
{
::EnterCriticalSection(&cs2);
Q2.push(irand % 6);
::LeaveCriticalSection(&cs2);
}
}
return 0;
}
unsigned __stdcall TMain(void)
{
while(1)
{
if (QMain.size() > 0)
{
::EnterCriticalSection(&cs1);
::EnterCriticalSection(&csMain);
int i = QMain.front();
QMain.pop();
i = 4 * i;
Q1.push(i);
::LeaveCriticalSection(&csMain);
::LeaveCriticalSection(&cs1);
}
}
return 0;
}
int _tmain(int argc, _TCHAR* argv[])
{
::InitializeCriticalSection(&cs1);
::InitializeCriticalSection(&cs2);
::InitializeCriticalSection(&csMain);
unsigned threadID;
::_beginthreadex(NULL, 0, &TMaster, NULL, 0, &threadID);
::_beginthreadex(NULL, 0, &TMoniter, NULL, 0, &threadID);
TMain();
return 0;
}
Ответ 5
Вы приобретаете одновременно несколько замков? Обычно это то, чего вы хотите избежать. Если необходимо, убедитесь, что вы всегда приобретаете блокировки в том же порядке в каждом потоке (это более ограничивает ваш concurrency и почему вы вообще хотите его избежать).
Другие советы concurrency: Вы приобретаете блокировку перед чтением размеров очереди? Если вы используете мьютексы для защиты очередей, то реализация вашей очереди не является параллельной, и вам, вероятно, потребуется получить блокировку перед чтением размера очереди.
Ответ 6
1 может возникнуть проблема из-за этого правила: "Основной процесс должен продолжаться все время, не должен быть заблокирован при чтении". Как вы его реализовали? в чем разница между "get" и "read"?
Проблема, кажется, в вашей реализации, а не в логике. И, как вы заявили, вы не должны находиться в каком-либо мертвом замке, потому что вы не приобретаете другой замок, будь то в замке.