RocketMQ5-03RocketMQ-Dashboard和Java客户端访问示例

接上篇02快速部署RocketMQ5.x(手动和容器部署)
已经完成 RocketMQ5.0 环境的部署,就需要对这个环境进行测试,查看集群、写入消息、读取消息等

本篇教你如何使用和查看部署的服务:

  • Docker部署 Dashboard
    • 获取镜像并下载
    • 部署服务
  • 客户端连接
    • pom文件
    • 生产者代码
    • 消费者代码
    • 接口测试
    • 问题: broker资源不足无法提供服务

Docker部署 Dashboard

以上通过可执行文件部署或者容器部署的形式,都需要有一个可以查看的集群的地方,对于官方自己配备的有 rocketmq-dashboard, 可以使用docker快速部署,便于测试

获取镜像并下载

docker search rocketmq-dashboard & docker pull apacherocketmq/rocketmq-dashboard

部署服务

docker run -d --name rmqdashboard -e "JAVA_OPTS=-Xmx256M -Xms256M -Xmn128M -Drocketmq.namesrv.addr=192.168.2.92:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false" -p 8088:8080 apacherocketmq/rocketmq-dashboard

这边将端口映射到了8088,所以访问 localhost:8088,就可以查看到集群,如果有数据正在写入与读取,就能够大概看到数据量
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

客户端连接

手动创建 topic: sh bin/mqadmin updatetopic -n 192.168.2.92:9876 -t dataTopic2 -c DefaultCluster

pom文件

 <properties><java.version>17</java.version><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding><spring-boot.version>3.0.2</spring-boot.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client-java</artifactId><version>5.0.5</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency></dependencies><dependencyManagement><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-dependencies</artifactId><version>${spring-boot.version}</version><type>pom</type><scope>import</scope></dependency></dependencies></dependencyManagement><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.8.1</version><configuration><source>17</source><target>17</target><encoding>UTF-8</encoding></configuration></plugin><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId><version>${spring-boot.version}</version><configuration><mainClass>com.learning.springbootrmq5.SpringbootRmq5Application</mainClass><skip>true</skip></configuration><executions><execution><id>repackage</id><goals><goal>repackage</goal></goals></execution></executions></plugin></plugins></build>

生产者代码

 @GetMapping("/sendSync")public String sendSync() throws ClientException, IOException {String endpoint = "192.168.2.92:8081";String topic = "dataTopic2";ClientServiceProvider provider = ClientServiceProvider.loadService();ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoint);ClientConfiguration configuration = builder.enableSsl(true).build();Producer producer = provider.newProducerBuilder().setTopics(topic).setClientConfiguration(configuration).build();Message message = provider.newMessageBuilder().setTopic(topic).setKeys("messageKey").setTag("messageTag").setBody("messageBodySync".getBytes()).build();try {SendReceipt sendReceipt = producer.send(message);log.info("Send sync message successfully, messageId={}", sendReceipt.getMessageId());} catch (ClientException e) {log.error("Failed to send message", e);}producer.close();return "success";}@GetMapping("/sendAsync")public String sendAsync() throws ClientException, InterruptedException, IOException {String endpoint = "192.168.2.92:8081";String topic = "dataTopic2";ClientServiceProvider provider = ClientServiceProvider.loadService();ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().enableSsl(true).setEndpoints(endpoint);ClientConfiguration configuration = builder.build();Producer producer = provider.newProducerBuilder().setTopics(topic).setClientConfiguration(configuration).build();Message message = provider.newMessageBuilder().setTopic(topic).setKeys("messageKey").setTag("messageTag").setBody("messageBodyASync".getBytes()).build();producer.sendAsync(message);log.info("Send async message successfully, messageId");return "success";}

消费者代码

@Slf4j
@Component
public class MessageConsumerRunner implements CommandLineRunner {@Overridepublic void run(final String... args) throws Exception {final ClientServiceProvider provider = ClientServiceProvider.loadService();String endpoints = "192.168.2.92:8081";ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder().setEndpoints(endpoints).build();String tag = "*";FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);String consumerGroup = "YourConsumerGroup";String topic = "dataTopic2";PushConsumer pushConsumer = provider.newPushConsumerBuilder().setClientConfiguration(clientConfiguration).setConsumerGroup(consumerGroup).setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression)).setMessageListener(messageView -> {log.info("Consume message successfully, messageId={}", messageView.getMessageId());return ConsumeResult.SUCCESS;}).build();Thread.sleep(Long.MAX_VALUE);}
}

接口测试

请求接口 /msg/sendAsync
在这里插入图片描述

能够正常收发

问题: broker资源不足无法提供服务

可能出现的客户端报错为:

org.apache.rocketmq.client.java.exception.InternalErrorException: [request-id=e3f9dxxxx1aa872, response-code=50001] org.apache.rocketmq.proxy.common.ProxyException: service not available now. It may be caused by one of the following reasons: the broker's disk is full [CL:  0.96 CQ:  0.96 INDEX: -1.00], messages are put to the slave, message store has been shut down, etc.java.util.concurrent.RejectedExecutionException: Task org.apache.rocketmq.shaded.io.grpc.internal.DelayedStream$4@72ba34c2 rejected from java.util.concurrent.ThreadPoolExecutor@7deb0119[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 13]at java.base/java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2065) ~[na:na]at java.base/java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:833) ~[na:na]at java.base/java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1365) ~[na:na]

