Как перезвонить асинхронную функцию из rx subscribe?
Я хотел бы перезвонить асинхронную функцию в Rx-подписке.
например. например:
public class Consumer
{
private readonly Service _service = new Service();
public ReplaySubject<string> Results = new ReplaySubject<string>();
public void Trigger()
{
Observable.Timer(TimeSpan.FromMilliseconds(100)).Subscribe(async _ => await RunAsync());
}
public Task RunAsync()
{
return _service.DoAsync();
}
}
public class Service
{
public async Task<string> DoAsync()
{
return await Task.Run(() => Do());
}
private static string Do()
{
Thread.Sleep(TimeSpan.FromMilliseconds(200));
throw new ArgumentException("invalid!");
return "foobar";
}
}
[Test]
public async Task Test()
{
var sut = new Consumer();
sut.Trigger();
var result = await sut.Results.FirstAsync();
}
Что нужно сделать, чтобы правильно поймать исключение?
Ответы
Ответ 1
Вы не хотите передавать метод async
в Subscribe
, потому что это создаст метод async void
. Сделайте все возможное, чтобы избежать async void
.
В вашем случае я думаю, что вы хотите вызвать метод async
для каждого элемента последовательности и затем кешировать все результаты. В этом случае используйте SelectMany
для вызова метода async
для каждого элемента и Replay
для кеширования (плюс a Connect
, чтобы получить мяч):
public class Consumer
{
private readonly Service _service = new Service();
public IObservable<string> Trigger()
{
var connectible = Observable.Timer(TimeSpan.FromMilliseconds(100))
.SelectMany(_ => RunAsync())
.Replay();
connectible.Connect();
return connectible;
}
public Task<string> RunAsync()
{
return _service.DoAsync();
}
}
Я изменил свойство Results
, которое будет возвращено из метода Trigger
, что, я думаю, имеет больше смысла, поэтому тест теперь выглядит следующим образом:
[Test]
public async Task Test()
{
var sut = new Consumer();
var results = sut.Trigger();
var result = await results.FirstAsync();
}
Ответ 2
Ответ Paul Betts работает в большинстве сценариев, но если вы хотите заблокировать поток, ожидая завершения функции async, вам нужно что-то вроде этого:
Observable.Interval(TimeSpan.FromSeconds(1))
.Select(l => Observable.FromAsync(asyncMethod))
.Concat()
.Subscribe();
Или:
Observable.Interval(TimeSpan.FromSeconds(1))
.Select(_ => Observable.Defer(() => asyncMethod().ToObservable()))
.Concat()
.Subscribe();
Ответ 3
Измените это на:
Observable.Timer(TimeSpan.FromMilliseconds(100))
.SelectMany(async _ => await RunAsync())
.Subscribe();
Подписка не поддерживает асинхронную операцию внутри наблюдаемого.