[译]RabbitMQ教程五(PHP版)--topic

  • 作者:卡牌
  • 时间:2019-03-05
  • 阅读量:72

TOPIC

(基于 php-amqplib)

在上一个教程中,我们改进了日志系统。 我们使用的是direct交换器,而不是使用只能进行虚拟广播的fanout交换器,并且有可能选择性地接收日志。

虽然使用直接交换改进了我们的系统,但它仍然有局限性 - 它不能基于多个标准进行路由。

在我们的日志记录系统中,我们可能不仅要根据严重性订阅日志,还要根据发出日志的源来订阅日志。您可能从syslog unix工具中了解这个概念,该工具根据严重性(info / warn / crit ...)和facility(auth / cron / kern ...)来路由日志。

这会给我们带来很大的灵活性 - 我们可能想要监听来自'cron'的关键错误以及来自'kern'的所有日志。

为了改善这个功能,我们需要学习一些复杂的 topic交换器。

Topic交换器

发送到topic交换器的信息不能具有任意的routing_key-它必须是由点分隔的单词列表。单词可以是任何内容,但通常它们指定与消息相关的一些功能。一些有效的路由键示例:“stock.usd.nyse”,“nyse.vmw”,“quick.orange.rabbit”。路由密钥中可以包含任意数量的单词,最多可达255个字节。

绑定密钥也必须采用相同的形式。topic交换器背后的逻辑类似于direct交换器。使用特定路由密钥发送的消息将被传递到与匹配绑定密钥绑定的所有队列。 但是绑定键有两个重要的特殊情况:

  • *(星号)可以替代一个单词。
  • #(hash)可以替换零个或多个单词。

在一个例子中解释这个是最容易的:

python-four.png

在这个例子中,我们将发送所有描述动物的消息。 消息将与包含三个单词(两个点)的路由键一起发送。 路由键中的第一个单词将描述速度,第二个是颜色,第三个是物种:“<speed>。<color>。<species>”。

我们创建了三个绑定:Q1绑定了绑定键“* .orange。”,Q2绑定了“。*。rabbit”和“lazy。#”。

这些绑定可以概括为:

  • Q1对所有orange色动物感兴趣。
  • Q2希望监听听到关于rabbit的一切,以及关于lazy动物的一切。

路由密钥设置为“quick.orange.rabbit”的消息将传递到两个队列。 消息“lazy.orange.elephant”也将同时发送给他们。另一方面,“quick.orange.fox”只会转到第一个队列,而“lazy.brown.fox”只会转到第二个队列。 “lazy.pink.rabbit”将仅传递到第二个队列一次,即使它匹配两个绑定。 “quick.brown.fox”与任何绑定都不匹配,因此它将被丢弃。

如果我们违反合同并发送带有一个或四个单词的消息,例如“orange”或“quick.orange.male.rabbit”,会发生什么? 好吧,这些消息将不匹配任何绑定,并将丢失。

另一方面,“lazy.orange.male.rabbit”,即使它有四个单词,也会匹配最后一个绑定,并将被传递到第二个队列。

topic交换器

topic交换器功能强大,可以像其他交换器一样运行。

当队列绑定“#”(哈希)绑定密钥时 - 它将接收所有消息,而不管路由密钥 - 如fanout交换。

当特殊字符“*”(星号)和“#”(哈希)未在绑定中使用时,topic交换器的行为就像direct交换器一样。

代码集合: 我们将在我们的日志记录系统中使用topic交换器。 我们将首先假设日志的路由键有两个词:“<facility>。<severity>”。

emit_log_topic.php

<?php

require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$channel->exchange_declare('topic_logs', 'topic', false, false, false);

$routing_key = isset($argv[1]) && !empty($argv[1]) ? $argv[1] : 'anonymous.info';
$data = implode(' ', array_slice($argv, 2));
if (empty($data)) {
    $data = "Hello World!";
}

$msg = new AMQPMessage($data);

$channel->basic_publish($msg, 'topic_logs', $routing_key);

echo ' [x] Sent ', $routing_key, ':', $data, "\n";

$channel->close();
$connection->close();

receive_logs_topic.php

<?php

require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$channel->exchange_declare('topic_logs', 'topic', false, false, false);

list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);

$binding_keys = array_slice($argv, 1);
if (empty($binding_keys)) {
    file_put_contents('php://stderr', "Usage: $argv[0] [binding_key]\n");
    exit(1);
}

foreach ($binding_keys as $binding_key) {
    $channel->queue_bind($queue_name, 'topic_logs', $binding_key);
}

echo " [*] Waiting for logs. To exit press CTRL+C\n";

$callback = function ($msg) {
    echo ' [x] ', $msg->delivery_info['routing_key'], ':', $msg->body, "\n";
};

$channel->basic_consume($queue_name, '', false, true, false, false, $callback);

while (count($channel->callbacks)) {
    $channel->wait();
}

$channel->close();
$connection->close();

要接收所有日志:

php receive_logs_topic.php "#"

要接收“kern”的所有日志:

php receive_logs_topic.php "kern.*"

或者,如果您只想听到“critical”日志:

php receive_logs_topic.php "*.critical"

您可以创建多个绑定:

php receive_logs_topic.php "kern.*" "*.critical"

并使用路由键“kern.critical”类型发出日志:

php emit_log_topic.php "kern.critical" "A critical kernel error"

玩这些程序玩得开心。 请注意,代码不会对路由或绑定密钥做出任何假设,您可能希望使用两个以上的路由密钥参数。

文章评论

共有0条评论来说两句吧...

提交
Top