以上大体就是描述资源不足无法进行接入、服务不可达等,通常就是因为环境的资源不足,可能是内存、可能是硬盘

.../broker/logs/rocketmqlogs/store.log 中可以看出端倪,是磁盘存储不够了

2024-01-08 13:34:24 ERROR StoreScheduledThread1 - physic disk maybe full soon 0.95, so mark disk full, storePathPhysic=/home/rocketmq/store/commitlog

可以通过清除以下数据暂时缓解 .../broker/store/commitlog,可以发现没怎么用也有好多G。不过确实需要使用的话尽早考虑扩容啊

扩大存储增加可用磁盘空间,就能够正常使用连接了

如果这篇文章对你有用的话,帮忙留个关注吧~

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/334581.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

python画房子

前言 今天&#xff0c;我们来用Python画房子。 一、第一种 第一种比较简单。 代码&#xff1a; import turtle as t import timedef go(x, y):t.penup()t.goto(x, y)t.pendown() def rangle(h,w):t.left(180)t.forward(h)t.right(90)t.forward(w)t.left(-90)t.forward(h) de…

Stable Diffusion 系列教程 - 6 Dreambooth及训练

Stable-Diffusion、Imagen等文生图大模型已经具备了强大的生成能力&#xff0c;假设我们的Prompt为 [Cyberpunk Style]&#xff0c;SD或许能很快画出赛博朋克风格的一幅画。但你作为一个不知名的人&#xff0c;不能奢求SD在训练的时候把你自己想要的风格也加进去吧&#xff1f;…

鸿蒙应用开发学习路线(OpenHarmony/HarmonyOS)

鸿蒙应用开发学习路线&#xff08;OpenHarmony/HarmonyOS&#xff09; HarmonyOS应用开发学习路线网站汇总社区汇总视频学习路线 OpenHarmony应用开发学习路线与资料网站汇总社区汇总学习路线 MarkDown工具推荐 HarmonyOS应用开发学习路线 作者&#xff1a;坚果 团队&#xff1…

Verilog 高级教程笔记——持续更新中

Verilog advanced tutorial 转换函数 调用系统任务任务描述int_val $rtoi( real_val ) ;实数 real_val 转换为整数 int_val 例如 3.14 -> 3real_val $itor( int_val ) ;整数 int_vla 转换为实数 real_val 例如 3 -> 3.0vec_val $realtobits( real_val ) ;实数转换为…

企业级快速开发平台可以用在什么行业?优点多吗?

应用专业的企业级快速开发平台可以带来什么效果&#xff1f;目前&#xff0c;低代码技术平台在很多领域都获得了广泛应用和推广&#xff0c;在实现高效率办公、流程化办公和数字化发展中扮演了非常重要的角色&#xff0c;具有举足轻重的作用。针对这个话题&#xff0c;现在将给…

react输入框检索树形(tree)结构

input搜索框搜索树形子级内容1. input框输入搜索内容2. 获取tree结构数据3. 与tree匹配输入的内容&#xff0c;tree是多维数组&#xff0c;一级一级的对比输入的内容是否匹配&#xff0c;用forEach循环遍历数据&#xff0c;匹配不到在往下找&#xff0c;直到找到为null &#x…

Golang中for和for range语句的使用技巧、对比及常见的避坑

前言 基础语法不再赘述&#xff0c;写这个原因是之前的某次面试被问道了&#xff0c;我知道会导致问题但具体答下来不是很通顺。再回想自己开发过程中&#xff0c;很多地方都是使用到了for/for range&#xff0c;但是却从没注意过一些细节&#xff0c;因此专门学习一下进行记录…

解决MySQL8.0本地服务器连接不上的问题

MySQL在同一个内网内&#xff0c;但是他人链接你的MySQL时候提示&#xff1a; Host xxx is not allowed to connect to this MySQL server 这通常是MySQL限制了用户允许访问的IP导致的&#xff0c;我们可以按照下面的步骤来接触这个限制。 办法一 进入mysql的bin目录&#xff…

Linux的网络设置

一.查看网络配置 1.查看网络接口信息 - ifconfig ① 直接使用 ifconfig 命令 默认显示活动的网卡 解析&#xff1a; ② ifconfig 具体网卡名称 只显示具体的网卡的信息 ③ ifconfig -a 显示所有的网卡 ④ ifconfig 网卡名称 down 关闭网卡 ifdown 关闭网卡 …

在黑马程序员大学的2023年终总结

起笔 时间真快&#xff0c;转眼又是年末。是时候给2023做个年终总结了&#xff0c;为这一年的学习、生活以及成长画上一个圆满的句号。 这一年相比去年经历了很多事情&#xff0c;接下来我会一一说起 全文大概4000字&#xff0c;可能会占用你15分钟左右的时间 经历 先来给大…

前端nginx配置指南

前端项目发布后&#xff0c;有些接口需要在服务器配置反向代理&#xff0c;资源配置gzip压缩&#xff0c;配置跨域允许访问等 配置文件模块概览 配置示例 反向代理 反向代理是Nginx的核心功能之一&#xff0c;是指客户端发送请求到代理服务器&#xff0c;代理服务器再将请求…

Unity 踩坑记录 AnyState 切换动画执行两次

AnySate 切换动画 Can Transition To Self 将这个勾选去掉&#xff01;&#xff01;&#xff01;