RocketMQ实践与原理分析(Docker安装RocketMQ)

前言

QBM之前使用的消息中间件是ActiveMQ,后续需要升级为RocketMQ。

MQ广泛应用于很多业务场景中,主要的作用

  • 异步解耦
  • 削峰

常用MQ中间件对比,参考官方文档:https://rocketmq.apache.org/zh/docs/4.x/introduction/03whatis

协议和特点消息有序性定时消息批量消息广播消息消息过滤服务器触发的重新投递消息存储
ActiveMQPush model, support OpenWire, STOMP, AMQP, MQTT, JMSExclusive (独自)Consumer or Exclusive Queues can ensure orderingSupportedNot SupporedSupportedSupportedNot SupportedSupports very fast persistence using JDBC along with a high performance journal,such as levelDB, kahaDB
KafkaPull model, support TCPEnsure ordering of messages within a partitionNot SupportedSupported, with async producerNot SupportedSupported, you can use Kafka Streams to filter messagesNot SupportedHigh performance file storage
RocketMQPull model, support TCP, JMS, OpenMessagingEnsure strict ordering of messages,and can scale out gracefullySupportedSupported, with sync mode to avoid message lossSupportedSupported, property filter expressions based on SQL92SupportedHigh performance and low latency file storage

通过学习并结合业务知识,重点思考的问题:

  • 顺序性消费?顺序消费场景某个消息失败导致消息挤压?
  • 消息的挤压?如何根据业务划分topic和tag?相同tag分group?,同一业务消息有相同的key?
  • 消息消费的多线程问题?某个业务场景比如财务需要单线程消费?

目录

  • 概述
  • 实践
  • 原理分析

一、概述

RocketMQ是由阿里捐赠给Apache的一款低延迟、高并发、高可用、高可靠的分布式消息中间件。经历了淘宝双十一的洗礼。RocketMQ既可为分布式应用系统提供异步解耦和削峰填谷的能力,同时也具备互联网应用所需的海量消息堆积、高吞吐、可靠重试等特性。

基于最基础的发布订阅模型,而在实际的应用中,结构会更复杂。例如为了支持高并发和水平扩展,中间的消息主题需要进行分区(对应Message Queue),同一个Topic会有多个生产者,同一个信息会有多个消费者,消费者之间要进行负载均衡等。
image.png

ps:存储消息Topic的 代理服务器( Broker ),是实际部署过程对应的代理服务器。

核心概念

  • Group:一类生产者或者消费者,每个包含GroupID
  • Producer:消息发布者,RocketMQ提供多种发送方式,同步发送、异步发送、顺序发送、单向发送。
  • Consumer:消息订阅者,从Broker服务器拉取消息、并将其提供给应用程序。从用户应用的角度而言提供了两种消费形式:拉取式消费、推动式消费
  • Message Queue:每个Topic下会由一个到多个队列来存储消息,RocketMQ 对 Topic 进行了分区,这种操作被称为队列(MessageQueue)。
    • 死信队列:用于处理无法被正常消息的消息,当一条信息初次消费失败,消息队列RocketMQ会自动进行消息重试,达到重试最大次数仍然失败的话,若消费仍然失败,该消息不会被丢弃,而是直接发到设置的该Consumer对应的死信队列里面。
  • Topic:消息主题
  • Message:生产者向Topic发送并最终传给消费者的数据消息载体,生产者为消息定义的属性成为消息属性,包含Message Key和Tag,MQ本身会生成一个Message ID。
    • Message Key:消息的业务标识,由消息生产者设置,唯一标识某个业务逻辑。
    • Message ID:消息的全局唯一标识,由RocketMQ系统自动生成。
    • Tag:消息标签,Topic下的进一步区分。
  • Topic Partition:分区,物理上的概念,每个Topic包含一个或者多个分区。
  • 消费位点:每个Topic会有很多分区,每个分区会统计当前消息的总条数,这个称为最大位点MaxOffset,分区开始的消费位点为MinOffset。
    • 重置消费位点:以时间轴为坐标,在消息持久化存储的时间范围内(默认3天),重新设置Consumer对已订阅Topic的消费进度,设置完成后Consumer将接受设定时间点之后由Producer发送到消息队列服务端的消息。

