Заставить задачу продолжить текущий поток?
Я создаю порт для рамки AKKA для .NET(не считайте это слишком серьезным сейчас, это выходная часть части Актера прямо сейчас)
У меня возникают проблемы с поддержкой "Будущее".
В Java/Scala Akka фьючерсы ожидаются синхронно с вызовом Await.
Как и .NET Task.Wait()
Моя цель - поддерживать true async.
Он работает прямо сейчас, но продолжение выполняется в неправильном потоке в моем текущем решении.
Это результат передачи сообщения одному из моих участников, которые содержат блок ожидания для будущего.
Как вы можете видеть, актер всегда выполняется в одном потоке, а блок ожидания выполняется на случайном потоке threadpool.
actor thread: 6
await thread 10
actor thread: 6
await thread 12
actor thread: 6
actor thread: 6
await thread 13
...
Актер получает сообщение с использованием DataFlow BufferBlock<Message>
Вернее, я использую RX над буферным блоком для подписки на сообщения.
Он настроен следующим образом:
var messages = new BufferBlock<Message>()
{
BoundedCapacity = 100,
TaskScheduler = TaskScheduler.Default,
};
messages.AsObservable().Subscribe(this);
Пока все хорошо.
Однако, когда я жду будущий результат.
так:
protected override void OnReceive(IMessage message)
{
....
var result = await Ask(logger, m);
// This is not executed on the same thread as the above code
result.Match()
.With<SomeMessage>(t => {
Console.WriteLine("await thread {0}",
System.Threading.Thread.CurrentThread.GetHashCode());
})
.Default(_ => Console.WriteLine("Unknown message"));
...
Я знаю, что это обычное поведение async ждет, но я действительно должен убедиться, что только один поток имеет доступ к моему игроку.
Я не хочу, чтобы будущее выполнялось синхронно, я хочу запустить async так же, как обычно, но я хочу, чтобы продолжение выполнялось в том же потоке, что и процессор/актер.
Мой код для будущей поддержки выглядит следующим образом:
public Task<IMessage> Ask(ActorRef actor, IMessage message)
{
TaskCompletionSource<IMessage> result =
new TaskCompletionSource<IMessage>();
var future = Context.ActorOf<FutureActor>(name : Guid.NewGuid().ToString());
// once this object gets a response,
// we set the result for the task completion source
var futureActorRef = new FutureActorRef(result);
future.Tell(new SetRespondTo(), futureActorRef);
actor.Tell(message, future);
return result.Task;
}
Любые идеи, что я могу сделать, чтобы заставить продолжение работать в том же потоке, который запустил вышеуказанный код?
Ответы
Ответ 1
Я создаю порт рамки AKKA для .NET.
Сладкое. Я пошел на разговор Akka на CodeMash '13, несмотря на то, что никогда не касался Java/ Scala/Akka. Я видел много возможностей для библиотеки .NET/framework. Microsoft работает над чем-то подобным, который, надеюсь, в конечном итоге станет общедоступным (он в настоящее время в ограниченном просмотре).
Я подозреваю, что пребывание в мире Dataflow/Rx в максимально возможной степени является более легким подходом; async
лучше всего, когда у вас есть асинхронные операции (с одним запуском и единственным результатом для каждой операции), тогда как Dataflow и Rx работают лучше с потоками и подписками (с одним запуском и несколькими результатами). Поэтому моя первая реакция на кисть - либо связать буферный блок с ActionBlock
с конкретным планировщиком, либо использовать ObserveOn
для перемещения уведомлений Rx определенному планировщику вместо того, чтобы пытаться сделать это на стороне async
. Конечно, я не очень хорошо знаком с дизайном API Akka, поэтому возьмите это с солью.
В любом случае, мой async
intro описывает только два надежных варианта планирования await
продолжений: SynchronizationContext.Current
и TaskScheduler.Current
. Если ваш порт Akka больше связан с картой (где ваш код делает хостинг, а код конечного пользователя всегда выполняется вашим кодом), тогда может возникнуть смысл SynchronizationContext
. Если ваш порт больше представляет собой библиотеку (где код конечного пользователя выполняет хостинг и называет ваш код по мере необходимости), тогда TaskScheduler
будет иметь больше смысла.
Существует не так много примеров пользовательского SynchronizationContext
, потому что это довольно редко. У меня есть AsyncContextThread
type в моей библиотеке AsyncEx который определяет как SynchronizationContext
, так и TaskScheduler
для этого потока. Существует несколько примеров пользовательских TaskScheduler
s, таких как Parallel Extensions Extras, который имеет планировщик STA и "текущий поток" планировщик.
Ответ 2
Планировщик задач решает, следует ли запускать задачу в новом потоке или в текущем потоке.
Существует возможность принудительно запустить его в новом потоке, но никто не заставляет его запускать текущий поток.
Но есть метод Task.RunSynchronously(), который запускает задачу синхронно в текущем TaskScheduler.
Также, если вы используете async/await, на этом уже есть аналогичный вопрос.