3、RabbitMQ队列之工作队列【RabbitMQ官方教程】

news/2025/1/8 11:01:11/文章来源:https://www.cnblogs.com/dreamboycx/p/18641913

工作队列

使用 php-amqplib

 在第一个教程中,我们编写了从命名队列发送和接收消息的程序。在本例中,我们将创建一个工作队列,用于在多个工作人员之间分配耗时的任务。

工作队列(又名:任务队列)背后的主要思想是避免立即执行资源密集型任务,并必须等待其完成。相反,我们把任务安排在以后完成。我们将任务封装为消息并将其发送到队列。在后台运行的工作进程将弹出任务并最终执行作业。当你运行多个worker时,任务将在它们之间共享。

这个概念在web应用程序中特别有用,因为在短HTTP请求窗口内无法处理复杂的任务。

准备工作

在本教程的前一部分中,我们发送了一条包含“Hello World!”的消息。现在,我们将发送代表复杂任务的字符串。我们没有现实世界的任务,比如要调整大小的图像或要渲染的pdf文件,所以让我们假装很忙——使用sleep()函数。我们将把字符串中的点数作为其复杂性;每个点将占一秒钟的“工作”。例如,Hello描述的一个假任务。。。需要三秒钟

我们将稍微修改前面示例中的send.php代码,以允许从命令行发送任意消息。这个程序将把任务安排到我们的工作队列中,所以我们把它命名为new_task.php

$data = implode(' ', array_slice($argv, 1));
if (empty($data)) {$data = "Hello World!";
}
$msg = new AMQPMessage($data);$channel->basic_publish($msg, '', 'hello');echo ' [x] Sent ', $data, "\n";

我们的旧receive.php脚本也需要一些更改:它需要为消息体中的每个点伪造一秒钟的工作。它将从队列中弹出消息并执行任务,因此我们称之为worker.php

$callback = function ($msg) {echo ' [x] Received ', $msg->getBody(), "\n";sleep(substr_count($msg->getBody(), '.'));echo " [x] Done\n";
};$channel->basic_consume('hello', '', false, true, false, false, $callback);

请注意,我们的假任务模拟了执行时间。

按照教程一中的方式运行它们:

# shell 1
php worker.php
# shell 2
php new_task.php "A very hard task which takes two seconds.."

循环调度

使用任务队列的优点之一是能够轻松并行工作。如果我们正在建立积压的工作,我们可以添加更多的工作人员,这样就可以轻松扩展:

首先,让我们尝试同时运行两个worker.php脚本。它们都会从队列中收到消息,但具体是如何得到的呢?让我们看看。

你需要打开三个控制台。两个将运行worker.php脚本。这些游戏机将是我们的两个消费者——C1和C2

# shell 1
php worker.php
# => [*] Waiting for messages. To exit press CTRL+C
# shell 2
php worker.php
# => [*] Waiting for messages. To exit press CTRL+C

在第三部分中,我们将发布新任务。一旦你启动了消费者,你就可以发布一些消息:

# shell 3
php new_task.php First message.
php new_task.php Second message..
php new_task.php Third message...
php new_task.php Fourth message....
php new_task.php Fifth message.....

让我们看看我们的工人得到了什么:

# shell 1
php worker.php
# => [*] Waiting for messages. To exit press CTRL+C
# => [x] Received 'First message.'
# => [x] Received 'Third message...'
# => [x] Received 'Fifth message.....'
# shell 2
php worker.php
# => [*] Waiting for messages. To exit press CTRL+C
# => [x] Received 'Second message..'
# => [x] Received 'Fourth message....'

默认情况下,RabbitMQ将按顺序将每条消息发送给下一个消费者。平均而言,每个消费者都会收到相同数量的消息。这种分发消息的方式称为轮询。请尝试使用三个或更多工作线程。

消息确认

执行一个任务可能需要几秒钟的时间,你可能会想知道,如果消费者启动了一个长任务,但在完成之前就终止了,会发生什么。使用我们当前的代码,一旦RabbitMQ向消费者发送消息,它就会立即将其标记为删除。在这种情况下,如果终止一个worker,它刚刚处理的消息就会丢失。发送给此特定工作程序但尚未处理的消息也会丢失。

