Как эффективно регистрировать асинхронно?
Я использую Enterprise Library 4 в одном из моих проектов для ведения журнала (и других целей). Я заметил, что есть некоторые затраты на регистрацию, которую я делаю, что я могу смягчить, выполнив ведение журнала в отдельном потоке.
То, как я делаю это сейчас, это то, что я создаю объект LogEntry, а затем вызываю BeginInvoke на делегат, который вызывает Logger.Write.
new Action<LogEntry>(Logger.Write).BeginInvoke(le, null, null);
То, что я действительно хотел бы сделать, это добавить сообщение журнала в очередь, а затем создать один поток, который вытягивает экземпляры LogEntry из очереди и выполняет операцию журнала. Преимущество этого заключалось бы в том, что ведение журнала не мешает выполнению операции, и не каждая операция ведения журнала приводит к тому, что задание запускается в пуле потоков.
Как создать общую очередь, которая поддерживает много писателей и одного читателя потоковым безопасным способом? Некоторые примеры реализации очереди, которые предназначены для поддержки многих авторов (без создания синхронизации/блокировки) и одного читателя, будут действительно оценены.
Рекомендация относительно альтернативных подходов также будет оценена, но я не заинтересован в изменении рамок регистрации.
Ответы
Ответ 1
Я написал этот код некоторое время назад, не стесняйтесь его использовать.
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
namespace MediaBrowser.Library.Logging {
public abstract class ThreadedLogger : LoggerBase {
Queue<Action> queue = new Queue<Action>();
AutoResetEvent hasNewItems = new AutoResetEvent(false);
volatile bool waiting = false;
public ThreadedLogger() : base() {
Thread loggingThread = new Thread(new ThreadStart(ProcessQueue));
loggingThread.IsBackground = true;
loggingThread.Start();
}
void ProcessQueue() {
while (true) {
waiting = true;
hasNewItems.WaitOne(10000,true);
waiting = false;
Queue<Action> queueCopy;
lock (queue) {
queueCopy = new Queue<Action>(queue);
queue.Clear();
}
foreach (var log in queueCopy) {
log();
}
}
}
public override void LogMessage(LogRow row) {
lock (queue) {
queue.Enqueue(() => AsyncLogMessage(row));
}
hasNewItems.Set();
}
protected abstract void AsyncLogMessage(LogRow row);
public override void Flush() {
while (!waiting) {
Thread.Sleep(1);
}
}
}
}
Некоторые преимущества:
- Он сохраняет фоновый журнал вживую, поэтому ему не нужно вращаться и вращаться вниз.
- Он использует один поток для обслуживания очереди, что означает, что никогда не будет ситуации, когда 100 потоков обслуживают очередь.
- Он копирует очереди, чтобы гарантировать, что очередь не заблокирована во время выполнения операции журнала.
- Он использует AutoResetEvent, чтобы гарантировать, что поток bg находится в состоянии ожидания.
- Это, ИМХО, очень легко следовать
Вот немного улучшенная версия, имейте в виду, что я провел очень мало тестирования на ней, но она затрагивает несколько незначительных проблем.
public abstract class ThreadedLogger : IDisposable {
Queue<Action> queue = new Queue<Action>();
ManualResetEvent hasNewItems = new ManualResetEvent(false);
ManualResetEvent terminate = new ManualResetEvent(false);
ManualResetEvent waiting = new ManualResetEvent(false);
Thread loggingThread;
public ThreadedLogger() {
loggingThread = new Thread(new ThreadStart(ProcessQueue));
loggingThread.IsBackground = true;
// this is performed from a bg thread, to ensure the queue is serviced from a single thread
loggingThread.Start();
}
void ProcessQueue() {
while (true) {
waiting.Set();
int i = ManualResetEvent.WaitAny(new WaitHandle[] { hasNewItems, terminate });
// terminate was signaled
if (i == 1) return;
hasNewItems.Reset();
waiting.Reset();
Queue<Action> queueCopy;
lock (queue) {
queueCopy = new Queue<Action>(queue);
queue.Clear();
}
foreach (var log in queueCopy) {
log();
}
}
}
public void LogMessage(LogRow row) {
lock (queue) {
queue.Enqueue(() => AsyncLogMessage(row));
}
hasNewItems.Set();
}
protected abstract void AsyncLogMessage(LogRow row);
public void Flush() {
waiting.WaitOne();
}
public void Dispose() {
terminate.Set();
loggingThread.Join();
}
}
Преимущества над оригиналом:
- Это одноразовый, поэтому вы можете избавиться от асинхронного регистратора
- Улучшена смысловая семантика
- Он будет немного лучше реагировать на всплеск, за которым следует молчание.
Ответ 2
Да, вам нужна очередь производителей/потребителей. У меня есть один пример этого в моем учебнике по многопоточности - если вы посмотрите на мою страницу "deadlocks/monitor methods" , вы найдете код во втором половина.
Конечно, есть много других примеров в Интернете, и .NET 4.0 будет поставляться с одним в рамках (скорее, более полно, чем у меня!). В .NET 4.0 вы, вероятно, обернете ConcurrentQueue<T>
в BlockingCollection<T>
.
Версия на этой странице не является универсальной (она была написана давно), но вы, вероятно, захотите сделать ее обобщенной - было бы тривиально делать.
Вы вызывали бы Produce
из каждого "нормального" потока и Consume
из одного потока, просто зацикливаясь и регистрируя все, что он потребляет. Это, наверное, проще всего сделать, чтобы поток потребителя стал фоновым потоком, поэтому вам не нужно беспокоиться о "остановке" очереди, когда ваше приложение выходит. Это означает, что удаленная возможность пропускать окончательную запись в журнале (хотя бы на полпути через ее запись при выходе из приложения) - или даже больше, если вы создаете быстрее, чем она может потреблять/регистрировать.
Ответ 3
Я предлагаю начать с измерения фактического воздействия на производительность системы в целом (то есть путем запуска профилировщика) и, возможно, переключения на что-то более быстрое, например log4net (я лично перешел к нему из журнала EntLib давным-давно).
Если это не сработает, вы можете попробовать использовать этот простой метод из .NET Framework:
ThreadPool.QueueUserWorkItem
Очередность метода для выполнения. Метод выполняется, когда поток пула потоков становится доступным.
Сведения о MSDN
Если это не сработает, то вы можете прибегнуть к чему-то вроде того, как Джон Скит предложил и фактически сам кодировал фреймворк асинхронного журналирования.
Ответ 4
Вот что я придумал... также см. ответ Сэма Шаффрона. Этот ответ является вики-сообществом в случае возникновения каких-либо проблем, которые люди видят в коде и хотят обновить.
/// <summary>
/// A singleton queue that manages writing log entries to the different logging sources (Enterprise Library Logging) off the executing thread.
/// This queue ensures that log entries are written in the order that they were executed and that logging is only utilizing one thread (backgroundworker) at any given time.
/// </summary>
public class AsyncLoggerQueue
{
//create singleton instance of logger queue
public static AsyncLoggerQueue Current = new AsyncLoggerQueue();
private static readonly object logEntryQueueLock = new object();
private Queue<LogEntry> _LogEntryQueue = new Queue<LogEntry>();
private BackgroundWorker _Logger = new BackgroundWorker();
private AsyncLoggerQueue()
{
//configure background worker
_Logger.WorkerSupportsCancellation = false;
_Logger.DoWork += new DoWorkEventHandler(_Logger_DoWork);
}
public void Enqueue(LogEntry le)
{
//lock during write
lock (logEntryQueueLock)
{
_LogEntryQueue.Enqueue(le);
//while locked check to see if the BW is running, if not start it
if (!_Logger.IsBusy)
_Logger.RunWorkerAsync();
}
}
private void _Logger_DoWork(object sender, DoWorkEventArgs e)
{
while (true)
{
LogEntry le = null;
bool skipEmptyCheck = false;
lock (logEntryQueueLock)
{
if (_LogEntryQueue.Count <= 0) //if queue is empty than BW is done
return;
else if (_LogEntryQueue.Count > 1) //if greater than 1 we can skip checking to see if anything has been enqueued during the logging operation
skipEmptyCheck = true;
//dequeue the LogEntry that will be written to the log
le = _LogEntryQueue.Dequeue();
}
//pass LogEntry to Enterprise Library
Logger.Write(le);
if (skipEmptyCheck) //if LogEntryQueue.Count was > 1 before we wrote the last LogEntry we know to continue without double checking
{
lock (logEntryQueueLock)
{
if (_LogEntryQueue.Count <= 0) //if queue is still empty than BW is done
return;
}
}
}
}
}
Ответ 5
В ответ на сообщение Сэма Сафронса я хотел назвать флеш и убедиться, что все действительно закончено. В моем случае я пишу в базу данных в потоке очереди, и все мои события журнала выходили в очередь, но иногда приложение останавливалось, прежде чем все было закончено, что было неприемлемо в моей ситуации. Я изменил несколько фрагментов кода, но главное, что я хотел разделить, - это флеш:
public static void FlushLogs()
{
bool queueHasValues = true;
while (queueHasValues)
{
//wait for the current iteration to complete
m_waitingThreadEvent.WaitOne();
lock (m_loggerQueueSync)
{
queueHasValues = m_loggerQueue.Count > 0;
}
}
//force MEL to flush all its listeners
foreach (MEL.LogSource logSource in MEL.Logger.Writer.TraceSources.Values)
{
foreach (TraceListener listener in logSource.Listeners)
{
listener.Flush();
}
}
}
Надеюсь, это избавит кого-то от разочарования. Это особенно заметно в параллельных процессах, регистрирующих много данных.
Спасибо, что поделились своим решением, он поставил меня в хорошее направление!
- Джонни S
Ответ 6
Я хотел сказать, что мой предыдущий пост был бесполезным. Вы можете просто установить AutoFlush в true, и вам не придется перебирать все слушатели. Тем не менее, у меня все еще была сумасшедшая проблема с параллельными потоками, пытаясь сбросить регистратор. Мне пришлось создать еще одно логическое значение, которое было установлено в true при копировании очереди и выполнении записей LogEntry, а затем в рутине флеша мне пришлось проверить, что логическое значение, чтобы убедиться, что что-то еще не было в очереди, и ничего не обрабатывалось перед возвратом.
Теперь несколько потоков параллельно могут ударить по этой вещи, и когда я вызываю флеш, я знаю, что это действительно покраснело.
public static void FlushLogs()
{
int queueCount;
bool isProcessingLogs;
while (true)
{
//wait for the current iteration to complete
m_waitingThreadEvent.WaitOne();
//check to see if we are currently processing logs
lock (m_isProcessingLogsSync)
{
isProcessingLogs = m_isProcessingLogs;
}
//check to see if more events were added while the logger was processing the last batch
lock (m_loggerQueueSync)
{
queueCount = m_loggerQueue.Count;
}
if (queueCount == 0 && !isProcessingLogs)
break;
//since something is in the queue, reset the signal so we will not keep looping
Thread.Sleep(400);
}
}
Ответ 7
Просто обновление:
Используя eneprise library 5.0 с .NET 4.0, это можно легко сделать:
static public void LogMessageAsync(LogEntry logEntry)
{
Task.Factory.StartNew(() => LogMessage(logEntry));
}
См:
http://randypaulo.wordpress.com/2011/07/28/c-enterprise-library-asynchronous-logging/
Ответ 8
Здесь может помочь дополнительный уровень косвенности.
Ваш первый вызов метода async может помещать сообщения в синхронизированную очередь и устанавливать событие - поэтому блокировки происходят в пуле потоков, а не в ваших рабочих потоках, а затем еще один поток вытягивает сообщения из очереди когда событие поднято.
Ответ 9
Если вы регистрируете что-то в отдельном потоке, сообщение может не быть записано, если приложение сработает, что делает его бесполезным.
Причина заключается в том, почему вы должны всегда скрываться после каждой записи.
Ответ 10
Если вы имеете в виду очередь SHARED, то я думаю, вам придется синхронизировать записи с ней, нажатиями и попками.
Но я все же считаю, что стоит сосредоточиться на дизайне общей очереди. По сравнению с IO регистрации и, вероятно, по сравнению с другой работой, выполняемой вашим приложением, краткое количество блокировок для нажатий и всплывающих окон, вероятно, не будет значительным.