Шаблон ZeroMQ PUB/SUB с многопоточным отключением Poller

У меня есть два приложения: С++-сервер и С# WPF UI. Код С++ принимает запросы (из любого места/любого) через службу обмена сообщениями ZeroMQ [PUB/SUB]. Я использую свой код С# для повторного тестирования и создания "обратных тестов" и их выполнения. Эти обратные тесты могут состоять из множества "единичных тестов" и каждой из них отправлять/получать тысячи сообщений с сервера С++.

В настоящее время индивидуальные задние тесты хорошо работают, могут отправлять N единичных тестов каждый с тысячами запросов и захватов. Моя проблема - архитектура; когда я отправляю другой тест обратно (после первого), я получаю проблему с подпиской на события, выполняемой второй раз из-за того, что поток опроса не отменяется и не удаляется. Это приводит к ошибочному выводу. Это может показаться тривиальной проблемой (возможно, это для некоторых из вас), но аннулирование этой задачи опроса в моей текущей конфигурации оказывается затруднительным. Некоторый код...

Мой класс брокера сообщений прост и выглядит как

public class MessageBroker : IMessageBroker<Taurus.FeedMux>, IDisposable
{
    private Task pollingTask;
    private NetMQContext context;
    private PublisherSocket pubSocket;

    private CancellationTokenSource source;
    private CancellationToken token;
    private ManualResetEvent pollerCancelled;

    public MessageBroker()
    {
        this.source = new CancellationTokenSource();
        this.token = source.Token;

        StartPolling();
        context = NetMQContext.Create();
        pubSocket = context.CreatePublisherSocket();
        pubSocket.Connect(PublisherAddress);
    }

    public void Dispatch(Taurus.FeedMux message)
    {
        pubSocket.Send(message.ToByteArray<Taurus.FeedMux>());
    }

    private void StartPolling()
    {
        pollerCancelled = new ManualResetEvent(false);
        pollingTask = Task.Run(() =>
        {
            try
            {
                using (var context = NetMQContext.Create())
                using (var subSocket = context.CreateSubscriberSocket())
                {
                    byte[] buffer = null;
                    subSocket.Options.ReceiveHighWatermark = 1000;
                    subSocket.Connect(SubscriberAddress);
                    subSocket.Subscribe(String.Empty);
                    while (true)
                    {
                        buffer = subSocket.Receive();
                        MessageRecieved.Report(buffer.ToObject<Taurus.FeedMux>());
                        if (this.token.IsCancellationRequested)
                            this.token.ThrowIfCancellationRequested();
                    }
                }
            }
            catch (OperationCanceledException)
            {
                pollerCancelled.Set();
            }
        }, this.token);
    }

    private void CancelPolling()
    {
        source.Cancel();
        pollerCancelled.WaitOne();
        pollerCancelled.Close();
    }

    public IProgress<Taurus.FeedMux> MessageRecieved { get; set; }
    public string PublisherAddress { get { return "tcp://127.X.X.X:6500"; } }
    public string SubscriberAddress { get { return "tcp://127.X.X.X:6501"; } }

    private bool disposed = false;

    protected virtual void Dispose(bool disposing)
    {
        if (!disposed)
        {
            if (disposing)
            {
                if (this.pollingTask != null)
                {
                    CancelPolling();
                    if (this.pollingTask.Status == TaskStatus.RanToCompletion ||
                         this.pollingTask.Status == TaskStatus.Faulted ||
                         this.pollingTask.Status == TaskStatus.Canceled)
                    {
                        this.pollingTask.Dispose();
                        this.pollingTask = null;
                    }
                }
                if (this.context != null)
                {
                    this.context.Dispose();
                    this.context = null;
                }
                if (this.pubSocket != null)
                {
                    this.pubSocket.Dispose();
                    this.pubSocket = null;
                }
                if (this.source != null)
                {
                  this.source.Dispose();
                  this.source = null;
                }
            }
            disposed = true;
        }
    }

    public void Dispose()
    {
        Dispose(true);
        GC.SuppressFinalize(this);
    }

    ~MessageBroker()
    {
        Dispose(false);
    }
}

