工作队列
使用 php-amqplib
在第一个教程中,我们编写了从命名队列发送和接收消息的程序。在本例中,我们将创建一个工作队列,用于在多个工作人员之间分配耗时的任务。
工作队列(又名:任务队列)背后的主要思想是避免立即执行资源密集型任务,并必须等待其完成。相反,我们把任务安排在以后完成。我们将任务封装为消息并将其发送到队列。在后台运行的工作进程将弹出任务并最终执行作业。当你运行多个worker时,任务将在它们之间共享。
这个概念在web应用程序中特别有用,因为在短HTTP请求窗口内无法处理复杂的任务。
准备工作
在本教程的前一部分中,我们发送了一条包含“Hello World!”的消息。现在,我们将发送代表复杂任务的字符串。我们没有现实世界的任务,比如要调整大小的图像或要渲染的pdf文件,所以让我们假装很忙——使用sleep()函数。我们将把字符串中的点数作为其复杂性;每个点将占一秒钟的“工作”。例如,Hello描述的一个假任务。。。需要三秒钟
我们将稍微修改前面示例中的send.php代码,以允许从命令行发送任意消息。这个程序将把任务安排到我们的工作队列中,所以我们把它命名为new_task.php
$data = implode(' ', array_slice($argv, 1)); if (empty($data)) {$data = "Hello World!"; } $msg = new AMQPMessage($data);$channel->basic_publish($msg, '', 'hello');echo ' [x] Sent ', $data, "\n";
我们的旧receive.php脚本也需要一些更改:它需要为消息体中的每个点伪造一秒钟的工作。它将从队列中弹出消息并执行任务,因此我们称之为worker.php
$callback = function ($msg) {echo ' [x] Received ', $msg->getBody(), "\n";sleep(substr_count($msg->getBody(), '.'));echo " [x] Done\n"; };$channel->basic_consume('hello', '', false, true, false, false, $callback);
请注意,我们的假任务模拟了执行时间。
按照教程一中的方式运行它们:
# shell 1 php worker.php
# shell 2 php new_task.php "A very hard task which takes two seconds.."
循环调度
使用任务队列的优点之一是能够轻松并行工作。如果我们正在建立积压的工作,我们可以添加更多的工作人员,这样就可以轻松扩展:
首先,让我们尝试同时运行两个worker.php脚本。它们都会从队列中收到消息,但具体是如何得到的呢?让我们看看。
你需要打开三个控制台。两个将运行worker.php脚本。这些游戏机将是我们的两个消费者——C1和C2
# shell 1 php worker.php # => [*] Waiting for messages. To exit press CTRL+C
# shell 2 php worker.php # => [*] Waiting for messages. To exit press CTRL+C
在第三部分中,我们将发布新任务。一旦你启动了消费者,你就可以发布一些消息:
# shell 3 php new_task.php First message. php new_task.php Second message.. php new_task.php Third message... php new_task.php Fourth message.... php new_task.php Fifth message.....
让我们看看我们的工人得到了什么:
# shell 1 php worker.php # => [*] Waiting for messages. To exit press CTRL+C # => [x] Received 'First message.' # => [x] Received 'Third message...' # => [x] Received 'Fifth message.....'
# shell 2 php worker.php # => [*] Waiting for messages. To exit press CTRL+C # => [x] Received 'Second message..' # => [x] Received 'Fourth message....'
默认情况下,RabbitMQ将按顺序将每条消息发送给下一个消费者。平均而言,每个消费者都会收到相同数量的消息。这种分发消息的方式称为轮询。请尝试使用三个或更多工作线程。
消息确认
执行一个任务可能需要几秒钟的时间,你可能会想知道,如果消费者启动了一个长任务,但在完成之前就终止了,会发生什么。使用我们当前的代码,一旦RabbitMQ向消费者发送消息,它就会立即将其标记为删除。在这种情况下,如果终止一个worker,它刚刚处理的消息就会丢失。发送给此特定工作程序但尚未处理的消息也会丢失。
但我们不想失去任何任务。如果一名工人死亡,我们希望将任务交给另一名工人。
为了确保消息永远不会丢失,RabbitMQ支持消息确认。ack(nowledgement)由消费者发回,告诉RabbitMQ已收到、处理了特定消息,并且RabbitMQ可以自由删除它。
如果一个消费者在没有发送ack的情况下死亡(其通道关闭、连接关闭或TCP连接丢失),RabbitMQ将理解消息未完全处理,并将其重新排队。如果同时有其他消费者在线,它将迅速将其重新传递给另一个消费者。这样,即使工人偶尔死亡,你也可以确保没有信息丢失。
消费者交付确认时强制执行超时(默认为30分钟)。这有助于检测从不确认交付的有缺陷(卡住)的消费者。您可以按照“交货确认超时”中的说明增加此超时时间。
消息确认之前是由我们自己关闭的。是时候通过将basic_consume的第四个参数设置为false(true表示没有ack)来打开它们,并在我们完成任务后从worker发送适当的确认。
$callback = function ($msg) {echo ' [x] Received ', $msg->getBody(), "\n";sleep(substr_count($msg->getBody(), '.'));echo " [x] Done\n";$msg->ack(); };$channel->basic_consume('task_queue', '', false, false, false, false, $callback);
使用此代码,您可以确保即使在处理消息时使用CTRL+C终止工作进程,也不会丢失任何内容。在worker终止后不久,所有未确认的消息都会重新传递。
确认必须在收到交付的同一渠道上发送。尝试使用不同的通道进行确认将导致通道级协议异常。请参阅确认文档指南以了解更多信息
错过ack是一个常见的错误。这是一个容易犯的错误,但后果是严重的。当您的客户端退出时,消息将被重新传递(这可能看起来像是随机重新传递),但RabbitMQ将占用越来越多的内存,因为它无法释放任何未标记的消息。
为了调试这种错误,您可以使用 rabbitmqctl 打印 messages_unsacknowledged 字段
sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged在Windows上,删除sudo:
rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged
消息持久性
我们已经学会了如何确保即使消费者死亡,任务也不会丢失。但如果RabbitMQ服务器停止,我们的任务仍然会丢失。
当RabbitMQ退出或崩溃时,它会忘记队列和消息,除非你告诉它不要这样做。需要做两件事来确保消息不会丢失:我们需要将队列和消息都标记为持久。
首先,我们需要确保队列在RabbitMQ节点重启后能够存活。为了做到这一点,我们需要宣布它是持久的。为此,我们将第三个参数传递给 queue_declare 为 true :
$channel->queue_declare('hello', false, true, false, false);
虽然这个命令本身是正确的,但它在我们目前的设置中不起作用。这是因为我们已经定义了一个名为hello的队列,它是不持久的。RabbitMQ不允许您使用不同的参数重新定义现有队列,并将向任何试图这样做的程序返回错误。但有一个快速的解决方法——让我们用不同的名称声明一个队列,例如task_queue:
$channel->queue_declare('task_queue', false, true, false, false);
设置为true的此标志需要应用于生产者和消费者代码。
此时,我们确信即使RabbitMQ重新启动,task_queue队列也不会丢失。现在我们需要将消息标记为持久。
- 通过设置 delivery_mode=2 消息属性,AMQPMessage将其作为属性数组的一部分
$msg = new AMQPMessage($data,array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT) );
关于消息持久性的说明
将消息标记为持久并不能完全保证消息不会丢失。尽管它告诉RabbitMQ将消息保存到磁盘,但RabbitMQ接受消息但尚未保存消息的时间窗口仍然很短。此外,RabbitMQ不会对每条消息执行fsync(2)——它可能只是保存到缓存中,而不是真正写入磁盘。持久性保证并不强,但对于我们的简单任务队列来说已经足够了。如果你需要更强有力的保证,那么你可以使用出版商确认
公平调度
官方链接:https://www.rabbitmq.com/tutorials/tutorial-two-php