[译]RabbitMQ教程二(PHP版)--工作队列

  • 作者:卡牌
  • 时间:2019-03-05
  • 阅读量:79

工作队列

(基于 php-amqplib)

官方文档

下图表示工作队列

sending.png

在上一章的教程中,我们写了一个从命名队列中发送消息和接收消息的程序。在本节中,我们将会创建一个工作队列:将耗时的任务分发给多个工作者。

工作队列背后主要的思想是避免直接做资源密集型任务,并且不得不等待其完成。我们将任务放到之后再做来取代这种做法。我们把任务封装为一个 消息,放入队列中。一个工作进程会在一致运行在后台,取出任务并且完成这个任务。当你启动多个工作者的时候,任务将会在多个工作者之间分享。

这个概念在WEB应用中是非常有用的。在这些应用程序中,短的HTTP请求无法处理复杂的任务。

准备工作

在之前的一章教程,我们发送了一个包含"HELLO WORLD"的消息,现在我们将发送代表复杂任务的字符串。我们没有真实的任务,比如调整图片的大小或者渲染PDF文件。所以我们使用sleep()函数来伪造耗时任务。我们用字符串中的.来表示任务的复杂性,一个.表示耗时1秒。比如,一个伪造任务描述为hello...表示耗时3秒。

我们将微调之前send.php的代码,为了允许特定的信息被发送到命令行。这个程序将调度任务到工作队列,将它命名为new_task.php

$data = implode(' ', array_slice($argv, 1));
if (empty($data)) {
    $data = "Hello World!";
}
$msg = new AMQPMessage($data);

$channel->basic_publish($msg, '', 'hello');

echo ' [x] Sent ', $data, "\n";

完整代码如下:

<?php
require_once __DIR__ . "/vendor/autoload.php";

use PhpAmqpLib\Connection\AMQPStreamConnection;

use PhpAmqpLib\Message\AMQPMessage;

// 建立连接,如果是非本机,需要创立一个拥有管理员标签的用户
$connection =
    new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');

$channel = $connection->channel();

$channel->queue_declare('hello', false, false, false, false);

$data = implode(' ', array_slice($argv, 1));

if (empty($data)) {
    $data = 'hello world!';
}

$msg = new AMQPMessage($data);

$channel->basic_publish($msg, '', 'hello');

echo "[x] sent'".$data."\n";

$channel->close();
$connection->close();

receive脚本也需要做出一些改变:它需要伪造在消息体中为每个.耗时一秒的工作。它将从队列中取出消息并执行任务,所以我们将它叫做worker.php

$callback = function ($msg) {
    echo "[x]Received". $msg->body ."\n";
    sleep(substr_count($msg->body, '.'));
    echo "[x] Done\n";
};

注意,我们是模仿耗时任务。 执行如下命令

# shell 1
php worker.php

# shell 2
php new_task.php "A very hard task which takes two seconds.."

循环分发

使用任务队列的优点之一就是并行处理的能力,如果我们积累了大量的工作,我们需要增加更多的工作者,很容易的进行拓展。

首先,我们同时运营两个worker.php脚本,他们都将会从队列中获取获取消息,但是它是怎么执行的呢?让我们来看看。

你需要打开三个控制台,两个运行worker.php脚本,我们称之为消费者c1和消费者c2。

# shell 1
php worker.php
# => [*] Waiting for messages. To exit press CTRL+C

# shell 2
php worker.php
# => [*] Waiting for messages. To exit press CTRL+C

在第三个控制台中,我们将发布心的任务,一旦你以及开启了消费者,你就可以发布一些信息。

# shell 3
php new_task.php First message.
php new_task.php Second message..
php new_task.php Third message...
php new_task.php Fourth message....
php new_task.php Fifth message.....

我们看一下对工作者分发了什么:

# shell 1
php worker.php
# => [*] Waiting for messages. To exit press CTRL+C
# => [x] Received 'First message.'
# => [x] Received 'Third message...'
# => [x] Received 'Fifth message.....'

# shell 2
php worker.php
# => [*] Waiting for messages. To exit press CTRL+C
# => [x] Received 'Second message..'
# => [x] Received 'Fourth message....'

默认情况下,RabbitMQ会轮流的分发每一个消息给下一个消费者。平均每个消费者接收到的消息数量是相等的。这种分发消息是机制叫做:循环。尝试建立三个工作者或者更多。

消息确认

执行一个任务需要耗时几秒,你也许会疑问:一个消费者开始执行一个长任务,并且只完成了一部分就崩溃了,将会发生什么。基于当前的代码,一旦RabbitMQ向消费者发送信息,消息就会立即被标记删除。在这个例子中,如果你杀死一个工作者,其正在处理的消息将会丢失,并且分发给这个工作者的所有未处理的消息都会丢失。

但是我们并不想丢失任何任务,如果一个工作者崩溃了,我们更想将他的任务发送给其他工作者。

为了确认消息不曾丢失,RabbitMQ支持消息确认功能。消费者将发送一个确认信息来告诉RabbitMQ,当前的消息以及被接收、处理完成,RabbitMQ可以随意删除它了。

