Ответ 1
Это мой любимый вид проблемы: тот, который можно решить с помощью конвейера.
Обратите внимание, что в зависимости от ваших обстоятельств этот подход может фактически отрицательно повлиять на производительность, но поскольку вы явно задали вопрос о том, как вы можете использовать запись в выделенном потоке, приведенный ниже код демонстрирует именно это.
Отказ от ответственности: вы должны в идеале рассмотреть TPL Dataflow для этого, но это не то, что я хорошо разбираюсь, поэтому я просто возьму знакомый маршрут Task
+ BlockingCollection<T>
.
Сначала я собирался предложить трехэтапный конвейер (чтение, процесс, запись), но потом я понял, что вы уже объединили первые два этапа с тем, как вы "потоки" узлов, поскольку они читать и подавать их на ваш Parallel.ForEach
(да, вы уже реализовали конвейер сортов). Еще лучше - меньше синхронизации потоков.
С учетом этого теперь код становится следующим:
public class Result
{
public string Case { get; set; }
public string State { get; set; }
public string Investor { get; set; }
public decimal Price { get; set; }
public string Product { get; set; }
}
...
using (var reader = CreateXmlReader())
{
// I highly doubt that this collection will
// ever reach its bounded capacity since
// the processing stage takes so long,
// but in case it does, Parallel.ForEach
// will be throttled.
using (var handover = new BlockingCollection<Result>(boundedCapacity: 100))
{
var processStage = Task.Run(() =>
{
try
{
Parallel.ForEach(EnumerateAxis(reader, "Input"), node =>
{
// Do calc.
Thread.Sleep(1000);
// Hand over to the writer.
// This handover is not blocking (unless our
// blocking collection has reached its bounded
// capacity, which would indicate that the
// writer is running slower than expected).
handover.Add(new Result());
});
}
finally
{
handover.CompleteAdding();
}
});
var writeStage = Task.Run(() =>
{
using (var writer = CreateXmlReader())
{
foreach (var result in handover.GetConsumingEnumerable())
{
// Write element.
}
}
});
// Note: the two stages are now running in parallel.
// You could technically use Parallel.Invoke to
// achieve the same result with a bit less code.
Task.WaitAll(processStage, writeStage);
}
}