Перевод async-wait С# кода в F # относительно планировщика

Интересно, слишком ли это вопрос, но недавно я заставил себя найти код, который я хотел бы узнать о том, как перевести с С# на правильный F #. Путешествие начинается с здесь (1) (исходная проблема с взаимодействием TPL-F #) и продолжается здесь (2) (некоторый пример кода, который я планирую перевести на F #).

Пример кода слишком длинный для воспроизведения здесь, но интересными функциями являются ActivateAsync, RefreshHubs и AddHub. В частности, интересными являются

  • AddHub имеет подпись private async Task AddHub(string address).
  • RefreshHubs вызывает AddHub в цикле и собирает список tasks, который затем ожидает в самом конце await Task.WhenAll(tasks), и, следовательно, возвращаемое значение соответствует его сигнатуре private async Task RefreshHubs(object _).
  • RefreshHubs вызывается ActivateAsync точно так же, как await RefreshHubs(null), а затем в конце есть вызов await base.ActivateAsync(), соответствующий сигнатуре функции public override async Task ActivateAsync().

Вопрос:

Каков был бы правильный перевод таких сигнатур функций на F #, который все еще поддерживает интерфейс и функциональность и уважает настраиваемый планировщик по умолчанию? И я не слишком уверен в этом "async/await in F #". Как в том, как это сделать "механически".:)

Причина в том, что в ссылке "здесь (1)", похоже, проблема (я не проверял это) в том, что операции aync F # не уважают настраиваемый планировщик совместной работы, установленный во время выполнения (Orleans). Кроме того, он заявил здесь, что операции TPL выходят из планировщика и переходят в пул задач, поэтому их использование запрещено.

Один из способов, я могу думать о том, чтобы иметь дело с функцией F # следующим образом

//Sorry for the inconvenience of shorterned code, for context see the link "here (1)"...
override this.ActivateAsync() =
    this.RegisterTimer(new Func<obj, Task>(this.FlushQueue), null, TimeSpan.FromMilliseconds(100.0), TimeSpan.FromMilliseconds(100.0)) |> ignore

    if RoleEnvironment.IsAvailable then
        this.RefreshHubs(null) |> Async.awaitPlainTask |> Async.RunSynchronously
    else
        this.AddHub("http://localhost:48777/") |> Async.awaitPlainTask |> Async.RunSynchronously

    //Return value comes from here.
    base.ActivateAsync()

member private this.RefreshHubs(_) =
    //Code omitted, in case mor context is needed, take a look at the link "here (2)", sorry for the inconvinience...
    //The return value is Task.
    //In the C# version the AddHub provided tasks are collected and then the
    //on the last line there is return await Task.WhenAll(newHubAdditionTasks) 
    newHubs |> Array.map(fun i -> this.AddHub(i)) |> Task.WhenAll

member private this.AddHub(address) =
    //Code omitted, in case mor context is needed, take a look at the link "here (2)", sorry for the inconvinience...
    //In the C# version:
    //...
    //hubs.Add(address, new Tuple<HubConnection, IHubProxy>(hubConnection, hub))
    //} 
    //so this is "void" and could perhaps be Async<void> in F#... 
    //The return value is Task.
    hubConnection.Start() |> Async.awaitTaskVoid |> Async.RunSynchronously
    TaskDone.Done

Функция startAsPlainTask находится Sacha Barber из здесь. Другим интересным вариантом может быть здесь как

module Async =
    let AwaitTaskVoid : (Task -> Async<unit>) =
        Async.AwaitIAsyncResult >> Async.Ignore

< edit: Я просто заметил, что Task.WhenAll тоже нужно будет ждать. Но что было бы правильным путем? Ух, время спать (плохой каламбур)...

