Задача Параллельная библиотека WaitAny с заданным результатом
Я пытаюсь написать код, который будет вызывать вызов веб-службы на несколько разных серверов параллельно, поэтому TPL кажется очевидным выбором для использования.
Только один из моих вызовов веб-сервисов всегда вернет результат, который я хочу, и все остальные не будут. Я пытаюсь разработать способ эффективного использования Task.WaitAny
, но только разблокировать, когда возвращается первый Task
, который соответствует условию.
Я пробовал с WaitAny
, но не мог решить, где разместить фильтр. Я добрался до этого:
public void SearchServers()
{
var servers = new[] {"server1", "server2", "server3", "server4"};
var tasks = servers
.Select(s => Task<bool>.Factory.StartNew(server => CallServer((string)server), s))
.ToArray();
Task.WaitAny(tasks); //how do I say "WaitAny where the result is true"?
//Omitted: cancel any outstanding tasks since the correct server has been found
}
private bool CallServer(string server)
{
//... make the call to the server and return the result ...
}
Изменить: Быстрое разъяснение на случай, если возникнет какая-либо путаница выше. Я пытаюсь сделать следующее:
- Для каждого сервера запустите
Task
, чтобы проверить его
- Подождите, пока сервер не вернет true (только один из 1 серверов вернет true)
- Или подождите, пока все серверы не вернут false, т.е. не было соответствия.
Ответы
Ответ 1
Лучшее из того, что я могу представить, это указать ContinueWith
для каждого Task
, проверить результат и true
отменить другие задачи. Для отмены задач вы можете использовать CancellationToken.
var tasks = servers
.Select(s => Task.Run(...)
.ContinueWith(t =>
if (t.Result) {
// cancel other threads
}
)
).ToArray();
UPDATE: альтернативным решением будет WaitAny
до тех пор, пока правильная задача не завершится (но у нее есть некоторые недостатки, например, удаление готовых задач из списка и создание нового массива из оставшихся - довольно тяжелая операция)
List<Task<bool>> tasks = servers.Select(s => Task<bool>.Factory.StartNew(server => CallServer((string)server), s)).ToList();
bool result;
do {
int idx = Task.WaitAny(tasks.ToArray());
result = tasks[idx].Result;
tasks.RemoveAt(idx);
} while (!result && tasks.Count > 0);
// cancel other tasks
ОБНОВЛЕНИЕ 2: В настоящее время я бы сделал это с помощью Rx:
[Fact]
public async Task AwaitFirst()
{
var servers = new[] { "server1", "server2", "server3", "server4" };
var server = await servers
.Select(s => Observable
.FromAsync(ct => CallServer(s, ct))
.Where(p => p)
.Select(_ => s)
)
.Merge()
.FirstAsync();
output.WriteLine($"Got result from {server}");
}
private async Task<bool> CallServer(string server, CancellationToken ct)
{
try
{
if (server == "server1")
{
await Task.Delay(TimeSpan.FromSeconds(1), ct);
output.WriteLine($"{server} finished");
return false;
}
if (server == "server2")
{
await Task.Delay(TimeSpan.FromSeconds(2), ct);
output.WriteLine($"{server} finished");
return false;
}
if (server == "server3")
{
await Task.Delay(TimeSpan.FromSeconds(3), ct);
output.WriteLine($"{server} finished");
return true;
}
if (server == "server4")
{
await Task.Delay(TimeSpan.FromSeconds(4), ct);
output.WriteLine($"{server} finished");
return true;
}
}
catch(OperationCanceledException)
{
output.WriteLine($"{server} Cancelled");
throw;
}
throw new ArgumentOutOfRangeException(nameof(server));
}
Тест занимает 3,32 секунды на моей машине (это означает, что он не дождался 4-го сервера), и я получил следующий вывод:
server1 finished
server2 finished
server3 finished
server4 Cancelled
Got result from server3
Ответ 2
Вы можете использовать OrderByCompletion()
из библиотеки AsyncEx, которая возвращает задания по мере их завершения. Ваш код может выглядеть примерно так:
var tasks = servers
.Select(s => Task.Factory.StartNew(server => CallServer((string)server), s))
.OrderByCompletion();
foreach (var task in tasks)
{
if (task.Result)
{
Console.WriteLine("found");
break;
}
Console.WriteLine("not found yet");
}
// cancel any outstanding tasks since the correct server has been found
Ответ 3
Использование Interlocked.CompareExchange сделает именно это, только одна задача сможет писать на serverReturedDatap >
public void SearchServers()
{
ResultClass serverReturnedData = null;
var servers = new[] {"server1", "server2", "server3", "server4"};
var tasks = servers.Select(s => Task<bool>.Factory.StartNew(server =>
{
var result = CallServer((string)server), s);
Interlocked.CompareExchange(ref serverReturnedData, result, null);
}).ToArray();
Task.WaitAny(tasks); //how do I say "WaitAny where the result is true"?
//
// use serverReturnedData as you want.
//
}
EDIT: Как сказал Джасд, вышеуказанный код может вернуться до того, как переменная serverReturnedData имеет допустимое значение (если сервер возвращает нулевое значение, это может произойти), чтобы вы могли обернуть результат в пользовательский объект.
Ответ 4
Вот общее решение, основанное на svick ответе:
public static async Task<T> GetFirstResult<T>(
this IEnumerable<Func<CancellationToken, Task<T>>> taskFactories,
Action<Exception> exceptionHandler,
Predicate<T> predicate)
{
T ret = default(T);
var cts = new CancellationTokenSource();
var proxified = taskFactories.Select(tf => tf(cts.Token)).ProxifyByCompletion();
int i;
for (i = 0; i < proxified.Length; i++)
{
try
{
ret = await proxified[i].ConfigureAwait(false);
}
catch (Exception e)
{
exceptionHandler(e);
continue;
}
if (predicate(ret))
{
break;
}
}
if (i == proxified.Length)
{
throw new InvalidOperationException("No task returned the expected value");
}
cts.Cancel(); //we have our value, so we can cancel the rest of the tasks
for (int j = i+1; j < proxified.Length; j++)
{
//observe remaining tasks to prevent process crash
proxified[j].ContinueWith(
t => exceptionHandler(t.Exception), TaskContinuationOptions.OnlyOnFaulted)
.Forget();
}
return ret;
}
Где ProxifyByCompletion
реализовано как:
public static Task<T>[] ProxifyByCompletion<T>(this IEnumerable<Task<T>> tasks)
{
var inputTasks = tasks.ToArray();
var buckets = new TaskCompletionSource<T>[inputTasks.Length];
var results = new Task<T>[inputTasks.Length];
for (int i = 0; i < buckets.Length; i++)
{
buckets[i] = new TaskCompletionSource<T>();
results[i] = buckets[i].Task;
}
int nextTaskIndex = -1;
foreach (var inputTask in inputTasks)
{
inputTask.ContinueWith(completed =>
{
var bucket = buckets[Interlocked.Increment(ref nextTaskIndex)];
if (completed.IsFaulted)
{
Trace.Assert(completed.Exception != null);
bucket.TrySetException(completed.Exception.InnerExceptions);
}
else if (completed.IsCanceled)
{
bucket.TrySetCanceled();
}
else
{
bucket.TrySetResult(completed.Result);
}
}, CancellationToken.None,
TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default);
}
return results;
}
И Forget
- пустой метод для подавления CS4014:
public static void Forget(this Task task) //suppress CS4014
{
}