Реализация очереди с задержкой для PHP AMQP

В последнее время я быстро реализовал систему производителей/потребителей.

<?php
namespace Queue;

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;    

class Amqp
{
    private $connection;
    private $queueName;
    private $delayedQueueName;
    private $channel;
    private $callback;

    public function __construct($host, $port, $login, $password, $queueName)
    {
        $this->connection = new AMQPStreamConnection($host, $port, $login, $password);
        $this->queueName = $queueName;
        $this->delayedQueueName = null;
        $this->channel = $this->connection->channel();
        // First, we need to make sure that RabbitMQ will never lose our queue.
        // In order to do so, we need to declare it as durable. To do so we pass
        // the third parameter to queue_declare as true.
        $this->channel->queue_declare($queueName, false, true, false, false);
    }

    public function __destruct()
    {
        $this->close();
    }

    // Just in case : http://stackoverflow.com/info/151660/can-i-trust-php-destruct-method-to-be-called
    // We should call close explicitly if possible.
    public function close()
    {
        if (!is_null($this->channel)) {
            $this->channel->close();
            $this->channel = null;
        }

        if (!is_null($this->connection)) {
            $this->connection->close();
            $this->connection = null;
        }
    }

    public function produceWithDelay($data, $delay)
    {
        if (is_null($this->delayedQueueName))
        {
            $delayedQueueName = $this->queueName . '.delayed';

            // First, we need to make sure that RabbitMQ will never lose our queue.
            // In order to do so, we need to declare it as durable. To do so we pass
            // the third parameter to queue_declare as true.
            $this->channel->queue_declare($this->delayedQueueName, false, true, false, false, false,
                new AMQPTable(array(
                    'x-dead-letter-exchange' => '',
                    'x-dead-letter-routing-key' => $this->queueName
                ))
            );

            $this->delayedQueueName = $delayedQueueName;
        }

        $msg = new AMQPMessage(
            $data,
            array(
                'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
                'expiration' => $delay
            )
        );

        $this->channel->basic_publish($msg, '', $this->delayedQueueName);
    }

    public function produce($data)
    {
        $msg = new AMQPMessage(
            $data,
            array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT)
        );

        $this->channel->basic_publish($msg, '', $this->queueName);
    }

    public function consume($callback)
    {
        $this->callback = $callback;

        // This tells RabbitMQ not to give more than one message to a worker at
        // a time.
        $this->channel->basic_qos(null, 1, null);

        // Requires ack.
        $this->channel->basic_consume($this->queueName, '', false, false, false, false, array($this, 'consumeCallback'));

        while(count($this->channel->callbacks)) {
            $this->channel->wait();
        }
    }

    public function consumeCallback($msg)
    {
        call_user_func_array(
            $this->callback,
            array($msg)
        );

        // Very important to ack, in order to remove msg from queue. Ack after
        // callback, as exception might happen in callback.
        $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
    }

    public function getQueueSize()
    {
        // three tuple containing (<queue name>, <message count>, <consumer count>)
        $tuple = $this->channel->queue_declare($this->queueName, false, true, false, false);
        if ($tuple != null && isset($tuple[1])) {
            return $tuple[1];
        }
        return -1;
    }
}

public function produce и public function consume работает, как и ожидалось.

Однако, когда он поставляется с системой с задержкой очереди

Пара

public function produceWithDelay и public function consume не работает должным образом. Потребитель, который вызывает consume, не в состоянии получить какой-либо элемент, даже ожидая некоторого периода времени.

Я считаю, что что-то не так с моей реализацией produceWithDelay. Могу ли я узнать, что случилось?

Ответы

Ответ 1

Для боковой заметки.

Я обнаружил, что это вызвано моей собственной ошибкой.

