RocketMQ使用

说明:本文介绍RocketMQ的消费模式&消息类型,RocketMQ的安装参考及简单使用,参考:http://t.csdn.cn/BKFPj

消费模式

RocketMQ与RabbitMQ最大的区别在于,RocketMQ是根据消息的Topic锁定消费者的,Topic属性设置为相同的消费者,可以看做是一个消费者集群。消息模式分为以下三种:

(1)一对一

最简单的一种方式,消息的Topic只被一个消费者消费,如下:

(生产者)

    @Autowiredprivate RocketMQTemplate rocketMQTemplate;@Testpublic void simpleTest(){rocketMQTemplate.syncSend("simple","hello rocketmq!");}

(消费者)

@Component
@RocketMQMessageListener(consumerGroup = "groupA", topic = "simple")
public class ConsumerListener implements RocketMQListener<String> {@Overridepublic void onMessage(String s) {System.out.println("s = " + s);}
}

执行结果

在这里插入图片描述

(2)一对多

当存在多个Topic相同的消费者时,这些消费者共同消费消息,如下:

(开启两个消费者,Topic相同)

在这里插入图片描述

(生产者)

    @Testpublic void oneToMany(){for (int i = 0; i < 10; i++) {rocketMQTemplate.syncSend("simple","one to many" + i);}}

执行结果,可以看到负载均衡策略是随机;

在这里插入图片描述

在这里插入图片描述

(3)多对多

参考一对多方式,发送多个Topic的消息,让多种Topic的消费者接收消息;

消息类型

根据消息的类型和对消息的处理,可以分为以下几种:

(1)同步消息

同步消息,消息发送到MQ,MQ保存成功后才会返回结果,在API中是以"sync"(synchronous,同步)开头的一些方法,可以看到这些方法都有返回值,可以通过返回结果判断是否发送成功;
在这里插入图片描述

(消费者)

@Component
@RocketMQMessageListener(consumerGroup = "groupA", topic = "simple")
public class ConsumerListener implements RocketMQListener<String> {@Overridepublic void onMessage(String s) {System.out.println("接收到同步消息 = " + s);}
}

(生产者,可以通过返回结果判断发送是否成功)

	@Autowiredprivate RocketMQTemplate rocketMQTemplate;@Testpublic void simpleTest1(){SendResult sendResult = rocketMQTemplate.syncSend("simple", "这是一个异步消息");System.out.println("sendResult.getSendStatus() = " + sendResult.getSendStatus());}

在这里插入图片描述

(2)异步消息

异步消息,消息发送给MQ后代码就会立即向下执行,在API中是以“asyn”(asynchronous,异步),可以手动设置发送消息成功与否执行的方法;

(生产者)

    @Testpublic void simpleTest2() throws InterruptedException {rocketMQTemplate.asyncSend("simple", "这是一个异步消息", new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {System.out.println("成功信息" + sendResult.toString());}@Overridepublic void onException(Throwable throwable) {System.out.println("异常信息" + throwable.getMessage());}});TimeUnit.SECONDS.sleep(2);}

(发送消息成功,执行成功的方法)

在这里插入图片描述

需要注意,这里是指发送消息成功与否,与消费者是否成功消费无关;

(3)单向消息

单向消息,是指只管发送消息,不关系MQ是否成功接收,没有返回值;

    @Testpublic void simpleTest3() {rocketMQTemplate.sendOneWay("simple", "这是一个单向消息");}

(4)延迟消息

延迟消息,指给消息设置一个延迟级别,达到指定时间后,消费者才能收到这个消息,延迟级别如下:

# 延迟级别,从1开始
1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

