定义路由键(routing key)的例子:'stock.usd.nyse',nyse.vmw,quick.orange.rabbit
绑定键(binding key)也必须是同样的格式。topic交换机的逻辑与direct交换机是相似的——一条携带特定路由键(routing key)发送的消息将会被分发到所有以与之匹配的绑定键(binding key)绑定的Queue(队列)。
如图:创建了三个绑定:Q1队列以
'*.orange.*
路由键绑定,Q2以'*.*.rabbit
与lazy.#
(两个路由键)绑定。
这些绑定可以概括为:
例:
'quick.orange.rabbit
的消息将会被交付到这两个队列'lazy.orange.elephant'
的消息也会被交付到这两个队列'quick.orange.fox'
的消息只会去到第一个(Q1)队列'lazy.brown.fox'
只会发送到第二个(Q2)队列lazy.pink.rabbit
的消息只会被发送到第二个(Q2)队列一次'quick.brown.fox'
不匹配任何一个绑定,所以它会被丢弃'orange'
或者'quick.orange.male.rabbit'
这些消息将不会匹配到任何绑定上并且将会被丢失。'lazy.orange.male.rabbit'
拥有四个词,它将会匹配最后一个绑定(lazy.#)
,并将会发送到第二个队列话题交换机(Topic Exchange)
话题交换机是十分强大的,同时它能表现得想其他的交换机一样。
当一个Queue(队列)以"#"(井号)绑定键(binding key)绑定——它将会忽视路由键(routing key)去接收所有消息就像是一个fanout类型的交换机一样。
当特殊字符"*"(星号)与"#"(井号)没有被用在绑定(键)上,这时候topic交换机由表现得像direct交换机一样。
我们将从一个工作假设开始,即日志的路由键将有两个单词:"
emit_log_topic.php的代码如下:
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
// 声明一个topic交换机
$channel->exchange_declare('topic_logs', 'topic', false, false, false);
// 从命令行获取路由键
$routing_key = isset($argv[1]) && !empty($argv[1]) ? $argv[1] : 'anonymous.info';
$data = implode(' ', array_slice($argv, 2));
if (empty($data)) {
$data = "Hello World!";
}
$msg = new AMQPMessage($data);
$channel->basic_publish($msg, 'topic_logs', $routing_key);
echo ' [x] Sent ', $routing_key, ':', $data, "\n";
$channel->close();
$connection->close();
receive_logs_topic.php的代码如下:
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
// 声明topic交换机
$channel->exchange_declare('topic_logs', 'topic', false, false, false);
// 获取非持久化队列
list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);
$binding_keys = array_slice($argv, 1);
if( empty($binding_keys )) {
file_put_contents('php://stderr', "Usage: $argv[0] [binding_key]\n");
exit(1);
}
// 绑定多个绑定键到队列
foreach($binding_keys as $binding_key) {
$channel->queue_bind($queue_name, 'topic_logs', $binding_key);
}
echo ' [*] Waiting for logs. To exit press CTRL+C', "\n";
$callback = function($msg){
echo ' [x] ',$msg->delivery_info['routing_key'], ':', $msg->body, "\n";
};
$channel->basic_consume($queue_name, '', false, true, false, false, $callback);
while(count($channel->callbacks)) {
$channel->wait();
}
$channel->close();
$connection->close();
?>
如果想接收所有日志:
php receive_logs_topic.php "#"
接收"kern"设备的所有日志:
php receive_logs_topice.php "kern.*"
或者你只想监听"critical"的日志:
php receive_logs_topic.php "*.critical"
你可以创建多个绑定:
php receive_logs_topic.php "kern.*" "*.critical"
以及,发送一个带有"kern.critical"路由键的日志请输入:
php emit_log_topic.php "kern.critical" "A critical kernel error"
最新评论