其他消息类型相关概念

  • 事务性消息:Exactly-Once:Consumer消费一次仅能消费一次。
  • 集群消息:相同的ConsumerGroup下的消息消费,每个Consumer按照GroupID均分消费。
  • 广播消息:相同的ConsumerGroup下的消息消费,每个Consumer按照GroupID全量消费。
  • 定时消息:Producer将消息发送到RocketMQ服务端,不期望消息被马上投递,而是推迟到当前时间点之后某一个时间点才发给Consumer。
  • 延时消息:Producer将消息发送到RocketMQ服务端,不期望消息被马上投递,而是推迟一定时间才发给Consumer。
  • 事务消息:类似X/Open XA的分布式事务功能。
  • 顺序消息:一种按照顺序进行发布和消费的消息模型,分为全局顺序消息和分区顺序消息。
    • 全局顺序消息:对于指定的一个Topic,所有的消息按照严格的FIFO的顺序进行发布和消费。
    • 分区顺序消息:对于指定的一个Topic,所有消息根据Sharding Key进行区块分区,同一分区内的消息按照严格的FIFO顺序进行发布的消费。Sharding Key是顺序消息中用来区分不同分区的关键字段,和普通消息的Message Key是完全不同的概念。

其他消息相关概念

  • 消息堆积:消息被堆积在了RocketMQ的服务端,Consumer没有能力消费来不及消费。
  • 消息过滤:Consumer可以根据Tag对消息进行过滤,确保Consumer最终只能接受到被过滤后的消息,过滤操作是在服务端完成的。
  • 消息轨迹:消息从生产到消费过程的链路追钟,方便定位排查问题。

参考:

  • https://rocketmq.apache.org/docs/quickStart/01quickstart
  • https://developer.aliyun.com/article/780968

二、实践

安装RocketMQ参考:https://rocketmq.apache.org/docs/quickStart/01quickstart

容器安装RocketMQ,需要分开安装Nameserver容器和Broker容器以及控制台Console容器,其中Nameserver和Broker的连接通过broker.conf

这样做是为了解耦和方便管理:https://juejin.cn/post/7218438764100108325

开发测试直接使用docker安装

# 拉取镜像
docker pull rocketmqinc/rocketmq# 一、启动NameServer容器,创建一个新的容器并指定 RocketMQ 的镜像
docker run -d \
--name rmqnamesrv \
-p 9876:9876 \
-v /home/docker/mydata/rocketmq/conf:/root/config \
-v /home/docker/mydata/rocketmq/logs:/root/logs \
-e "JAVA_OPTS=-Duser.home=/opt" \
rocketmqinc/rocketmq \
sh mqnamesrv # 参数说明:
-d 以守护线程方式启动
--name rmqnamesrv 设置容器名称
-p 9876:9876 端口映射
-v 把容器内的/root/logs日志路径挂载到宿主机的自定义路径中(需根据自己的路径自行创建)
-v 把容器内的/root/store数据存储目录挂载到宿主机的自定义目录(需根据自己的路径自行创建)
rocketmqinc/rocketmq 使用镜像的名称
sh mqnamesrv 执行name server脚本# 进入容器
docker exec -it d60b /bin/bash# 修改broker.conf文件,设置通信的brokerIP
vi ... /conf/broker.conf,然后添加brokerIP1 = xxx.xxx.xxx.xxx,内容为宿主机的IP# broker.conf的其他配置项# 所属集群名称,如果节点较多可以配置多个
brokerClusterName = DefaultCluster
#broker名称,master和slave使用相同的名称,表明他们的主从关系
brokerName = broker-a
#0表示Master,大于0表示不同的slave
brokerId = 0
#表示几点做消息删除动作,默认是凌晨4点
deleteWhen = 04
#在磁盘上保留消息的时长,单位是小时
fileReservedTime = 48
#有三个值:SYNC_MASTER,ASYNC_MASTER,SLAVE;同步和异步表示Master和Slave之间同步数据的机制;
brokerRole = ASYNC_MASTER
#刷盘策略,取值为:ASYNC_FLUSH,SYNC_FLUSH表示同步刷盘和异步刷盘;SYNC_FLUSH消息写入磁盘后才返回成功状态,ASYNC_FLUSH不需要;
flushDiskType = ASYNC_FLUSH# 回到宿主机,将broker.conf拷贝到宿主机
# nameserver容器内配置文件/opt/rocketmq-4.4.0/confdocker cp d60b:/opt/rocketmq-4.4.0/conf/broker.conf /home/docker/mydata/rocketmq/conf/broker.conf# 二、启动Broker容器docker run -d  \
--name rmqbroker \
--link rmqnamesrv:namesrv \
-p 10911:10911 \
-p 10909:10909 \
-v  /home/docker/mydata/rocketmq/broker/logs:/root/logs \
-v  /home/docker/mydata/rocketmq/broker/store:/root/store \
-v /home/docker/mydata/rocketmq/conf/broker.conf:/opt/rocketmq-4.4.0/conf/broker.conf \
-e "NAMESRV_ADDR=namesrv:9876" \
-e "MAX_POSSIBLE_HEAP=200000000" \
rocketmqinc/rocketmq \
sh mqbroker -c  ../conf/broker.conf # 参数说明
--link rmqnamesrv:namesrv  和rmqnamesrv容器通信
-p 10911:10911 把容器的非vip通道端口挂载到宿主机
-p 10909:10909 把容器的vip通道端口挂载到宿主机
-e “NAMESRV_ADDR=namesrv:9876”  指定namesrv的地址为本机namesrv的ip地址:9876
-e “MAX_POSSIBLE_HEAP=200000000” rocketmqinc/rocketmq sh mqbroker 指定broker服务的最大堆内存(暂未配置)
sh mqbroker -c  ../conf/broker.conf  读取../conf/broker.conf配置并启动broker# 三、安装控制台
docker pull styletang/rocketmq-console-ngdocker run -d \
-p 8081:8080 \
-e "JAVA_OPTS=-Drocketmq.config.namesrvAddr=120.46.82.131:9876 -Drocketmq.config.isVIPChannel=false" \
styletang/rocketmq-console-ng# 四、访问控制台(别忘了开8081防火墙)
xxx.xxx.xxx.xxx:8081