Вместо

    if (is_null($this->delayedQueueName))
    {
        $delayedQueueName = $this->queueName . '.delayed';

        $this->channel->queue_declare($this->delayedQueueName, false, true, false, false, false,
        ...

        $this->delayedQueueName = $delayedQueueName;
    }

Я должен написать его в

    if (is_null($this->delayedQueueName))
    {
        $delayedQueueName = $this->queueName . '.delayed';

        $this->channel->queue_declare(delayedQueueName, false, true, false, false, false,
        ...

        $this->delayedQueueName = $delayedQueueName;
    }

Моя переменная-член еще не инициализирована должным образом.

Полностью работоспособный код приведен ниже для вашей ссылочной цели.

<?php

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;

class Amqp
{
    private $connection;
    private $queueName;
    private $delayedQueueName;
    private $channel;
    private $callback;

    public function __construct($host, $port, $login, $password, $queueName)
    {
        $this->connection = new AMQPStreamConnection($host, $port, $login, $password);
        $this->queueName = $queueName;
        $this->delayedQueueName = null;
        $this->channel = $this->connection->channel();
        $this->channel->queue_declare($queueName, false, true, false, false);
    }

    public function __destruct()
    {
        $this->close();
    }

    public function close()
    {
        if (!is_null($this->channel)) {
            $this->channel->close();
            $this->channel = null;
        }

        if (!is_null($this->connection)) {
            $this->connection->close();
            $this->connection = null;
        }
    }

    public function produceWithDelay($data, $delay)
    {
        if (is_null($this->delayedQueueName))
        {
            $delayedQueueName = $this->queueName . '.delayed';

            $this->channel->queue_declare($delayedQueueName, false, true, false, false, false,
                new AMQPTable(array(
                    'x-dead-letter-exchange' => '',
                    'x-dead-letter-routing-key' => $this->queueName
                ))
            );

            $this->delayedQueueName = $delayedQueueName;
        }

        $msg = new AMQPMessage(
            $data,
            array(
                'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
                'expiration' => $delay
            )
        );

        $this->channel->basic_publish($msg, '', $this->delayedQueueName);
    }

    public function produce($data)
    {
        $msg = new AMQPMessage(
            $data,
            array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT)
        );

        $this->channel->basic_publish($msg, '', $this->queueName);
    }

    public function consume($callback)
    {
        $this->callback = $callback;

        $this->channel->basic_qos(null, 1, null);

        $this->channel->basic_consume($this->queueName, '', false, false, false, false, array($this, 'callback'));

        while (count($this->channel->callbacks)) {
            $this->channel->wait();
        }
    }

    public function callback($msg)
    {
        call_user_func_array(
            $this->callback,
            array($msg)
        );

        $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
    }
}

Ответ 2

Кулак всех проверяет, что ваш плагин rabbitmq_delayed_message_exchange включен, выполнив команду: rabbitmq-plugins list, If not - подробнее здесь.

И вам нужно обновить свой метод __construct, потому что вам нужно объявить очередь немного по-другому. Я не претендую на обновление вашей конструкции, но хотел бы привести простой пример:

Объявить очередь:

<?php

require_once __DIR__ . '/../vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$args = new AMQPTable(['x-delayed-type' => 'fanout']);
$channel->exchange_declare('delayed_exchange', 'x-delayed-message', false, true, false, false, false, $args);
$args = new AMQPTable(['x-dead-letter-exchange' => 'delayed']);
$channel->queue_declare('delayed_queue', false, true, false, false, false, $args);
$channel->queue_bind('delayed_queue', 'delayed_exchange');

Отправить сообщение:

$data = 'Hello World at ' . date('Y-m-d H:i:s');
$delay = 7000;
$message = new AMQPMessage($data, ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]);
$headers = new AMQPTable(['x-delay' => $delay]);
$message->set('application_headers', $headers);
$channel->basic_publish($message, 'delayed_exchange');
printf(' [x] Message sent: %s %s', $data, PHP_EOL);
$channel->close();
$connection->close();

Получать сообщение:

$callback = function (AMQPMessage $message) {
    printf(' [x] Message received: %s %s', $message->body, PHP_EOL);
    $message->delivery_info['channel']->basic_ack($message->delivery_info['delivery_tag']);
};
$channel->basic_consume('delayed_queue', '', false, false, false, false, $callback);
while(count($channel->callbacks)) {
    $channel->wait();
}
$channel->close();
$connection->close();

Также вы можете найти исходные файлы здесь.
Надеюсь, это поможет вам!