Ответ 1
Вероятно, проще, если вы перевернете операцию в метод, который будет обрабатывать один запрос асинхронно, а затем вызовите его 100 раз.
Чтобы начать, укажите нужный конечный результат. Поскольку вы работаете с MemoryStream
, это означает, что вы захотите вернуть Task<MemoryStream>
из вашего метода. Подпись будет выглядеть примерно так:
static Task<MemoryStream> GetMemoryStreamAsync(AmazonS3 s3,
GetObjectRequest request)
Поскольку ваш объект AmazonS3
реализует шаблон асинхронного проектирования, вы можете использовать FromAsync
в TaskFactory
class для создания Task<T>
из класса, реализующего асинхронный дизайн Шаблон, например:
static Task<MemoryStream> GetMemoryStreamAsync(AmazonS3 s3,
GetObjectRequest request)
{
Task<GetObjectResponse> response =
Task.Factory.FromAsync<GetObjectRequest,GetObjectResponse>(
s3.BeginGetObject, s3.EndGetObject, request, null);
// But what goes here?
Итак, вы уже в хорошем месте, у вас есть Task<T>
, на который вы можете дождаться или получить обратный вызов при завершении вызова. Однако вам нужно каким-то образом перевести GetObjectResponse
, возвращенный с вызова на Task<GetObjectResponse>
, в MemoryStream
.
С этой целью вы хотите использовать метод ContinueWith
в классе Task<T>
. Подумайте об этом как о асинхронной версии метода Select
на Enumerable
class, это просто проецирование в другое Task<T>
, за исключением того, что каждый раз, когда вы вызываете ContinueWith
, вы потенциально создаете новую задачу, которая запускает этот раздел кода.
При этом ваш метод выглядит следующим образом:
static Task<MemoryStream> GetMemoryStreamAsync(AmazonS3 s3,
GetObjectRequest request)
{
// Start the task of downloading.
Task<GetObjectResponse> response =
Task.Factory.FromAsync<GetObjectRequest,GetObjectResponse>(
s3.BeginGetObject, s3.EndGetObject, request, null
);
// Translate.
Task<MemoryStream> translation = response.ContinueWith(t => {
using (Task<GetObjectResponse> resp = t ){
var ms = new MemoryStream();
t.Result.ResponseStream.CopyTo(ms);
return ms;
}
});
// Return the full task chain.
return translation;
}
Обратите внимание, что в приведенном выше примере вы можете вызвать перегрузку ContinueWith
, передав TaskContinuationOptions.ExecuteSynchronously
, так как кажется, что вы выполняете минимальную работу (я не могу сказать, ответы могут быть огромными). В тех случаях, когда вы выполняете очень минимальную работу, когда было бы вредно запускать новую задачу для завершения работы, вы должны пройти TaskContinuationOptions.ExecuteSynchronously
, чтобы не тратить время на создание новых задач для минимальных операций.
Теперь, когда у вас есть метод, который может преобразовать один запрос в Task<MemoryStream>
, создание обертки, которая будет обрабатывать любое число из них, проста:
static Task<MemoryStream>[] GetMemoryStreamsAsync(AmazonS3 s3,
IEnumerable<GetObjectRequest> requests)
{
// Just call Select on the requests, passing our translation into
// a Task<MemoryStream>.
// Also, materialize here, so that the tasks are "hot" when
// returned.
return requests.Select(r => GetMemoryStreamAsync(s3, r)).
ToArray();
}
В приведенном выше примере вы просто берете последовательность своих экземпляров GetObjectRequest
и возвращаете массив из Task<MemoryStream>
. Важен тот факт, что он возвращает материализованную последовательность. Если вы не материализуете его перед возвратом, то задачи не будут созданы до тех пор, пока последовательность не будет выполнена.
Конечно, если вы хотите этого поведения, то, во что бы то ни стало, просто удалите вызов .ToArray()
, верните метод IEnumerable<Task<MemoryStream>>
, а затем запросы будут сделаны, когда вы выполните итерацию по задачам.
Оттуда вы можете обрабатывать их по одному (используя метод Task.WaitAny
в цикле) или дождаться их всех (путем вызова метода Task.WaitAll
). Примером последнего может быть:
static IList<MemoryStream> GetMemoryStreams(AmazonS3 s3,
IEnumerable<GetObjectRequest> requests)
{
Task<MemoryStream>[] tasks = GetMemoryStreamsAsync(s3, requests);
Task.WaitAll(tasks);
return tasks.Select(t => t.Result).ToList();
}
Кроме того, следует отметить, что это довольно хорошо подходит для Reactive Extensions framework, так как это очень хорошо подходит для IObservable<T>
.