在前面的教程中,我们改进了日志系统。我们没有使用只能进行虚拟广播的扇出交换机,而是使用了直接交换机,从而有可能选择性地接收日志。
虽然使用直接交换改进了我们的系统,但它仍然有局限性——它不能基于多个标准进行路由。
在我们的日志系统中,我们可能不仅要根据严重性订阅日志,还要根据发出日志的源订阅日志。您可能从syslogunix工具中知道这个概念,该工具根据严重性(info/warn/crit…)和便利性(auth/cron/kern…)路由日志。
这将为我们提供很大的灵活性——我们可能只想监听来自“cron”的关键错误,也要监听来自“kern”的所有日志,为了在我们的日志系统中实现这一点,我们需要了解更复杂的主题交换。
主题交换
发送到主题交换的消息不能有任意的路由密钥——它必须是一个由点分隔的单词列表。单词可以是任何东西,但通常它们指定了与消息相关的一些特征。几个有效的路由密钥示例:
stock.usd.nyse, nyse.vmw, quick.orange.rabbit 路由密钥中可以有任意数量的单词,最多255个字节。
绑定密钥也必须采用相同的形式。主题交换背后的逻辑类似于直接交换——使用特定路由密钥发送的消息将被传递到使用匹配绑定密钥绑定的所有队列。但是,绑定密钥有两个重要的特殊情况:
- *(星号)只能代替一个单词
#(hash)可以替代零个或多个单词
用一个例子来解释这一点最容易:
在这个例子中,我们将发送所有描述动物的消息。消息将使用由三个单词(两点)组成的路由密钥发送。路由关键字中的第一个词将描述速度,第二个词是颜色,第三个词是物种: <speed>.<colour>.<species>
我们创建了三个绑定:Q1与绑定键绑定 *.orange.* Q2与 *.*.rabbit 和 lazy.#
这些绑定可以概括为:
- Q1对所有橙色的动物都感兴趣
- Q2想听听关于兔子的一切,以及关于懒惰动物的一切
路由密钥设置为 quick.orange.rabbit 的消息将同时传递到两个队列。消息 lazy.ornge.elephant 也会去他们俩那里。另一方面, quick.orange.fox 只会进入第一个队列, lazy.brown.ox 只会到达第二个队列。 lazy.pink.rabbit 只会被传递到第二个队列一次,即使它匹配两个绑定。 quick.bown.fox 与任何绑定都不匹配,因此将被丢弃
如果我们违反合同,发送一条包含一到四个单词的消息,比如橙色或 quick.orange.new-rabit ,会发生什么?好吧,这些消息将不匹配任何绑定,并将丢失
另一方面, lazy.ornge.new.rambit ,即使它有四个单词,也会匹配最后一个绑定,并被传递到第二个队列。
主题交换
主题交换功能强大,可以像其他交换一样运行。
当队列使用#(哈希)绑定键绑定时,它将接收所有消息,而不管路由键如何,就像扇出交换一样。
当绑定中不使用特殊字符*(星号)和#(哈希)时,主题交换的行为就像直接交换一样
放一起执行
我们将在日志系统中使用主题交换。我们将从一个可行的假设开始,即日志的路由密钥将有两个单词: <facility>.<severity>
代码几乎与上一教程中的代码相同。
emit_log_topic.php的代码:
1 <?php 2 3 require_once __DIR__ . '/vendor/autoload.php'; 4 use PhpAmqpLib\Connection\AMQPStreamConnection; 5 use PhpAmqpLib\Message\AMQPMessage; 6 7 $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest'); 8 $channel = $connection->channel(); 9 10 $channel->exchange_declare('topic_logs', 'topic', false, false, false); 11 12 $routing_key = isset($argv[1]) && !empty($argv[1]) ? $argv[1] : 'anonymous.info'; 13 $data = implode(' ', array_slice($argv, 2)); 14 if (empty($data)) { 15 $data = "Hello World!"; 16 } 17 18 $msg = new AMQPMessage($data); 19 20 $channel->basic_publish($msg, 'topic_logs', $routing_key); 21 22 echo ' [x] Sent ', $routing_key, ':', $data, "\n"; 23 24 $channel->close(); 25 $connection->close();
receive_logs_topic.php的代码:
1 <?php 2 3 require_once __DIR__ . '/vendor/autoload.php'; 4 use PhpAmqpLib\Connection\AMQPStreamConnection; 5 6 $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest'); 7 $channel = $connection->channel(); 8 9 $channel->exchange_declare('topic_logs', 'topic', false, false, false); 10 11 list($queue_name, ,) = $channel->queue_declare("", false, false, true, false); 12 13 $binding_keys = array_slice($argv, 1); 14 if (empty($binding_keys)) { 15 file_put_contents('php://stderr', "Usage: $argv[0] [binding_key]\n"); 16 exit(1); 17 } 18 19 foreach ($binding_keys as $binding_key) { 20 $channel->queue_bind($queue_name, 'topic_logs', $binding_key); 21 } 22 23 echo " [*] Waiting for logs. To exit press CTRL+C\n"; 24 25 $callback = function ($msg) { 26 echo ' [x] ', $msg->getRoutingKey(), ':', $msg->getBody(), "\n"; 27 }; 28 29 $channel->basic_consume($queue_name, '', false, true, false, false, $callback); 30 31 try { 32 $channel->consume(); 33 } catch (\Throwable $exception) { 34 echo $exception->getMessage(); 35 } 36 37 $channel->close(); 38 $connection->close();
要接收所有日志,请执行以下操作:
php receive_logs_topic.php "#"
要从设施kern接收所有日志,请执行以下操作:
php receive_logs_topic.php "kern.*"
或者,如果您只想了解关键日志:
php receive_logs_topic.php "*.critical"
您可以创建多个绑定:
php receive_logs_topic.php "kern.*" "*.critical"
并发出带有路由密钥kern.critical类型的日志:
php emit_log_topic.php "kern.critical" "A critical kernel error"
玩这些程序玩得开心。请注意,该代码没有对路由或绑定密钥做出任何假设,您可能希望使用两个以上的路由密钥参数。
接下来,在教程6中了解如何将往返消息作为远程过程调用执行。
官方链接:https://www.rabbitmq.com/tutorials/tutorial-five-php