Ответ 1
РЕДАКТИРОВАТЬ: Поскольку я нахожусь на подоконнике, я должен указать, что клиент .NET RabbitMQ теперь имеет эту функциональность: https://www.rabbitmq.com/dotnet-api-guide.html#connection-recovery
В идеале вы должны иметь возможность использовать это и избегать ручного внедрения логики повторного подключения.
Недавно мне пришлось реализовать почти то же самое. Из того, что я могу сказать, большая часть доступной информации о RabbitMQ предполагает, что ваша сеть очень надежна или вы запускаете брокера RabbitMQ на том же компьютере, что и любой клиент, отправляющий или получающий сообщения, что позволяет Кролику решать любые проблемы с подключением.
На самом деле не так сложно настроить клиента Rabbit на надежную защиту от удаленных соединений, но есть несколько особенностей, с которыми вам нужно иметь дело.
Первое, что вам нужно сделать, включить heartbeat:
ConnectionFactory factory = new ConnectionFactory()
{
Uri = brokerUri,
RequestedHeartbeat = 30,
};
Установка "RequestedHeartbeat" на 30 заставит клиента проверять каждые 30 секунд, если соединение все еще живое. Без этого, абонент сообщения будет сидеть там счастливо, ожидая появления другого сообщения, не подозревая, что его соединение ухудшилось.
Включение биения также позволяет серверу проверить, все ли подключено соединение, что может быть очень важно. Если соединение ухудшилось после того, как сообщение было подхвачено подписчиком, но до его подтверждения сервер просто предполагает, что клиент занимает много времени, и сообщение "застревает" на мертвом соединении, пока оно не будет закрыто. При включенном сердцебиении сервер узнает, когда соединение ухудшится и закроет его, вернув сообщение в очередь, чтобы другой абонент мог его обработать. Без сердечного приступа я должен был вручную и закрыть соединение в интерфейсе управления кроликом, чтобы застрявшее сообщение могло быть передано подписчику.
Во-вторых, вам нужно будет обрабатывать OperationInterruptedException
. Как вы заметили, это, как правило, исключение, которое клиент кролика выбрасывает, когда он замечает, что соединение было прервано. Если IModel.QueueDeclare()
вызывается, когда соединение было прервано, это исключение вы получите. Обработайте это исключение, удалив свою подписку, канал и соединение и создав новые.
Наконец, вам придется обрабатывать то, что делает ваш потребитель, пытаясь использовать сообщения из закрытого соединения. К сожалению, каждый другой способ потребления сообщений из очереди в клиенте Rabbit, похоже, реагирует по-разному. QueueingBasicConsumer
выдает EndOfStreamException
, если вы вызываете QueueingBasicConsumer.Queue.Dequeue
в закрытом соединении. EventingBasicConsumer
ничего не делает, поскольку он просто ждет сообщения. Из того, что я могу сказать, попробовав его, класс Subscription
, который вы используете, возвращает true из вызова Subscription.Next
, но значение args
равно null. Еще раз, справитесь с этим, избавьтесь от своего соединения, канала и подписки и заново создайте их.
Значение connection.IsOpen
будет обновлено до False, когда соединение завершится неудачей с биением, поэтому вы можете проверить это, если хотите. Однако, поскольку сердцебиение работает в отдельном потоке, вам все равно придется обрабатывать случай, когда соединение открыто, когда вы его проверяете, но закрывается до вызова subscription.Next()
.
Одна последняя вещь, на которую нужно обратить внимание - это IConnection.Dispose()
. Этот вызов будет вызывать EndOfStreamException
, если вы вызываете dispose после того, как соединение было закрыто. Это кажется ошибкой для меня, и мне не нравится, что я не вызываю dispose на объект IDisposable
, поэтому я вызываю его и проглатываю исключение.
Объединяя все это в быстрый и грязный пример:
public bool Cancelled { get; set; }
IConnection _connection = null;
IModel _channel = null;
Subscription _subscription = null;
public void Run(string brokerUri, string queueName, Action<byte[]> handler)
{
ConnectionFactory factory = new ConnectionFactory()
{
Uri = brokerUri,
RequestedHeartbeat = 30,
};
while (!Cancelled)
{
try
{
if(_subscription == null)
{
try
{
_connection = factory.CreateConnection();
}
catch(BrokerUnreachableException)
{
//You probably want to log the error and cancel after N tries,
//otherwise start the loop over to try to connect again after a second or so.
continue;
}
_channel = _connection.CreateModel();
_channel.QueueDeclare(queueName, true, false, false, null);
_subscription = new Subscription(_channel, queueName, false);
}
BasicDeliverEventArgs args;
bool gotMessage = _subscription.Next(250, out args);
if (gotMessage)
{
if(args == null)
{
//This means the connection is closed.
DisposeAllConnectionObjects();
continue;
}
handler(args.Body);
_subscription.Ack(args);
}
}
catch(OperationInterruptedException ex)
{
DisposeAllConnectionObjects();
}
}
DisposeAllConnectionObjects();
}
private void DisposeAllConnectionObjects()
{
if(_subscription != null)
{
//IDisposable is implemented explicitly for some reason.
((IDisposable)_subscription).Dispose();
_subscription = null;
}
if(_channel != null)
{
_channel.Dispose();
_channel = null;
}
if(_connection != null)
{
try
{
_connection.Dispose();
}
catch(EndOfStreamException)
{
}
_connection = null;
}
}