Ответ 1
DataTable
просто не предназначен или не предназначен для одновременного использования (в частности, если есть какая-либо форма мутации). На мой взгляд, рекомендуемая "обертка" будет либо:
- удалить необходимость работать с
DataTable
одновременно (при включении мутации) или: - удалите
DataTable
, вместо этого используя структуру данных, которая либо напрямую поддерживает то, что вам нужно (например, параллельный сбор), либо намного проще и может быть тривиально синхронизирована (либо эксклюзивная, либо считывающая/записывающая)
В основном: измените проблему.
Из комментариев:
Код выглядит так:
Parallel.ForEach(strings, str=> { DataRow row; lock(table){ row= table.NewRow(); } MyParser.Parse(str, out row); lock(table){ table.Rows.Add(row) } });
Я могу только надеяться, что out row
является опечаткой здесь, поскольку это фактически не приведет к заполнению строки, созданной с помощью NewRow()
, но: если вам абсолютно необходимо использовать этот подход, вы не можете использовать NewRow
, так как ожидающая строка является разделяемой. Лучше всего:
Parallel.ForEach(strings, str=> {
object[] values = MyParser.Parse(str);
lock(table) {
table.Rows.Add(values);
}
});
Важное изменение в том, что lock
охватывает весь процесс новой строки. Обратите внимание, что при использовании Parallel.ForEach
у вас не будет гарантии заказа, поэтому важно, чтобы окончательный заказ не нуждался в точности (что не должно быть проблемой, если данные включают компонент времени).
Однако! Я все еще думаю, что вы приближаетесь к этому неправильно: для parallelism, чтобы быть релевантным, это должны быть нетривиальные данные. Если у вас нетривиальные данные, вам действительно не нужно буферизировать все это в памяти. Я сильно предлагаю сделать что-то вроде следующего, которое отлично работает в одном потоке:
using(var bcp = new SqlBulkCopy())
using(var reader = ObjectReader.Create(ParseFile(path)))
{
bcp.DestinationTable = "MyLog";
bcp.WriteToServer(reader);
}
...
static IEnumerable<LogRow> ParseFile(string path)
{
using(var reader = File.OpenText(path))
{
string line;
while((line = reader.ReadLine()) != null)
{
yield return new LogRow {
// TODO: populate the row from line here
};
}
}
}
...
public sealed class LogRow {
/* define your schema here */
}
Преимущества:
- нет буферизации - это полностью потоковая операция (
yield return
не помещает вещи в список или аналогичный) - по этой причине строки могут начать потоковое теги немедленно, не дожидаясь, прежде чем весь файл будет предварительно обработан.
- Отсутствие проблем с насыщением памяти.
- отсутствие проблем с потоком/накладные расходы
- вы можете сохранить исходный порядок (обычно не критический, но приятный)
- вы ограничены только тем, насколько быстро вы можете прочитать исходный файл, который обычно быстрее в одном потоке, чем из нескольких потоков (конкуренция на одном устройстве ввода-вывода просто накладная)
- избегает всех накладных расходов
DataTable
, который здесь слишком много, потому что он настолько гибкий, что имеет значительные накладные расходы - чтение (из файла журнала) и запись (в базу данных) теперь являются параллельными, а не последовательными
В своей работе я делаю много вещей, таких как ^^^, и по опыту обычно , по крайней мере, вдвое быстрее, чем заполнение DataTable
в памяти.
И наконец - вот пример реализации IEnumerable<T>
, который допускает одновременные чтения и записи, не требуя, чтобы все было буферизировано в памяти, - что позволило бы нескольким потокам анализировать данные (вызывая Add
и, наконец, Close
)) с помощью одного потока для SqlBulkCopy
через API IEnumerable<T>
:
using System;
using System.Collections;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
/// <summary>
/// Acts as a container for concurrent read/write flushing (for example, parsing a
/// file while concurrently uploading the contents); supports any number of concurrent
/// writers and readers, but note that each item will only be returned once (and once
/// fetched, is discarded). It is necessary to Close() the bucket after adding the last
/// of the data, otherwise any iterators will never finish
/// </summary>
class ThreadSafeBucket<T> : IEnumerable<T>
{
private readonly Queue<T> queue = new Queue<T>();
public void Add(T value)
{
lock (queue)
{
if (closed) // no more data once closed
throw new InvalidOperationException("The bucket has been marked as closed");
queue.Enqueue(value);
if (queue.Count == 1)
{ // someone may be waiting for data
Monitor.PulseAll(queue);
}
}
}
public void Close()
{
lock (queue)
{
closed = true;
Monitor.PulseAll(queue);
}
}
private bool closed;
public IEnumerator<T> GetEnumerator()
{
while (true)
{
T value;
lock (queue)
{
if (queue.Count == 0)
{
// no data; should we expect any?
if (closed) yield break; // nothing more ever coming
// else wait to be woken, and redo from start
Monitor.Wait(queue);
continue;
}
value = queue.Dequeue();
}
// yield it **outside** of the lock
yield return value;
}
}
IEnumerator IEnumerable.GetEnumerator()
{
return GetEnumerator();
}
}
static class Program
{
static void Main()
{
var bucket = new ThreadSafeBucket<int>();
int expectedTotal = 0;
ThreadPool.QueueUserWorkItem(delegate
{
int count = 0, sum = 0;
foreach(var item in bucket)
{
count++;
sum += item;
if ((count % 100) == 0)
Console.WriteLine("After {0}: {1}", count, sum);
}
Console.WriteLine("Total over {0}: {1}", count, sum);
});
Parallel.For(0, 5000,
new ParallelOptions { MaxDegreeOfParallelism = 3 },
i => {
bucket.Add(i);
Interlocked.Add(ref expectedTotal, i);
}
);
Console.WriteLine("all data added; closing bucket");
bucket.Close();
Thread.Sleep(100);
Console.WriteLine("expecting total: {0}",
Interlocked.CompareExchange(ref expectedTotal, 0, 0));
Console.ReadLine();
}
}