系列五、Java操作RocketMQ简单消息之同步消息

一、概述

        同步消息的特征是消息发出后会有一个返回值,即RocketMQ服务器收到消息后的一个确认,这种方式非常安全,但是性能上却没有那么高,而且在集群模式下,也是要等到所有的从机都复制了消息以后才会返回,适用于重要的消息传递,例如:短信通知

二、案例代码

2.1、pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.star</groupId><artifactId>rocketmq-example</artifactId><version>1.0-SNAPSHOT</version><packaging>jar</packaging><name>rocketmq-example</name><url>http://maven.apache.org</url><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding></properties><dependencies><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>4.5.0</version></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.13.2</version><scope>test</scope></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>2.0.25</version></dependency><dependency><groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId><version>5.8.16</version></dependency><dependency><groupId>org.apache.commons</groupId><artifactId>commons-collections4</artifactId><version>4.4</version></dependency><dependency><groupId>org.apache.commons</groupId><artifactId>commons-lang3</artifactId><version>3.12.0</version></dependency><!-- 普通maven项目中使用Sl4j注解 --><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.22</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>1.7.32</version></dependency><dependency><groupId>ch.qos.logback</groupId><artifactId>logback-classic</artifactId><version>1.2.10</version></dependency></dependencies><!-- 锁定Java编译的版本 --><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.8.1</version><configuration><source>1.8</source><target>1.8</target></configuration></plugin></plugins></build>
</project>

2.2、RocketMQConstant

package org.star.constants;/*** @Author: 一叶浮萍归大海* @Date: 2023/7/27 16:42* @Description:*/
public class RocketMQConstant {/*** 配置RocketMQ NameSrv的地址*/public static final String NAME_SERVER_ADDR = "192.168.173.219:9876";}

2.3、SimpleConsumer