如果一个工作者崩溃(通道被关闭,连接被关闭,或者tcp连接丢失),且没有发送消息确认,RabbitMQ会知道这个消息没有被完全处理,并且将它重新放入队列。如果有其他同时在线的消费者,这个消息会被快速的分发到其他消费者。这样你就可以确认没有消息丢失,即使消费者意外崩溃。

对于RabbitMQ而言,没有超时这一说,如果消费者在执行任务的时候崩溃,RabbitMQ会重新分发消息,即使消息需要花费很久很久的时间也没有关系。

默认情况下,消息确认已关闭。 是时候通过将第四个参数设置为basic_consume来打开它们(true表示没有确认)并在完成任务后从工作人员发送正确的确认。

$callback = function ($msg) {
echo ' [x] Received ', $msg->body, "\n";
sleep(substr_count($msg->body, '.'));
echo " [x] Done\n";
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
};

$channel->basic_consume('task_queue', '', false, false, false, false, $callback);

使用这部分代码,尽管使用了ctrl+c杀掉一个在执行任务工作者,任何消息都不会丢失。尽管后来工作者崩溃,所有未确认的消息都将会被重新发送。

消息确认必须在同一个信道发送,试图在不同信道发送将导致channel-level的协议异常。

消息持久化

我们已经学习了如何确保在消费者死亡的情况下任务不会丢失。但是当RabbitMQ服务关闭的时候,任务也会丢失。

当RabbitMQ退出或者崩溃,它都会丢失队列与消息,除非你告诉它不要这么做。信息不会丢失需要我们确认两个事情:持久化队列和持久化消息。

首先,我们需要确认RabbitMQ不会将我们的队列丢失。为了达到这个目的,我们需要申明一个队列。将queue_declare的第三个参数设置为true

$channel->queue_declare('hello', false, true, false, false);

尽管命令是正确的,但是它启动的时候不会像我们想的那样。因为我们已经定义了一个hello的非持久化的队列。RabbitMQ不允许重复定义参数不同的已经存在的队列,并且返回一个错误。但是我们有一个快速的解决办法,申明一个不同名称的队列task_queue。

$channel->queue_declare('task_queue', false, true, false, false);

设置true的标识需要被同时应用于生产者和消费者的代码中。

此时我们确信即使RabbitMQ重新启动,task_queue队列也不会丢失。 现在我们需要将消息标记为持久性 - 通过设置delivery_mode = 2消息属性,AMQPMessage将其作为属性数组的一部分。

$msg = new AMQPMessage(
    $data,
    array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT)
);

PS:将消息标记为持久性并不能完全保证消息不会丢失。 虽然它告诉RabbitMQ将消息保存到磁盘,但是当RabbitMQ接受消息并且尚未保存消息时,仍然有一个短时间窗口。 此外,RabbitMQ不会为每条消息执行fsync(2) - 它可能只是保存到缓存而不是真正写入磁盘。 持久性保证不强,但对于我们简单的任务队列来说已经足够了。 如果您需要更强的保证,那么您可以使用发布者确认。

公平分发

你或许已经注意到:分发任务并不是我们想的那样。例如,在两个工作者的条件下,当所有的技术消息耗时较长,偶数消息耗时较短。一个工作者一直很忙,而另外一个几乎不工作。而RabbitMQ对此一无所知,仍然均匀的发送消息。

因为当消息进入队列的时候,RabbitMQ只是分发消息。它不关注消费者未确认消息的数量。它只是盲目的将第n个消息发送个第n个消费者。

下图表示RabbitMQ分发消息

prefetch-count.png

为了避免此种情况的出现,我们可以设置basic_qos方法,设置参数 prefetch_count = 1。它告诉RabbitMQ不要在同时给一个工作者多个消息。也就是说,不要在工作者处理并且返回消息确认之前分发给它一个新的消息。而是分发给不忙的下一个工作者。

$channel->basic_qos(null, 1, null);

关于队列大小的说明 如果所有工作人员都很忙,您的队列就会填满。 您将需要关注这一点,并可能添加更多工作人员,或者采取其他策略。

最终源码:

new_task.php

<?php

require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$channel->queue_declare('task_queue', false, true, false, false);

$data = implode(' ', array_slice($argv, 1));
if (empty($data)) {
    $data = "Hello World!";
}
$msg = new AMQPMessage(
    $data,
    array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT)
);

$channel->basic_publish($msg, '', 'task_queue');

echo ' [x] Sent ', $data, "\n";

$channel->close();
$connection->close();

worker.php:

<?php

require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$channel->queue_declare('task_queue', false, true, false, false);

echo " [*] Waiting for messages. To exit press CTRL+C\n";

$callback = function ($msg) {
    echo ' [x] Received ', $msg->body, "\n";
    sleep(substr_count($msg->body, '.'));
    echo " [x] Done\n";
    $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
};

$channel->basic_qos(null, 1, null);
$channel->basic_consume('task_queue', '', false, false, false, false, $callback);

while (count($channel->callbacks)) {
    $channel->wait();
}

$channel->close();
$connection->close();

消息确认和读取,你可以启动一个队列,持久化的设置选项会使任务存活尽管RabbitMQ重启。

文章评论

共有0条评论来说两句吧...

提交
Top