Как работать с потоком с ConcurrentQueue <T>
Я пытаюсь понять, каким будет лучший способ работы с очередью. У меня есть процесс, который возвращает DataTable. Каждый DataTable, в свою очередь, объединяется с предыдущим DataTable. Существует одна проблема: слишком много записей для хранения до окончательного BulkCopy (OutOfMemory).
Итак, я решил, что я должен немедленно обработать каждый входящий DataTable. Думая о ConcurrentQueue<T>
... но я не вижу, как метод WriteQueuedData()
знал бы, чтобы удалить из таблицы таблицу и записать ее в базу данных.
Например:
public class TableTransporter
{
private ConcurrentQueue<DataTable> tableQueue = new ConcurrentQueue<DataTable>();
public TableTransporter()
{
tableQueue.OnItemQueued += new EventHandler(WriteQueuedData); // no events available
}
public void ExtractData()
{
DataTable table;
// perform data extraction
tableQueue.Enqueue(table);
}
private void WriteQueuedData(object sender, EventArgs e)
{
BulkCopy(e.Table);
}
}
Мой первый вопрос: кроме того, что на самом деле у меня нет каких-либо событий для подписки, если я называю ExtractData()
асинхронно, это все, что мне нужно? Во-вторых, есть ли что-то, что мне не хватает в функции ConcurrentQueue<T>
и нужно, чтобы какая-то форма триггера работала асинхронно с объектами в очереди?
Обновление
Я только что получил класс из ConcurrentQueue<T>
, у которого есть обработчик события OnItemQueued. Тогда:
new public void Enqueue (DataTable Table)
{
base.Enqueue(Table);
OnTableQueued(new TableQueuedEventArgs(Table));
}
public void OnTableQueued(TableQueuedEventArgs table)
{
EventHandler<TableQueuedEventArgs> handler = TableQueued;
if (handler != null)
{
handler(this, table);
}
}
Любые опасения по поводу этой реализации?
Ответы
Ответ 1
Из моего понимания проблемы вам не хватает нескольких вещей.
Параллельная очередь представляет собой структуру данных, предназначенную для приема нескольких потоков, просматривающих и записывающих в очередь, без необходимости явно блокировать структуру данных. (Все, что касается джаза за кулисами, или коллекция реализована таким образом, что ей не нужно блокировать.)
Имея это в виду, похоже, что шаблон, который вы пытаетесь использовать, - это "Produce/Consumer". Во-первых, у вас есть некоторые задачи, которые создают работу (и добавление элементов в очередь). И второй у вас есть вторая задача. Потребление вещей из очереди (и объектов очереди).
Итак, вы хотите два потока: одно добавление элементов и второе удаление элементов. Поскольку вы используете параллельную коллекцию, вы можете иметь несколько потоков, добавляющих элементы и несколько потоков, удаляющих элементы. Но, очевидно, чем больше у вас соперничества в параллельной очереди, тем быстрее это станет узким местом.
Ответ 2
Я думаю, что ConcurrentQueue полезен только в очень немногих случаях. Его главное преимущество заключается в том, что он свободен от блокировки. Однако обычно потоки (-и) производителя должны каким-то образом информировать потребительский поток (-ы) о том, что имеются данные для обработки. Эта сигнализация между потоками требует блокировок и отменяет преимущества использования ConcurrentQueue. Самый быстрый способ синхронизации потоков - использовать Monitor.Pulse(), который работает только внутри блокировки. Все другие инструменты синхронизации еще медленнее.
Конечно, потребитель может просто постоянно проверять, есть ли что-то в очереди, которое работает без блокировок, но это огромная трата ресурсов процессора. Чуть лучше, если потребитель ждет между проверкой.
Поднятие потока при записи в очередь - очень плохая идея. Использование ConcurrentQueue для сохранения mabe 1 микросекунды будет полностью потрачено впустую, выполнив обработчик событий, который может занять 1000 раз.
Если вся обработка выполняется в обработчике событий или асинхронном вызове, возникает вопрос, почему все еще нужна очередь? Лучше передать данные непосредственно обработчику и вообще не использовать очередь.
Обратите внимание, что реализация ConcurrentQueue довольно сложна, чтобы разрешить concurrency. В большинстве случаев лучше использовать обычную Queue < > и блокировать каждый доступ к очереди. Поскольку для доступа к очереди требуется только микросекунды, крайне маловероятно, что 2 потока будут обращаться к очереди в одну и ту же микросекунду, и из-за блокировки почти не будет задержки. Использование обычной Queue < > с блокировкой часто приводит к более быстрому выполнению кода, чем ConcurrentQueue.
Ответ 3
Это полное решение для того, что я придумал:
public class TableTransporter
{
private static int _indexer;
private CustomQueue tableQueue = new CustomQueue();
private Func<DataTable, String> RunPostProcess;
private string filename;
public TableTransporter()
{
RunPostProcess = new Func<DataTable, String>(SerializeTable);
tableQueue.TableQueued += new EventHandler<TableQueuedEventArgs>(tableQueue_TableQueued);
}
void tableQueue_TableQueued(object sender, TableQueuedEventArgs e)
{
// do something with table
// I can't figure out is how to pass custom object in 3rd parameter
RunPostProcess.BeginInvoke(e.Table,new AsyncCallback(PostComplete), filename);
}
public void ExtractData()
{
// perform data extraction
tableQueue.Enqueue(MakeTable());
Console.WriteLine("Table count [{0}]", tableQueue.Count);
}
private DataTable MakeTable()
{ return new DataTable(String.Format("Table{0}", _indexer++)); }
private string SerializeTable(DataTable Table)
{
string file = Table.TableName + ".xml";
DataSet dataSet = new DataSet(Table.TableName);
dataSet.Tables.Add(Table);
Console.WriteLine("[{0}]Writing {1}", Thread.CurrentThread.ManagedThreadId, file);
string xmlstream = String.Empty;
using (MemoryStream memstream = new MemoryStream())
{
XmlSerializer xmlSerializer = new XmlSerializer(typeof(DataSet));
XmlTextWriter xmlWriter = new XmlTextWriter(memstream, Encoding.UTF8);
xmlSerializer.Serialize(xmlWriter, dataSet);
xmlstream = UTF8ByteArrayToString(((MemoryStream)xmlWriter.BaseStream).ToArray());
using (var fileStream = new FileStream(file, FileMode.Create))
fileStream.Write(StringToUTF8ByteArray(xmlstream), 0, xmlstream.Length + 2);
}
filename = file;
return file;
}
private void PostComplete(IAsyncResult iasResult)
{
string file = (string)iasResult.AsyncState;
Console.WriteLine("[{0}]Completed: {1}", Thread.CurrentThread.ManagedThreadId, file);
RunPostProcess.EndInvoke(iasResult);
}
public static String UTF8ByteArrayToString(Byte[] ArrBytes)
{ return new UTF8Encoding().GetString(ArrBytes); }
public static Byte[] StringToUTF8ByteArray(String XmlString)
{ return new UTF8Encoding().GetBytes(XmlString); }
}
public sealed class CustomQueue : ConcurrentQueue<DataTable>
{
public event EventHandler<TableQueuedEventArgs> TableQueued;
public CustomQueue()
{ }
public CustomQueue(IEnumerable<DataTable> TableCollection)
: base(TableCollection)
{ }
new public void Enqueue (DataTable Table)
{
base.Enqueue(Table);
OnTableQueued(new TableQueuedEventArgs(Table));
}
public void OnTableQueued(TableQueuedEventArgs table)
{
EventHandler<TableQueuedEventArgs> handler = TableQueued;
if (handler != null)
{
handler(this, table);
}
}
}
public class TableQueuedEventArgs : EventArgs
{
#region Fields
#endregion
#region Init
public TableQueuedEventArgs(DataTable Table)
{this.Table = Table;}
#endregion
#region Functions
#endregion
#region Properties
public DataTable Table
{get;set;}
#endregion
}
Как доказательство концепции, это работает очень хорошо. В большинстве случаев я видел 4 рабочих потока.