Ответ 1
В то время как мне нравятся компоненты TPL Dataflow (которые, как подсказывает, вы используете), переход к этой системе требует значительных обязательств - это не то, что вы можете просто добавить к существующему дизайну. Он предлагает значительные преимущества, если вы выполняете большие объемы обработки данных с интенсивным использованием процессора и хотите использовать многие ядра ЦП. Но получить лучшее из этого нетривиально.
Его другое предложение, используя Rx, может быть проще интегрировать с существующим решением. (См. оригинальную документацию, но для последнего кода используйте Rx-Main пакет nuget. Или если вы хотели бы посмотреть на источник, см. сайт Rx CodePlex). Возможно, даже если код вызова будет продолжен, используя IEnumerable<Symbol>
, если вы хотите - вы можете использовать Rx только как деталь реализации, [ edit 2013/11/09, чтобы добавить:], хотя, как указывает svick, это, вероятно, не очень хорошая идея, учитывая вашу конечную цель.
Прежде чем я покажу вам пример, я хочу четко сказать, что именно мы делаем. В вашем примере был метод с этой сигнатурой:
public async Task<IEnumerable<Symbol>> GetSymbolsAsync()
Этот тип возвращаемого значения Task<IEnumerable<Symbol>>
, по существу, говорит: "Это метод, который производит единственный результат типа IEnumerable<Symbol>
, и он может не сразу произвести этот результат".
Это тот единственный бит результата, который, как я думаю, вызывает у вас горе, потому что это не то, что вы хотите. A Task<T>
(независимо от того, что T
может быть) представляет собой одиночную асинхронную операцию. Он может иметь много шагов (многие используют await
, если вы реализуете его как метод С# async
), но в конечном итоге он производит одно. Вы хотите создавать несколько вещей в разное время, поэтому Task<T>
не подходит.
Если бы вы действительно собирались сделать то, что ваша сигнатура метода promises - в конечном итоге произведет один результат - один способ, которым вы могли бы это сделать, - это создать метод async для создания списка, а затем произвести это как результат, когда он будет хорошим и готовым
// Note: this first example is *not* what you want.
// However, it is what your method signature promises to do.
public async Task<IEnumerable<Symbol>> GetSymbolsAsync()
{
var historicalFinancialTask = new List<Task<HistoricalFinancialResult>>();
foreach (var symbol in await _listSymbols)
{
historicalFinancialTask.Add(GetFinancialsQueryAsync(symbol));
}
var results = new List<Symbol>();
while (historicalFinancialTask.Count > 0)
{
var historicalFinancial = await Task.WhenAny(historicalFinancialTask);
historicalFinancialTask.Remove(historicalFinancial);
results.Add(new Symbol(historicalFinancial.Result.Symbol.Identifier, historicalFinancial.Result.Symbol.HistoricalQuotes, historicalFinancial.Result.Data));
}
return results;
}
Этот метод делает то, что говорит его подпись: он асинхронно создает последовательность символов.
Но предположительно вы хотели бы создать IEnumerable<Symbol>
, который будет создавать элементы по мере их появления, а не ждать, пока они не будут доступны. (В противном случае вы могли бы просто использовать WhenAll
.) Вы можете это сделать, но yield return
не подходит.
Короче говоря, я думаю, что вы хотите сделать, это создать асинхронный список. Там тип для этого: IObservable<T>
выражает то, что, как я полагаю, вы надеялись выразить с помощью Task<IEnumerable<Symbol>>
: это последовательность элементов (точно так же, как IEnumerable<T>
), но асинхронная.
Это может помочь понять это по аналогии:
public Symbol GetSymbol() ...
имеет значение
public Task<Symbol> GetSymbolAsync() ...
а
public IEnumerable<Symbol> GetSymbols() ...
:
public IObservable<Symbol> GetSymbolsObservable() ...
(К сожалению, в отличие от Task<T>
не существует общего соглашения об именах для вызова асинхронного метода, ориентированного на последовательность. Я добавил здесь "Observable", но это не универсальная практика. не назовет его GetSymbolsAsync
, потому что люди ожидают, что вернут Task
.)
Другими словами, Task<IEnumerable<T>>
говорит: "Я создам эту коллекцию, когда буду хорош и готов", тогда как IObservable<T>
говорит: "Вот коллекция. Я буду производить каждый элемент, когда я буду хорош и готов."
Итак, вам нужен метод, который возвращает последовательность объектов Symbol
, где эти объекты создаются асинхронно. Это говорит нам, что вы действительно должны возвращать IObservable<Symbol>
. Вот реализация:
// Unlike this first example, this *is* what you want.
public IObservable<Symbol> GetSymbolsRx()
{
return Observable.Create<Symbol>(async obs =>
{
var historicalFinancialTask = new List<Task<HistoricalFinancialResult>>();
foreach (var symbol in await _listSymbols)
{
historicalFinancialTask.Add(GetFinancialsQueryAsync(symbol));
}
while (historicalFinancialTask.Count > 0)
{
var historicalFinancial = await Task.WhenAny(historicalFinancialTask);
historicalFinancialTask.Remove(historicalFinancial);
obs.OnNext(new Symbol(historicalFinancial.Result.Symbol.Identifier, historicalFinancial.Result.Symbol.HistoricalQuotes, historicalFinancial.Result.Data));
}
});
}
Как вы можете видеть, это позволяет писать в значительной степени то, что вы надеялись написать, - тело этого кода почти идентично вашему. Единственное различие заключается в том, что, когда вы использовали yield return
(который не компилировался), это вызывает метод OnNext
для объекта, предоставленного Rx.
Написав это, вы можете легко обернуть это в IEnumerable<Symbol>
([ Edited 2013/11/29, чтобы добавить:], хотя вы, вероятно, на самом деле не хотите этого делать - см. дополнение в конце ответа):
public IEnumerable<Symbol> GetSymbols()
{
return GetSymbolsRx().ToEnumerable();
}
Это может выглядеть несинхронно, но на самом деле позволяет асинхронно работать с базовым кодом. Когда вы вызываете этот метод, он не будет блокироваться - даже если базовый код, который выполняет работу по извлечению финансовой информации, не может сразу произвести результат, этот метод, тем не менее, немедленно вернет IEnumerable<Symbol>
. Теперь, конечно, любой код, который пытается выполнить итерацию через эту коллекцию, в конечном итоге блокируется, если данные еще не доступны. Но критически важно то, что я думаю, что вы изначально пытались достичь:
- Вы можете написать метод
async
, который выполняет работу (делегат в моем примере передан как аргументObservable.Create<T>
, но вы можете написать автономныйasync
метод, если хотите) - Вызывающий код не будет заблокирован просто в результате запроса начать загрузку символов
- Получаемый
IEnumerable<Symbol>
будет производить каждый отдельный элемент, как только он станет доступен
Это работает, потому что метод Rx ToEnumerable
имеет в себе некоторый умный код, который соединяет разрыв между синхронным мировоззрением IEnumerable<T>
и асинхронным производством результатов. (Другими словами, это делает именно то, что вы были разочарованы, обнаружив, что С# не смог сделать для вас.)
Если вам интересно, вы можете посмотреть на источник. Код, который лежит в основе того, что ToEnumerable
можно найти на https://rx.codeplex.com/SourceControl/latest#Rx.NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/GetEnumerator.cs
[ Отредактировано 2013/11/29, чтобы добавить:]
svick указал в комментариях, что я пропустил: ваша конечная цель - поместить содержимое в ObservableCollection<Symbol>
. Как-то я этого не видел. Это означает, что IEnumerable<T>
- неправильный путь - вы хотите заполнить коллекцию, поскольку элементы становятся доступными, а не выполняются с помощью цикла foreach
. Поэтому вы просто сделаете это:
GetSymbolsRx().Subscribe(symbol => SymbolsObservableCollection.Add(symbol));
или что-то в этом роде. Это добавит элементы в коллекцию по мере их появления.
Это зависит от того, что все началось с потока пользовательского интерфейса. Если это так, ваш асинхронный код должен работать в потоке пользовательского интерфейса, что означает, что когда элементы добавляются в коллекцию, это также происходит в потоке пользовательского интерфейса. Но если по какой-то причине вы в конечном итоге запускаете вещи из рабочего потока (или если вы будете использовать ConfigureAwait
для любого из ожидающих, тем самым нарушая соединение с потоком пользовательского интерфейса), вам нужно будет организовать обработку элементов из поток Rx в правом потоке:
GetSymbolsRx()
.ObserveOnDispatcher()
.Subscribe(symbol => SymbolsObservableCollection.Add(symbol));
Если вы находитесь в потоке пользовательского интерфейса, когда вы это делаете, он подберет текущего диспетчера и обеспечит его получение. Если вы подписались на неправильный поток, вы можете использовать перегрузку ObserveOn
, которая принимает диспетчера. (Для этого требуется, чтобы у вас была ссылка на System.Reactive.Windows.Threading
. И это методы расширения, поэтому для их содержащего пространства имен вам понадобится using
, который также называется System.Reactive.Windows.Threading
)