(生产者)

    @Testpublic void simpleTest4() {// 设置超时为1秒,延迟等级为3,即10秒rocketMQTemplate.syncSend("simple", MessageBuilder.withPayload("这是一个延迟消息").build(),1000,3);}

(消费者,10秒后才收到消息)

在这里插入图片描述

延迟消息相较于RabbitMQ,使用起来更方便,但是只能设置时间等级,不能设置准确时间,非常难受;

(5)批量消息

RocketMQ可以发送一个集合,如下:

(消费者)

    @Testpublic void simpleTest5(){ArrayList<Message> list = new ArrayList<>();list.add(MessageBuilder.withPayload("aaa").build());list.add(MessageBuilder.withPayload("bbb").build());list.add(MessageBuilder.withPayload("ccc").build());rocketMQTemplate.syncSend("simple", list, 3000);}

(执行结果)

在这里插入图片描述

(6)消息过滤

消息过滤,是RocketMQ较与RabbitMQ独有的功能,指对发送的消息进行过滤,指接收限定条件的消息,对消息进行限制接收。有两种方式,如下:

a. 标签过滤

在发送消息时,指定topic的同时,加上一个标签,表示只发给有这个标签的消费者;

(生产者)

    @Testpublic void simpleTest6(){rocketMQTemplate.syncSend("simple:tag", "Tag Message");}

(消费者)

@Component
@RocketMQMessageListener(consumerGroup = "groupA", topic = "simple", selectorExpression = "tag1")
public class ConsumerListener implements RocketMQListener<String> {@Overridepublic void onMessage(String s) {System.out.println("接收到标签过滤消息 = " + s);}
}

(执行结果)

在这里插入图片描述

b. SQL过滤

另一种是SQL过滤的方式,在消费者这边,写SQL语句对消息进行过滤消息;

(生产者,设置name = SQL)

    @Testpublic void simpleTest6(){// 标签方式rocketMQTemplate.syncSend("simple:tag", "Tag Message");// SQL语句方式rocketMQTemplate.syncSend("simple",MessageBuilder.withPayload("SQL Message").setHeader("name","SQL").build());}

(消费者,只接受name = SQL的消息)

@Component
@RocketMQMessageListener(consumerGroup = "groupA", topic = "simple",selectorType = SelectorType.SQL92, selectorExpression = "name = 'SQL'")
public class ConsumerListener implements RocketMQListener<String> {@Overridepublic void onMessage(String s) {System.out.println("接收到SQL语句过滤消息 = " + s);}
}

(执行结果)

在这里插入图片描述

(7)对象消息

RocketMQ当然也可以发送对象作为消息,该对象应该要实现Serializable接口,如下:

import java.io.Serializable;public class User implements Serializable {private String username;private String password;public User() {}public User(String username, String password) {this.username = username;this.password = password;}public String getUsername() {return username;}public void setUsername(String username) {this.username = username;}public String getPassword() {return password;}public void setPassword(String password) {this.password = password;}@Overridepublic String toString() {return "User{" +"username='" + username + '\'' +", password='" + password + '\'' +'}';}
}

(生产者)

    @Testpublic void simpleTest7(){User user = new User();user.setUsername("zhangsan");user.setPassword("123456");rocketMQTemplate.syncSend("simple", user);}

(消费者)

@Component
@RocketMQMessageListener(consumerGroup = "groupA", topic = "simple")
public class ConsumerListener implements RocketMQListener<User> {@Overridepublic void onMessage(User user) {System.out.println("user = " + user);}
}

(执行结果)

在这里插入图片描述

(8)顺序消息

顺序消息,是指消息从发送到被消费,需要始终保持前后顺序。如下,发送15次消息,可以看到消费者那边的消费顺序并不是一直的;

    @Testpublic void simpleTest1() {for (int i = 0; i < 15; i++) {rocketMQTemplate.syncSend("simple", "这是一个同步消息===>" + i);}}

在这里插入图片描述

顺序消息,需要保证以下两方面:

  • 所有的消息存入到MQ中的同一个队列中,因为RocketMQ默认有四个队列,消息会被负载均衡存储在这些队列里;

  • 该队列只能被一个线程消费,因为一个队列的消息在消费时会有多个线程同时进行消费;

前者可以通过,XxxOrderly()方法实现消息在队列中的顺序存储,如下:

(生产者:给对象设置一个ID,让它们按照ID顺序存储在MQ中)

    @Testpublic void simpleTest8(){ArrayList<User> users = new ArrayList<>();User user1 = new User("1","zhangsan","zs");User user2 = new User("2","lisi","ls");User user3 = new User("3","wangwu","ww");users.add(user1);users.add(user2);users.add(user3);for (User user : users) {rocketMQTemplate.syncSendOrderly("simple",user,user.getId());}}

后者,可以通过在消费者这边添加这个配置,保证消息被顺序消费,如下:

(消费者,设置消费模式 consumeMode = ConsumeMode.ORDERLY

@Component
@RocketMQMessageListener(consumerGroup = "groupA", topic = "simple",consumeMode = ConsumeMode.ORDERLY)
public class ConsumerListener implements RocketMQListener<User> {@Overridepublic void onMessage(User user) {System.out.println(user);}
}

执行结果,可以看到消息时顺序进行的

在这里插入图片描述

总结

RocketMQ的内容还有很多,可参考 http://t.csdn.cn/QXQNZ

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/62884.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

什么是BitTorrent协议?

BitTorrent 是一个 P2P (Peer-to-Peer) 通信协议&#xff0c;它设计用于分发数据和电子文件在互联网上。BitTorrent 是目前世界上最流行的 P2P 文件共享协议之一&#xff0c;以下是对 BitTorrent 协议的详细解释&#xff1a; 种子和对等体&#xff1a; 种子&#xff08;Torrent…

Centos7.9安装lrzsz进行文件传输---Linux工作笔记059

这里咱们lrzsz命令,需要用来进行文件传输,因为如果不安装这个命令的话,那么 传输安装包什么的就不方便因为只有少数传输工具,才支持,直接拖拽的.没有的时候就可以用这个工具,用命令来传输 直接就是: sz 文件名 就可以把文件下载下来 rz 选择一个文件, 就可以把文件上传到当…

dingding机器人

“自定义机器人”只支持消息发送&#xff0c;自动回复需要“企业内部机器人” 消息发送 import requests import jsonres requests.post(https://oapi.dingtalk.com/robot/send?access_token036a339axxx,data json.dumps({"text": {"content":"h…

LVS-DR模式集群构建过程演示

一、工作原理 LVS的工作原理 1.当用户向负载均衡调度器&#xff08;Director Server&#xff09;发起请求&#xff0c;调度器将请求发往至内核空间 2.PREROUTING链首先会接收到用户请求&#xff0c;判断目标IP确定是本机IP&#xff0c;将数据包发往INPUT链 3.IPVS是工作在IN…

图像多目标跟踪

目标跟踪&#xff08;Object Tracking&#xff09;是自动驾驶中常见的任务&#xff0c;根据跟踪目标数量的不同&#xff0c;目标跟踪可分为&#xff1a; 单目标跟踪&#xff08;Single Object Tracking&#xff0c;SOT&#xff09;多目标跟踪&#xff08;Multi-Objects Tracki…

Ubuntu18.04版本安装ROS及出现错误的处理方法

前面的文章是在已安装的ROS基础上做的一些应用&#xff0c;这里我们从零开始安装ROS机器人操作系统。 机器人操作系统(Robot Operating System,ROS)是一个开发机器人软件的框架&#xff0c;里面包含了一系列的工具&#xff0c;库和惯例&#xff0c;目的在于简化在大量不同种类机…

系统架构设计专业技能 · 软件工程(一)【系统架构设计师】

系列文章目录 系统架构设计高级技能 软件架构概念、架构风格、ABSD、架构复用、DSSA&#xff08;一&#xff09;【系统架构设计师】 系统架构设计高级技能 系统质量属性与架构评估&#xff08;二&#xff09;【系统架构设计师】 系统架构设计高级技能 软件可靠性分析与设计…

修改第三方组件默认样式

深度选择器 修改el-input的样式&#xff1a; <el-input class"input-area"></el-input>查看DOM结构&#xff1a; 原本使用 /deep/ 但是可能不兼容 使用 :deep .input-area {:deep(.el-input__inner){background-color: blue;} }将 input 框背景色改为…

基于STM32微控制器的物联网(IoT)节点设计与实现

基于STM32微控制器的物联网(IoT)节点的设计和实现。我们讨论物联网节点的基本概念和功能,并详细介绍了STM32微控制器的特点和优势。然后,我们将探讨如何使用STM32开发环境和相关的硬件模块来设计和实现一个完整的物联网节点。最后,我们将提供一个示例代码,展示如何在STM3…

JAVA(一)

我的第一个JAVA程序 以下我们通过一个简单的实例来展示Java编程,创建文件HelloWorld.java&#xff08;文件名需与类名一致&#xff09;,代码如下 实例 public class HelloWorld{public static void main(String[] args){System.out.println(Hello World) } } 注:Srting ar…

ARP协议原理与应用

ARP协议原理与应用 一、ARP协议概述1.1、场景描述1.2、ARP协议概述 二、ARP协议工作原理2.1、ARP工作流程2.2、ARP工作原理2.3、ARP缓存表 三、ARP协议分类3.1、免费ARP&#xff08;Gratuitous ARP &#xff09;3.2、代理ARP&#xff08;Proxy ARP &#xff09;3.3、RARP与IARP…

9.2.2Socket(TCP)

一.过程: 1.建立连接(不是握手),虽然内核中的连接有很多,但是在应用程序中,要一个一个处理. 2. 获取任务:使用ServerSocket.accept()方法,作用是把内核中的连接获取到应用程序中,这个过程类似于生产者消费者模型. 3. 使用缓冲的时候,注意全缓冲和行缓冲. 4.注意关闭文件资源…