rabbitMQ交换机之Topic类型

Topic交换机

定义路由键(routing key)的例子:'stock.usd.nyse',nyse.vmw,quick.orange.rabbit

绑定键(binding key)也必须是同样的格式。topic交换机的逻辑与direct交换机是相似的——一条携带特定路由键(routing key)发送的消息将会被分发到所有以与之匹配的绑定键(binding key)绑定的Queue(队列)。

  • (星号)可以代替(任意)一个词
  • (井号)可以代替零个或者更多的词

如图:创建了三个绑定:Q1队列以'*.orange.*路由键绑定,Q2以'*.*.rabbitlazy.#(两个路由键)绑定。 这些绑定可以概括为:

  • Q1对所有orange(橙色)的动物感兴趣。
  • Q2想监听rabbit(兔子)的所有消息与lazy(慢吞吞的)动物的所有消息。

例:

  1. 路由键为'quick.orange.rabbit的消息将会被交付到这两个队列
  2. 路由键为'lazy.orange.elephant'的消息也会被交付到这两个队列
  3. 路由键为'quick.orange.fox'的消息只会去到第一个(Q1)队列
  4. 路由键为'lazy.brown.fox'只会发送到第二个(Q2)队列
  5. 路由键为lazy.pink.rabbit的消息只会被发送到第二个(Q2)队列一次
  6. 路由键为'quick.brown.fox'不匹配任何一个绑定,所以它会被丢弃
  7. 路由键为'orange'或者'quick.orange.male.rabbit'这些消息将不会匹配到任何绑定上并且将会被丢失。
  8. 路由键为'lazy.orange.male.rabbit'拥有四个词,它将会匹配最后一个绑定(lazy.#),并将会发送到第二个队列
话题交换机(Topic Exchange)
话题交换机是十分强大的,同时它能表现得想其他的交换机一样。
当一个Queue(队列)以"#"(井号)绑定键(binding key)绑定——它将会忽视路由键(routing key)去接收所有消息就像是一个fanout类型的交换机一样。
当特殊字符"*"(星号)与"#"(井号)没有被用在绑定(键)上,这时候topic交换机由表现得像direct交换机一样。

我们将从一个工作假设开始,即日志的路由键将有两个单词:"."。

emit_log_topic.php的代码如下:

<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

// 声明一个topic交换机
$channel->exchange_declare('topic_logs', 'topic', false, false, false);

// 从命令行获取路由键
$routing_key = isset($argv[1]) && !empty($argv[1]) ? $argv[1] : 'anonymous.info';
$data = implode(' ', array_slice($argv, 2));
if (empty($data)) {
    $data = "Hello World!";
}

$msg = new AMQPMessage($data);

$channel->basic_publish($msg, 'topic_logs', $routing_key);

echo ' [x] Sent ', $routing_key, ':', $data, "\n";

$channel->close();
$connection->close();

receive_logs_topic.php的代码如下:

<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

// 声明topic交换机
$channel->exchange_declare('topic_logs', 'topic', false, false, false);

// 获取非持久化队列
list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);

$binding_keys = array_slice($argv, 1);
if( empty($binding_keys )) {
    file_put_contents('php://stderr', "Usage: $argv[0] [binding_key]\n");
    exit(1);
}

// 绑定多个绑定键到队列
foreach($binding_keys as $binding_key) {
    $channel->queue_bind($queue_name, 'topic_logs', $binding_key);
}

echo ' [*] Waiting for logs. To exit press CTRL+C', "\n";

$callback = function($msg){
  echo ' [x] ',$msg->delivery_info['routing_key'], ':', $msg->body, "\n";
};

$channel->basic_consume($queue_name, '', false, true, false, false, $callback);

while(count($channel->callbacks)) {
    $channel->wait();
}

$channel->close();
$connection->close();
?>

如果想接收所有日志:

php receive_logs_topic.php "#"

接收"kern"设备的所有日志:

php receive_logs_topice.php "kern.*"

或者你只想监听"critical"的日志:

php receive_logs_topic.php "*.critical"

你可以创建多个绑定:

php receive_logs_topic.php "kern.*" "*.critical"

以及,发送一个带有"kern.critical"路由键的日志请输入:

php emit_log_topic.php "kern.critical" "A critical kernel error"

zed
请先登录后发表评论
  • latest comments
  • 总共0条评论