< edit 2: В здесь (1) (исходная проблема с взаимодействием TPL-F #) в Codeplex было упомянуто, что F # использует контексты синхронизации, чтобы подталкивать работу к потокам, тогда как TPL этого не делает. Теперь, это правдоподобное объяснение, я чувствую (хотя у меня все еще есть проблемы при правильном переводе этих фрагментов независимо от пользовательского планировщика). Некоторая интересная дополнительная информация могла бы быть от

Я думаю, что в этом контексте я должен упомянуть Hopac в качестве интересного тангенциального, а также упомянуть, что я недоступен для следующие 50 нечетных часов или около того, если все мои перекрестные сообщения выйдут из-под контроля.

< edit 3: Daniel и svick дайте хороший совет в комментариях для использования настраиваемого построителя задач. Даниил предоставляет ссылку на тот, который уже определен в FSharpx.

Глядя на источник, я вижу, что интерфейс с параметрами определяется как

type TaskBuilder(?continuationOptions, ?scheduler, ?cancellationToken) =
    let contOptions = defaultArg continuationOptions TaskContinuationOptions.None
    let scheduler = defaultArg scheduler TaskScheduler.Default
    let cancellationToken = defaultArg cancellationToken CancellationToken.None

Если кто-то должен был использовать это в Орлеане, это выглядит как TaskScheduler должно быть TaskScheduler.Current согласно документации здесь

Orleans имеет собственный планировщик задач, который обеспечивает однопоточный модель исполнения, используемая в зернах. Важно, чтобы при запуске задачи используется планировщик Орлеана, а не пул потоков .NET.

Если ваш код зерна требует создания подзадачи, вы должны использовать Task.Factory.StartNew:

ждет Task.Factory.StartNew(() = > {/* logic */});

Этот метод будет использовать текущий планировщик задач, который будет Орлеанский планировщик.

Вам следует избегать использования Task.Run, который всегда использует поток .NET пул, и поэтому не будет выполняться в однопоточном исполнении модель.

Здесь выглядит тонкое различие между TaskScheduler.Current и TaskScheduler.Default, Возможно, это требует вопроса, в каких примерах случаев будет нежелательная разница. Поскольку документация в Орлеане указывает на то, что вы не используете Task.Run и вместо этого направляетесь к Task.Factory.StartNew, мне интересно определить TaskCreationOptions.DenyAttachChild как рекомендовано такими властями, как Stephen Toub в Task.Run vs Task.Factory.StartNew и Стивен Клири на StartNew Dangerous. Хм, похоже, что .Default будет .DenyAttachChilld, если я не ошибаюсь.

Кроме того, поскольку существует проблема с Task.Run viz Task.Factory.CreateNew в отношении настраиваемого планировщика, я задаюсь вопросом, можно ли устранить эту конкретную проблему с помощью пользовательского TaskFactory, как описано в планировщике заданий (Task.Factory) и управлении количеством потоков и Как создать планировщик заданий, ограничивающий Concurrency.

Хм, это становится довольно долгим "размышлением". Интересно, как мне это закрыть? Возможно, если svick и Daniel могли бы сделать свои комментарии в качестве ответов, и я бы обогнал их и принял svick?

Ответы

Ответ 1

Вы можете использовать TaskBuilder в FSharpx и передать TaskScheduler.Current. Здесь моя попытка перевода RefreshHubs. Обратите внимание, что Task<unit> используется вместо Task.

let RefreshHubs _ =
    let task = TaskBuilder(scheduler = TaskScheduler.Current)
    task {
        let addresses = 
            RoleEnvironment.Roles.["GPSTracker.Web"].Instances
            |> Seq.map (fun instance ->
                let endpoint = instance.InstanceEndpoints.["InternalSignalR"]
                sprintf "http://%O" endpoint.IPEndpoint
            )
            |> Seq.toList

        let newHubs = addresses |> List.filter (not << hubs.ContainsKey)
        let deadHubs = hubs.Keys |> Seq.filter (fun x -> 
            not (List.exists ((=) x) addresses))

        // remove dead hubs
        deadHubs |> Seq.iter (hubs.Remove >> ignore)

        // add new hubs
        let! _ = Task.WhenAll [| for hub in newHubs -> AddHub hub |]
        return ()
    }