RocketMQ实战—7.生产集群部署和生产参数

news/2025/2/8 21:59:19/文章来源:https://www.cnblogs.com/mjunz/p/18705472

大纲

1.RocketMQ生产集群部署和生产参数分析

2.RocketMQ生产集群10wTPS压测

3.RocketMQ生产级故障案例

 

1.RocketMQ生产集群部署和生产参数分析

(1)服务器数量

4C8G阿⾥云⾼配服务器共四台,公⽹IP假设如下:

139.224.217.92,106.15.250.248,47.102.152.14,139.224.212.58

(2)集群规划

采取2台NameServer,2组MasterSlave Broker的部署结构。

(3)环境搭建

一.下载安装包

https://www.apache.org/dyn/closer.cgi?path=rocketmq/4.9.2/rocketmq-all-4.9.2-bin-release.zip

或者直接在服务器:

$ wget https://archive.apache.org/dist/rocketmq/4.9.2/rocketmq-all-4.9.2-bin-release.zip
$ wget https://mirror.bit.edu.cn/apache/rocketmq/4.9.2/rocketmq-all-4.9.2-bin-release.zip

以上两个链接都可,后⾯的链接是在中国的镜像服务,速度会快⼀些。

 

二.解压安装包

这⾥下载的包解压路径是:

$ ll
usr/local/rocketmq/rocketmq-4.9.2

解压后的包结构如下:

$ ls
benchmark  bin  conf  lib  LICENCE  NOTICE  README.md

接下来要选择⼀套集群模式来部署,官⽅提供配置⽂件内容中,集群模式有以下⼏种:2m-2s-async(2主2从异步)、2m-2s-sync(2主2从同步)、2m-noslave(2主)。这里采取官⽅推荐的第⼀种模式,2m-2s-async(2主2从异步)的模式。

 

三.前置⼯作:修改/etc/hosts⽂件,修改IP映射

$ vi /etc/hosts
139.224.217.92 rocketmq-nameserver1 
139.224.217.92 rocketmq-master1-slave 
106.15.250.248 rocketmq-nameserver2 
106.15.250.248 rocketmq-master2-slave 
47.102.152.14 rocketmq-master1 
139.224.212.58 rocketmq

四.修改Broker-Master配置和Broker-slave配置

步骤⼀:进⼊Broker-Master的conf/2m-2s-async这个配置⽂件⽬录,执⾏vim broker-a.properties命令编辑配置⽂件,把以下内容粘贴进配置⽂件中:

# 集群名称,如果公司有多组MQ集群,建议起不同名称 
brokerClusterName=rocketmq-cluster 
# Broker名称 
brokerName=broker-a 
# 0表示的是当前是Master节点,⼤于0,则表示是Slave节点 
brokerId=0 
# 删除⽂件时间点,默认凌晨是4点 
deleteWhen=04 
# ⽂件保留时间,默认保留48⼩时,我们这⾥设置为168⼩时也就是⼀周时间 
fileReservedTime=168 
# ASYNC_MASTER 异步复制Master 
# SYNC_MASTER 同步双写Master 
brokerRole=ASYNC_MASTER 
# 刷盘⽅式,ASYNC_FLUSH异步刷盘 
# SYNC_FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH 
# Broker 对外服务的监听端⼝ 
listenPort=10911 
# 是否允许 Broker ⾃动创建Topic,⼀般线下可以开启,线上最好关闭 
autoCreateTopicEnable=true 
# 是否允许 Broker ⾃动创建订阅组,⼀般线下可以开启,线上最好关闭 
autoCreateSubscriptionGroup=true 
# nameServer地址;分号分割 
namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876 
# 存储路径 
storePathRootDir=/usr/local/rocketmq/rocketmq-4.9.2/store/broker-a 
# commitLog 存储路径 
storePathCommitLog=/usr/local/rocketmq/rocketmq-4.9.2/store/broker-a/commitlog 
# 消费队列存储路径存储路径 
storePathConsumeQueue=/usr/local/rocketmq/rocketmq-4.9.2/store/broker-a/consumequeue 
# 消息索引存储路径 
storePathIndex=/usr/local/rocketmq/rocketmq-4.9.2/store/broker-a/index 
# checkpoint 检查点⽂件存储路径 
storeCheckpoint=/usr/local/rocketmq/rocketmq-4.9.2/store/checkpoint 
# abort ⽂件存储路径 
abortFile=/usr/local/rocketmq/rocketmq-4.9.2/store/abort

步骤二:进⼊Broker-Slave的conf/2m-2s-async这个配置⽂件⽬录,执⾏vim broker-a.properties命令编辑配置⽂件,把以下内容粘贴进配置⽂件中:

# 集群名称,如果公司有多组MQ集群,建议起不同名称
brokerClusterName=rocketmq-cluster
# Broker名称
brokerName=broker-a-s
# 0表示的是当前是Master节点,⼤于0,则表示是Slave节点
brokerId=1
# 删除⽂件时间点,默认凌晨是4点
deleteWhen=04
# ⽂件保留时间,默认保留48⼩时,我们这⾥设置为168⼩时也就是⼀周时间
fileReservedTime=168
# ASYNC_MASTER 异步复制Master
# SYNC_MASTER 同步双写Master
brokerRole=SLAVE
# 刷盘⽅式,ASYNC_FLUSH异步刷盘
# SYNC_FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH
# Broker 对外服务的监听端⼝
listenPort=10911
# 是否允许 Broker ⾃动创建Topic,⼀般线下可以开启,线上最好关闭
autoCreateTopicEnable=true
# 是否允许 Broker ⾃动创建订阅组,⼀般线下可以开启,线上最好关闭
autoCreateSubscriptionGroup=true
# nameServer地址;分号分割
namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
# 这个参数代表当前Broker监听的IP,如果不指定的话,会默认选⼀个IP,但是在阿⾥云服务器上,⼀般是内外⽹两个⽹卡,两个IP,此时就可能会出错,造成⽆法通讯
brokerIP1=47.102.152.14
# 存在Broker主从时,在Broker主节点上配置了brokerIP2的话,Broker从节点会连接主节点配置的brokerIP2来同步,所以从节点可以配置这个IP做主从同步brokerIP2 
# 存储路径
storePathRootDir=/usr/local/rocketmq/rocketmq-4.9.2/store/broker-a-s
# commitLog 存储路径
storePathCommitLog=/usr/local/rocketmq/rocketmq-4.9.2/store/broker-a-s/commitlog
# 消费队列存储路径存储路径
storePathConsumeQueue=/usr/local/rocketmq/rocketmq-4.9.2/store/broker-a-s/consumequeue
# 消息索引存储路径
storePathIndex=/usr/local/rocketmq/rocketmq-4.9.2/store/broker-a-s/index
#checkpoint 检查点⽂件存储路径
storeCheckpoint=/usr/local/rocketmq/rocketmq-4.9.2/store/checkpoint
#abort ⽂件存储路径
abortFile=/usr/local/rocketmq/rocketmq-4.9.2/store/abort