三、原理分析

参卡:官网文档:https://rocketmq.apache.org/zh/docs/4.x/introduction/03whatis

3.1、RocketMQ部署模型

Producer、Consumer又是如何找到Topic和Broker的地址呢?消息的具体发送和接收又是怎么进行的呢?

RocketMQ部署架构上主要分为四个部分
image.png

  • 生产者Producer:通过 MQ 的负载均衡模块选择相应的 Broker 集群队列进行消息投递,投递的过程支持快速失败和重试。
  • 消费者Consumer:消费消息角色,支持推Push、拉Pull两种模式对消息消费,同时也支持集群方式和广播方式的消费。
  • 域名服务器NameServer:一个简单的Topic路由注册中心,支持Topic、Broker的动态注册与发现。一般集群部署,各实例间相互不进行信息通讯,集群中的每个NameServer都全量的保存完整的路由信息,某个NameServer下线也不影响可用性。主要包括两个功能
    • Broker管理:接受Broker集群的注册信息并保存下来作为路由信息,然后提供心跳检测机制,检测Broker是否还存活。
    • 路由信息管理:保存关于Broker集群的整个路由信息 和 用于客户端查询的队列信息。
  • **代理服务器Broker:**主要负责消息的存储、投递和查询以及服务高可用保证。因为各个Broker中的信息不一样,不能简单像NameServer那样直接集群部署,而是需要采取主从模式集群架构。Broker采取Master-Slave结构,通过指定相同的BrokerName、不同的BrokerId来区分主(BrokerId = 0)、从(BrokerId = 1)

小结

  • Broker注册:每个 Broker 与 NameServer 集群中的所有节点建立长连接,定时注册 Topic 信息到所有 NameServer。
  • Producer注册:Producer 与 NameServer 集群中的其中一个节点建立长连接,定期从 NameServer 获取Topic路由信息,并向提供 Topic 服务的 Master 建立长连接,且定时向 Master 发送心跳。Producer 完全无状态。
  • Consumer注册:Consumer 与 NameServer 集群中的其中一个节点建立长连接,定期从 NameServer 获取 Topic 路由信息,并向提供 Topic 服务的 Master、Slave 建立长连接,且定时向 Master、Slave发送心跳。Consumer 既可以从 Master 订阅消息,也可以从Slave订阅消息。

// TODO

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/110334.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

mysql课堂笔记 mac

目录 启动mac上的mysql 进入mysql mac windows 创建数据库 创建表 修改字段数据类型 修改字段名 增加字段 删除字段 启动mac上的mysql sudo /usr/local/mysql/support-files/mysql.server start 直接输入你的开机密码即可。 编辑 进入mysql mac sudo /usr/local…

博客系统(升级(Spring))(四)(完)基本功能(阅读,修改,添加,删除文章)(附带项目)

