Как избежать условий гонки в переменной условия в VxWorks

Мы программируем проприетарную встроенную платформу, сидящую поверх VxWorks 5.5. В нашем наборе инструментов есть переменная условия, которая реализуется с использованием двоичного семафора VxWorks.

Теперь POSIX предоставляет функцию ожидания, которая также принимает мьютекс. Это разблокирует мьютексы (так, чтобы какая-то другая задача могла записывать данные) и ждет, пока другая задача будет сигнализировать (это делается, записывая данные). Я считаю, что это реализует то, что называется Monitor, ICBWT.

Нам нужна такая функция ожидания, но ее реализация сложна. Простой подход сделает это:

bool condition::wait_for(mutex& mutex) const {
    unlocker ul(mutex);    // relinquish mutex
    return wait(event);
}                          // ul dtor grabs mutex again

Тем не менее, это спортивное состояние гонки, потому что это позволяет другой задаче вытеснить этот момент после разблокировки и до ожидания. Другая задача может записываться на дату после ее разблокировки и сигнализировать условие до того, как эта задача начнет ждать семафора. (Мы протестировали это, и это действительно произойдет и навсегда заблокирует задачу ожидания.)

Учитывая, что VxWorks 5.5, похоже, не предоставляет API для временного отказа от семафора при ожидании сигнала, есть ли способ реализовать это поверх предоставленных процедур синхронизации?

Примечание.. Это очень старая версия VxWorks, которая была скомпилирована без поддержки POSIX (от поставщика запатентованного оборудования, из того, что я понял).

Ответы

Ответ 1

Условия гонки можно избежать, если каждое ожидающее задание ожидает отдельный двоичный семафор. Эти семафоры должны быть зарегистрированы в контейнере, который задача сигнализации использует для разблокирования всех ожидающих задач. Контейнер должен быть защищен мьютексом.

Метод wait_for() получает двоичный семафор, ждет его и, наконец, удаляет его.

void condition::wait_for(mutex& mutex) {
    SEM_ID sem = semBCreate(SEM_Q_PRIORITY, SEM_EMPTY);
    {
        lock l(listeners_mutex);    // assure exclusive access to listeners container
        listeners.push_back(sem);       
    }                               // l dtor unlocks listeners_mutex again

    unlocker ul(mutex);             // relinquish mutex
    semTake(sem, WAIT_FOREVER);

    {
        lock l(listeners_mutex);
        // remove sem from listeners
        // ...
        semDelete(sem);
    }
}                                   // ul dtor grabs mutex again

Метод signal() выполняет итерацию по всем зарегистрированным семафорам и разблокирует их.

void condition::signal() {
    lock l(listeners_mutex);
    for_each (listeners.begin(), listeners.end(), /* call semGive()... */ )
}

Этот подход гарантирует, что wait_for() никогда не пропустит сигнал. Недостатком является необходимость дополнительных системных ресурсов. Чтобы избежать создания и уничтожения семафоров для каждого вызова wait_for(), можно использовать пул.

Ответ 2

Это должно быть довольно легко с помощью встроенного vxworks, здесь требуется очередь сообщений. Ваш метод wait_for может использоваться как есть.

bool condition::wait_for(mutex& mutex) const 
{
    unlocker ul(mutex);    // relinquish mutex
    return wait(event);
}                          // ul dtor grabs mutex again

но код ожидания (события) будет выглядеть следующим образом:

wait(event)
{
    if (msgQRecv(event->q, sigMsgBuf, sigMsgSize, timeoutTime) == OK)
    {
        // got it...
    }
    else
    {
        // timeout, report error or something like that....
    }
}

и ваш код сигнала будет выглядеть примерно так:

signal(event)
{
    msgQSend(event->q, sigMsg, sigMsgSize, NO_WAIT, MSG_PRI_NORMAL);
}

Итак, если сигнал запускается до того, как вы начнете ждать, тогда msgQRecv немедленно вернется с сигналом, когда он в конце концов будет вызван, и вы можете снова принять мьютекс в ul dtor, как указано выше.