package org.star.simple.consumer;import cn.hutool.core.util.StrUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.star.constants.RocketMQConstant;import java.util.List;/*** @Author: 一叶浮萍归大海* @Date: 2023/8/25 10:20* @Description: 简单消息消费者*/
@Slf4j
public class SimpleConsumer {public static void main(String[] args) throws Exception {/*** 消费消息分两种* (1)拉模式:消费者主动去Broker上拉消息* (2)推模式:消费者等待Broker把消息推送过来*/DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("SimpleMessageGroup");consumer.setNamesrvAddr(RocketMQConstant.NAME_SERVER_ADDR);consumer.subscribe("SimpleTopic", "*");consumer.setMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {if (CollectionUtils.isNotEmpty(list)) {String body = StrUtil.utf8Str(list.get(0).getBody());log.info("收到消息 body:{}",body);}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.out.println("consumer started!");}}

2.4、SyncProducer

package org.star.simple.producer;import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.star.constants.RocketMQConstant;import java.nio.charset.StandardCharsets;/*** @Author: 一叶浮萍归大海* @Date: 2023/8/25 10:12* @Description: 同步发送:等待消息返回后再继续执行下面的操作*/
@Slf4j
public class SyncProducer {public static void main(String[] args) throws Exception {DefaultMQProducer producer = new DefaultMQProducer("SyncProducerGroup");producer.setNamesrvAddr(RocketMQConstant.NAME_SERVER_ADDR);producer.start();for (int i = 0; i < 3; i++) {Message message = new Message("SimpleTopic", ("我是第[" + i + "]个简单消息").getBytes(StandardCharsets.UTF_8));SendResult sendResult = producer.send(message);log.info("第[" + i + "]个简单消息发送成功 sendStatus:{},msgId:{},topic:{}", sendResult.getSendStatus(),sendResult.getMsgId(),sendResult.getMessageQueue().getTopic());}producer.shutdown();}}

2.5、控制台打印结果

# 生产者端
12:14:32.339 [main] INFO org.star.simple.producer.SyncProducer - 第[0]个简单消息发送成功 sendStatus:SEND_OK,msgId:C0A81FB25D9418B4AAC207C6D9850000,topic:SimpleTopic
12:14:32.343 [main] INFO org.star.simple.producer.SyncProducer - 第[1]个简单消息发送成功 sendStatus:SEND_OK,msgId:C0A81FB25D9418B4AAC207C6D9940001,topic:SimpleTopic
12:14:32.348 [main] INFO org.star.simple.producer.SyncProducer - 第[2]个简单消息发送成功 sendStatus:SEND_OK,msgId:C0A81FB25D9418B4AAC207C6D9970002,topic:SimpleTopic# 消费者端
12:14:32.337 [ConsumeMessageThread_10] INFO org.star.simple.consumer.SimpleConsumer - 收到消息 body:我是第[0]个简单消息
12:14:32.345 [ConsumeMessageThread_11] INFO org.star.simple.consumer.SimpleConsumer - 收到消息 body:我是第[1]个简单消息
12:14:32.349 [ConsumeMessageThread_12] INFO org.star.simple.consumer.SimpleConsumer - 收到消息 body:我是第[2]个简单消息

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/95858.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

数据结构(Java实现)-排序

排序的概念 排序&#xff1a;所谓排序&#xff0c;就是使一串记录&#xff0c;按照其中的某个或某些关键字的大小&#xff0c;递增或递减的排列起来的操稳定性&#xff1a;假定在待排序的记录序列中&#xff0c;存在多个具有相同的关键字的记录&#xff0c;若经过排序&#xff…

【微服务部署】08-监控与告警

文章目录 1. PrometheusOperator1.1 优势1.2 配置脚本1.3 部署脚本 2. Granfana实现监控看板2.1 Granfana核心特性2.2 部署文件 3. prometheus-net收集自定义指标3.1 组件包3.2 使用场景 目前Kubernetes中最流行的监控解决方案是使用Prometheus和AlertManager 1. PrometheusOpe…

(超简单)将图片转换为ASCII字符图像

将一张图片转换为ASCII字符图像 原图&#xff1a; 效果图&#xff1a; import javax.imageio.ImageIO; import java.awt.image.BufferedImage; import java.io.File; import java.io.FileWriter; import java.io.IOException;public class ImageToASCII {/*** 将图片转换为A…

React笔记(三)类组件(1)

一、组件的概念 使用组件方式进行编程&#xff0c;可以提高开发效率&#xff0c;提高组件的复用性、提高代码的可维护性和可扩展性 React定义组件的方式有两种 类组件&#xff1a;React16.8版本之前几乎React使用都是类组件 函数组件:React16.8之后&#xff0c;函数式组件使…

Linux CentOS7 系统中添加用户

在linux centOS7系统中&#xff0c;添加用户是管理员的基本操作。作为学习linux系统的基本操作&#xff0c;对添加用户应该多方面了解。 添加用户的命令useradd&#xff0c;跟上用户名&#xff0c;就可以快速创建一个用户。添加一些选项&#xff0c;可以设置更人性化的用户信息…

dbeaver离线安装clickhouse连接驱动

Clickhouse 数据库连接工具——DBeaver 主要介绍了Clickhouse 数据库连接工具——DBeaver相关的知识&#xff0c;希望对你有一定的参考价值。 Clickhouse 数据库连接工具——DBeaver 1.下载 DBeaver 和 连接驱动 https://dbeaver.io/files/dbeaver-ce-latest-x86_64-setup.…

清理docker镜像方法

首先stop ps -a里的容器&#xff0c;然后rm容器&#xff0c;最后再rmi镜像 先停止容器 rm容器 docker rmi 镜像 删除后可以发现已经不存在

【SpringSecurity】七、SpringSecurity集成thymeleaf

文章目录 1、thymeleaf2、依赖部分3、定义Controller4、创建静态页面5、WebSecurityConfigurerAdapter6、权限相关7、当用户没有某权限时&#xff0c;页面不展示该按钮 1、thymeleaf 查了下读音&#xff0c;leaf/li:f/&#xff0c;叶子&#xff0c;前面的单词发音和时间time一…

CNN 01(CNN简介)

一、卷积神经网络的发展 convolutional neural network 在计算机视觉领域&#xff0c;通常要做的就是指用机器程序替代人眼对目标图像进行识别等。那么神经网络也好还是卷积神经网络其实都是上个世纪就有的算法&#xff0c;只是近些年来电脑的计算能力已非当年的那种计算水平…

JavaScript -【第二周】

文章来源于网上收集和自己原创&#xff0c;若侵害到您的权利&#xff0c;请您及时联系并删除~~~ 理解什么是流程控制&#xff0c;知道条件控制的种类并掌握其对应的语法规则&#xff0c;具备利用循环编写简易ATM取款机程序能力 运算符语句综合案例 1. 运算符 算术运算符赋值运…

时间语义与窗口

时间语义 在Flink中&#xff0c;时间语义分为两种 &#xff1a; 处理时间和事件时间。时间语义与窗口函数是密不可分的。以窗口为单位进行某一段时间内指标统计&#xff0c;例如想要统计8点-9点的某个页面的访问量&#xff0c;此时就需要用到了窗口函数&#xff0c;这里的关键…

githubPage部署Vue项目

github中新建项目 my-web &#xff08;编写vue项目代码&#xff09; myWebOnline(存放Vue打包后的dist包里面的文件) 发布流程 &#xff08;假设my-web项目已经编写完成&#xff09;Vue-cli my-web vue.config.js文件中 const { defineConfig } require(vue/cli-service)…