Реализация правильного завершения повторного блока
Teaser: ребята, этот вопрос не о том, как реализовать политику повтора. Это о правильном завершении блока потока данных TPL.
Этот вопрос в основном является продолжением моего предыдущего вопроса Повторить политику в ITargetBlock. Ответом на этот вопрос было решение @svick smart, которое использует TransformBlock
(источник) и TransformManyBlock
(target). Единственная проблема заключается в том, чтобы закончить этот блок правильно: дождитесь завершения всех попыток, а затем заполните целевой блок. Вот что я закончил (это просто фрагмент, не обращайте слишком много внимания на не-потоковое приложение retries
):
var retries = new HashSet<RetryingMessage<TInput>>();
TransformManyBlock<RetryableMessage<TInput>, TOutput> target = null;
target = new TransformManyBlock<RetryableMessage<TInput>, TOutput>(
async message =>
{
try
{
var result = new[] { await transform(message.Data) };
retries.Remove(message);
return result;
}
catch (Exception ex)
{
message.Exceptions.Add(ex);
if (message.RetriesRemaining == 0)
{
if (failureHandler != null)
failureHandler(message.Exceptions);
retries.Remove(message);
}
else
{
retries.Add(message);
message.RetriesRemaining--;
Task.Delay(retryDelay)
.ContinueWith(_ => target.Post(message));
}
return null;
}
}, dataflowBlockOptions);
source.LinkTo(target);
source.Completion.ContinueWith(async _ =>
{
while (target.InputCount > 0 || retries.Any())
await Task.Delay(100);
target.Complete();
});
Идея состоит в том, чтобы выполнить какой-то опрос и проверить, есть ли еще сообщения, ожидающие обработки, и нет сообщений, требующих повторной попытки. Но в этом решении мне не нравится идея опроса.
Да, я могу инкапсулировать логику добавления/удаления повторений в отдельный класс и даже, например, выполнить некоторое действие, когда набор попыток станет пустым, но как справиться с условием target.InputCount > 0
? Не существует такого обратного вызова, вызываемого при отсутствии ожидающих сообщений для блока, поэтому кажется, что проверка target.ItemCount
в цикле с небольшой задержкой является единственным вариантом.
Кто-нибудь знает более умный способ достичь этого?
Ответы
Ответ 1
Сочетание hwcverwe ответа и комментария JamieSee может быть идеальным решением.
Сначала вам нужно создать несколько событий:
var signal = new ManualResetEvent(false);
var completedEvent = new ManualResetEvent(false);
Затем вам нужно создать наблюдателя и подписаться на TransformManyBlock
, поэтому вы будете уведомлены, когда произойдет соответствующее событие:
var observer = new RetryingBlockObserver<TOutput>(completedEvent);
var observable = target.AsObservable();
observable.Subscribe(observer);
Наблюдаемое может быть довольно простым:
private class RetryingBlockObserver<T> : IObserver<T> {
private ManualResetEvent completedEvent;
public RetryingBlockObserver(ManualResetEvent completedEvent) {
this.completedEvent = completedEvent;
}
public void OnCompleted() {
completedEvent.Set();
}
public void OnError(Exception error) {
//TODO
}
public void OnNext(T value) {
//TODO
}
}
И вы можете дождаться либо сигнала, либо завершения (исчерпания всех исходных элементов), либо обоих
source.Completion.ContinueWith(async _ => {
WaitHandle.WaitAll(completedEvent, signal);
// Or WaitHandle.WaitAny, depending on your needs!
target.Complete();
});
Вы можете проверить значение результата WaitAll, чтобы понять, какое событие было установлено, и реагировать соответствующим образом.
Вы также можете добавить в код другие события, передавая их наблюдателю, чтобы он мог установить их при необходимости. Вы можете различать свое поведение и реагировать по-разному, когда возникает ошибка, например
Ответ 2
Возможно, ManualResetEvent может сделать трюк для вас.
Добавьте публичное свойство в TransformManyBlock
private ManualResetEvent _signal = new ManualResetEvent(false);
public ManualResetEvent Signal { get { return _signal; } }
И вот вы идете:
var retries = new HashSet<RetryingMessage<TInput>>();
TransformManyBlock<RetryableMessage<TInput>, TOutput> target = null;
target = new TransformManyBlock<RetryableMessage<TInput>, TOutput>(
async message =>
{
try
{
var result = new[] { await transform(message.Data) };
retries.Remove(message);
// Sets the state of the event to signaled, allowing one or more waiting threads to proceed
if(!retries.Any()) Signal.Set();
return result;
}
catch (Exception ex)
{
message.Exceptions.Add(ex);
if (message.RetriesRemaining == 0)
{
if (failureHandler != null)
failureHandler(message.Exceptions);
retries.Remove(message);
// Sets the state of the event to signaled, allowing one or more waiting threads to proceed
if(!retries.Any()) Signal.Set();
}
else
{
retries.Add(message);
message.RetriesRemaining--;
Task.Delay(retryDelay)
.ContinueWith(_ => target.Post(message));
}
return null;
}
}, dataflowBlockOptions);
source.LinkTo(target);
source.Completion.ContinueWith(async _ =>
{
//Blocks the current thread until the current WaitHandle receives a signal.
target.Signal.WaitOne();
target.Complete();
});
Я не уверен, где установлен ваш target.InputCount
. Итак, в месте, где вы меняете target.InputCount
, вы можете добавить следующий код:
if(InputCount == 0) Signal.Set();