Проигрыватель "backtesting" использует для выполнения каждого теста обратной связи, сначала создает Dictionary, содержащий каждый Test (unit test), и сообщения для отправки в приложение С++ для каждого теста.

Метод DispatchTests, здесь

private void DispatchTests(ConcurrentDictionary<Test, List<Taurus.FeedMux>> feedMuxCollection)
{
    broker = new MessageBroker();
    broker.MessageRecieved = new Progress<Taurus.FeedMux>(OnMessageRecieved);
    testCompleted = new ManualResetEvent(false);

    try
    {
        // Loop through the tests. 
        foreach (var kvp in feedMuxCollection)
        {
            testCompleted.Reset();
            Test t = kvp.Key;
            t.Bets = new List<Taurus.Bet>();
            foreach (Taurus.FeedMux mux in kvp.Value)
            {
                token.ThrowIfCancellationRequested();
                broker.Dispatch(mux);
            }
            broker.Dispatch(new Taurus.FeedMux()
            {
                type = Taurus.FeedMux.Type.PING,
                ping = new Taurus.Ping() { event_id = t.EventID }
            });
            testCompleted.WaitOne(); // Wait until all messages are received for this test. 
        }
        testCompleted.Close();
    }
    finally
    {
        broker.Dispose(); // Dispose the broker.
    }
}

Сообщение PING в конце, чтобы сообщить С++, что мы закончили. Затем мы заставляем ждать, так что следующий [unit] тест не отправляется до того, как все возвраты будут получены из кода С++ - мы делаем это с помощью ManualResetEvent.

Когда С++ получает сообщение PING, он отправляет сообщение прямо назад. Мы обрабатываем полученные сообщения через OnMessageRecieved, а PING сообщает нам установить ManualResetEvent.Set(), чтобы мы могли продолжить тестирование модуля; "Далее пожалуйста"...

private async void OnMessageRecieved(Taurus.FeedMux mux)
{
    string errorMsg = String.Empty;
    if (mux.type == Taurus.FeedMux.Type.MSG)
    {
        // Do stuff.
    }
    else if (mux.type == Taurus.FeedMux.Type.PING)
    {
        // Do stuff.

        // We are finished reciving messages for this "unit test"
        testCompleted.Set(); 
    }
}

Моя проблема заключается в том, что broker.Dispose() в последнем выше никогда не попадает. Я понимаю, что окончательные блоки, выполняемые по фоновым потокам, не гарантируются для выполнения.

Перечеркнутый текст выше был из-за того, что я возился с кодом; Я прекратил родительский поток до того, как ребенок закончил. Однако есть еще проблемы...

Теперь broker.Dispose() вызывается правильно и вызывается broker.Dispose(), в этом методе я пытаюсь отменить поток poller и правильно распоряжаться Task, чтобы избежать нескольких подписок.

Чтобы отменить поток, я использую метод CancelPolling()

private void CancelPolling()
{
    source.Cancel();
    pollerCancelled.WaitOne(); <- Blocks here waiting for cancellation.
    pollerCancelled.Close();
}

но в методе StartPolling()

while (true)
{
    buffer = subSocket.Receive();
    MessageRecieved.Report(buffer.ToObject<Taurus.FeedMux>());
    if (this.token.IsCancellationRequested)
        this.token.ThrowIfCancellationRequested();
}

ThrowIfCancellationRequested() никогда не вызывается, и нить никогда не отменяется, поэтому никогда не удаляется должным образом. Полит-поток блокируется методом subSocket.Receive().

Теперь мне не ясно, как добиться того, чего я хочу, мне нужно вызвать broker.Dispose()/PollerCancel() в потоке, отличном от того, который использовался для опроса сообщений, а некоторые - как принудительного отмены. Thread abort не то, что я хочу получить любой ценой.

По сути, я хочу правильно распорядиться broker перед выполнением следующего теста обратной связи, как я могу правильно это обработать, отделить опрос и запустить его в отдельном домене приложений?

Я попробовал, располагая внутри обработчика OnMessageRecived, но это явно выполняется в том же потоке, что и poller, и это не способ сделать это, не вызывая дополнительных потоков, он блокирует.