注意:上⾯是⼀对Master-Slave的配置⽂件。由于⼀共有四台机器,所以需要配置两组;第⼆组配置⽂件中的配置保持不变,只需要把:

brokerName=broker-a 改为 brokerName=broker-b
brokerName=broker-a-s 改为 brokerName=broker-b-s

五.创建⽂件存储路径

在47.102.152.14机器上执⾏:

$ mkdir -p /usr/local/rocketmq/rocketmq-4.9.2/store/broker-a
$ mkdir -p /usr/local/rocketmq/rocketmq-4.9.2/store/broker-a/consumequeue
$ mkdir -p /usr/local/rocketmq/rocketmq-4.9.2/store/broker-a/commitlog
$ mkdir -p /usr/local/rocketmq/rocketmq-4.9.2/store/broker-a/index

在139.224.212.58机器上执⾏:

$ mkdir -p /usr/local/rocketmq/rocketmq-4.9.2/store/broker-b
$ mkdir -p /usr/local/rocketmq/rocketmq-4.9.2/store/broker-b/consumequeue
$ mkdir -p /usr/local/rocketmq/rocketmq-4.9.2/store/broker-b/commitlog
$ mkdir -p /usr/local/rocketmq/rocketmq-4.9.2/store/broker-b/index

在139.224.217.92机器上执⾏:

$ mkdir -p /usr/local/rocketmq/rocketmq-4.9.2/store/broker-a-s
$ mkdir -p /usr/local/rocketmq/rocketmq-4.9.2/store/broker-a-s/consumequeue
$ mkdir -p /usr/local/rocketmq/rocketmq-4.9.2/store/broker-a-s/commitlog
$ mkdir -p /usr/local/rocketmq/rocketmq-4.9.2store/broker-a-s/index

在106.15.250.248机器上执⾏:

$ mkdir -p /usr/local/rocketmq/rocketmq-4.9.2/store/broker-b-s
$ mkdir -p /usr/local/rocketmq/rocketmq-4.9.2/store/broker-b-s
$ mkdir -p /consumequeue /usr/local/rocketmq/rocketmq-4.9.2store/broker-b-s/commitlog
$ mkdir -p /usr/local/rocketmq/rocketmq-4.9.2store/broker-b-s/index

六.启动NameServer

步骤一:编辑NameServer的配置⽂件

JDK堆内存默认给的是4G,年轻代2G,我们机器配置有限,就调⼩⼀倍,设置成2g。

$ vi /usr/local/rocketmq/rocketmq-4.9.2/bin/runserver.sh
JAVA_OPT="${JAVA_OPT} -server -Xms4g -Xmx4g -Xmn2g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m" 
JAVA_OPT="${JAVA_OPT} -XX:+UseConcMarkSweepGC -XX:+UseCMSCompactAtFullCollection -XX:CMSInitiatingOccupancyFraction=70 -XX:+CMSParallelRemarkEnabled -XX:SoftRefLRUPolicyMSPerMB=0 -XX:+CMSClassUnloadingEnabled -XX:SurvivorRatio=8 -XX:-UseParNewGC" 
JAVA_OPT="${JAVA_OPT} -verbose:gc -Xloggc:${GC_LOG_DIR}/rmq_srv_gc_%p_%t.log -XX:+PrintGCDetails -XX:+PrintGCDateStamps"
JAVA_OPT="${JAVA_OPT} -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=5 -XX:GCLogFileSize=30m"
else 
JAVA_OPT="${JAVA_OPT} -server -Xms4g -Xmx4g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m" 
JAVA_OPT="${JAVA_OPT} -XX:+UseG1GC -XX:G1HeapRegionSize=16m -XX:G1ReservePercent=25 -XX:InitiatingHeapOccupancyPercent=30 -XX:SoftRefLRUPolicyMSPerMB=0" 
JAVA_OPT="${JAVA_OPT} -Xlog:gc*:file=${GC_LOG_DIR}/rmq_srv_gc_%p_%t.log:time,tags:filecount=5,filesize=30M" 

步骤二:修改完成后,后台启动两台NameServer

$ nohup sh /usr/local/rocketmq/rocketmq-4.9.2/bin/mqnamesrv >/usr/local/rocketmq/rocketmq-4.9.2/logs/mqnamesrv.log 2>&1 &

启动完成后,输⼊jps,出现如下内容,即说明启动成功:

$ jps
14416 NamesrvStartup
14435 Jps

七.启动四台Broker

步骤一:编辑Broker的配置文件

JAVA_OPT="${JAVA_OPT} -server -Xms8g -Xmx8g"
JAVA_OPT="${JAVA_OPT} -XX:+UseG1GC -XX:G1HeapRegionSize=16m -XX:G1ReservePercent=25 -XX:InitiatingHeapOccupancyPercent=30 -XX:SoftRefLRUPolicyMSPerMB=0"
JAVA_OPT="${JAVA_OPT} -verbose:gc -Xloggc:${GC_LOG_DIR}/rmq_broker_gc_%p_%t.log -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCApplicationStoppedTime -XX:+PrintAdaptiveSizePolicy"
JAVA_OPT="${JAVA_OPT} -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=5 -XX:GCLogFileSize=30m"
JAVA_OPT="${JAVA_OPT} -XX:-OmitStackTraceInFastThrow"
JAVA_OPT="${JAVA_OPT} -XX:+AlwaysPreTouch"
JAVA_OPT="${JAVA_OPT} -XX:MaxDirectMemorySize=15g"
JAVA_OPT="${JAVA_OPT} -XX:-UseLargePages -XX:-UseBiasedLocking"
JAVA_OPT="${JAVA_OPT} -Djava.ext.dirs=${JAVA_HOME}/jre/lib/ext:${BASE_DIR}/lib:${JAVA_HOME}/lib/ext"

步骤二:按照上⾯步骤中的Broker节点规划,分别启动四台Broker。

注意在指定配置⽂件的时候,要修改机器上配置好的那个⽂件,分别如下:

47.102.152.14
$ nohup sh /usr/local/rocketmq/rocketmq-4.9.2/bin/mqbroker -c /usr/local/rocketmq/rocketmq-4.9.2/conf/2m-2s-async/broker-a.properties > /usr/local/rocketmq/rocketmq-4.9.2/logs/broker-a.log 2>&1 &139.224.212.58 
$ nohup sh /usr/local/rocketmq/rocketmq-4.9.2/bin/mqbroker -c /usr/local/rocketmq/rocketmq-4.9.2/conf/2m-2s-async/broker-b.properties > /usr/local/rocketmq/rocketmq-4.9.2/logs/broker-b.log 2>&1 &139.224.217.92 
$ nohup sh /usr/local/rocketmq/rocketmq-4.9.2/bin/mqbroker -c /usr/local/rocketmq/rocketmq-4.9.2/conf/2m-2s-async/broker-a-s.properties > /usr/local/rocketmq/rocketmq-4.9.2/logs/broker-a-s.log 2>&1 &106.15.250.248 
$ nohup sh /usr/local/rocketmq/rocketmq-4.9.2/bin/mqbroker -c /usr/local/rocketmq/rocketmq-4.9.2/conf/2m-2s-async/broker-b-s.properties > /usr/local/rocketmq/rocketmq-4.9.2/logs/broker-b-s.log 2>&1 &

输⼊jps,出现如下内容,即说明启动成功:

$ jps
14258 Jps
14196 BrokerStartup

八.启动RocketMQ控制台,查看集群信息

这个rocketmq-all-4.9.2-bin-release.zip包在本地解压后,⾥⾯有⼀个console项⽬,文件名为rocketmq-console。打开之后把NameServer配置成我们⾃⼰的NameServer地址即可启动console查看mq集群的情况。使用IDEA打开console项⽬,会⾃动加载依赖环境,加载完成后修改配置⽂件application.properties里的nameserver地址。接着对console项目进行打包,打包命令是:

$ mvn clean package -Dmaven.test.skip=true

打包完成后,在target⽬录下,找到rocketmq-console-ng-1.0.0.jar放到服务器上即可运⾏。这⾥是将这个console项⽬放到了集群环境的⼀台服务器中:47.102.152.14。在服务区中启动console项⽬的命令是:

$ nohup java -jar rocketmq-console-ng-1.0.0.jar >/usr/local/rocketmq/rocketmq-4.9.2/logs/mq-console.log 2>&1 &

启动后访问如下链接,即可查看集群情况。

http://47.102.152.14:8080/#/cluster

(4)集群参数调优

一.JVM参数调优

-server -Xms4g -Xmx4g -Xmn2g

Broker堆内存与新⽣代⼤⼩,因为Broker是很吃内存的,所以尽可能给多⼀点内存。这里因为服务器是4C8G,所以设置堆内存4G,新⽣代2G,并且不允许堆内存⾃动扩展。虽然RocketMQ使⽤的是G1垃圾回收器,但直接给⼀个固定⼤内存,与给⼀个范围相比,可让G1⾃动调整的效率更⾼。

 

-XX:+AlwaysPreTouch

RocketMQ是使⽤⻚缓存的,这个参数可以在RocketMQ开启时,预先把每⼀⻚都分配好。但坏处就是,会导致RocketMQ启动速度变慢。如果不关心启动速度,可以开启这个参数,让RocektMQ在运⾏初期提升性能,虽然运⾏⼀段时间后性能没有差异。

 

-XX:-UseBiasedLocking

官⽅推荐关闭偏向锁以减少GC停顿时间。

 

-XX:+UseG1GC

 

-XX:G1HeapRegionSize=16m

 

-XX:G1ReservePercent=25

通过-XX:G1ReservePercent可以指定G1为分配担保预留的空间⽐例,默认10%。也就是⽼年代会预留10%的空间来给新⽣代的对象晋升。这个值设置为25,表示会预留⽐较多的空间来给新⽣代的对象晋升,这个参数设定也是官⽅推荐的参数。

 

-XX:InitiatingHeapOccupancyPercent=30

通过-XX:InitiatingHeapOccupancyPercent指定触发全局并发标记的⽼年代使⽤占⽐。默认值45%,表示的是⽼年代占堆的⽐例超过45%。如果Mixed GC周期结束后⽼年代使⽤率还是超过45%,那么会再次触发全局并发标记过程。这样就会导致频繁的⽼年代GC,影响应⽤吞吐量,这个参数调整成30性能会更好。

 

以上这些参数都是经过RocketMQ官⽅测试后的参数设置,在Broker启动⽂件⾥⾯这些参数也都是设置好的,所以基本上不需要我们⾃⼰来调整。

 

-XX:MaxGCPauseMillis = 20

这个参数的含义是,每次GC时期望GC停顿的时间是多久,20代表20ms。这个参数对于RocketMQ运⾏还是很关键的,因为RocektMQ需要很⾼的性能,很低的延迟。所以很多⼈会有⼀个误区,认为这个参数需要调整的⽐较⼩,这样可以提⾼RocketMQ的响应速度。真实的情况是,如果这个-XX:MaxGCPauseMillis = 20参数设置过⼩,G1垃圾回收器会因为这个⽐较⼩的停顿时间,⽽给新⽣代⼀个⾮常⼩的空间,这就导致频繁的触发YGC。⽽RocketMQ⼜是⼀个QPS⾮常⾼的系统,那么就很可能会造成对象晋升⾄⽼年代过快,造成频繁Full GC。所以MaxGCPauseMillis值尽可能不要设置太⼩,⼀般来说可以不设置保持默认即可。

 

二.OS操作系统内核参数调优

调优参数列表:

$ export PATH=$PATH:/sbin
$ sudo sysctl -w vm.extra_free_kbytes=512000
$ sudo sysctl -w vm.min_free_kbytes=512000
$ sudo sysctl -w vm.overcommit_memory=1
$ sudo sysctl -w vm.drop_caches=1
$ sudo sysctl -w vm.zone_reclaim_mode=0
$ sudo sysctl -w vm.max_map_count=65536
$ sudo sysctl -w vm.dirty_background_ratio=50
$ sudo sysctl -w vm.dirty_ratio=50
$ sudo sysctl -w vm.dirty_writeback_centisecs=36000
$ sudo sysctl -w vm.page-cluster=3
$ sudo sysctl -w vm.swappiness=1
$ echo 'ulimit -n 655350' >> /etc/profile
$ echo '* hard nofile 655350' >> /etc/security/limits.conf
$ echo '* hard memlock unlimited' >> /etc/security/limits.conf
$ echo '* soft memlock unlimited' >> /etc/security/limits.conf

三.调优前的单Producer压测

步骤一:首先看调优前的单Producer压测效果(即⽆消费者)。压测机的配置是4C8G两台,两台机器使⽤RocketMQ⾃带的benckmark压测组件进⾏压测。进⼊rocketmq-4.9.2/benchmark⽬录下,执⾏如下命令:

$ cd /usr/local/rocketmq/rocketmq-4.9.2/benchmark
$ sh producer.sh -n 139.224.217.92:9876;106.15.250.248:9876

