[译]RabbitMQ教程四(PHP版)--routing

  • 作者:卡牌
  • 时间:2019-03-05
  • 阅读量:87

routing

(基于php-amqplib)

在上一个教程中,我们构建了一个简单的日志系统 我们能够向许多接收者广播日志消息

在本教程中,我们将为其添加一个功能 - 我们将只能订阅一部分消息。例如,我们只能将关键错误消息定向到日志文件(以节省磁盘空间),同时仍然能够在控制台上打印所有日志消息。

绑定

在前面的例子中,我们已经创建了绑定。 您可能会记得以下代码:

$channel->queue_bind($queue_name, 'logs');

绑定是交换和队列之间的关系。这可以简单地理解为:队列对来自特定的交换器的消息感兴趣。

绑定可以采用额外的routing_key参数。 为了避免与$ channel :: basic_publish参数混淆,我们将其称为绑定密钥。 这就是我们如何使用键创建绑定:

$binding_key = 'black';
$channel->queue_bind($queue_name, $exchange_name, $binding_key);

绑定密钥的含义取决于交换器类型。 我们之前使用的fanout交换器只是忽略了它的价值。

Direct 交换器

我们上一个教程中的日志记录系统向所有消费者广播所有消息。我们希望扩展它以允许根据消息的严重性过滤消息。例如,我们可能希望将日志消息写入磁盘的脚本仅接收严重错误,而不是在警告或信息日志消息上浪费磁盘空间。

我们使用的是fanout交换器,它没有给我们太大的灵活性 - 它只能进行无意识的广播。我们将使用Direct交换器。 直接交换器背后的路由算法很简单 - 消息进入队列,其绑定密钥与消息的路由密钥完全匹配。

为了说明这一点,请参考下图:

sending.png

在此设置中,我们可以看到直接交换X与两个绑定到它的队列。 第一个队列绑定orange,第二个绑定有两个绑定,一个绑定密钥为black,另一个绑定为green。

在这样的设置中,使用路由密钥orange发布到交换器的消息将被路由到队列Q1。 路由键为black或绿green的消息将转到Q2。 所有其他消息将被丢弃。

多个绑定

图解多个绑定

direct-exchange-multiple.png

使用相同的绑定密钥绑定多个队列是完全合法的。 在我们的示例中,我们可以在X和Q1之间添加绑定键黑black的绑定。 在这种情况下,直接交换将表现得像fanout一样,并将消息广播到所有匹配的队列。 路由键为black的消息将传送到Q1和Q2。

提交日志

我们将此模型用于我们的日志系统。 我们会将消息发送给direct交换器,而不是fanout。 我们将提供日志严重性作为routing_key。 这样接收脚本将能够选择它想要接收的严重性。 让我们首先关注发送日志。

一如既往,我们需要先创建一个交换器:

$channel->exchange_declare('direct_logs', 'direct', false, false, false);

我们已准备好发送消息:

$channel->exchange_declare('direct_logs', 'direct', false, false, false);
$channel->basic_publish($msg, 'direct_logs', $severity);

为简化起见,我们假设“严重性”可以是'info', 'warning', 'error'之一。

订阅

接收消息将像上一个教程一样工作,但有一个例外 - 我们将为我们感兴趣的每个严重性创建一个新的绑定。

foreach ($severities as $severity) {
    $channel->queue_bind($queue_name, 'direct_logs', $severity);
}

代码集合

示意图

python-four.png

emit_log_direct.php代码如下:

<?php

require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$channel->exchange_declare('direct_logs', 'direct', false, false, false);

$severity = isset($argv[1]) && !empty($argv[1]) ? $argv[1] : 'info';

$data = implode(' ', array_slice($argv, 2));
if (empty($data)) {
    $data = "Hello World!";
}

$msg = new AMQPMessage($data);

$channel->basic_publish($msg, 'direct_logs', $severity);

echo ' [x] Sent ', $severity, ':', $data, "\n";

$channel->close();
$connection->close();

receive_logs_direct.php代码如下:

<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$channel->exchange_declare('direct_logs', 'direct', false, false, false);

list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);

$severities = array_slice($argv, 1);
if (empty($severities)) {
    file_put_contents('php://stderr', "Usage: $argv[0] [info] [warning] [error]\n");
    exit(1);
}

foreach ($severities as $severity) {
    $channel->queue_bind($queue_name, 'direct_logs', $severity);
}

echo " [*] Waiting for logs. To exit press CTRL+C\n";

$callback = function ($msg) {
    echo ' [x] ', $msg->delivery_info['routing_key'], ':', $msg->body, "\n";
};

$channel->basic_consume($queue_name, '', false, true, false, false, $callback);

while (count($channel->callbacks)) {
    $channel->wait();
}

$channel->close();
$connection->close();

如果您只想将“warning”和“error”(而非“info”)日志消息保存到文件中,只需打开控制台并键入:

php receive_logs_direct.php warning error > logs_from_rabbit.log

如果您想在屏幕上看到所有日志消息,请打开一个新终端并执行以下操作:

php receive_logs_direct.php info warning error
# => [*] Waiting for logs. To exit press CTRL+C

并且,例如,要发出error日志消息,只需键入:

php emit_log_direct.php error "Run. Run. Or it will explode."
# => [x] Sent 'error':'Run. Run. Or it will explode.'

文章评论

共有0条评论来说两句吧...

提交
Top