Каков наилучший способ добиться того, что я хочу и , есть ли шаблон для такого рода случаев, за которым я могу следовать?

Спасибо за ваше время.

Ответы

Ответ 1

Вот как я в конце концов обошел это (хотя я открыт для лучшего решения!)

public class FeedMuxMessageBroker : IMessageBroker<Taurus.FeedMux>, IDisposable
{
    // Vars.
    private NetMQContext context;
    private PublisherSocket pubSocket;
    private Poller poller;

    private CancellationTokenSource source;
    private CancellationToken token;
    private ManualResetEvent pollerCancelled;

    /// <summary>
    /// Default ctor.
    /// </summary>
    public FeedMuxMessageBroker()
    {
        context = NetMQContext.Create();

        pubSocket = context.CreatePublisherSocket();
        pubSocket.Connect(PublisherAddress);

        pollerCancelled = new ManualResetEvent(false);
        source = new CancellationTokenSource();
        token = source.Token;
        StartPolling();
    }

    #region Methods.
    /// <summary>
    /// Send the mux message to listners.
    /// </summary>
    /// <param name="message">The message to dispatch.</param>
    public void Dispatch(Taurus.FeedMux message)
    {
        pubSocket.Send(message.ToByteArray<Taurus.FeedMux>());
    }

    /// <summary>
    /// Start polling for messages.
    /// </summary>
    private void StartPolling()
    {
        Task.Run(() =>
            {
                using (var subSocket = context.CreateSubscriberSocket())
                {
                    byte[] buffer = null;
                    subSocket.Options.ReceiveHighWatermark = 1000;
                    subSocket.Connect(SubscriberAddress);
                    subSocket.Subscribe(String.Empty);
                    subSocket.ReceiveReady += (s, a) =>
                    {
                        buffer = subSocket.Receive();
                        if (MessageRecieved != null)
                            MessageRecieved.Report(buffer.ToObject<Taurus.FeedMux>());
                    };

                    // Poll.
                    poller = new Poller();
                    poller.AddSocket(subSocket);
                    poller.PollTillCancelled();
                    token.ThrowIfCancellationRequested();
                }
            }, token).ContinueWith(ant => 
                {
                    pollerCancelled.Set();
                }, TaskContinuationOptions.OnlyOnCanceled);
    }

    /// <summary>
    /// Cancel polling to allow the broker to be disposed.
    /// </summary>
    private void CancelPolling()
    {
        source.Cancel();
        poller.Cancel();

        pollerCancelled.WaitOne();
        pollerCancelled.Close();
    }
    #endregion // Methods.

    #region Properties.
    /// <summary>
    /// Event that is raised when a message is recived. 
    /// </summary>
    public IProgress<Taurus.FeedMux> MessageRecieved { get; set; }

    /// <summary>
    /// The address to use for the publisher socket.
    /// </summary>
    public string PublisherAddress { get { return "tcp://127.0.0.1:6500"; } }

    /// <summary>
    /// The address to use for the subscriber socket.
    /// </summary>
    public string SubscriberAddress { get { return "tcp://127.0.0.1:6501"; } }
    #endregion // Properties.

    #region IDisposable Members.
    private bool disposed = false;

    /// <summary>
    /// Dispose managed resources.
    /// </summary>
    /// <param name="disposing">Is desposing.</param>
    protected virtual void Dispose(bool disposing)
    {
        if (!disposed)
        {
            if (disposing)
            {
                CancelPolling();
                if (pubSocket != null)
                {
                    pubSocket.Disconnect(PublisherAddress);
                    pubSocket.Dispose();
                    pubSocket = null;
                }
                if (poller != null)
                {
                    poller.Dispose();
                    poller = null;
                }
                if (context != null)
                {
                    context.Terminate();
                    context.Dispose();
                    context = null;
                }
                if (source != null)
                {
                    source.Dispose();
                    source = null;
                }
            }

            // Shared cleanup logic.
            disposed = true;
        }
    }

    public void Dispose()
    {
        Dispose(true);
        GC.SuppressFinalize(this);
    }

