Как добиться незакрепленного, но блокирующего поведения?
Я использую одиночную очередь для одиночной очереди без блокировки для интенсивного сетевого приложения. У меня есть куча рабочих потоков, получающих работу в их собственных отдельных очередях, которые они затем деактивируют и обрабатывают.
Удаление блокировок из этих очередей значительно улучшило производительность при высокой нагрузке, , но они больше не блокируются, когда очереди пусты, что, в свою очередь, приводит к резкому увеличению использования ЦП.
Как я могу эффективно блокировать поток до тех пор, пока он не сможет успешно удалить что-либо или будет убит/прерван?
Ответы
Ответ 1
Если вы работаете в Linux, изучите Futex. Он обеспечивает производительность неблокирующей реализации, используя атомарные операции, а не вызовы ядра, такие как мьютекс, но если вам нужно установить процесс в режим ожидания из-за того, что какое-то условие не является истинным (т.е. Блокирование), оно будет затем сделайте соответствующие вызовы ядра, чтобы заставить процесс спать и разбудить его в будущем событии. Это в основном как очень быстрый семафор.
Ответ 2
В Linux futex можно использовать для блокировки потока. Но имейте в виду, что Futexes Tricky!
UPDATE: переменные состояния гораздо безопаснее использовать, чем futexes, и более переносимы. Однако переменная условия используется в сочетании с мьютексом, поэтому, строго говоря, результат больше не будет блокироваться. Однако, если ваша основная цель - это производительность (а не гарантия глобального прогресса), а заблокированная часть (то есть условие для проверки после пробуждения потока) мала, может случиться так, что вы получите удовлетворительные результаты без необходимости входить в тонкости интегрирования futexes в алгоритм.
Ответ 3
Если вы работаете в Windows, вы не сможете использовать futexes, но в Windows Vista аналогичный механизм называется Keyed Events. К сожалению, это не является частью опубликованного API (это собственный API-интерфейс NTDLL), но вы можете использовать его, пока вы принимаете предостережение, которое оно может изменить в будущих версиях Windows (и вам не нужно запускать ядра до Vista). Обязательно прочитайте статью, указанную выше. Здесь непроверенный эскиз того, как он может работать:
/* Interlocked SList queue using keyed event signaling */
struct queue {
SLIST_HEADER slist;
// Note: Multiple queues can (and should) share a keyed event handle
HANDLE keyed_event;
// Initial value: 0
// Prior to blocking, the queue_pop function increments this to 1, then
// rechecks the queue. If it finds an item, it attempts to compxchg back to
// 0; if this fails, then it racing with a push, and has to block
LONG block_flag;
};
void init_queue(queue *qPtr) {
NtCreateKeyedEvent(&qPtr->keyed_event, -1, NULL, 0);
InitializeSListHead(&qPtr->slist);
qPtr->blocking = 0;
}
void queue_push(queue *qPtr, SLIST_ENTRY *entry) {
InterlockedPushEntrySList(&qPtr->slist, entry);
// Transition block flag 1 -> 0. If this succeeds (block flag was 1), we
// have committed to a keyed-event handshake
LONG oldv = InterlockedCompareExchange(&qPtr->block_flag, 0, 1);
if (oldv) {
NtReleaseKeyedEvent(qPtr->keyed_event, (PVOID)qPtr, FALSE, NULL);
}
}
SLIST_ENTRY *queue_pop(queue *qPtr) {
SLIST_ENTRY *entry = InterlockedPopEntrySList(&qPtr->slist);
if (entry)
return entry; // fast path
// Transition block flag 0 -> 1. We must recheck the queue after this point
// in case we race with queue_push; however since ReleaseKeyedEvent
// blocks until it is matched up with a wait, we must perform the wait if
// queue_push sees us
LONG oldv = InterlockedCompareExchange(&qPtr->block_flag, 1, 0);
assert(oldv == 0);
entry = InterlockedPopEntrySList(&qPtr->slist);
if (entry) {
// Try to abort
oldv = InterlockedCompareExchange(&qPtr->block_flag, 0, 1);
if (oldv == 1)
return entry; // nobody saw us, we can just exit with the value
}
// Either we don't have an entry, or we are forced to wait because
// queue_push saw our block flag. So do the wait
NtWaitForKeyedEvent(qPtr->keyed_event, (PVOID)qPtr, FALSE, NULL);
// block_flag has been reset by queue_push
if (!entry)
entry = InterlockedPopEntrySList(&qPtr->slist);
assert(entry);
return entry;
}
Вы также можете использовать аналогичный протокол, используя Slim Read Write блокировки и Переменные состояния, с бесконтактным быстрым путем. Это обертки по ключевым событиям, поэтому они могут повлечь дополнительные накладные расходы, чем напрямую связанные с ключами.
Ответ 4
Вы пробовали условное ожидание? Когда очередь становится пустой, просто начните ждать нового задания. Поток, помещающий задания в очередь, должен подать сигнал. Таким образом, вы используете только блокировки, когда очередь пуста.
https://computing.llnl.gov/tutorials/pthreads/#ConditionVariables
Ответ 5
Вы можете заставить поток спать с помощью функции sigwait(). Вы можете разбудить поток с помощью pthread_kill. Это намного быстрее, чем переменные условия.
Ответ 6
Вы можете добавить спящие во время ожидания. Просто выберите наибольшее ожидание, которое вы готовы иметь, затем сделайте что-то вроде этого (псевдокод, потому что я не помню синтаксис pthread):
WAIT_TIME = 100; // Set this to whatever you're happy with
while(loop_condition) {
thing = get_from_queue()
if(thing == null) {
sleep(WAIT_TIME);
} else {
handle(thing);
}
}
Даже что-то короткое, как сон 100 мс, должно значительно снизить использование ЦП. Я не уверен, в какой момент переключение контекста сделает его хуже, чем ожидание.