可以看到这样的结果:第⼀台机器,发送的TPS是1.5-1.6w左右,最⼤响应时间456ms,平均响应时间4ms左右。第⼆台机器,发送的tps是2.4w左右,最⼤响应时间485ms,平均响应时间2.7ms左右。⽽我们此时观察RocketMQ客户端数据,两台Master节点的TPS总计在4w+。总体来说,单Producer的QPS在4w+的时候还是有很好的响应能⼒的。

 

步骤二:接下来再继续加⼀点机器,把本地的Producer也启动起来。也就是打开源码包,找到example这个项⽬。找到org.apache.rocketmq.example.benchmark.Producer,然后把它启动起来,但要注意修改⼀下Producer的NameServer地址:

final DefaultMQProducer producer = new DefaultMQProducer("benchmark_producer", rpcHook, msgTraceEnable, null);
producer.setInstanceName(Long.toString(System.currentTimeMillis()));if (commandLine.hasOption('n')) {String ns = commandLine.getOptionValue('n');producer.setNamesrvAddr(ns);
}
producer.serNamesrvAddr("139.224.217.92:9876;106.15.250.248:9876");producer.setCompressMsgBodyOverHowmuch(Integer.MAX_VALUE);producer.start();

可以看到,我们本地也能模拟出2k+的TPS。而此时两台压测机明显TPS也是下降了⼀些的,所以基本上这个集群抗住4w+TPS是差不多的,⽽且性能也还不错。此外注意看此时的NameServer系统指标: 可以看到CPU其实还没有到达极限,内存使⽤⽐较⾼了,⼤概在80%左右。⽽两台Broker-master机器的CPU,内存使⽤都⽐较⾼,但是还没有达到极限。

 

四.调优前的单Consumer压测

接着看看调优前的单consumer压测效果(即⽆⽣产者),进⼊rocketmq-4.9.2/benchmark⽬录下执⾏如下命令:

$ cd /usr/local/rocketmq/rocketmq-4.9.2/benchmark
$ sh consumer.sh -t BenchmarkTest -n 139.224.217.92:9876;106.15.250.248:9876 -g test2

可以看到,消费者消费的速度明显要⽐写⼊速度要快很多,两台都能达到4w左右。注意:其中RT这个时间⾮常久,原因是前面测试了单独的Producer,而Consumer⼀直没有开启。直到开启测试Consumer时已经过了很久了,这就导致这个时间计算的结果是从数据到MQ后就开始计算,所以值⽐较⼤。两台NameServer的压⼒看起来⾮常⼩,CPU和内存都基本没⽤。

 

五.调优后单Producer的压测效果

从TPS上看,整体的TPS略有提升,但是提升并不明显,总体TPS提升了⼤概1-3k左右。系统指标:两台NameServer(包含Broker-Slave节点)的CPU和内存使⽤,也略有上升。两台Broker-Master节点的CPU,内存使⽤率也略有上升。

 

六.调优后单Consumer的压测效果

很奇怪的现像发⽣了:这个总体的消费TPS只有总计不到两万了。其实在观测的过程中,最⾼达到过总计4w,但是依然和最开始测试的过程差距⽐较⼤,这是什么原因呢?从CPU、内存这两个关键指标来看,没什么问题,因为总体负载都不是很⾼。但是注意,磁盘IO⾮常⾼,说明现在影响到Consumer效率的是磁盘IO负载过于⾼了。

 

之所以产⽣磁盘IO负载过高这个现象,是因为调整参数时,⽂件句柄调整的过⼤655350,导致IO压⼒飙升,反⽽对性能产⽣了较⼤影响。另外就是,⽂件落⼊磁盘,消费者消费数据时从磁盘读取数据到内存。最终恢复Linux内核参数65535后,压测结果回归正常。所以调整参数时,要进行前后压测对比,不能直接按照文档复制。

 

把参数还原后,发送消息的TPS仍然是4w左右。可以说前⾯优化之后的1-3k从数量级来说,没有什么太⼤的性能提升。所以没有必要为了这⼀点写⼊效率,去调优参数,并且对消费者端造成巨⼤影响。

 

2.RocketMQ生产集群10wTPS压测

两台4核8G的Master机器,通过压测可以扛下4w写TPS。单台4核8G的Master可以扛下2w写TPS,但CPU负载已经很高了。单台4核8G的Master可以扛下4W读TPS。所以如果要扛10wTPS,可以用5台4核8G的Master机器。

 

3.RocketMQ生产级故障案例

(1)RocketMQ的VIP端⼝故障

(2)completbleFuture不规范使⽤导致消费速率低

(3)Producer发送消息失败问题

 

(1)RocketMQ的VIP端⼝故障

在打开Console控制台,打开Cluster时,看到如下所示的报错:

org.apache.rocketmq.remoting.exception.RemotingConnectException:
connect to <139.224.212.58:10909> failed

⽽Cluster⻚⾯也没有什么TPS数据什么的,这是什么原因?查看这个节点的Broker⽇志发现是⼀个远程链接问题:

2022-01-15 04:04:20 ERROR BrokerControllerScheduledThread1 - SyncTopicConfig Exception, 47.102.152.14:10911 
org.apache.rocketmq.remoting.exception.RemotingConnectException: connect to 47.102.152.14:10909 failed at org.apache.rocketmq.remoting.netty.NettyRemotingClient.invokeSync(NettyRemotingClient.java:3 94) ~ [rocketmq-remoting-4.9.2.jar:4.9.2] at org.apache.rocketmq.broker.out.BrokerOuterAPI.getAllTopicConfig(BrokerOuterAPI.java:330) ~ [rocketmq-broker-4.9.2.jar:4.9.2] at org.apache.rocketmq.broker.slave.SlaveSynchronize.syncTopicConfig(SlaveSynchronize.java:60) [rocketmq-broker-4.9.2.jar:4.9.2] at org.apache.rocketmq.broker.slave.SlaveSynchronize.syncAll(SlaveSynchronize.java:49) [rocketmq-broker-4.9.2.jar:4.9.2] at org.apache.rocketmq.broker.BrokerController$12.run(BrokerController.java:1144) [rocketmq-broker-4.9.2.jar:4.9.2] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_202] at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) [na:1.8.0_202] at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(Scheduled ThreadPoolExecutor.java:180) [na:1.8.0_202] at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadP oolExecutor.java:294) [na:1.8.0_202] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [na:1.8.0_202] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [na:1.8.0_202] at java.lang.Thread.run(Thread.java:748) [na:1.8.0_202] 2022-01-15 04:04:20 INFO BrokerControllerScheduledThread1 - Update slave consumer offset from master, 47.102.152.14:10911 2022-01-15 04:04:20 INFO BrokerControllerScheduledThread1 - Update slave delay offset from master, 47.102.152.14:10911