    /// <summary>
    /// Finalizer.
    /// </summary>
    ~FeedMuxMessageBroker()
    {
        Dispose(false);
    }
    #endregion // IDisposable Members.
}

Таким образом, мы проводим опрос таким же образом, но с использованием класса Poller из NetMQ. В продолжении задачи мы устанавливаем так, что мы уверены, что и Poller, и Task отменены. Мы тогда безопасны для распоряжения...

Ответ 2

Представление более высокого уровня по объекту

Ваш фокус и усилия, направленные на создание основы тестирования, свидетельствуют о том, что ваша цель направлена ​​на разработку строгого и профессионального подхода, который заставил меня сначала поднять мою шляпу в приветствии восхищения таким смелым делом.

В то время как тестирование является важным мероприятием для предоставления разумных количественных доказательств, что системный тест соответствует определенным ожиданиям, успех в этом зависит от того, насколько тесная среда тестирования соответствует условиям реального развертывания.

Можно согласиться, что тестирование на других, разных основах не доказывает, что реальное развертывание будет выполняться, как ожидалось, в среде, которая принципиально отличается от тестируемой.


Элементный контроль или просто контроль состояния, что вопрос.

Ваши усилия (по крайней мере, во время публикации OP) сосредоточены на архитектуре кода, которая пытается удержать экземпляры на месте и пытается повторно установить внутреннее состояние экземпляра Poller перед началом следующей тестовой батареи.

На мой взгляд, тестирование требует нескольких принципов, если вы будете стремиться к профессиональному тестированию:

  • Принцип повторяемости теста (повторные тесты тестов должны служить одинаковыми результатами, что позволяет избежать квази-тестирования, которое дает только результат - "лотерею" )

  • Принцип неинтересного тестирования (повторы тестов не должны подвергаться "внешнему" вмешательству, не контролируемому тестовым сценарием)

Сказав это, позвольте мне привести несколько заметок, вдохновленных Гарри Марковицем, нобелевцем, награжденным за его замечательные количественные исследования по оптимизации портфеля.

Скорее перейдите на один шаг назад, чтобы получить полный контроль жизненного цикла элементов

CACI Simulations, Inc. (одна из компаний Harry Markowitz) разработала в начале 90-х годов их флагманскую программную инфраструктуру COMET III - исключительно мощный симулятор для большого, сложного проектирования и прототипирования процессов, масштабируемыми сетями/сетями/телекоммуникационными сетями.

Наибольшее впечатление от COMET III заключалось в возможности генерации тестовых сценариев, включая настраиваемую предварительную тестовую предварительную нагрузку, которая заставила испытуемые элементы войти в состояние, похожее на то, что "усталость", означает в механических экспериментах по испытанию пыток или о том, что хрупкость водородной диффузии означает для металлургических предприятий атомной электростанции.

Да, как только вы перейдете к низкоуровневым сведениям о том, как алгоритмы, node -буферы, распределения памяти, выбор архитектуры на основе трубопроводов/балансировки нагрузки/grid-обработки, сбои в отказоустойчивости, политики сбора мусора и ограниченные алгоритмы совместного использования ресурсов и их влияние (при использовании "рабочих нагрузок" в режиме реального времени ") сквозная производительность/задержки, эта функция просто незаменима.

Это означает, что простой контроль состояния, связанный с отдельным экземпляром, недостаточно, поскольку он не предоставляет средств как для тестовой повторяемости, так и для тестовой изоляции/без вмешательства. Проще говоря, даже если вы найдете способ "reset" экземпляра Poller, это не даст вам реалистичного тестирования с гарантированной повторяемостью теста с возможностью предварительного тестирования.

Требуется отступ и более высокий уровень абстракции и контроля тестового сценария.

Как это относится к проблеме OP?

  • Вместо простого управления состоянием
  • Создать многоуровневую архитектуру/плоскость (-ы) управления/отдельную сигнализацию

Способ ZeroMQ для поддержки этой цели

  • Создание суперструктур как нетривиальных шаблонов
  • Использовать полный контроль жизненного цикла экземпляров, используемых в сценариях тестирования.
  • Сохранять ZeroMQ-maxims: Zero-sharing, Zero-blocking,...
  • Преимущества Multi-Context()