RocketMQ:安装配置单节点RocketMQ、rocketmq-console,使用Java简单生产消费

news/2025/3/11 15:08:10/文章来源:https://www.cnblogs.com/casoli/p/18765091

一、安装配置RocketMQ

1.1 安装配置Java环境

RocketMQ是一个纯Java的开源消息中间件,所以运行依赖Java环境。

配置java环境参考
Java:CentOS 7 离线安装 jdk
Java:CentOS 7 联网安装 jdk(yum方式)

1.2 安装配置RocketMQ

1.2.1 下载与解压

  1. 在/home/data/下创建rocketmq目录并进入该目录
    mkdir -p /home/data/rocketmq && cd /home/data/rocketmq

  2. 在新建的目录下下载压缩包
    wget https://mirrors.bfsu.edu.cn/apache/rocketmq/4.8.0/rocketmq-all-4.8.0-bin-release.zip
    如果找不到包,就去https://mirrors.bfsu.edu.cn/apache/rocketmq 自己看下吧,更新会删除历史的版本

  3. 解压
    unzip rocketmq-all-4.8.0-bin-release.zip

1.2.2 创建软连接与存储、日志目录

  1. 在/usr/local下创建rocketmq软链接
    cd /usr/local && ln -s /home/data/rocketmq/rocketmq-all-4.8.0-bin-release rocketmq

  2. 创建存储的路径和日志路径
    mkdir -p /usr/local/rocketmq/{store/{commitlog,consumequeue,index},logs}

1.2.3 修改配置文件与脚本配置

  1. 更改rocketmq配置文件(主启动文件,主要修改namesrvAddr和brokerIP1)
    vim /usr/local/rocketmq/conf/2m-2s-async/broker-a.properties

    修改内容:

    #所属集群名字
    brokerClusterName=rocketmq-cluster
    #broker 名字,注意此处不同的配置文件填写的不一样
    brokerName=broker-a
    #0 表示 Master, >0 表示 Slave
    brokerId=0
    #nameServer 地址,分号分割
    namesrvAddr=124.70.143.9:9876
    # 设置对外ip,这样程序才能访问此服务器下的mq
    brokerIP1=124.70.143.9
    #在发送消息时,自动创建服务器不存在的 topic,默认创建的队列数
    defaultTopicQueueNums=4
    #是否允许 Broker 自动创建 Topic,建议线下开启,线上关闭
    autoCreateTopicEnable=true
    #是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
    autoCreateSubscriptionGroup=true
    #Broker 对外服务的监听端口
    listenPort=10911
    #删除文件时间点,默认凌晨 4 点
    deleteWhen=04
    #文件保留时间,默认 48 小时
    fileReservedTime=120
    #commitLog 每个文件的大小默认 1G
    mapedFileSizeCommitLog=1073741824
    #ConsumeQueue 每个文件默认存 30W 条,根据业务情况调整
    mapedFileSizeConsumeQueue=300000
    #destroyMapedFileIntervalForcibly=120000
    #redeleteHangedFileInterval=120000
    #检测物理文件磁盘空间
    diskMaxUsedSpaceRatio=88
    #存储路径
    storePathRootDir=/usr/local/rocketmq/store
    #commitLog 存储路径
    storePathCommitLog=/usr/local/rocketmq/store/commitlog
    #消费队列存储路径存储路径
    storePathConsumeQueue=/usr/local/rocketmq/store/consumequeue
    #消息索引存储路径
    storePathIndex=/usr/local/rocketmq/store/index
    #checkpoint 文件存储路径
    storeCheckpoint=/usr/local/rocketmq/store/checkpoint
    #abort 文件存储路径
    abortFile=/usr/local/rocketmq/store/abort
    #限制的消息大小
    maxMessageSize=65536
    #flushCommitLogLeastPages=4
    #flushConsumeQueueLeastPages=2
    #flushCommitLogThoroughInterval=10000
    #flushConsumeQueueThoroughInterval=60000
    #Broker 的角色
    #- ASYNC_MASTER 异步复制 Master
    #- SYNC_MASTER 同步双写 Master
    #- SLAVE
    brokerRole=ASYNC_MASTER
    #刷盘方式
    #- ASYNC_FLUSH 异步刷盘
    #- SYNC_FLUSH 同步刷盘
    flushDiskType=ASYNC_FLUSH
    #checkTransactionMessageEnable=false
    #发消息线程池数量
    #sendMessageThreadPoolNums=128
    #拉消息线程池数量
    #pullMessageThreadPoolNums=128
    
  2. 修改日志配置文件

    cd /usr/local/rocketmq/conf && sed -i 's#${user.home}#/usr/local/rocketmq#g' \*.xml

  3. 修改启动脚本参数

    vim /usr/local/rocketmq/bin/runbroker.sh

    vim /usr/local/rocketmq/bin/runserver.sh

    # Xms Xmx Xmn都改成1g,我的云服务器内存2g,所以我改成了512m,不然服务器内存占用太满,512m只能做测试用,实际这些内存也是不够用的
    JAVA_OPT="${JAVA_OPT} -server -Xms512m -Xmx512m -Xmn128m
    # JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn1g ..."
    

    参数解析:
    -Xms JVM初始分配的内存
    -Xmx JVM最大分配的内存,若程序使用内存达到该值,会报错OutOfMemory
    -Xmn 设置年轻代大小