但我们不想失去任何任务。如果一名工人死亡,我们希望将任务交给另一名工人。

为了确保消息永远不会丢失,RabbitMQ支持消息确认。ack(nowledgement)由消费者发回,告诉RabbitMQ已收到、处理了特定消息,并且RabbitMQ可以自由删除它。

如果一个消费者在没有发送ack的情况下死亡(其通道关闭、连接关闭或TCP连接丢失),RabbitMQ将理解消息未完全处理,并将其重新排队。如果同时有其他消费者在线,它将迅速将其重新传递给另一个消费者。这样,即使工人偶尔死亡,你也可以确保没有信息丢失。

消费者交付确认时强制执行超时(默认为30分钟)。这有助于检测从不确认交付的有缺陷(卡住)的消费者。您可以按照“交货确认超时”中的说明增加此超时时间。

消息确认之前是由我们自己关闭的。是时候通过将basic_consume的第四个参数设置为false(true表示没有ack)来打开它们,并在我们完成任务后从worker发送适当的确认。

$callback = function ($msg) {echo ' [x] Received ', $msg->getBody(), "\n";sleep(substr_count($msg->getBody(), '.'));echo " [x] Done\n";$msg->ack();
};$channel->basic_consume('task_queue', '', false, false, false, false, $callback);

使用此代码,您可以确保即使在处理消息时使用CTRL+C终止工作进程,也不会丢失任何内容。在worker终止后不久,所有未确认的消息都会重新传递。

确认必须在收到交付的同一渠道上发送。尝试使用不同的通道进行确认将导致通道级协议异常。请参阅确认文档指南以了解更多信息

错过ack是一个常见的错误。这是一个容易犯的错误,但后果是严重的。当您的客户端退出时,消息将被重新传递(这可能看起来像是随机重新传递),但RabbitMQ将占用越来越多的内存,因为它无法释放任何未标记的消息。

为了调试这种错误,您可以使用 rabbitmqctl 打印 messages_unsacknowledged 字段

sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged

在Windows上,删除sudo:

rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged

消息持久性

我们已经学会了如何确保即使消费者死亡,任务也不会丢失。但如果RabbitMQ服务器停止,我们的任务仍然会丢失。

当RabbitMQ退出或崩溃时,它会忘记队列和消息,除非你告诉它不要这样做。需要做两件事来确保消息不会丢失:我们需要将队列和消息都标记为持久。

首先,我们需要确保队列在RabbitMQ节点重启后能够存活。为了做到这一点,我们需要宣布它是持久的。为此,我们将第三个参数传递给 queue_declare 为 true :

$channel->queue_declare('hello', false, true, false, false);

虽然这个命令本身是正确的,但它在我们目前的设置中不起作用。这是因为我们已经定义了一个名为hello的队列,它是不持久的。RabbitMQ不允许您使用不同的参数重新定义现有队列,并将向任何试图这样做的程序返回错误。但有一个快速的解决方法——让我们用不同的名称声明一个队列,例如task_queue:

$channel->queue_declare('task_queue', false, true, false, false);

设置为true的此标志需要应用于生产者和消费者代码。

此时,我们确信即使RabbitMQ重新启动,task_queue队列也不会丢失。现在我们需要将消息标记为持久。

  • 通过设置 delivery_mode=2 消息属性,AMQPMessage将其作为属性数组的一部分
$msg = new AMQPMessage($data,array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT)
);

关于消息持久性的说明

将消息标记为持久并不能完全保证消息不会丢失。尽管它告诉RabbitMQ将消息保存到磁盘,但RabbitMQ接受消息但尚未保存消息的时间窗口仍然很短。此外,RabbitMQ不会对每条消息执行fsync(2)——它可能只是保存到缓存中,而不是真正写入磁盘。持久性保证并不强,但对于我们的简单任务队列来说已经足够了。如果你需要更强有力的保证,那么你可以使用出版商确认

