Ответ 1
Я собираюсь сделать еще один треск на примере с реализацией примера pthread_barrier_wait()
, которая использует функциональные возможности mutex и condition variable, которые могут быть реализованы реализацией pthreads. Обратите внимание, что этот пример не пытается справиться с соображениями производительности (в частности, когда потоки ожидания разблокированы, все они повторно сериализованы при выходе из ожидания). Я думаю, что использование чего-то вроде объектов Linux Futex могло бы помочь с проблемами производительности, но Futexes по-прежнему в значительной степени не соответствуют моему опыту.
Кроме того, я сомневаюсь, что этот пример правильно обрабатывает сигналы или ошибки (если вообще в случае сигналов). Но я думаю, что правильная поддержка этих вещей может быть добавлена как упражнение для читателя.
Мой главный страх в том, что пример может иметь состояние гонки или тупик (обработка мьютексов сложнее, чем мне нравится). Также обратите внимание, что это пример, который даже не был скомпилирован. Рассматривайте его как псевдокод. Также имейте в виду, что мой опыт в основном в Windows - я занимаюсь этим скорее как образовательная возможность, чем что-либо другое. Таким образом, качество псевдокода может быть довольно низким.
Однако, отказ от ответственности, я думаю, что это может дать представление о том, как можно решить проблему, заданную в вопросе (например, как функция pthread_barrier_wait()
разрешает объект pthread_barrier_t
, который он использует для уничтожения любым от выпущенных потоков без опасности использования барьерного объекта одним или несколькими потоками на выходе).
Здесь:
/*
* Since this is a part of the implementation of the pthread API, it uses
* reserved names that start with "__" for internal structures and functions
*
* Functions such as __mutex_lock() and __cond_wait() perform the same function
* as the corresponding pthread API.
*/
// struct __barrier_wait data is intended to hold all the data
// that `pthread_barrier_wait()` will need after releasing
// waiting threads. This will allow the function to avoid
// touching the passed in pthread_barrier_t object after
// the wait is satisfied (since any of the released threads
// can destroy it)
struct __barrier_waitdata {
struct __mutex cond_mutex;
struct __cond cond;
unsigned waiter_count;
int wait_complete;
};
struct __barrier {
unsigned count;
struct __mutex waitdata_mutex;
struct __barrier_waitdata* pwaitdata;
};
typedef struct __barrier pthread_barrier_t;
int __barrier_waitdata_init( struct __barrier_waitdata* pwaitdata)
{
waitdata.waiter_count = 0;
waitdata.wait_complete = 0;
rc = __mutex_init( &waitdata.cond_mutex, NULL);
if (!rc) {
return rc;
}
rc = __cond_init( &waitdata.cond, NULL);
if (!rc) {
__mutex_destroy( &pwaitdata->waitdata_mutex);
return rc;
}
return 0;
}
int pthread_barrier_init(pthread_barrier_t *barrier, const pthread_barrierattr_t *attr, unsigned int count)
{
int rc;
result = __mutex_init( &barrier->waitdata_mutex, NULL);
if (!rc) return result;
barrier->pwaitdata = NULL;
barrier->count = count;
//TODO: deal with attr
}
int pthread_barrier_wait(pthread_barrier_t *barrier)
{
int rc;
struct __barrier_waitdata* pwaitdata;
unsigned target_count;
// potential waitdata block (only one thread will actually be used)
struct __barrier_waitdata waitdata;
// nothing to do if we only need to wait for one thread...
if (barrier->count == 1) return PTHREAD_BARRIER_SERIAL_THREAD;
rc = __mutex_lock( &barrier->waitdata_mutex);
if (!rc) return rc;
if (!barrier->pwaitdata) {
// no other thread has claimed the waitdata block yet -
// we'll use this thread's
rc = __barrier_waitdata_init( &waitdata);
if (!rc) {
__mutex_unlock( &barrier->waitdata_mutex);
return rc;
}
barrier->pwaitdata = &waitdata;
}
pwaitdata = barrier->pwaitdata;
target_count = barrier->count;
// all data necessary for handling the return from a wait is pointed to
// by `pwaitdata`, and `pwaitdata` points to a block of data on the stack of
// one of the waiting threads. We have to make sure that the thread that owns
// that block waits until all others have finished with the information
// pointed to by `pwaitdata` before it returns. However, after the 'big' wait
// is completed, the `pthread_barrier_t` object that passed into this
// function isn't used. The last operation done to `*barrier` is to set
// `barrier->pwaitdata = NULL` to satisfy the requirement that this function
// leaves `*barrier` in a state as if `pthread_barrier_init()` had been called - and
// that operation is done by the thread that signals the wait condition
// completion before the completion is signaled.
// note: we're still holding `barrier->waitdata_mutex`;
rc = __mutex_lock( &pwaitdata->cond_mutex);
pwaitdata->waiter_count += 1;
if (pwaitdata->waiter_count < target_count) {
// need to wait for other threads
__mutex_unlock( &barrier->waitdata_mutex);
do {
// TODO: handle the return code from `__cond_wait()` to break out of this
// if a signal makes that necessary
__cond_wait( &pwaitdata->cond, &pwaitdata->cond_mutex);
} while (!pwaitdata->wait_complete);
}
else {
// this thread satisfies the wait - unblock all the other waiters
pwaitdata->wait_complete = 1;
// 'release' our use of the passed in pthread_barrier_t object
barrier->pwaitdata = NULL;
// unlock the barrier waitdata_mutex - the barrier is
// ready for use by another set of threads
__mutex_unlock( barrier->waitdata_mutex);
// finally, unblock the waiting threads
__cond_broadcast( &pwaitdata->cond);
}
// at this point, barrier->waitdata_mutex is unlocked, the
// barrier->pwaitdata pointer has been cleared, and no further
// use of `*barrier` is permitted...
// however, each thread still has a valid `pwaitdata` pointer - the
// thread that owns that block needs to wait until all others have
// dropped the pwaitdata->waiter_count
// also, at this point the `pwaitdata->cond_mutex` is locked, so
// we're in a critical section
rc = 0;
pwaitdata->waiter_count--;
if (pwaitdata == &waitdata) {
// this thread owns the waitdata block - it needs to hang around until
// all other threads are done
// as a convenience, this thread will be the one that returns
// PTHREAD_BARRIER_SERIAL_THREAD
rc = PTHREAD_BARRIER_SERIAL_THREAD;
while (pwaitdata->waiter_count!= 0) {
__cond_wait( &pwaitdata->cond, &pwaitdata->cond_mutex);
};
__mutex_unlock( &pwaitdata->cond_mutex);
__cond_destroy( &pwaitdata->cond);
__mutex_destroy( &pwaitdata_cond_mutex);
}
else if (pwaitdata->waiter_count == 0) {
__cond_signal( &pwaitdata->cond);
__mutex_unlock( &pwaitdata->cond_mutex);
}
return rc;
}
17 июля 2011 года: обновление в ответ на комментарий/вопрос о связанных с процессом барьерах
Я полностью забыл о ситуации с барьерами, которые разделяются между процессами. И, как вы упомянули, идея, которую я изложил, в этом случае будет неудачной. У меня действительно нет опыта использования разделяемой памяти POSIX, поэтому любые предложения, которые я делаю, должны быть смягчены скептицизмом.
Подводя итог (для моей выгоды, если не кто-то другой):
Когда любой из потоков получает контроль после возврата pthread_barrier_wait()
, объект-барьер должен находиться в состоянии "init" (однако, последний pthread_barrier_init()
на этом объекте установлен). Также подразумевается API, что после возврата любого из потоков может произойти одна или несколько из следующих вещей:
- другой вызов
pthread_barrier_wait()
, чтобы начать новый раунд синхронизации потоков -
pthread_barrier_destroy()
на барьерном объекте - память, выделенная для объекта барьера, может быть освобождена или не разделена, если она находится в области разделяемой памяти.
Эти вещи означают, что до вызова pthread_barrier_wait()
позволяет любому потоку возвращаться, он в значительной степени должен обеспечить, чтобы все ожидающие потоки больше не использовали объект барьера в контексте этого вызова. Мой первый ответ обратился к этому, создав "локальный" набор объектов синхронизации (мьютекс и связанную переменную условия) за пределами объекта барьера, который блокировал бы все потоки. Эти локальные объекты синхронизации были выделены в стеке потока, который сначала вызывал pthread_barrier_wait()
.
Я думаю, что что-то подобное нужно будет сделать для барьеров, которые разделены по процессам. Однако в этом случае простое выделение этих объектов синхронизации в стеке потоков не является адекватным (так как другие процессы не будут иметь доступа). Для барьера, связанного с процессом, эти объекты должны быть распределены в памяти с разделением процессов. Я думаю, что описанная выше техника может быть применена аналогично:
-
waitdata_mutex
, который управляет "распределением" локальных переменных синхронизации (блок waitdata), будет находиться в разделяемой процессами памяти уже благодаря тому, что он находится в барьерной структуре. Конечно, когда барьер установлен наTHEAD_PROCESS_SHARED
, этот атрибут также должен применяться кwaitdata_mutex
- когда
__barrier_waitdata_init()
вызывается для инициализации локального мьютекса и переменной условия, ему придется выделять эти объекты в общей памяти вместо простого использования переменнойwaitdata
на основе стека. - когда поток "cleanup" уничтожает мьютекс и переменную условия в блоке
waitdata
, ему также необходимо очистить выделенное разделение памяти для блока. - в случае использования разделяемой памяти должен быть некоторый механизм для обеспечения того, чтобы объект общей памяти открывался по крайней мере один раз в каждом процессе и закрывал правильное количество раз в каждом процессе (но не закрывался полностью до каждый поток процесса завершается с его использованием). Я не продумал точно, как это будет сделано...
Я думаю, что эти изменения позволят схеме работать с барьерами, связанными с процессом. последняя маркерная точка выше - это ключевой элемент для определения. Другой способ - создать имя для объекта общей памяти, в котором будет храниться "локальный" общий процесс waitdata
. Для этого имени есть определенные атрибуты:
- вы хотите, чтобы хранилище для имени находилось в структуре
struct pthread_barrier_t
, чтобы все процессы имели к нему доступ; что означает известный предел длины имени - вы хотите, чтобы имя было уникальным для каждого "экземпляра" набора вызовов
pthread_barrier_wait()
, потому что это может быть возможно для второго раунда ожидания, прежде чем все потоки пройдут весь путь первый раунд ожидания (так что блок памяти с разделяемыми процессами, установленный дляwaitdata
, возможно, еще не был освобожден). Поэтому имя, вероятно, должно основываться на таких вещах, как идентификатор процесса, идентификатор потока, адрес барьерного объекта и атомный счетчик. - Я не знаю, есть ли последствия безопасности для того, чтобы имя было "допустимым". если это так, нужно добавить некоторую рандомизацию - не знаю, сколько. Возможно, вам также понадобится хэш-данные, упомянутые выше вместе со случайными битами. Как я уже сказал, я действительно не знаю, важно это или нет.