Ответ 1
Я издевался над некоторым кодом ниже, который может ответить на ваш вопрос. Основной момент заключается в том, что вы получаете fork/join parallelism с Parallel.ForEach, поэтому вам не нужно беспокоиться о состоянии гонки вне параллельной задачи (вызывающий поток блокируется, пока задачи не будут завершены, успешно или иначе). Вы просто хотите убедиться, что используете переменную LoopState (второй аргумент лямбда) для управления состоянием цикла.
Если какая-либо итерация цикла породила необработанное исключение, общий цикл будет вызывать исключение AggregateException в конце.
Другие ссылки, в которых упоминается этот раздел:
Параллельно. ForEach выдает исключение при обработке чрезвычайно больших наборов данных
http://msdn.microsoft.com/en-us/library/dd460720.aspx
Ограничивает ли Parallel.ForEach количество активных потоков?
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.ServiceModel;
namespace Temp
{
public class Class1
{
private class MockWcfProxy
{
internal object ProcessIntervalConfiguration(string item)
{
return new Object();
}
public CommunicationState State { get; set; }
}
private void myFunction()
{
IList<string> iListOfItems = new List<string>();
// populate iListOfItems
CancellationTokenSource cts = new CancellationTokenSource();
ParallelOptions po = new ParallelOptions();
po.MaxDegreeOfParallelism = 20; // max threads
po.CancellationToken = cts.Token;
try
{
var myWcfProxy = new MockWcfProxy();
if (Parallel.ForEach(iListOfItems, po, (item, loopState) =>
{
try
{
if (loopState.ShouldExitCurrentIteration || loopState.IsExceptional)
loopState.Stop();
// long running blocking WS call, check before and after
var response = myWcfProxy.ProcessIntervalConfiguration(item);
if (loopState.ShouldExitCurrentIteration || loopState.IsExceptional)
loopState.Stop();
// perform some local processing of the response object
}
catch (Exception ex)
{
// cannot continue game over.
if (myWcfProxy.State == CommunicationState.Faulted)
{
loopState.Stop();
throw;
}
// FYI you are swallowing all other exceptions here...
}
// else carry on..
// raise some events and other actions that could all risk an unhanded error.
}
).IsCompleted)
{
RaiseAllItemsCompleteEvent();
}
}
catch (AggregateException aggEx)
{
// This section will be entered if any of the loops threw an unhandled exception.
// Because we re-threw the WCF exeption above, you can use aggEx.InnerExceptions here
// to see those (if you want).
}
// Execution will not get to this point until all of the iterations have completed (or one
// has failed, and all that were running when that failure occurred complete).
}
private void RaiseAllItemsCompleteEvent()
{
// Everything completed...
}
}
}