Перевод async-wait С# кода в F # относительно планировщика
Интересно, слишком ли это вопрос, но недавно я заставил себя найти код, который я хотел бы узнать о том, как перевести с С# на правильный F #. Путешествие начинается с здесь (1) (исходная проблема с взаимодействием TPL-F #) и продолжается здесь (2) (некоторый пример кода, который я планирую перевести на F #).
Пример кода слишком длинный для воспроизведения здесь, но интересными функциями являются ActivateAsync
, RefreshHubs
и AddHub
. В частности, интересными являются
-
AddHub
имеет подпись private async Task AddHub(string address)
.
-
RefreshHubs
вызывает AddHub
в цикле и собирает список tasks
, который затем ожидает в самом конце await Task.WhenAll(tasks)
, и, следовательно, возвращаемое значение соответствует его сигнатуре private async Task RefreshHubs(object _)
.
-
RefreshHubs
вызывается ActivateAsync
точно так же, как await RefreshHubs(null)
, а затем в конце есть вызов await base.ActivateAsync()
, соответствующий сигнатуре функции public override async Task ActivateAsync()
.
Вопрос:
Каков был бы правильный перевод таких сигнатур функций на F #, который все еще поддерживает интерфейс и функциональность и уважает настраиваемый планировщик по умолчанию? И я не слишком уверен в этом "async/await in F #". Как в том, как это сделать "механически".:)
Причина в том, что в ссылке "здесь (1)", похоже, проблема (я не проверял это) в том, что операции aync F # не уважают настраиваемый планировщик совместной работы, установленный во время выполнения (Orleans). Кроме того, он заявил здесь, что операции TPL выходят из планировщика и переходят в пул задач, поэтому их использование запрещено.
Один из способов, я могу думать о том, чтобы иметь дело с функцией F # следующим образом
//Sorry for the inconvenience of shorterned code, for context see the link "here (1)"...
override this.ActivateAsync() =
this.RegisterTimer(new Func<obj, Task>(this.FlushQueue), null, TimeSpan.FromMilliseconds(100.0), TimeSpan.FromMilliseconds(100.0)) |> ignore
if RoleEnvironment.IsAvailable then
this.RefreshHubs(null) |> Async.awaitPlainTask |> Async.RunSynchronously
else
this.AddHub("http://localhost:48777/") |> Async.awaitPlainTask |> Async.RunSynchronously
//Return value comes from here.
base.ActivateAsync()
member private this.RefreshHubs(_) =
//Code omitted, in case mor context is needed, take a look at the link "here (2)", sorry for the inconvinience...
//The return value is Task.
//In the C# version the AddHub provided tasks are collected and then the
//on the last line there is return await Task.WhenAll(newHubAdditionTasks)
newHubs |> Array.map(fun i -> this.AddHub(i)) |> Task.WhenAll
member private this.AddHub(address) =
//Code omitted, in case mor context is needed, take a look at the link "here (2)", sorry for the inconvinience...
//In the C# version:
//...
//hubs.Add(address, new Tuple<HubConnection, IHubProxy>(hubConnection, hub))
//}
//so this is "void" and could perhaps be Async<void> in F#...
//The return value is Task.
hubConnection.Start() |> Async.awaitTaskVoid |> Async.RunSynchronously
TaskDone.Done
Функция startAsPlainTask
находится Sacha Barber из здесь. Другим интересным вариантом может быть здесь как
module Async =
let AwaitTaskVoid : (Task -> Async<unit>) =
Async.AwaitIAsyncResult >> Async.Ignore
< edit: Я просто заметил, что Task.WhenAll
тоже нужно будет ждать. Но что было бы правильным путем? Ух, время спать (плохой каламбур)...
< edit 2: В здесь (1) (исходная проблема с взаимодействием TPL-F #) в Codeplex было упомянуто, что F # использует контексты синхронизации, чтобы подталкивать работу к потокам, тогда как TPL этого не делает. Теперь, это правдоподобное объяснение, я чувствую (хотя у меня все еще есть проблемы при правильном переводе этих фрагментов независимо от пользовательского планировщика). Некоторая интересная дополнительная информация могла бы быть от
Я думаю, что в этом контексте я должен упомянуть Hopac в качестве интересного тангенциального, а также упомянуть, что я недоступен для следующие 50 нечетных часов или около того, если все мои перекрестные сообщения выйдут из-под контроля.
< edit 3: Daniel и svick дайте хороший совет в комментариях для использования настраиваемого построителя задач. Даниил предоставляет ссылку на тот, который уже определен в FSharpx.
Глядя на источник, я вижу, что интерфейс с параметрами определяется как
type TaskBuilder(?continuationOptions, ?scheduler, ?cancellationToken) =
let contOptions = defaultArg continuationOptions TaskContinuationOptions.None
let scheduler = defaultArg scheduler TaskScheduler.Default
let cancellationToken = defaultArg cancellationToken CancellationToken.None
Если кто-то должен был использовать это в Орлеане, это выглядит как TaskScheduler
должно быть TaskScheduler.Current
согласно документации здесь
Orleans имеет собственный планировщик задач, который обеспечивает однопоточный модель исполнения, используемая в зернах. Важно, чтобы при запуске задачи используется планировщик Орлеана, а не пул потоков .NET.
Если ваш код зерна требует создания подзадачи, вы должны использовать Task.Factory.StartNew:
ждет Task.Factory.StartNew(() = > {/* logic */});
Этот метод будет использовать текущий планировщик задач, который будет Орлеанский планировщик.
Вам следует избегать использования Task.Run, который всегда использует поток .NET пул, и поэтому не будет выполняться в однопоточном исполнении модель.
Здесь выглядит тонкое различие между TaskScheduler.Current и TaskScheduler.Default, Возможно, это требует вопроса, в каких примерах случаев будет нежелательная разница. Поскольку документация в Орлеане указывает на то, что вы не используете Task.Run
и вместо этого направляетесь к Task.Factory.StartNew
, мне интересно определить TaskCreationOptions.DenyAttachChild как рекомендовано такими властями, как Stephen Toub в Task.Run vs Task.Factory.StartNew и Стивен Клири на StartNew Dangerous. Хм, похоже, что .Default
будет .DenyAttachChilld
, если я не ошибаюсь.
Кроме того, поскольку существует проблема с Task.Run
viz Task.Factory.CreateNew
в отношении настраиваемого планировщика, я задаюсь вопросом, можно ли устранить эту конкретную проблему с помощью пользовательского TaskFactory, как описано в планировщике заданий (Task.Factory) и управлении количеством потоков и Как создать планировщик заданий, ограничивающий Concurrency.
Хм, это становится довольно долгим "размышлением". Интересно, как мне это закрыть? Возможно, если svick и Daniel могли бы сделать свои комментарии в качестве ответов, и я бы обогнал их и принял svick?
Ответы
Ответ 1
Вы можете использовать TaskBuilder
в FSharpx и передать TaskScheduler.Current
. Здесь моя попытка перевода RefreshHubs
. Обратите внимание, что Task<unit>
используется вместо Task
.
let RefreshHubs _ =
let task = TaskBuilder(scheduler = TaskScheduler.Current)
task {
let addresses =
RoleEnvironment.Roles.["GPSTracker.Web"].Instances
|> Seq.map (fun instance ->
let endpoint = instance.InstanceEndpoints.["InternalSignalR"]
sprintf "http://%O" endpoint.IPEndpoint
)
|> Seq.toList
let newHubs = addresses |> List.filter (not << hubs.ContainsKey)
let deadHubs = hubs.Keys |> Seq.filter (fun x ->
not (List.exists ((=) x) addresses))
// remove dead hubs
deadHubs |> Seq.iter (hubs.Remove >> ignore)
// add new hubs
let! _ = Task.WhenAll [| for hub in newHubs -> AddHub hub |]
return ()
}