RabbitMQ 工作队列(Work queues)模式示例

总结自:BV15k4y1k7Ep

模式说明

1556009144848

Work queues与简单模式相比,多了一个或一些消费端,多个消费端共同消费同一个队列中的消息。消费者之间是竞争的关系。

应用场景:对于任务过重或任务较多情况使用工作队列可以提高任务处理的速度。

代码

Work queues简单模式的代码是几乎一样的,可以完全复制,并多复制一个消费者进行多个消费者同时消费消息的测试。

1)生产者

package com.zhangmingge.rabbitmq.work;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.zhangmingge.rabbitmq.ConnectionUtil;public class Producer {static final String QUEUE_NAME = "work_queue";public static void main(String[] args) throws Exception {// 创建连接Connection connection = ConnectionUtil.getConnection();// 创建频道Channel channel = connection.createChannel();// 声明(创建)队列/** 参数 1:队列名称* 参数 2:是否定义持久化队列* 参数 3:是否独占本次连接* 参数 4:是否在不使用的时候自动删除队列* 参数 5:队列其它参数*/channel.queueDeclare(QUEUE_NAME, true, false, false, null);for (int i = 1; i <= 30; i++) {// 发送信息String message = "你好;小兔子!work 模式--" + i;/** 参数 1:交换机名称,如果没有指定则使用默认 Default Exchange* 参数 2:路由 key,简单模式可以传递队列名称* 参数 3:消息其它属性* 参数 4:消息内容*/channel.basicPublish("", QUEUE_NAME, null, message.getBytes());System.out.println("已发送消息:" + message);}// 关闭资源channel.close();connection.close();}
}

2)消费者 1

package com.zhangmingge.rabbitmq.work;import com.rabbitmq.client.*;
import com.zhangmingge.rabbitmq.ConnectionUtil;import java.io.IOException;public class Consumer1 {public static void main(String[] args) throws Exception {Connection connection = ConnectionUtil.getConnection();// 创建频道final Channel channel = connection.createChannel();// 声明(创建)队列/** 参数 1:队列名称* 参数 2:是否定义持久化队列* 参数 3:是否独占本次连接* 参数 4:是否在不使用的时候自动删除队列* 参数 5:队列其它参数*/channel.queueDeclare(Producer.QUEUE_NAME, true, false, false, null);// 一次只能接收并处理一个消息channel.basicQos(1);// 创建消费者;并设置消息处理DefaultConsumer consumer = new DefaultConsumer(channel) {/** consumerTag 消息者标签,在 channel.basicConsume 时候可以指定* envelope 消息包的内容,可从中获取消息 id,消息 routingKey,交换机,消息和重传标志 (收到消息失败后是否需要重新发送)* properties 属性信息* body 消息*/@Overridepublic void handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body) throws IOException {try {// 路由 keySystem.out.println("路由 key 为:" + envelope.getRoutingKey());// 交换机System.out.println("交换机为:" + envelope.getExchange());// 消息 idSystem.out.println("消息 id 为:" + envelope.getDeliveryTag());// 收到的消息System.out.println("消费者 1-接收到的消息为:" + new String(body, "utf-8"));Thread.sleep(1000);// 确认消息channel.basicAck(envelope.getDeliveryTag(), false);} catch (InterruptedException e) {e.printStackTrace();}}};// 监听消息/** 参数 1:队列名称* 参数 2:是否自动确认,设置为 true 为表示消息接收到自动向 mq 回复接收到了,mq 接收到回复会删除消息,设置为 false 则需要手动确认* 参数 3:消息接收到后回调*/channel.basicConsume(Producer.QUEUE_NAME, false, consumer);}
}

3)消费者 2

消费者 2 和消费者 1 代码除打印日志不同外,没有区别其他。

package com.zhangmingge.rabbitmq.work;import com.rabbitmq.client.*;
import com.zhangmingge.rabbitmq.ConnectionUtil;import java.io.IOException;public class Consumer2 {public static void main(String[] args) throws Exception {...DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body) throws IOException {try {...System.out.println("消费者 2-接收到的消息为:" + new String(body, "utf-8"));...} catch (InterruptedException e) {e.printStackTrace();}}};...}
}

测试

启动两个消费者,然后再启动生产者发送消息。到 IDEA 的两个消费者对应的控制台查看是否竞争性地接收到消息。

1556014310859

1556014318106

小结

在一个队列中如果有多个消费者,那么消费者之间对于同一个消息是竞争的关系。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/817165.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

定时中断基本结构