博客系统 (三) 博客系统博客主页前端后端个人博客前端后端显示个人文章删除文章 修改文章前端后端提取文章修改文章 显示正文内容前端后端文章阅读量功能 添加文章前端后端 如何使用Redis项目地点: 博客系统 博客系统是干什么的? CSDN就是一…

去耦电路设计应用指南(三)磁珠/电感的噪声抑制

(三)磁珠/电感的噪声抑制 1. 电感1.1 电感频率特性 2. 铁氧体磁珠3. LC 型和 PI 型滤波 当去耦电容器不足以抑制电源噪声时,电感器&磁珠/ LC 滤波器的结合使用是很有效的。扼流线圈与铁氧体磁珠 是用于电源去耦电路很常见的电感器。 1. …

MC-4/11/01/400 ELAU 软件允许用户完全访问相机设置

MC-4/11/01/400 ELAU 软件允许用户完全访问相机设置 一个完整的Sentinel模具保护解决方案包括一到四台冲击式摄像机、专用红外LED照明和镜头、Sentinel软件以及所有与模压机连接的必要互连组件。摄像机支架基于磁性,可快速、安全、灵活地部署。此外,一个…

Scanner类用法(学习笔记)

Scanner类用法(学习笔记,后续会补充) 1.next()用法 package com.yushifu.scanner; import java.util.Scanner;//util java工具包 //Scanner类(获取用户的输入) Scanner s new Scanner&#…

【深度学习】 Python 和 NumPy 系列教程(十):NumPy详解:2、数组操作(索引和切片、形状操作、转置操作、拼接操作)

目录 一、前言 二、实验环境 三、NumPy 0、多维数组对象(ndarray) 1. 多维数组的属性 1、创建数组 2、数组操作 1. 索引和切片 a. 索引 b. 切片 2. 形状操作 a. 获取数组形状 b. 改变数组形状 c. 展平数组 3. 转置操作 a. 使用.T属性 b…

VIRTIO-SCSI代码分析(1)VIRTIO SCSI设备模拟

VIRTIO SCSI设备的模拟是通过QEMU实现的,除了呈现SCSI设备外,它同样也是PCIE设备。QEMU中定义了VIRTIO SCSI设备如下所示: TYPE_DEVICE -> TYPE_VIRTIO_DEVICE -> TYPE_VIRTIO_SCSI_COMMON ->TYPE_VIRTIO_SCSI 其中前面为父设备&am…

【SpringCloud微服务项目学习-mall4cloud项目(1)】——环境部署,构建与运行

环境部署,构建与运行 mall4cloud项目介绍源码地址 开发环境搭建pom搭建项目运行前端运行 mall4cloud项目介绍 mall4j商城系统 首先介绍一下mall4j,是一个基于spring boot、spring oauth2.0、mybatis、redis的轻量级、前后端分离、防范xss攻击、拥有分布…

数据库开发-MySQL基础DQL和多表设计

1. 数据库操作-DQL DQL英文全称是Data Query Language(数据查询语言),用来查询数据库表中的记录。 1.1 介绍 查询关键字:SELECT 查询操作是所有SQL语句当中最为常见,也是最为重要的操作。在一个正常的业务系统中,查询操作的使…

怎么实现一个登录时需要输入验证码的功能

最近给项目换了一个登录页面,而这个登录页面设计了验证码,于是想着把这个验证码功能实现一下吧。 这篇文章就如何实现登录时的验证码验证功能进行详细地介绍。 目录 页面效果 实现思路 生成验证码的控制器类 前端页面代码 后端登录代码 UserLoginD…

JL-A/41 JL-A/42 JL-A/43 集成电路电流继电器 过负荷或短路 JOSEF约瑟

JL-A、B集成电路电流继电器 JL-A/11 JL-A/31 JL-A/12 JL-A/32 JL-A/13 JL-A/33 JL-A/21 JL-A/22 JL-A/23 JL-A/34 JL-A/35 JL-B/41 JL-A/42 JL-B/43 JL-B/11 JL-B/31 JL-B/12 JL-B/32 JL-B/13 JL-B/33 JL-B/21 JL-B/22 JL-B/23 JL-B/34 JL-B/35 JL-B/41 JL-B/42 …

[NLP] LLM---<训练中文LLama2(一)>训练一个中文LLama2的步骤

一 数据集 【Awesome-Chinese-LLM中文数据集】 【awesome-instruction-dataset】【awesome-instruction-datasets】【LLaMA-Efficient-Tuning-数据集】Wiki中文百科(25w词条)wikipedia-cn-20230720-filteredBaiduBaiKe(563w词条) …