Ответ 1
Если сообщение не было подтверждено и приложение не работает, оно будет автоматически обновлено, а свойство redelivered
на конверте будет установлено на true
(если вы не используете их с флагом no-ack = true
).
UPD:
У вас должно быть сообщение nack
с флагом пересылки в вашем блоке catch
try {
//Do some business logic
} catch (Exception $ex) {
//Log exception
return $queue->nack($msg->getDeliveryTag(), AMQP_REQUEUE);
}
Остерегайтесь бесконечно ненагруженных сообщений, в то время как количество повторной доставки не выполняется в RabbitMQ и в протоколе AMQP вообще.
Если вы не хотите связываться с такими сообщениями и просто хотите добавить некоторую задержку, вы можете захотеть добавить метод sleep()
или usleep()
перед вызовом метода nack
, но это совсем не хорошая идея.
Существует множество методов решения проблемы повторного набора циклов:
1. Полагайтесь на Мертвые обмены письмами
- плюсы: надежные, стандартные, четкие
- cons: требуется дополнительная логика
2. Используйте для каждого сообщения или для TTL очереди в очереди
- плюсы: просты в реализации, также стандартные, понятные
- минус: с длинными очередями вы можете потерять некоторое сообщение
Примеры (обратите внимание, что для очереди ttl мы передаем только номер и для сообщения ttl - все, что будет числовой строкой):
2.1 За сообщение ttl:
$queue = new AMQPQueue($channel);
$queue->setName('my-queue');
$queue->declareQueue();
$queue->bind('my-exchange');
$exchange->publish(
'message at ' . microtime(true),
null,
AMQP_NOPARAM,
array(
'expiration' => '1000'
)
);
2.2. В очереди ttl:
$queue = new AMQPQueue($channel);
$queue->setName('my-queue');
$queue->setArgument('x-message-ttl', 1000);
$queue->declareQueue();
$queue->bind('my-exchange');
$exchange->publish('message at ' . microtime(true));
3. Удерживайте число повторных наборов или количество оставшихся повторных наборов (иначе ограничение на скачок или ttl в стеке IP) в теле сообщения или заголовках
- профи: дает вам дополнительный контроль над временем жизни сообщений на уровне приложений.
- минусы: значительные накладные расходы, пока вам нужно изменить сообщение и опубликовать его снова, специфично для приложения, не понятно
код:
$queue = new AMQPQueue($channel);
$queue->setName('my-queue');
$queue->declareQueue();
$queue->bind('my-exchange');
$exchange->publish(
'message at ' . microtime(true),
null,
AMQP_NOPARAM,
array(
'headers' => array(
'ttl' => 100
)
)
);
$queue->consume(
function (AMQPEnvelope $msg, AMQPQueue $queue) use ($exchange) {
$headers = $msg->getHeaders();
echo $msg->isRedelivery() ? 'redelivered' : 'origin', ' ';
echo $msg->getDeliveryTag(), ' ';
echo isset($headers['ttl']) ? $headers['ttl'] : 'no ttl' , ' ';
echo $msg->getBody(), PHP_EOL;
try {
//Do some business logic
throw new Exception('business logic failed');
} catch (Exception $ex) {
//Log exception
if (isset($headers['ttl'])) {
// with ttl logic
if ($headers['ttl'] > 0) {
$headers['ttl']--;
$exchange->publish($msg->getBody(), $msg->getRoutingKey(), AMQP_NOPARAM, array('headers' => $headers));
}
return $queue->ack($msg->getDeliveryTag());
} else {
// without ttl logic
return $queue->nack($msg->getDeliveryTag(), AMQP_REQUEUE); // or drop it without requeue
}
}
return $queue->ack($msg->getDeliveryTag());
}
);
Возможно, существуют и другие способы улучшения управления потоками сообщений.
Заключение: нет серебряного пулевого решения. Вы должны решить, какое решение соответствует вашим потребностям наилучшим образом или узнать что-то другое, но не забудьте поделиться им здесь;)