rabbitMQ关于消息确认、消息持久化、公平调度

$callback = function ($msg) {
  echo ' [x] Received ', $msg->body, "\n";
  sleep(substr_count($msg->body, '.'));
  echo " [x] Done\n";
  $msg->ack(); //消息确认
};

$channel->basic_consume('task_queue', '', false, false, false, false, $callback);
一、消息确认和消息持久化

第四个参数(消息确认后):如ack回应前它的channel(频道) 关闭了,连接关闭了,或者TCP连接丢失了。 消息确认默认是关闭的。当设置 basic_consume函数的第四个参数为 false(true代表没有消息确认,确认后才会删除该条消息)并且当工作程序在完成一个任务的时候返回一个适当的应答信号将会开启消息确认。

第三个参数(消息持久化):当 RabbitMQ 退出或者崩溃,它会把队列与消息统统忘掉; queue_declare 置为 true可以消息持久化: Producer(生产者)和Consumer(消费者)的代码中这个标记都要被设为 true. 这时候我们就能确保即使 RabbitMQ 重启, task_queue 也不会丢失。现在我们需要通过设置 AMQPMessage 参数数组中的 delivery_mode = 2 这一消息属性来确保我们的消息是持久化的。

$channel->queue_declare('task_queue', false, true, false, false);
$msg = new AMQPMessage($data,
       array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT)
       );

二、公平调度
$channel->basic_qos(null, 1, null);

第二个参数(公平调度):asic_qos 函数并设置该函数的 (第二个参数)prefetch_count = 1;不要在处理程序处理并应答了上一条消息前再分发一个新的消息给它。相反地,它会把这条新的消息分发到下一个并不繁忙的处理程序。(只有这个消费者处理完并确认了,才能在分发一个消息给他)

三、示例:

1、new_task.php:

require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$channel->queue_declare('task_queue', false, true, false, false);

$data = implode(' ', array_slice($argv, 1));
if (empty($data)) {
    $data = "Hello World!";
}
$msg = new AMQPMessage(
    $data,
    array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT) //消息持久化
);

$channel->basic_publish($msg, '', 'task_queue');

echo ' [x] Sent ', $data, "\n";

$channel->close();
$connection->close();
?>

new_task.php官方源码

2.worker.php

<?php

require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$channel->queue_declare('task_queue', false, true, false, false);

echo " [*] Waiting for messages. To exit press CTRL+C\n";

$callback = function ($msg) {
    echo ' [x] Received ', $msg->body, "\n";
    sleep(substr_count($msg->body, '.'));
    echo " [x] Done\n";
    $msg->ack();
};

$channel->basic_qos(null, 1, null); //参数二:1 公平调度
//参数四:false 消息确认
$channel->basic_consume('task_queue', '', false, false, false, false, $callback);

while ($channel->is_open()) {
    $channel->wait();
}

$channel->close();
$connection->close();
?>

work.php官方源码

zed
请先登录后发表评论
  • latest comments
  • 总共0条评论