1.2.4 启动MQ、查看结果

  1. 启动mq(先启动NameServer,后启动BrokerServer)

    启动NameServer
    nohup sh /usr/local/rocketmq/bin/mqnamesrv >/dev/null 2>&1 &

    启动BorkerServer
    nohup sh /usr/local/rocketmq/bin/mqbroker -c /usr/local/rocketmq/conf/2m-2s-async/broker-a.properties -n localhost:9876 autoCreateTopicEnable=true >/dev/null 2>&1 &

    参数解析
    -c 指定配置文件
    -n 指定
    autoCreateTopicEnable=true 用户(非broker)能够创建topic

  2. 查看结果(有没有 NamesStartup和BrokerStartup)

    jps

  3. 验证是否成功注册

    sh /usr/local/rocketmq/bin/mqadmin clusterList -n localhost:9876

    出现下方字符,则注册成功;查看Addr列对应的ip,是否为公网ip,若为内网ip,后续java程序无法访问

    Cluster Name #Broker Name #BID #Addr #Version #InTPS(LOAD) #OutTPS(LOAD) #PCWait(ms) #Hour #SPACE

    rocketmq-cluster broker-a 0 124.70.143.9:10911 V4_8_0 0.00(0,0ms) 0.00(0,0ms) 0 449190.63 0.0037

  4. 关闭mq(先关broker,然后names)

    sh /usr/local/rocketmq/bin/mqshutdown broker
    sh /usr/local/rocketmq/bin/mqshutdown namesrv

二、安装配置rocketmq-console

PS:可以本地编译成jar包放上去的,不用在服务器上进行打包,(╬ ̄皿 ̄)=○

2.1 安装配置maven

rocketmq-console 需要 mvn进行 打包。

  1. 创建maven目录

    mkdir -p /home/data/maven && cd /home/data/maven

  2. 下载maven

    wget http://archive.apache.org/dist/maven/maven-3/3.6.0/binaries/apache-maven-3.6.0-bin.zip

  3. 解压

    unzip apache-maven-3.6.0-bin.zip

  4. 创建软链接

    ln -s /home/data/maven/apache-maven-3.6.0 /usr/local/maven

  5. 配置环境变量

    vim /etc/profile

    # set maven enviroment
    export MAVEN_HOME=/usr/local/maven
    # 注意这个PATH JAVA已经配置了,所以只需要在PATH后边加上 :$MAVEN_HOME/bin即可,而且注意MAVEN_HOME要在PATH引用之前
    export PATH=$MAVEN_HOME/bin:$PATH 
    
  6. 更新文件并执行(使配置生效)

    source /etc/profile

  7. 查看结果

    mvn -v