很奇怪,看了异常信息,发现是主从同步的过程没有成功,原因是同步Topic配置信息失败:

SyncTopicConfig Exception, 47.102.152.14:10911
org.apache.rocketmq.remoting.exception.RemotingConnectException: connect to 47.102.152.14:10909 failed

⽐较奇怪的是,有这么⼀句话"47.102.152.14:10909 failed",就是这么⼀句话容易让⼈疑惑。因为当前Broker是47.102.152.14节点上Broker的Slave节点,所以就先去查了⼀下47.102.152.14的配置如下:

# 集群名称,如果公司有多组MQ集群,建议起不同名称
brokerClusterName=rocketmq-cluster
# Broker名称
brokerName=broker-a
# 0表示的是当前是Master节点,⼤于0,则表示是Slave节点
brokerId=0
# 删除⽂件时间点,默认凌晨是4点
deleteWhen=04
# ⽂件保留时间,默认保留48⼩时,我们这⾥设置为168⼩时也就是⼀周时间
fileReservedTime=168
# ASYNC_MASTER 异步复制Master
# SYNC_MASTER 同步双写Master
brokerRole=ASYNC_MASTER
# 刷盘⽅式,ASYNC_FLUSH异步刷盘
# SYNC_FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH
# Broker 对外服务的监听端⼝
listenPort=10911
# 是否允许 Broker ⾃动创建Topic,⼀般线下可以开启,线上最好关闭
autoCreateTopicEnable=true
# 是否允许 Broker ⾃动创建订阅组,⼀般线下可以开启,线上最好关闭
autoCreateSubscriptionGroup=true
# nameServer地址;分号分割
namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
# 这个参数代表当前Broker监听的IP,如果不指定的话,会默认选⼀个IP,但是在阿⾥云服务器上,⼀般是内外⽹两个⽹卡,两个IP,此时就可能会出错,造成⽆法通讯
brokerIP1=47.102.152.14
# 存在Broker主从时,在Broker主节点上配置了brokerIP2的话,Broker从节点会连接主节点配置的brokerIP2来同步,所以从节点可以配置这个IP做主从同步brokerIP2 
# 存储路径
storePathRootDir=/usr/local/rocketmq/rocketmq-4.9.2/store/broker-a
# commitLog 存储路径
storePathCommitLog=/usr/local/rocketmq/rocketmq-4.9.2/store/broker-a/commitlog
# 消费队列存储路径存储路径
storePathConsumeQueue=/usr/local/rocketmq/rocketmq-4.9.2/store/broker-a/consumequeue
# 消息索引存储路径
storePathIndex=/usr/local/rocketmq/rocketmq-4.9.2/store/broker-a/index
#checkpoint 检查点⽂件存储路径
storeCheckpoint=/usr/local/rocketmq/rocketmq-4.9.2/store/checkpoint
#abort ⽂件存储路径
abortFile=/usr/local/rocketmq/rocketmq-4.9.2/store/abort

这个端⼝确实是10911,那为什么会出现10909这个端⼝呢?再去查Slave的端⼝,发现也是配置的是10911这个端⼝。所以不存在配置错误的问题,那此时就只能确定为,是RocketMQ本身的什么机制造成的。通过在官⽹、社区的搜索,最终发现,这个问题是存在类似情况的。

https://github.com/maihaoche/rocketmq-spring-boot-starter/issues/6

这个issue和这里情况还不完全⼀样,因为这个issue说的是Producer Consumer和服务端Broker的链接,⽽这里是Broker和Slave之间的连接。

 

不过本质上应该类似,因为VIP端⼝默认就是Broker监听端⼝减去2,所以这个问题基本已经清晰了,是因为主从同步过程中,端⼝-2之后⽆法通讯造成的。所以接下来,要么配置⼀下不启⽤VIP端⼝,要么开启10909这个端⼝。基于专⻔接⼝做专⻔的事情这⼀考虑,还是设置开启10909这个端⼝。

 

(2)completbleFuture不规范使⽤导致消费速率低

前⾯介绍过,消费者由于需要调⽤第三⽅平台接口等待响应结果,造成消费速率较低,所以使用了completbleFuture,但其实completbleFuture的使⽤也是有很⼤讲究的。

 

在使⽤了completbleFuture之后,本来以为消费的速率会⽐正常消费要快很多,因为避免了等待响应的这种情况,⽽实际的结果却并不是如此。

 

我们来准备⼀下参数,PostMan发起⼀个活动通知消息推送:

postman.setGlobalVariable("startTime",Date.parse(new Date("2022/01/13 15:07:26")));
postman.setGlobalVariable("endTime",Date.parse(new Date("2022/01/15 15:07:26")));
{"name": "促销活动","informType": 2,"startTime": {{ startTime }},"endTime": {{ endTime }},"remark": "促销活动说明","status": 1,"type": 1,"rule": {"key": "满减","value": {"满减": "200,30"}},"createUser": 0 
}

调⽤接⼝:

http://localhost:8080/demo/promotion/send

调⽤成功后,会持续不断的发送消息到RocketMQ。然后pushService中,会不断的去消费消息,紧接着发送消息出去。我们可以在Consumer消费者⾥⾯通过不同的⽅式来调⽤不同的消费者。

 

通过两者消费速率的对⽐发现,速率有⾮常⼤的差异,最⾼能差5倍以上,以下是实验数据:

 

数据一:正常的消费速率,⼀直持续稳定在600-800之间,最⾼甚⾄能达到1k的消费速率。

 

数据二:completableFuture,最开始启动的时候能达到⽐较⾼的600速率,运⾏⼀定时间后,就降到200速率了。

 

很明显在运⾏⼀段时间后,completableFuture这种⽅式的消费速率下降了,并且消费效率明显⽐普通的消费回调⽅式要慢许多,那这究竟是怎么回事呢?

 

Consumer端消费很慢,基本上主要的原因就是Consumer端线程太忙或者不够,这个时候就要回到completableFuture本身了,我们在代码中使⽤的是completableFuture.Async()⽅法:

List<CompletableFuture<AltResult>> futureList = msgList.stream().map(e -> CompletableFuture.supplyAsync(() -> handleMessageExt(e))).collect(Collectors.toList());

在CompletableFuture方法内部:

public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {...private static final Executor asyncPool = useCommonPool ? ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {return asyncSupplyStage(asyncPool, supplier);}...static final class ThreadPerTaskExecutor implements Executor {public void execute(Runnable r) { new Thread(r).start(); }}
}

而ForkJoinPool.commonPool()如下:

private static ForkJoinPool makeCommonPool() {int parallelism = -1;ForkJoinWorkerThreadFactory factory = null;UncaughtExceptionHandler handler = null;try {  // ignore exceptions in accessing/parsing propertiesString pp = System.getProperty("java.util.concurrent.ForkJoinPool.common.parallelism");String fp = System.getProperty("java.util.concurrent.ForkJoinPool.common.threadFactory");String hp = System.getProperty("java.util.concurrent.ForkJoinPool.common.exceptionHandler");if (pp != null)parallelism = Integer.parseInt(pp);if (fp != null)factory = ((ForkJoinWorkerThreadFactory)ClassLoader.getSystemClassLoader().loadClass(fp).newInstance());if (hp != null)handler = ((UncaughtExceptionHandler)ClassLoader.getSystemClassLoader().loadClass(hp).newInstance());} catch (Exception ignore) {}if (factory == null) {if (System.getSecurityManager() == null)factory = defaultForkJoinWorkerThreadFactory;else // use security-managed defaultfactory = new InnocuousForkJoinWorkerThreadFactory();}if (parallelism < 0 && (parallelism = Runtime.getRuntime().availableProcessors() - 1) <= 0)parallelism = 1;if (parallelism > MAX_CAP)parallelism = MAX_CAP;return new ForkJoinPool(parallelism, factory, handler, LIFO_QUEUE, "ForkJoinPool.commonPool-worker-");
}

如果使⽤supplyAsync,那么就会有⼀个默认的线程池,⽽线程池⾥⾯的线程数⼤⼩是根据CPU核数 - 1来决定的。例如是4核那么线程数就是3,如果⾃定义⼀个公⽤线程池,所有消费者都使⽤这个线程池,假如开启了80个线程,但是真正处理消费后的核⼼⽅法逻辑的,其实只有3个线程。其他的77个任务相当于都在队列⾥⾯等待ForkJoinPool⾥的线程完成,所以导致整个消费慢了。

 

如果其中⼀个ForkJoin线程慢了会拖累整个消费,最终可能影响到当前服务⾥订阅的所有Topic,影响整个MQ。这种场景⼀般不太会遇到,都是实战中开发时才会出现千奇百怪的问题。就这么个问题,如果不是对completableFuture.supplyAsync()源码极其了解,⼀般也想不到会出现这样的问题。所以,其实我们本意是想提升效率,最终却导致整体消费效率降低了。

 

(3)Producer发送消息失败问题

一.问题描述

在修改batch推送消息,并使⽤线程池来并发推送消息⾄MQ后(在未改造线程池和batch之前没有出现下述异常),针对百万⽤户活动消息推送⾄MQ的压测中,Producer出现了⾮常频繁的RemotingTooMuchRequestException。

 

⾸先看报错⽇志的第⼀段:

org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException: sendDefaultImpl call timeoutat org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendDefaultImpl(DefaultMQProducerImpl.java:683) ~ [rocketmq-client-4.9.2.jar:4.9.2]at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:1391) ~ [rocketmq-client-4.9.2.jar:4.9.2]at org.apache.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:916) ~ [rocketmq-client-4.9.2.jar:4.9.2]at com.demo.eshop.promotion.mq.producer.DefaultProducer.sendMessages(DefaultProducer.java:155) [classes/:na]at com.demo.eshop.promotion.mq.producer.DefaultProducer.sendMessages(DefaultProducer.java:107) [classes/:na]at com.demo.eshop.promotion.mq.consumer.listener.PlatFormPromotionUserBucketListener.lambd a$consumeMessage$0(PlatFormPromotionUserBucketListener.java:116) [classes/:na]at com.demo.eshop.promotion.mq.consumer.listener.PlatFormPromotionUserBucketListener$$Lam bda$1092/806913152.run(Unknown Source) [classes/:na] at com.demo.eshop.common.concurrent.SafeThreadPool.lambda$execute$0(SafeThreadPool.java: 42) [classes/:na] at com.demo.eshop.common.concurrent.SafeThreadPool$$Lambda$1091/1786949609.run(Unkno wn Source) [classes/:na] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_40] at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_40] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_40] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_40] at java.lang.Thread.run(Thread.java:745) [na:1.8.0_40] INFO 22512 --- [lientSelector_1] RocketmqRemoting : closeChannel: close the connection to remote address[47.102.152.14:10911] result: true INFO 22512 --- [lientSelector_1] RocketmqRemoting : closeChannel: close the connection to remote address[47.102.152.14:10911] result: true

接着看报错⽇志的第⼆段:

org.apache.rocketmq.client.exception.MQClientException: Send [3] times, still failed, cost [5037]ms, Topic: platform_promotion_send_topic, BrokersSent: [broker-b, broker-a, broker-b] See http://rocketmq.apache.org/docs/faq/ for further details. at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendDefaultImpl(DefaultMQProducerImpl.java:681) ~[rocketmq-client-4.9.2.jar:4.9.2] at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.j ava:1391) ~ [rocketmq-client-4.9.2.jar:4.9.2] at org.apache.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:916) ~ [rocketmq-client-4.9.2.jar:4.9.2] at com.demo.eshop.promotion.mq.producer.DefaultProducer.sendMessages(DefaultProducer.java: 155) [classes/:na]at com.demo.eshop.promotion.mq.producer.DefaultProducer.sendMessages(DefaultProducer.java: 107) [classes/:na] at com.demo.eshop.promotion.mq.consumer.listener.PlatFormPromotionUserBucketListener.lambd a$consumeMessage$0(PlatFormPromotionUserBucketListener.java:116) [classes/:na] at com.demo.eshop.promotion.mq.consumer.listener.PlatFormPromotionUserBucketListener$$Lam bda$1092/806913152.run(Unknown Source) [classes/:na] at com.demo.eshop.common.concurrent.SafeThreadPool.lambda$execute$0(SafeThreadPool.java: 42) [classes/:na] at com.demo.eshop.common.concurrent.SafeThreadPool$$Lambda$1091/1786949609.run(Unkno wn Source) [classes/:na] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_40] at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_40] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_40] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_40] at java.lang.Thread.run(Thread.java:745) [na:1.8.0_40] Caused by: org.apache.rocketmq.remoting.exception.RemotingTimeoutException: wait response on the channel <139.224.212.58:10911> timeout, 3084(ms) at org.apache.rocketmq.remoting.netty.NettyRemotingAbstract.invokeSyncImpl(NettyRemotingAbstr act.java:438) ~[rocketmq-remoting-4.9.2.jar:4.9.2] at org.apache.rocketmq.remoting.netty.NettyRemotingClient.invokeSync(NettyRemotingClient.java:3 77) ~ [rocketmq-remoting-4.9.2.jar:4.9.2] at org.apache.rocketmq.client.impl.MQClientAPIImpl.sendMessageSync(MQClientAPIImpl.java:505) ~ [rocketmq-client-4.9.2.jar:4.9.2] at org.apache.rocketmq.client.impl.MQClientAPIImpl.sendMessage(MQClientAPIImpl.java:489) ~ [rocketmq-client-4.9.2.jar:4.9.2]at org.apache.rocketmq.client.impl.MQClientAPIImpl.sendMessage(MQClientAPIImpl.java:433) ~ [rocketmq-client-4.9.2.jar:4.9.2] at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendKernelImpl(DefaultMQPro ducerImpl.java:870) ~ [rocketmq-client-4.9.2.jar:4.9.2] at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendDefaultImpl(DefaultMQPr oducerImpl.java:606) ~ [rocketmq-client-4.9.2.jar:4.9.2] ... 13 common frames omitted 2022-01-23 15:37:49.092 ERROR 22512 --- [sgThreadPool-27] c.r.e.p.mq.producer.DefaultProducer :发送MQ消息失败: type:平台优惠活动消息

