Реализация очереди с задержкой для PHP AMQP
В последнее время я быстро реализовал систему производителей/потребителей.
<?php
namespace Queue;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;
class Amqp
{
private $connection;
private $queueName;
private $delayedQueueName;
private $channel;
private $callback;
public function __construct($host, $port, $login, $password, $queueName)
{
$this->connection = new AMQPStreamConnection($host, $port, $login, $password);
$this->queueName = $queueName;
$this->delayedQueueName = null;
$this->channel = $this->connection->channel();
// First, we need to make sure that RabbitMQ will never lose our queue.
// In order to do so, we need to declare it as durable. To do so we pass
// the third parameter to queue_declare as true.
$this->channel->queue_declare($queueName, false, true, false, false);
}
public function __destruct()
{
$this->close();
}
// Just in case : http://stackoverflow.com/info/151660/can-i-trust-php-destruct-method-to-be-called
// We should call close explicitly if possible.
public function close()
{
if (!is_null($this->channel)) {
$this->channel->close();
$this->channel = null;
}
if (!is_null($this->connection)) {
$this->connection->close();
$this->connection = null;
}
}
public function produceWithDelay($data, $delay)
{
if (is_null($this->delayedQueueName))
{
$delayedQueueName = $this->queueName . '.delayed';
// First, we need to make sure that RabbitMQ will never lose our queue.
// In order to do so, we need to declare it as durable. To do so we pass
// the third parameter to queue_declare as true.
$this->channel->queue_declare($this->delayedQueueName, false, true, false, false, false,
new AMQPTable(array(
'x-dead-letter-exchange' => '',
'x-dead-letter-routing-key' => $this->queueName
))
);
$this->delayedQueueName = $delayedQueueName;
}
$msg = new AMQPMessage(
$data,
array(
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
'expiration' => $delay
)
);
$this->channel->basic_publish($msg, '', $this->delayedQueueName);
}
public function produce($data)
{
$msg = new AMQPMessage(
$data,
array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT)
);
$this->channel->basic_publish($msg, '', $this->queueName);
}
public function consume($callback)
{
$this->callback = $callback;
// This tells RabbitMQ not to give more than one message to a worker at
// a time.
$this->channel->basic_qos(null, 1, null);
// Requires ack.
$this->channel->basic_consume($this->queueName, '', false, false, false, false, array($this, 'consumeCallback'));
while(count($this->channel->callbacks)) {
$this->channel->wait();
}
}
public function consumeCallback($msg)
{
call_user_func_array(
$this->callback,
array($msg)
);
// Very important to ack, in order to remove msg from queue. Ack after
// callback, as exception might happen in callback.
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
}
public function getQueueSize()
{
// three tuple containing (<queue name>, <message count>, <consumer count>)
$tuple = $this->channel->queue_declare($this->queueName, false, true, false, false);
if ($tuple != null && isset($tuple[1])) {
return $tuple[1];
}
return -1;
}
}
public function produce
и public function consume
работает, как и ожидалось.
Однако, когда он поставляется с системой с задержкой очереди
Пара
public function produceWithDelay
и public function consume
не работает должным образом. Потребитель, который вызывает consume
, не в состоянии получить какой-либо элемент, даже ожидая некоторого периода времени.
Я считаю, что что-то не так с моей реализацией produceWithDelay
. Могу ли я узнать, что случилось?
Ответы
Ответ 1
Для боковой заметки.
Я обнаружил, что это вызвано моей собственной ошибкой.
Вместо
if (is_null($this->delayedQueueName))
{
$delayedQueueName = $this->queueName . '.delayed';
$this->channel->queue_declare($this->delayedQueueName, false, true, false, false, false,
...
$this->delayedQueueName = $delayedQueueName;
}
Я должен написать его в
if (is_null($this->delayedQueueName))
{
$delayedQueueName = $this->queueName . '.delayed';
$this->channel->queue_declare(delayedQueueName, false, true, false, false, false,
...
$this->delayedQueueName = $delayedQueueName;
}
Моя переменная-член еще не инициализирована должным образом.
Полностью работоспособный код приведен ниже для вашей ссылочной цели.
<?php
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;
class Amqp
{
private $connection;
private $queueName;
private $delayedQueueName;
private $channel;
private $callback;
public function __construct($host, $port, $login, $password, $queueName)
{
$this->connection = new AMQPStreamConnection($host, $port, $login, $password);
$this->queueName = $queueName;
$this->delayedQueueName = null;
$this->channel = $this->connection->channel();
$this->channel->queue_declare($queueName, false, true, false, false);
}
public function __destruct()
{
$this->close();
}
public function close()
{
if (!is_null($this->channel)) {
$this->channel->close();
$this->channel = null;
}
if (!is_null($this->connection)) {
$this->connection->close();
$this->connection = null;
}
}
public function produceWithDelay($data, $delay)
{
if (is_null($this->delayedQueueName))
{
$delayedQueueName = $this->queueName . '.delayed';
$this->channel->queue_declare($delayedQueueName, false, true, false, false, false,
new AMQPTable(array(
'x-dead-letter-exchange' => '',
'x-dead-letter-routing-key' => $this->queueName
))
);
$this->delayedQueueName = $delayedQueueName;
}
$msg = new AMQPMessage(
$data,
array(
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
'expiration' => $delay
)
);
$this->channel->basic_publish($msg, '', $this->delayedQueueName);
}
public function produce($data)
{
$msg = new AMQPMessage(
$data,
array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT)
);
$this->channel->basic_publish($msg, '', $this->queueName);
}
public function consume($callback)
{
$this->callback = $callback;
$this->channel->basic_qos(null, 1, null);
$this->channel->basic_consume($this->queueName, '', false, false, false, false, array($this, 'callback'));
while (count($this->channel->callbacks)) {
$this->channel->wait();
}
}
public function callback($msg)
{
call_user_func_array(
$this->callback,
array($msg)
);
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
}
}
Ответ 2
Кулак всех проверяет, что ваш плагин rabbitmq_delayed_message_exchange
включен, выполнив команду: rabbitmq-plugins list
, If not - подробнее здесь.
И вам нужно обновить свой метод __construct
, потому что вам нужно объявить очередь немного по-другому. Я не претендую на обновление вашей конструкции, но хотел бы привести простой пример:
Объявить очередь:
<?php
require_once __DIR__ . '/../vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$args = new AMQPTable(['x-delayed-type' => 'fanout']);
$channel->exchange_declare('delayed_exchange', 'x-delayed-message', false, true, false, false, false, $args);
$args = new AMQPTable(['x-dead-letter-exchange' => 'delayed']);
$channel->queue_declare('delayed_queue', false, true, false, false, false, $args);
$channel->queue_bind('delayed_queue', 'delayed_exchange');
Отправить сообщение:
$data = 'Hello World at ' . date('Y-m-d H:i:s');
$delay = 7000;
$message = new AMQPMessage($data, ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]);
$headers = new AMQPTable(['x-delay' => $delay]);
$message->set('application_headers', $headers);
$channel->basic_publish($message, 'delayed_exchange');
printf(' [x] Message sent: %s %s', $data, PHP_EOL);
$channel->close();
$connection->close();
Получать сообщение:
$callback = function (AMQPMessage $message) {
printf(' [x] Message received: %s %s', $message->body, PHP_EOL);
$message->delivery_info['channel']->basic_ack($message->delivery_info['delivery_tag']);
};
$channel->basic_consume('delayed_queue', '', false, false, false, false, $callback);
while(count($channel->callbacks)) {
$channel->wait();
}
$channel->close();
$connection->close();
Также вы можете найти исходные файлы здесь.
Надеюсь, это поможет вам!