2.2 安装配置rocketmq-console

  1. 下载rocketmq-console

    地址:https://gitee.com/ralph81/rocketmq-console/repository/archive/master.zip

  2. 创建目录上传压缩包

    mkdir -p /home/data/rocketmq-console && cd /home/data/rocketmq-console && rz

  3. 解压

    unzip ralph81-rocketmq-console-master.zip

  4. 修改配置文件

    cd rocketmq-console && vim ./src/main/resources/application.properties

    
    # 注释这个,不然会报错-端口占用
    # server.address=124.70.143.9
    # 修改访问上下文
    server.contextPath=/rocketmq
    # 修改访问端口号,默认8080 这个端口太常见了
    server.port=8099# name server地址
    rocketmq.config.namesrvAddr=124.70.143.9:9876
    

    保存并退出

  5. mvn命令打包

    mvn clean package -Dmaven.test.skip=true

  6. 运行java包

    java -jar target/rocketmq-console-ng-2.0.0.jar

  7. 验证

    浏览器登录http://124.70.143.9:8099/rocketmq

    PS:账号密码在src/main/resources/users.properties中配置,管理员是admin/admin。

三、Java代码生产消费RocketMQ

使用java代码连接RocketMQ,发布消费主题内容;
我使用的SpringBoot,所以通过实现ApplicationRunner,使程序启动时运行方法

pom.xml

<!-- rocketmq 因为我的服务器部署的是4.8.0的RocketMQ-client -->
<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>4.8.0</version>
</dependency>