二.问题原因分析

那么这个问题出现的原因是什么呢?先进⼊源码看看情况:在rocketmq-client-4.9.2.jar这个依赖中,找到DefaultMQProducerImpl这个类。在DefaultMQProducerImpl这个类中第683⾏,发现有异常信息抛出。

//在rocketmq-client-4.9.2.jar的org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl类中683行:
String info = String.format("Send [%d] times, still failed, cost [%d]ms, Topic: %s, BrokersSent: %s",times,System.currentTimeMillis() - beginTimestampFirst,msg.getTopic(),Arrays.toString(brokersSent));info += FAQUrl.suggestTodo(FAQUrl.SEND_MSG_FAILED);MQClientException mqClientException = new MQClientException(info, exception);
if (callTimeout) {throw new RemotingTooMuchRequestException("sendDefaultImpl call timeout");
}

这⾥有⼀个callTimeout参数,判断最后是否抛出了异常。那么这个参数是在哪⾥定义、更新的?

boolean callTimeout = false;
MessageQueue mq = null;
Exception exception = null;
SendResult sendResult = null;
int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
int times = 0;
String[] brokersSent = new String[timesTotal];
for (; times < timesTotal; times++) {String lastBrokerName = null == mq ? null : mq.getBrokerName();MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);if (mqSelected != null) {mq = mqSelected;brokersSent[times] = mq.getBrokerName();try {beginTimestampPrev = System.currentTimeMillis();if (times > 0) {//Reset topic with namespace during resend.msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic()));}long costTime = beginTimestampPrev - beginTimestampFirst;if (timeout < costTime) {callTimeout = true;break;}

beginTimestampFirst,beginTimestampPrev⽐较好理解,就是sendmessage开始执⾏的时间,以及尝试发送的时间。或者说,尝试和MQ建⽴链接到得到⼀个反馈结果之后的时间。两个时间相减,如果⼤于了timeout这个时间,就说明超时了。

 

那么这timeout是什么?我们在producer.send()发送的时候,实际上是调⽤了下⾯这个⽅法的:

public class DefaultMQProducer extends ClientConfig implements MQProducer {...@Overridepublic SendResult send(Collection<Message> msgs, long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {return this.defaultMQProducerImpl.send(batch(msgs), timeout);}...
}

⽽往下⼀层,实际上是调⽤了defaultMQProducerImpl.send()发出的消息。可以看到有⼀个参数,叫做timeout,defaultMQProducerImpl.send()消息的调⽤代码位置如下:

public class DefaultMQProducerImpl implements MQProducerInner {...public SendResult send(Message msg, long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {return this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout);}...
}

也就是实际执⾏的send操作,是defaultMQProducerImpl的内部方法"private SendResult sendDefaultImpl"发出的。⽽初始化这么⼀个SendResult对象,会传⼊我们调⽤send的时候的timeout参数。

public class DefaultMQProducerImpl implements MQProducerInner {...private SendResult sendDefaultImpl(Message msg,final CommunicationMode communicationMode,final SendCallback sendCallback,final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {......}...
}

如果我们调⽤send()的时候,没有传⼊这个timeout参数,那么这个参数有⼀个默认值,是3000ms。没有传⼊超时参数的代码如下:

public class DefaultMQProducer extends ClientConfig implements MQProducer {...@Overridepublic SendResult send(Collection<Message> msgs) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {return this.defaultMQProducerImpl.send(batch(msgs));}...
}

点进去send()⽅法,发现它会get⼀个默认的Timeout时间:

public class DefaultMQProducerImpl implements MQProducerInner {...public SendResult send(Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {return send(msg, this.defaultMQProducer.getSendMsgTimeout());}...
}

进⼊这个获取timeOut的⽅法:

public class DefaultMQProducer extends ClientConfig implements MQProducer {...//Timeout for sending messages.private int sendMsgTimeout = 3000;public int getSendMsgTimeout() {return sendMsgTimeout;}...
}

看到这⾥可知:发送消息的时间如果超过了3000ms还没有发送成功, 就会导致发送失败。

 

三.发送失败的几种原因

Broker侧的原因:

原因1:并发太⾼导致发送给阻塞。
原因2:消息积压。
原因3:内存不够⽤,导致⼤量消息写磁盘效率极速下降。
原因4:频繁GC,导致send()操作等待过久。
原因5:发送的消息的⽹络带宽超过了最⼤⽹络带宽。
原因6:Chanel过于繁忙导致等待时间⽐较⻓,消息过⼤导致写⼊RocketMQ时间过⻓,其他消息等待时间过⻓。 

Client侧的原因:

原因1:设置的timeout时间不合理导致消息还没有发送成功久直接就失败了。
原因2:设置的batch数量过⼤,导致⼀次batch中的数据太多,发送时间过⻓导致超时。 
原因3:设置的发送线程池线程个数太多,导致RocketMQ连接压⼒过⼤。

经过排查发现:Broker的⽇志⼀切正常、没有什么报错信息。内存、CPU、GC情况也⽐较正常,其top信息除了内存其他的都⽐较正常。从列出的GC信息也可看到,从启动到现在,RocketMQ总计11次YGC,0次FGC。从⽹络带宽信息⾥⾯也可以看到,在发送⼤量消息前和发送⼤量消息后,⽹络使⽤量虽然很⼤(最⾼达到了20MB+),但还远未达到100MB的程度,因为⽹卡是100MB,所以还⾮常富裕。所以从RocketMQ服务本身来看,没有什么问题。

 

到⽬前为⽌,这个问题的排查,还是没有头绪,因为从RocketMQ本身来说,服务⾮常健康,那么问题出在哪⾥呢?

 

我们回过头看⽇志,⽇志中有这么⼀句话:Send [3] times, still failed, cost [5037]ms。也就是说,花费了5037ms发送消息,尝试了3次最终还是发送失败,那是不是说,是超时时间的问题?超时时间调得⼤⼀点,会不会就解决了?

org.apache.rocketmq.client.exception.MQClientException: Send [3] times, still failed, cost [5037]ms

于是在代码中加⼊了发送消息,添加⼀个超时时间,设置为10000ms:

sharedSendMsgThreadPool.execute(() -> {defaultProducer.sendMessages(RocketMqConstant.PLATFORM_PROMOTION_SEND_TOPIC, sendBatch, "平台优惠活动消息", 10000);
});

重新运⾏代码,执⾏推送全员优惠活动,惊喜的发现不报错了!但是这种改法有个问题,就是如果对于时效性要求特别⾼的场景很显然是不合适的。因为超时时间10000ms,意味着最慢要10s才能发送成功。加上消费者端识别到消息,拉取消息,最终处理,还是会耗时⽐较久的。那么还有没有其他的⽅式?

 

根据上面列举的可能原因,除了Broker本身外,还有⼀些其他的可能原因,就是batchSize过⼤,或者是Chanel接⼝太忙。根据代码,batchSize显然是不⼤的,我们特意控制在100左右。那是不是有可能是我们线程池 + batch推送,导致端⼝以及⽹络Chanel压⼒过⼤导致超时?如果能再开⼀个端⼝,来建⽴Chanel是不是就能够减轻单端⼝的压⼒?

 

注意,这是在我们服务器带宽、CPU、IO压⼒还⽐较⼩,就出现这个问题。所以接下来就简单了,直接设置⼀个参数,让Producer发送消息的时候能和Broker的其他端⼝建⽴Chanel。实际上还是和前⾯的那个案例相关,这个参数就是VipChannel参数。

 

在Producer端可以这么设置:

@Component
public class DefaultProducer {private final TransactionMQProducer producer;@Autowiredpublic DefaultProducer(RocketMQProperties rocketMQProperties) {producer = new TransactionMQProducer(RocketMqConstant.PUSH_DEFAULT_PRODUCER_GROUP);producer.setCompressMsgBodyOverHowmuch(Integer.Max_VALUE);producer.setNamesrvAddr(rocketMQProperties.getNameServer());producer.setVipChannelEnabled(true);start();}...
}

开启之后,把超时时间去掉,发现程序运⾏完全OK,没有任何报错。

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/880995.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

十二、MyBatis分页插件

十二、MyBatis分页插件@目录十二、分页插件12.1 分页插件使用步骤12.2 分页插件的使用12.3 测试案例本人其他相关文章链接 十二、分页插件 12.1 分页插件使用步骤 1. 添加依赖 <dependency><groupId>com.github.pagehelper</groupId><artifactId>page…

htb Nunchucks walkthrough ssti + shebang绕过apparmor限制

注册发现注册失败扫描子域名 ffuf -u https://nunchucks.htb/ -w /usr/share/dirb/wordlists/common.txt -H "Host: FUZZ.nunchucks.htb" -fs 30589访问看看有啥随便输入个邮箱抓包看看 尝试ssti 注入发现确实存在在hacktrick上搜索payload https://book.hacktricks.…

P1551 亲戚

并查集还是不熟,还得练 #include<iostream> #include<set> #include<map> #include<algorithm> #include<vector> #define int long long const int N = 1e6; using namespace std; char* p1, * p2, buf[100000]; #define nc() (p1==p2 &&a…

推荐一些程序员常逛的开发者社区

前言 在信息技术日新月异的今天,程序员作为推动技术进步的重要力量,始终在探索、学习和交流的道路上不断前行。为了帮助程序员们更好地拓宽视野、提升技能,本文大姚将给大家推荐12个程序员常逛的开发者社区。 GitHub GitHub是一个功能强大、易于使用的代码托管平台。拥有庞大…

为飞牛OS基于FRP的内网穿透开启HTTPS加密

前言 玩NAS的朋友应该有比较多只是在家庭局域网使用,比如日常看看电影、备份手机照片什么的,这属于家庭局域网的使用场景。 当然了,如果你经常出差,或者过年回家不想把NAS也背回去,或者是想上班摸鱼,或者是NAS搭建游戏服务器之类的能公网访问就很有必要了。 公网访问我自…

0208《XEduHub + PySimpleGUI + PySimpleGUIWeb:在行空板上部署模型的全解析》【模型部署】

- 2月8日,晚上,19:30~21:00(主讲老师:邱奕盛)实验内容: 【模型部署】利用统一推理框架实现模型部署。 在训练好的模型基础上,设计简洁的体验界面, 最终尝试在行空板上实现完整效果的呈现,涉及 XEduHub、PySimpleGUI、PySimpleGUIWeb等工具。 import PySimpleGUI as …

DeepSeek 不再卡顿,从此告别服务器繁忙,请稍后再试(建议收藏!)

大家好,我是六哥。 由于DeepSeek真的太火了,也许你也跟我一样,常会遇到这样的情况:真的让人抓狂,10条回复里常常有9条是“服务器blabla,请稍后重试”,看到这话,就问你,谁能不崩溃? 其实仔细想想,DeepSeek的目标是AGI,算力资源更多用在探索模型上,很难兼顾几亿用户…

踩坑---注意芯片复位后引脚初始化功能

踩坑---注意复位后引脚初始化功能 背景 ​ 做综合案例训练的时候,把PB3和PB4当做了普通IO进行了配置,运行过程中发现很奇怪,怎么输出和配置的不一致。debug调试显示的IO输出和万用表量的都不一样。由于添加了其他功能代码,还把每个部分代码都抽出来单独调试,最后发现就是那…

【字符串、栈】单个char字符转为string表示

单个char转string char x = a; string c = string(1, x);string转单个char string s = "abc"; char x = s[0];string转char[](字符数组) string s = "abc"; s.c_str();(字符数组)char[]转string //直接赋值即可 char s1[4] = "abc"; string …

CF2008-Solution

【萌新场!】CF2008解析 今天带来的是 CF2008:Codeforces Round 970(Div 3)的前 8 题解析!非常入门,适合基础较弱的同学食用! Div3 的题目,往往思维含量较低,更多的是套路的教学,适合扩充知识面。 时间匆忙,难免有错漏之处,仅供娱乐! 时间限制均为 2s,内存限制均为…

WHUWC T3 极差划分

WHUWC T3 极差划分 汇集了优秀资源,懒得自己手写了 cf题目

哥斯拉流量JAVA_AES解密-日志分析

solar应急响应-日志流量分析-2 题目:新手运维小王的Geoserver遭到了攻击 小王拿到了当时被入侵时的流量,其中一个IP有访问webshell的流量,已提取部分放在了两个pcapng中了。请帮他解密该流量。 由上一题得到一个后门文件 String code="ZiFsXmEqZ3tBN2I0X1g5ektfMnY4Tl9…