springCloud之Stream

1、简介

Spring Cloud Stream是一个用来为微服务应用构建 消息驱动 能力的框架。通过使用 Spring Cloud Strea m ,可以有效简化开发人员对消息中间件的使用复杂度,降低代码与消息中间件间的耦合度,屏蔽消息中间件 之 间的差异性,让开发人员可以有更多的精力关注于核心业务逻辑的处理。

主要有以下几个组件:

1)、目的地绑定器(Destination Binders):负责提供与外部消息系统集成的组件。

2)、固定器(Bindings):介于外部消息系统与应用程序间的桥梁 ,这个应用程序提供了生产者和消费者的消息 (由 Destination Binders 创建)。

3)、输入管道(Input Bindings):消费者通过Input Bindings 连接 Binder ,而 Binder 与 MQ 连接,即消费者通过 Input Bindings 从 MQ 读取数据。

4)、输出管道(Output Bindings):生产者通过Output Bindings 连接 Binder ,而 Binder 与 MQ 连接,即生产者通过 Output Bindings 向 MQ 写入数据。

5)、消息(Message):生产者和消费者使用的规范数据结构,用于与 Binders 通信(从而通过外部消息系统与其他应用程序通信)。

2、具体应用示例1(MQ使用kafka)

引入依赖

<dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>
2.1、生产者

配置文件

server:port: 8090
spring:cloud:stream:kafka:binder:brokers: 192.168.30.88:9092,192.168.30.89:9092bindings:producer-out-0:destination: topic1content-type: application/json

代码实现

@Autowired
private StreamBridge streamBridge;@GetMapping("/test/send")
public String sendMsg(@RequestParam("msg") String msg){Map<String , Object> map = new HashMap<>();map.put("tag", "tags");MessageHeaders headers = new MessageHeaders(map);// 封装消息Message<String> message = MessageBuilder.createMessage(msg, headers);//发送消息streamBridge.send("producer-out-0", message);return msg;
}
2.2、消费者

配置文件

server:port: 8091
spring:cloud:stream:kafka:binder:brokers: 192.168.30.88:9092,192.168.30.89:9092function:definition: consumer # 这个名称要和下面bindings的consumer-in-0第一个单词一样bindings:consumer-in-0:destination: topic1content-type: application/json

代码实现

// 向容器中添加Consumer<Message<String>>类型的bean即可
@Bean
public Consumer<Message<String>> consumer(){return msg -> {System.out.println("接收到消息:" + msg.getPayload());};
}
3、具体应用示例2(MQ使用Rocketmq)

引入依赖

<dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
</dependency>

代码实现

@Autowired
private StreamBridge streamBridge;@GetMapping("/test/send")
public String sendMsg(@RequestParam("msg") String msg){Map<String , Object> map = new HashMap<>();map.put(MessageConst.PROPERTY_TAGS, "tags");MessageHeaders headers = new MessageHeaders(map);// 封装消息Message<String> message = MessageBuilder.createMessage(msg, headers);//发送消息streamBridge.send("producer-out-0", message);return JSON.toJSONString(message);
}
3.2、消费者

配置文件:

server:port: 8091
spring:cloud:stream:rocketmq:binder:name-server: 192.168.30.88:9876function:definition: consumer # 这个名称要和下面bindings的consumer-in-0第一个单词一样bindings:consumer-in-0:destination: topic1content-type: application/json

代码实现:

// 向容器中添加Consumer<Message<String>>类型的bean即可
@Bean
public Consumer<Message<String>> consumer(){return msg -> {System.out.println("接收到消息:" + msg.getPayload());};
}

注:

1、在spring-cloud-stream 3.1.0之前的版本,还有采用定义Source、Sink等方式编写消息生产者和消费者,在3.1.0以后的版本中弃用@StreamListener的方式,而采用函数式编程的方式接入,使用StreamBrige来进行发送。

2、注意binding的名称命名规则

例如:上面的代码中定义的consumer。

# 输入:    <方法名> + -in- + <index>
# 输出:    <方法名> + -out- + <index>

总结:本文介绍Stream统一消息中间件的模型,给出基于kafka和Rocketmq两种消息中间件模型下的使用案例,以及给出废弃使用老版本的Source、Sink模式解释。帮助大家快速上手Stream的使用。

       本人是一个从小白自学计算机技术,对运维、后端、各种中间件技术、大数据等有一定的学习心得,想获取自学总结资料(pdf版本)或者希望共同学习,关注微信公众号:上了年纪的小男孩。后台回复相应技术名称/技术点即可获得。(本人学习宗旨:学会了就要免费分享)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/317920.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Paddle3D 2 雷达点云CenterPoint模型训练

