RocketMQ 源码分析——Producer

文章目录

  • 消息发送代码实现
  • 消息发送者启动流程
    • 检查配置
    • 获得MQ客户端实例
    • 启动实例
    • 定时任务
  • Producer 消息发送流程
    • 选择队列
      • 默认选择队列策略
      • 故障延迟机制策略*
      • 两种策略的选择
  • 技术亮点:ThreadLocal

消息发送代码实现

下面是一个生产者发送消息的demo(同步发送)

image.png

主要做了几件事:

  • 初始化一个生产者(DefaultMQProducer)对象
  • 设置 NameServer 的地址
  • 启动生产者
  • 发送消息

消息发送者启动流程

image.png

DefaultMQProducerImpl类start()

image.png

检查配置

DefaultMQProducerImpl

image.png

获得MQ客户端实例

整个JVM中只存在一个MQClientManager实例,维护一个MQClientInstance缓存表。DefaultMQProducerImpl类start()

image.png

一个clientId只会创建一个MQClientInstance

image.png

clientId生成规则:IP@instanceName@unitName

image.png

RocketMQ中消息发送者、消息消费者都属于”客户端“。每一个客户端就是一个MQClientInstance,每一个ClientConfig对应一个实例。

不同的生产者、消费端如果引用同一个客户端配置(ClientConfig),则它们共享一个MQClientInstance实例。所以我们在定义的的时候要注意这种问题(生产者和消费者如果分组名相同容易导致这个问题)

image.png

启动实例

MQClientInstance类start()

image.png

定时任务

MQClientInstance类startScheduledTask()

image.png

Producer 消息发送流程

我们从一个生产者案例的代码进入代码可知:DefaultMQProducerImpl中的sendDefaultImpl()是生产者消息发送的核心方法

image.png

image.png

从核心方法可知消息发送就是4个步骤:验证消息、查找路由、选择队列、消息发送。

image.png

image.png

选择队列

默认选择队列策略

采用了最简单的轮询算法,这种算法有个很好的特性就是,保证每一个Queue队列的消息投递数量尽可能均匀。这种算法只要消息投递过程中没有发生重试的话,基本上可以保证每一个Queue队列的消息投递数量尽可能均匀。当然如果投递中发生问题,比如第一次投递就失败,那么很大的可能性是集群状态下的一台Broker挂了,所以在重试发送中进行规避。这样设置也是比较合理的。

故障延迟机制策略*

采用此策略后,每次向Broker成功或者异常的发送,RocketMQ都会计算出该Borker的可用时间(发送结束时间-发送开始时间,失败的按照30S计算),并且保存,方便下次发送时做筛选。

image.png

除了记录Broker的发送消息时长之外,还要计算一个Broker的不可用时长。这里采用一个经验值:

如果发送时长在550ms之内,不可用时长为0。

达到550ms,不可用时长为30S

达到1000ms,不可用时长为60S

达到2000ms,不可用时长为120S

达到3000ms,不可用时长为180S

达到15000ms,不可用时长为600S

image.png

image.png

有了以上的Broker规避信息后发送消息就非常简单了。

在开启故障延迟机制策略步骤如下:

  1. 根据消息队列表时做轮训
  2. 选好一个队列
  3. 判断该队列所在Broker是否可用
  4. 如果是可用则返回该队列,队列选择逻辑结束
  5. 如果不可用,则接着步骤2继续
  6. 如果都不可用,则随机选一个

代码如下:

image.png

两种策略的选择

从这种策略上可以很明显看到,默认队列选择是轮训策略,而故障延迟选择队列则是优先考虑消息的发送时长短的队列。那么如何选择呢?

首先RocketMQ默认的发送失败有重试策略,默认是2,也就是如果向不同的Broker发送三次都失败了那么这条消息的发送就失败了,作为RocketMQ肯定是尽力要确保消息发送成功。所以给出以下建议。

如果是网络比较好的环境,推荐默认策略,毕竟网络问题导致的发送失败几率比较小。

如果是网络不太好的环境,推荐故障延迟机制,消息队列选择时,会在一段时间内过滤掉RocketMQ认为不可用的broker,以此来避免不断向宕机的broker发送消息,从而实现消息发送高可用。

当然以上成立的条件是一个Topic创建在2个Broker以上的的基础上。

技术亮点:ThreadLocal

image.png

image.png

image.png

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/115193.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

2023华为杯E题:出血性脑卒中临床智能诊疗建模(不断更新)