公平调度

 

 

官方链接:https://www.rabbitmq.com/tutorials/tutorial-two-php

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/861491.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

静力学FEM12.30

1.静力学方程 考虑图所示变截面弹性杆的静态响应。这是线性应力分析或线弹性问题的一个例子,我们需要求杆内的应力分布σ(x)。 应力由物体的变形产生,而变形由物体内各点的位移u(x)表征。位移导致用ε(x)表示的应变;应变是一个无量纲变量。杆受到分布力b(x)或集中力作用。这…

软件工程个人总结作业

项目 详细信息这个作业属于哪个课程 https://edu.cnblogs.com/campus/fzu/SE2024这个作业要求在哪里 作业要求这个作业的目标 软工实践个人总结学号 102201233一、学期回顾 1.1 回顾你对于软件工程课程的想象 1.1.1 达到期待和目标的部分算法编写能力的提升目标:提高解决复杂算…

一袋米要抗几楼——软工学期回顾

这个作业属于哪个课程 https://edu.cnblogs.com/campus/fzu/SE2024这个作业要求在哪里 https://edu.cnblogs.com/campus/fzu/SE2024/homework/13315这个作业的目标 对整个学期的学习进行总结学号 102201130🎓 一、学期回顾 1.1 回顾你对于软件工程课程的想象在上这门课之前,…

java.sql.SQLException: ORA-00600: 内部错误代码, 参数: [kcbnew_3]的其中一个解决方法

ORA-00600 解决方案java.sql.SQLException: ORA-00600: 内部错误代码, 参数: [kcbnew_3]的其中一个解决方法 重启 重启 重启 oracle服务。 今天反馈添加数据库报错 。试了一下就几各别的表不能插入。别的表好好的 GPT一下并检查了表空间都没什么问题。 执行 INSERT INTO DEVIC…

库卡机器人KR240电源模块维修思路讲解

一、库卡机器人KR240电源模块故障诊断 故障诊断是维修过程中的关键步骤。使用库卡提供的诊断工具或软件,对库卡机器人KR240电源模块进行故障诊断。重点关注电源供应、输出电压、电流等关键参数。通过诊断结果,确定故障的具体位置和性质,为后续的维修工作提供明确方向。 二、…

【Airflow】入门笔记

前言 Airflow入门教程 正文 简介 任务管理、调度、监控工作流平台。 基于DAG(有向无环图)的任务管理系统。 基本架构组件scheduler: 以有向无环图(dag)的形式创建任务工作流,根据用户的配置将任务定时/定期进行调度 worker: 任务的执行单元,worker会从任务队列当中拉取任务…

[Airflow] 入门笔记

前言 Airflow入门教程 正文 简介 任务管理、调度、监控工作流平台。 基于DAG(有向无环图)的任务管理系统。 基本架构组件scheduler: 以有向无环图(dag)的形式创建任务工作流,根据用户的配置将任务定时/定期进行调度 worker: 任务的执行单元,worker会从任务队列当中拉取任务…

2024下学期加分项

软考中级设计师通过资格证书

直接调用文件设置qt可执行程序的图标,运行时的图标,exe本身的图标,以及固定到任务栏时的图标,窗口坐上角的图标

// 设置应用程序图标(窗口图标和任务栏图标)this->setWindowIcon(QIcon("./Icon/ReadADtool.ico")); // 从资源文件中加载图标 固定到任务栏上时的图标: 在pro文件添加如下指令:设置rc文件内容:IDI_ICON1 ICON DISCARDABLE "ReadADtool.ico…

Ajax入门以及Axios的详细使用(含Promise)

1. 概述 1.1 是什么Ajax = Asynchronous JavaScript and XML(异步的 JavaScript 和 XML)Ajax 不是新的编程语言,而是一种用于创建快速动态网页的技术Ajax 最大的优点是在不重新加载整个页面的情况下,可以与服务器交换数据并更新部分网页内容,使网页实现异步更新传统的网页…