RocketMQProducer.java

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;/*** @author shuli* @create 2021-03-30 13:03* @description 生产者生产消息,topic是TopicTest,tags是TagA,key没设置,message是Hello RocketMQ i**/@Component
@Order(1)
public class RocketMQProducer implements ApplicationRunner {@Overridepublic void run(ApplicationArguments args) throws Exception {try{DefaultMQProducer producer = new DefaultMQProducer("groupProducer");producer.setNamesrvAddr("124.70.143.9:9876");//MQ服务器地址producer.setVipChannelEnabled(false);producer.start();for (int i = 0; i < 2; i++) {Message msg = new Message("TopicTest", "TagA" , ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));SendResult sendResult = producer.send(msg);System.out.printf("%s%n", sendResult);System.out.println("--");}// 关闭生产者producer.shutdown();}catch (Exception e){e.printStackTrace();}}
}

RocketMQConsumer.java

package com.caosli.alltest.connectRocketMQ;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;import java.util.List;/*** @author shuli* @create 2021-03-31 13:13* @description 消费者消费数据**/@Component
@Order(2)
public class RokcetMQCunsumer implements ApplicationRunner {@Overridepublic void run(ApplicationArguments args) throws Exception {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("FirstConsumerGroup");consumer.setNamesrvAddr("124.70.143.9:9876");consumer.setInstanceName("Consumber");// 订阅,可以订阅多个主题,多写几个// 订阅多个标签TagA、TagB、TagC:consumer.subscribe("TopicTest","TagA || TagB || TagC");// 订阅所有标签:consumer.subscribe("TopicTest","*");consumer.subscribe("TopicTest","TagA");consumer.registerMessageListener((List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext)->{System.out.println(Thread.currentThread().getName()+ " Receive New Messages: " + list.size());MessageExt msg = list.get(0);if("TopicTest".equals(msg.getTopic())){if("TagA".equals(msg.getTags())){System.out.println(new String(msg.getBody()));}}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;});// 启动consumerconsumer.start();// 暂时不关闭,消费者可以持续消费数据;后续可以通过rocketmq-console发布消息,测试消费者接收
//        consumer.shutdown();}
}

四、过程中出现的错误总结

4.1 报错:内存不足

Java HotSpot(TM) 64-Bit Server VM warning: INFO: os::commit_memory(0x00000000c0000000, 268435456, 0) failed; error='Cannot allocate memory' (errno=12)

原因在nameserver和brokerserver中设置的堆内存参数过大,改为512m

4.2 报错:No route info of this topic

org.apache.rocketmq.client.exception.MQClientException: No route info of this topic

原因pom.xml的rocketmq-client(4.3.0)和服务器上(4.8.0)版本不一致,修改pom.xml为 4.8.0

4.3 报错:sendDefaultImpl call timeout

org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException: sendDefaultImpl call timeout

解决 修改启动命令中的ip为localhost,配置文件中的地址为公网地址。原因不清

第二天又出现这问题了

诊断结果是内存分配太少(以前设置的太少了,512还行),导致OOM,发布消息出现问题。解决办法:关闭rocketmq,然后再启动。

4.4 报错:端口占用

Identify and stop the process that's listening on port 8099 or configure this application to listen on another port.

注释掉 application.properties 中的 server.address

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/897279.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

高等数学笔记

唉...本蒟蒻也是要考研了, 目前目标是深圳大学, 想研究的方向偏算法多一点, 深度学习强化学习什么的, 我会尽最大努力了 9 做到一个新的问题,想起与过去某个问题类似。发现在解答中,对此类问题,以及工具和方法的理解是存在缺陷的,或者发现理解不够深刻。于是通过解决新的…

Scatter(A Distance-Guided Fuzzing For Heap-layout)

SCATTER Abstract 利用堆利用的方法为将受害者的chunk放在可以溢出的chunk之后。SCATTER使能够以无原始的方式以普通purpose程序中的堆溢出产生可剥削的堆布局。它先使用静态分析和动态检测来计算潜在的堆利用布局,然后设计由新操纵距离为指导的fuzz,该距离衡量了在堆布局空间…

使用 Pixi.js 插件实现探险者小游戏(二)

使用 Pixi.js 插件实现探险者小游戏(一)中我们学习了如何创建精灵图,这节我们要让精灵图动起来。 精灵图布局 游戏画面如下图所示,我们要生成一个围墙,探险者、恶魔、宝物都在这个围墙里面。探险者可以上下左右移动,恶魔只能上下移动,宝物是不动的。探险者与宝物被恶魔群…

Docker:CentOS 7 离线安装 docker-ce

0. 检查卸载已有docker 查看是否安装 docker yum list installed | grep docker 卸载docker yum remove docker docker-common container-selinux docker-selinux docker-engineyum remove -y docker-* 1. 下载安装包 要下载docker-18.06.x-ce版本,否则有些不支持 k8s。。请看…

dp泄露攻击

题目: from Crypto.Util.number import *flag = bNSSCTF{******} + b1*100p = getPrime(512) q = getPrime(512)n = p*q e = 65537 d = inverse(e, (p-1)*(q-1))dp = d % (p-1)m = bytes_to_long(flag)c = pow(m, e, n)print(fn = {n}) print(fc = {c}) print(fdp = {dp}) n = …

dpdq泄露攻击-没e_

题目: from Crypto.Util.number import * from gmpy2 import * from secret import flagp = getPrime(1024) q = getPrime(1024) d = inverse(65537,(p-1)*(q-1)) dp = d%(p-1) dq = d%(q-1) print(fc={pow(bytes_to_long(flag),e,p*q)}) print(fp={p}) print(fq={q}) print(fd…

Linux安装Ollama服务

背景 Ollama官方提供了一键式安装脚本,但因国内网络问题,效率太低,所以探索更为快捷方式。 我的系统信息如下 root@yan:/mnt/d/data# lsb_release -a No LSB modules are available. Distributor ID: Ubuntu Description: Ubuntu 22.04.5 LTS Release: 22.04 Code…

C学习笔记-311

多维数组和指针 为什么需要数组为了解决大量同类型数据的存储和使用问题。 用数组可以模拟现实世界。Int a[25]:一维数组,可以当做一个线性结构。 Int a[8][6]:可以当做一个平面,意思是8行6列。有48个元素。 Int a[3][4][5]:可以当做一个三维立体。 Int a[3][4][5][6]:可…

e与(p-1)或(q-1)均不互素

题目: from Crypto.Util.number import bytes_to_long from secret import flage = 0x14 p = 7330895897249035860738209657929637460767893905398244379628076799548083100726568174238286139385106848645676643457511649442694896479642275193079806880680590593771233914993…

e与(q-1)互素,但用上题方法求不出

题目: c = 2485360255306619684345131431867350432205477625621366642887752720125176463993839766742234027524 n = 0x2CAA9C09DC1061E507E5B7F39DDE3455FCFE127A2C69B621C83FD9D3D3EAA3AAC42147CD7188C53 e = 3解题思路:分解n得到分析得到e只与(r-1)互素,但用上题方法无法解出…

张高兴的大模型开发实战:(一)使用 Selenium 进行网页爬虫

目录什么是 Selenium环境搭建与配置安装 Selenium下载浏览器驱动基础操作启动浏览器并访问网页定位网页元素通过 ID 定位通过 CSS 选择器定位通过 XPath 定位与元素交互提取数据交互操作设置等待时间切换页面执行 JavaScript 代码关闭浏览器进阶技巧使用 ActionChains 模拟用户…