文章目录 一、 背景介绍二、 数据集介绍及建模目标第一题:血肿扩张风险相关因素探索建模。第一问要求第一问解题思路第二问第二问解体思路 第二题:血肿周围水肿的发生及进展建模,并探索治疗干预和水肿进展的关联关系第一问第一问思路第二问第…

Mojo编程语言是AI人工智能的新的编程语言

Mojo是Chris Lattner的创业公司Modular开发的一种新的编程语言,旨在统一AI基建和异构计算。Mojo被认为是Python的超集,兼容Python生态,但添加了系统编程和编译期优化的特性,以提高性能和部署效率。Mojo基于MLIR,可以支…

2、Elasticsearch 基础功能

第3章 Elasticsearch 基础功能 以 8.X 版本为基础通过 Kibana 软件给大家演示基本操作。 3.1 索引操作 3.1.1 创建索引 ES 软件的索引可以类比为 MySQL 中表的概念,创建一个索引,类似于创建一个表。 查询完成后,Kibana 右侧会返回响应结果及请…

【C语言】结构体内存对齐机制详解

目录 一、前言二、结构体内存对齐规则三、实例解析 一、前言 在讲解结构体内存对齐机制之前,我们先来看1个例子: typedef struct {char sex; // 性别int id; // 学号char name[20]; // 姓名float score; // 成绩char addr[30]; …

基于HTML5架构的综合管廊系统网络结构设计

摘 要:从网络拓扑结构、开放式实时以太网协议、控制层系统配置方面介绍了综合管廊的系统网络架构设计,分析了无线网络特性,阐述了基于HTML5架构所能实现的功能的初步构想,以便于综合管廊运维人员巡检,确保管廊本体安全…

基于TensorFlow+CNN+协同过滤算法的智能电影推荐系统——深度学习算法应用(含微信小程序、ipynb工程源码)+MovieLens数据集(七)

目录 前言总体设计系统整体结构图系统流程图 运行环境模块实现1. 模型训练1)数据集分析2)数据预处理3)模型创建4)模型训练5)获取特征矩阵 2. 后端Django3. 前端微信小程序1)小程序全局配置文件2&#xff09…

怎样找到NPM里面开源库下载地址

场景 最近帮忙找一个开源库地址。这里以vue/language-core为例子。 解决 https://registry.npmmirror.com/vue/language-core/1.8.13这里就是如下格式: https://registry.npmmirror.com/{包名}/{版本号}打开这个页面后,得到开源库下载地址&#xff0c…

使用Postman如何在接口测试前将请求的参数进行自定义处理

1、前言 当我们使用 Postman 进行接口测试时,对于简单的不需要处理的接口,直接请求即可,但是对于需要处理的接口,如需要转码、替换值等,则就麻烦一些,一般我们都是先手动把修改好的值拷贝到请求里再进行请…

C++【个人笔记1】

1.C的初识 1.1 简单入门 #include<iostream> using namespace std; int main() {cout << "hello world" << endl;return 0; } #include<iostream>; 预编译指令&#xff0c;引入头文件iostream.using namespace std; 使用标准命名空间cout …

IC芯片测试:如何对芯片静态功耗进行测试?

静态功耗也叫静态电流&#xff0c;是指芯片在静止状态下的电流或者是指芯片在不受外界因素影响下自身所消耗的电流。静态功耗对于芯片来说是衡量一款芯片的功耗与效率非常重要的指标。 传统手动测试静态功耗只需在芯片的输入端串上一台万用表&#xff0c;然后对芯片各个端口添加…

Vue 安装与创建第一Docker的项目

1. 下载nodejs 并且安装 Node.js 2. 打开命令窗口&#xff0c;验证是否安装成功 C:\Users\Harry>node -v v18.16.0C:\Users\Harry>npm -v 9.5.1 3. 安装Vue CLI C:\Users\Harry>npm install -g vue/cli 经过不算漫长的等待&#xff0c;你的Vue CLI就装好了。确认一下…

嵌入式Linux驱动开发(I2C专题)(七)

使用GPIO操作I2C设备_IMX6ULL 参考资料&#xff1a; Linux文档 Linux-5.4\Documentation\devicetree\bindings\i2c\i2c-gpio.yamlLinux-4.9.88\Documentation\devicetree\bindings\i2c\i2c-gpio.txt Linux驱动源码 Linux-5.4\drivers\i2c\busses\i2c-gpio.cLinux-4.9.88\driv…