打开时钟-->配置 时钟源-->配置 时基单元-->配置 中断输出-->配置 NVIC-->启动 定时器 程序 void Timer_Init(void) {RCC_APB1PeriphClockCmd(RCC_APB1Periph_TIM2,ENABLE);/*配置时钟*/TIM_InternalClockConfig(TIM2);TIM_TimeBaseInitTypeDef TIM_TimeBaseIni…

定时器-输出比较PWM

打开时钟-->配置 时钟源-->配置 时基单元-->配置 输出比较单元-->配置 GPIO口 代码 void PWM_Init(void) {RCC_APB1PeriphClockCmd(RCC_APB1Periph_TIM2,ENABLE);RCC_APB2PeriphClockCmd(RCC_APB2Periph_GPIOA, ENABLE);/*配置 时钟*/TIM_InternalClockConfig(TIM2…

专题二:操作系统基本原理

1. 操作系统概述 操作系统:管理系统的硬件、软件、数据资源 控制程序运行 人机之间的接口 应用软件与硬件之间的接口进程管理 存储管理 文件管理 作业管理 设备管理 2. 进程管理 2.1. 进程状态(三态模型、五态模型) 2.2. ★★★信号量与PV操作★★★ 2.2.1. 前趋图 2.2.2.…

C++内存模型实践探索

C++对象模型是个常见、且复杂的话题,本文基于Itanium C++ ABI通过程序实践介绍了几种 简单C++继承 场景下对象模型,尤其是存在虚函数的场景,并通过图的方式直观表达内存布局。前言 C++对象模型是个常见、且复杂的话题,本文基于Itanium C++ ABI通过程序实践介绍了几种 简单C…

乘风破浪,扬帆出海,专门为英语学习者设计的在线学习平台之English Pod

什么是English Podhttps://learnenglishpod.comEnglish Pod是一个专门为英语学习者设计的在线学习平台,提供各种各样的英语学习播客(pod cast)和教学资源。其目标是帮助不同水平的学习者通过日常对话和实用内容提高英语听力、口语、词汇和语法能力。EnglishPod的课程通常包括对…

课堂练习

Complex.h中的代码:#include <iostream> #pragma once class Complex { public:Complex(double x=0, double y=0);Complex(const Complex& p);~Complex();void add(const Complex& p);double get_real() const;double get_imag() const;friend Complex add(cons…

乘风破浪,乘风出海,学习英语之English Pod

什么是English Podhttps://learnenglishpod.comEnglish Pod是一个专门为英语学习者设计的在线学习平台,提供各种各样的英语学习播客(podcast)和教学资源。其目标是帮助不同水平的学习者通过日常对话和实用内容提高英语听力、口语、词汇和语法能力。EnglishPod的课程通常包括对…

20222306 2024-2025-1 《网络与系统攻防技术》实验二实验报告

1.实验内容 1.1 实践目标 (1)使用netcat获取主机操作Shell,cron启动某项任务(任务自定) PS:cron是linux下用来周期性的执行某种任务或等待处理某些事件的一个守护进程 (2)使用socat获取主机操作Shell, 任务计划启动 (3)使用MSF meterpreter(或其他软件)生成可执行文件,利…

transformers 推理 Qwen2.5 等大模型技术细节详解(一)transformers 初始化和对象加载(文末免费送书)

本文详细讲解 transformers 推理大语言模型的初始化过程,包括 Python 包搜索、LazyModule 延迟模块、模块搜索和 Python 包 API 设计美学……上周收到一位网友的私信,希望老牛同学写一篇有关使用 transformers 框架推理大模型的技术细节的文章。 老牛同学刚开始以为这类的文章…

UI开发概述

★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★➤微信公众号:山青咏芝(MaoistLearning)➤博客园地址:为敢技术(https://www.cnblogs.com/strengthen/ )➤GitHub地址:https://github.com/strengthen➤原文地址:https://www.cnblogs…

MiGPT让你的小爱音响更聪明

大家好,我是晓凡。 今天要给大家带来一个超级有趣的开源项目MiGPT。 这个项目,简直就是给小爱音箱装上了超级大脑,让你的小爱音箱更聪明。 想象一下,当小爱音箱接入大模型后,上知天文,下知地理,从“人工智障”秒变学霸。 一、什么是MiGPTMiGPT是一个由idootop团队开发的…

基于amis后端低代码平台

写这个平台是为了解决多年对于项目的困扰,不想碰到新项目就重新来,通过业务模块的积累,能进行模块化安装。新的项目只需要安装模块就能搭建一套完整的业务系统。amis-api 的所有基础代码都以模块的形式组合在一起。这些模块可以随时从数据库中安装或卸载。这些模块有两大目的…