Событие- > q - это MSG_Q_ID, созданный во время создания события с вызовом msgQCreate, и данные в sigMsg определены вами... но могут быть просто случайным байтом данных, или вы можете прийти с более интеллектуальной структурой с информацией о том, кто сигнализирует или что-то еще, что может быть приятно знать.

Обновление для нескольких официантов, это немного сложно: Итак, есть несколько предположений, которые я сделаю для упрощения вещей

  • Количество задач, которые будут ожидаться, известно во время создания события и является постоянным.
  • Будет одна задача, которая всегда отвечает за указание, когда это нормально, чтобы разблокировать мьютекс, все другие задачи просто хотят уведомления, когда событие сигнализируется/завершается.

В этом подходе используется счетный семафор, похожий на приведенный выше, с небольшой дополнительной логикой:

wait(event)
{
    if (semTake(event->csm, timeoutTime) == OK)
    {
        // got it...
    }
    else
    {
        // timeout, report error or something like that....
    }
}

и ваш код сигнала будет выглядеть примерно так:

signal(event)
{
    for (int x = 0; x < event->numberOfWaiters; x++)
    {
        semGive(event->csm);
    }
}

Создание события похоже на это, помните, что в этом примере количество официантов постоянно и известно во время создания события. Вы можете сделать его динамичным, но ключ заключается в том, что каждый раз, когда событие будет происходить, numberOfWaiters должны быть правильными, прежде чем разблокировка разблокирует мьютексы.

createEvent(numberOfWaiters)
{
    event->numberOfWaiters = numberOfWaiters;
    event->csv = semCCreate(SEM_Q_FIFO, 0);
    return event;
}

Вы не можете быть спокойны в отношении numberOfWaiters: D Я еще раз скажу: numberOfWaiters должны быть правильными, прежде чем разблокировка разблокирует мьютекс. Чтобы сделать его динамическим (если это необходимо), вы можете добавить функцию setNumWaiters (numOfWaiters) и вызвать это в функции wait_for перед тем, как разблокировка разблокирует мьютекс, если он всегда правильно устанавливает номер.

Теперь для последнего трюка, как указано выше, предполагается, что одна задача отвечает за разблокирование мьютекса, остальные просто ждут сигнала, а это означает, что одна и только одна задача вызовет функцию wait_for() выше, а остальные задачи просто вызывают функцию ожидания (события).

С учетом этого numberOfWaiters вычисляется следующим образом:

  • Количество задач, которые будут называть wait()
  • плюс 1 для задачи, которая вызывает wait_for()

Конечно, вы также можете сделать это более сложным, если вам действительно нужно, но, скорее всего, это сработает, потому что обычно одна задача запускает событие, но многие задачи хотят знать, что это завершено, и это то, что это обеспечивает.

Но ваш основной поток выглядит следующим образом:

init()
{
    event->createEvent(3);
}

eventHandler()
{
    locker l(mutex);
    doEventProcessing();
    signal(event);
}

taskA()
{
    doOperationThatTriggersAnEvent();
    wait_for(mutex);
    eventComplete();
}

taskB()
{
    doWhateverIWant();
    // now I need to know if the event has occurred...
    wait(event);
    coolNowIKnowThatIsDone();
}

taskC()
{
    taskCIsFun();
    wait(event);
    printf("event done!\n");
}

Когда я пишу выше, я чувствую, что все концепции OO мертвы, но, надеюсь, вы получите эту идею, на самом деле ждать и ждать_ должны принимать тот же параметр или не параметр, а скорее быть членами того же класса, который также имеет все данные, которые им нужно знать... но тем не менее это обзор того, как это работает.

Ответ 3

Из описания, похоже, вы можете реализовать (или использовать) семафор - это стандартный алгоритм CS с семантикой, подобный condvars, и есть тонны учебников о том, как их реализовать (https://www.google.com/search?q=semaphore+algorithm).

Случайный результат Google, который объясняет семафоры, находится по адресу: http://www.cs.cornell.edu/courses/cs414/2007sp/lectures/08-bakery.ppt (см. слайд 32).