一、安装配置RocketMQ
1.1 安装配置Java环境
RocketMQ是一个纯Java的开源消息中间件,所以运行依赖Java环境。
配置java环境参考
Java:CentOS 7 离线安装 jdk
Java:CentOS 7 联网安装 jdk(yum方式)
1.2 安装配置RocketMQ
1.2.1 下载与解压
-
在/home/data/下创建rocketmq目录并进入该目录
mkdir -p /home/data/rocketmq && cd /home/data/rocketmq
-
在新建的目录下下载压缩包
wget https://mirrors.bfsu.edu.cn/apache/rocketmq/4.8.0/rocketmq-all-4.8.0-bin-release.zip
如果找不到包,就去https://mirrors.bfsu.edu.cn/apache/rocketmq 自己看下吧,更新会删除历史的版本 -
解压
unzip rocketmq-all-4.8.0-bin-release.zip
1.2.2 创建软连接与存储、日志目录
-
在/usr/local下创建rocketmq软链接
cd /usr/local && ln -s /home/data/rocketmq/rocketmq-all-4.8.0-bin-release rocketmq
-
创建存储的路径和日志路径
mkdir -p /usr/local/rocketmq/{store/{commitlog,consumequeue,index},logs}
1.2.3 修改配置文件与脚本配置
-
更改rocketmq配置文件(主启动文件,主要修改namesrvAddr和brokerIP1)
vim /usr/local/rocketmq/conf/2m-2s-async/broker-a.properties
修改内容:
#所属集群名字 brokerClusterName=rocketmq-cluster #broker 名字,注意此处不同的配置文件填写的不一样 brokerName=broker-a #0 表示 Master, >0 表示 Slave brokerId=0 #nameServer 地址,分号分割 namesrvAddr=124.70.143.9:9876 # 设置对外ip,这样程序才能访问此服务器下的mq brokerIP1=124.70.143.9 #在发送消息时,自动创建服务器不存在的 topic,默认创建的队列数 defaultTopicQueueNums=4 #是否允许 Broker 自动创建 Topic,建议线下开启,线上关闭 autoCreateTopicEnable=true #是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭 autoCreateSubscriptionGroup=true #Broker 对外服务的监听端口 listenPort=10911 #删除文件时间点,默认凌晨 4 点 deleteWhen=04 #文件保留时间,默认 48 小时 fileReservedTime=120 #commitLog 每个文件的大小默认 1G mapedFileSizeCommitLog=1073741824 #ConsumeQueue 每个文件默认存 30W 条,根据业务情况调整 mapedFileSizeConsumeQueue=300000 #destroyMapedFileIntervalForcibly=120000 #redeleteHangedFileInterval=120000 #检测物理文件磁盘空间 diskMaxUsedSpaceRatio=88 #存储路径 storePathRootDir=/usr/local/rocketmq/store #commitLog 存储路径 storePathCommitLog=/usr/local/rocketmq/store/commitlog #消费队列存储路径存储路径 storePathConsumeQueue=/usr/local/rocketmq/store/consumequeue #消息索引存储路径 storePathIndex=/usr/local/rocketmq/store/index #checkpoint 文件存储路径 storeCheckpoint=/usr/local/rocketmq/store/checkpoint #abort 文件存储路径 abortFile=/usr/local/rocketmq/store/abort #限制的消息大小 maxMessageSize=65536 #flushCommitLogLeastPages=4 #flushConsumeQueueLeastPages=2 #flushCommitLogThoroughInterval=10000 #flushConsumeQueueThoroughInterval=60000 #Broker 的角色 #- ASYNC_MASTER 异步复制 Master #- SYNC_MASTER 同步双写 Master #- SLAVE brokerRole=ASYNC_MASTER #刷盘方式 #- ASYNC_FLUSH 异步刷盘 #- SYNC_FLUSH 同步刷盘 flushDiskType=ASYNC_FLUSH #checkTransactionMessageEnable=false #发消息线程池数量 #sendMessageThreadPoolNums=128 #拉消息线程池数量 #pullMessageThreadPoolNums=128
-
修改日志配置文件
cd /usr/local/rocketmq/conf && sed -i 's#${user.home}#/usr/local/rocketmq#g' \*.xml
-
修改启动脚本参数
vim /usr/local/rocketmq/bin/runbroker.sh
vim /usr/local/rocketmq/bin/runserver.sh
# Xms Xmx Xmn都改成1g,我的云服务器内存2g,所以我改成了512m,不然服务器内存占用太满,512m只能做测试用,实际这些内存也是不够用的 JAVA_OPT="${JAVA_OPT} -server -Xms512m -Xmx512m -Xmn128m # JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn1g ..."
参数解析:
-Xms JVM初始分配的内存
-Xmx JVM最大分配的内存,若程序使用内存达到该值,会报错OutOfMemory
-Xmn 设置年轻代大小
1.2.4 启动MQ、查看结果
-
启动mq(先启动NameServer,后启动BrokerServer)
启动NameServer
nohup sh /usr/local/rocketmq/bin/mqnamesrv >/dev/null 2>&1 &
启动BorkerServer
nohup sh /usr/local/rocketmq/bin/mqbroker -c /usr/local/rocketmq/conf/2m-2s-async/broker-a.properties -n localhost:9876 autoCreateTopicEnable=true >/dev/null 2>&1 &
参数解析
-c 指定配置文件
-n 指定
autoCreateTopicEnable=true 用户(非broker)能够创建topic -
查看结果(有没有 NamesStartup和BrokerStartup)
jps
-
验证是否成功注册
sh /usr/local/rocketmq/bin/mqadmin clusterList -n localhost:9876
出现下方字符,则注册成功;查看Addr列对应的ip,是否为公网ip,若为内网ip,后续java程序无法访问
Cluster Name #Broker Name #BID #Addr #Version #InTPS(LOAD) #OutTPS(LOAD) #PCWait(ms) #Hour #SPACE
rocketmq-cluster broker-a 0 124.70.143.9:10911 V4_8_0 0.00(0,0ms) 0.00(0,0ms) 0 449190.63 0.0037
-
关闭mq(先关broker,然后names)
sh /usr/local/rocketmq/bin/mqshutdown broker
sh /usr/local/rocketmq/bin/mqshutdown namesrv
二、安装配置rocketmq-console
PS:可以本地编译成jar包放上去的,不用在服务器上进行打包,(╬ ̄皿 ̄)=○
2.1 安装配置maven
rocketmq-console 需要 mvn进行 打包。
-
创建maven目录
mkdir -p /home/data/maven && cd /home/data/maven
-
下载maven
wget http://archive.apache.org/dist/maven/maven-3/3.6.0/binaries/apache-maven-3.6.0-bin.zip
-
解压
unzip apache-maven-3.6.0-bin.zip
-
创建软链接
ln -s /home/data/maven/apache-maven-3.6.0 /usr/local/maven
-
配置环境变量
vim /etc/profile
# set maven enviroment export MAVEN_HOME=/usr/local/maven # 注意这个PATH JAVA已经配置了,所以只需要在PATH后边加上 :$MAVEN_HOME/bin即可,而且注意MAVEN_HOME要在PATH引用之前 export PATH=$MAVEN_HOME/bin:$PATH
-
更新文件并执行(使配置生效)
source /etc/profile
-
查看结果
mvn -v
2.2 安装配置rocketmq-console
-
下载rocketmq-console
地址:https://gitee.com/ralph81/rocketmq-console/repository/archive/master.zip
-
创建目录上传压缩包
mkdir -p /home/data/rocketmq-console && cd /home/data/rocketmq-console && rz
-
解压
unzip ralph81-rocketmq-console-master.zip
-
修改配置文件
cd rocketmq-console && vim ./src/main/resources/application.properties
# 注释这个,不然会报错-端口占用 # server.address=124.70.143.9 # 修改访问上下文 server.contextPath=/rocketmq # 修改访问端口号,默认8080 这个端口太常见了 server.port=8099# name server地址 rocketmq.config.namesrvAddr=124.70.143.9:9876
保存并退出
-
mvn命令打包
mvn clean package -Dmaven.test.skip=true
-
运行java包
java -jar target/rocketmq-console-ng-2.0.0.jar
-
验证
浏览器登录http://124.70.143.9:8099/rocketmq
PS:账号密码在src/main/resources/users.properties中配置,管理员是admin/admin。
三、Java代码生产消费RocketMQ
使用java代码连接RocketMQ,发布消费主题内容;
我使用的SpringBoot,所以通过实现ApplicationRunner,使程序启动时运行方法
pom.xml
<!-- rocketmq 因为我的服务器部署的是4.8.0的RocketMQ-client -->
<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>4.8.0</version>
</dependency>
RocketMQProducer.java
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;/*** @author shuli* @create 2021-03-30 13:03* @description 生产者生产消息,topic是TopicTest,tags是TagA,key没设置,message是Hello RocketMQ i**/@Component
@Order(1)
public class RocketMQProducer implements ApplicationRunner {@Overridepublic void run(ApplicationArguments args) throws Exception {try{DefaultMQProducer producer = new DefaultMQProducer("groupProducer");producer.setNamesrvAddr("124.70.143.9:9876");//MQ服务器地址producer.setVipChannelEnabled(false);producer.start();for (int i = 0; i < 2; i++) {Message msg = new Message("TopicTest", "TagA" , ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));SendResult sendResult = producer.send(msg);System.out.printf("%s%n", sendResult);System.out.println("--");}// 关闭生产者producer.shutdown();}catch (Exception e){e.printStackTrace();}}
}
RocketMQConsumer.java
package com.caosli.alltest.connectRocketMQ;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;import java.util.List;/*** @author shuli* @create 2021-03-31 13:13* @description 消费者消费数据**/@Component
@Order(2)
public class RokcetMQCunsumer implements ApplicationRunner {@Overridepublic void run(ApplicationArguments args) throws Exception {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("FirstConsumerGroup");consumer.setNamesrvAddr("124.70.143.9:9876");consumer.setInstanceName("Consumber");// 订阅,可以订阅多个主题,多写几个// 订阅多个标签TagA、TagB、TagC:consumer.subscribe("TopicTest","TagA || TagB || TagC");// 订阅所有标签:consumer.subscribe("TopicTest","*");consumer.subscribe("TopicTest","TagA");consumer.registerMessageListener((List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext)->{System.out.println(Thread.currentThread().getName()+ " Receive New Messages: " + list.size());MessageExt msg = list.get(0);if("TopicTest".equals(msg.getTopic())){if("TagA".equals(msg.getTags())){System.out.println(new String(msg.getBody()));}}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;});// 启动consumerconsumer.start();// 暂时不关闭,消费者可以持续消费数据;后续可以通过rocketmq-console发布消息,测试消费者接收
// consumer.shutdown();}
}
四、过程中出现的错误总结
4.1 报错:内存不足
Java HotSpot(TM) 64-Bit Server VM warning: INFO: os::commit_memory(0x00000000c0000000, 268435456, 0) failed; error='Cannot allocate memory' (errno=12)
原因在nameserver和brokerserver中设置的堆内存参数过大,改为512m
4.2 报错:No route info of this topic
org.apache.rocketmq.client.exception.MQClientException: No route info of this topic
原因pom.xml的rocketmq-client(4.3.0)和服务器上(4.8.0)版本不一致,修改pom.xml为 4.8.0
4.3 报错:sendDefaultImpl call timeout
org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException: sendDefaultImpl call timeout
解决 修改启动命令中的ip为localhost,配置文件中的地址为公网地址。原因不清
第二天又出现这问题了
诊断结果是内存分配太少(以前设置的太少了,512还行),导致OOM,发布消息出现问题。解决办法:关闭rocketmq,然后再启动。
4.4 报错:端口占用
Identify and stop the process that's listening on port 8099 or configure this application to listen on another port.
注释掉 application.properties 中的 server.address