2 Paddle3D 雷达点云CenterPoint模型训练–包含KITTI格式数据地址 2.0 数据集 百度DAIR-V2X开源路侧数据转kitti格式。 2.0.1 DAIR-V2X-I\velodyne中pcd格式的数据转为bin格式 参考源码&#xff1a;雷达点云数据.pcd格式转.bin格式 def pcd2bin():import numpy as npimport…

基于头脑风暴算法优化的Elman神经网络数据预测 - 附代码

基于头脑风暴算法优化的Elman神经网络数据预测 - 附代码 文章目录 基于头脑风暴算法优化的Elman神经网络数据预测 - 附代码1.Elman 神经网络结构2.Elman 神经用络学习过程3.电力负荷预测概述3.1 模型建立 4.基于头脑风暴优化的Elman网络5.测试结果6.参考文献7.Matlab代码 摘要&…

EDI 项目推进流程

EDI 需求确认 交易伙伴发来EDI对接邀请&#xff0c;企业应该如何应对&#xff1f; 首先需要确认EDI需求&#xff0c;通常包括传输协议和报文标准以及传输的业务单据类型。可以向交易伙伴发送以下内容&#xff1a; &#xff08;中文版&#xff09; 与贵司建立EDI连接需要使用…

中国计算机学会推荐国际学术会议及时间(计算机体系结构/高性能计算/存储系统)

中国计算机学会推荐国际学术会议及时间 (计算机体系结构/高性能计算/存储系统) 参考资料 参考链接: call4papers

网工内推 | 网络工程师,NP认证优先,上市公司,包吃,最高15薪

01 无锡先导智能装备股份有限公司 招聘岗位&#xff1a;高级网络工程师 职责描述&#xff1a; 1.依据项目规划方案提供硬件及网络方案设计&#xff1b; 2.面向客户提供网络技术支持&#xff0c;包括故障的解决、性能的优化、日常维护等&#xff1b; 3.和合作伙伴、供应商的技术…

(七)独立按键

文章目录 独立按键原理图三行代码法简单概述代码书写键码推算如何使用短按键长按键 状态机法简单概述代码书写键码推算如何使用短按键长按键 现象 独立按键原理图 三行代码法 简单概述 代码书写 u8 Trg 0x00;//短按键 u8 Cont 0x00;//长按键 void BtnThree(void) {u8 reada…

打造专业开发者指南:针对ShardingProxy分库分表解决策略的深度剖析 – 详解部署、使用、服务治理与优化技巧

一、 ShardingProxy快速使用 ShardingProxy的功能同样是分库分表&#xff0c;但是他是一个独立部署的服务端&#xff0c;提供 统一的数据库代理服务。注意&#xff0c;ShardingProxy目前只支持MySQL和PostgreSQL。并且&#xff0c;客户端连接ShardingProxy时&#xff0c;最好使…

Java反射机制和动态代理

反射和动态代理 反射前言获取class对象的方式反射获取构造方法反射获取成员变量反射获取成员方法实例 动态代理 反射 前言 什么是反射&#xff1f; 反射允许对成员变量&#xff0c;成员方法和构造方法的信息进行编程访问。 为什么用反射 / 反射的作用&#xff1f; 可以轻易地获…

图像分割-漫水填充法 floodFill (C#)

版权声明&#xff1a;本文为博主原创文章&#xff0c;转载请在显著位置标明本文出处以及作者网名&#xff0c;未经作者允许不得用于商业目的。 本文的VB版本请访问&#xff1a;图像分割-漫水填充法 floodFill-CSDN博客 FloodFill方法是一种图像处理算法&#xff0c;它的目的是…

【SpringBoot框架篇】34.使用Spring Retry完成任务的重试

文章目录 简要1.为什么需要重试&#xff1f;2.添加maven依赖3.使用Retryable注解实现重试4.基于RetryTemplate模板实现重试 简要 Spring实现了一套重试机制&#xff0c;功能简单实用。Spring Retry是从Spring Batch独立出来的一个功能&#xff0c;已经广泛应用于Spring Batch,…

哪些洗地机比较好?洗地机选购指南

随着社会生活水平的提高&#xff0c;人们对居家环境的卫生和清洁要求不断提升。家用洗地机作为一种先进的清洁工具&#xff0c;带来了许多便利和优势&#xff0c;特别是在解决一些特殊需求的家庭环境方面。 以下是一些家用洗地机的优势和适用场景&#xff1a; 1.高效清洁&…

Hive用户自定义函数之UDF开发

在进行大数据分析或者开发的时候&#xff0c;难免用到Hive进行数据查询分析&#xff0c;Hive内置很多函数&#xff0c;但是会有一部分需求需要自己开发&#xff0c;这个时候就需要自定义函数了&#xff0c;Hive的自定义函数开发非常方便&#xff0c;今天首先讲一下UDF的入门开发…