spring-cloud-stream

系列文章目录

第一章 Java线程池技术应用
第二章 CountDownLatch和Semaphone的应用
第三章 Spring Cloud 简介
第四章 Spring Cloud Netflix 之 Eureka
第五章 Spring Cloud Netflix 之 Ribbon
第六章 Spring Cloud 之 OpenFeign
第七章 Spring Cloud 之 GateWay
第八章 Spring Cloud Netflix 之 Hystrix
第九章 代码管理gitlab 使用
第十章 SpringCloud Alibaba 之 Nacos discovery
第十一章 SpringCloud Alibaba 之 Nacos Config
第十二章 Spring Cloud Alibaba 之 Sentinel
第十三章 JWT
第十四章 RabbitMQ应用
第十五章 RabbitMQ 延迟队列
第十六章 spring-cloud-stream

在这里插入图片描述


文章目录

  • 系列文章目录
    • @[TOC](文章目录)
  • 前言
  • 1、stream设计思想
  • 2、编码常用的注解
  • 3、编码步骤
    • 3.1、添加依赖
    • 3.2、修改配置文件
    • 3.3、生产
    • 3.4、消费
    • 3.5、延迟队列
      • 3.5.1、修改配置文件
      • 3.5.2、生产端
      • 3.5.2、消息确认机制 消费端
  • 总结

前言

https://github.com/spring-cloud/spring-cloud-stream-binder-rabbit
官方定义Spring Cloud Stream是一个构建消息驱动微服务的框架。应用程序通过inputs或者outputs来与Spring Cloud Stream中binder对象交互。通过我们配置来binding(绑定),而Spring Cloud Stream的binder对象负责与消息中间件交互。

SpringCloud stream通过使用Spring Integration来连接消息代理中间件以实现消息事件驱动。Spring Cloud Stream为一些供应商的消息中间件产品提供了个性化的自动化配置实现,引用了发布-订阅、消费组、分区的三个核心概念。

Stream让我们不再关注具体MQ的细节我们只需要用一种适配绑定的方式,自动的给我们在各种MQ内切换,总的来说Stream能够屏蔽底层消息中间件的差异、降低切换成本,是统一消息的编程模型。

1、stream设计思想

在这里插入图片描述
在这里插入图片描述

  • Binder:很方便的连接中间件,屏蔽差异
  • Channel:通道是队列Queue的一种抽象,在消息通讯系统中就是实现存储和转发的媒介,通过Channel对队列进行配置
  • Source和Sink:简单的可理解为参照对象是Spring Cloud Stream自身,从Stream发布消息就是输出,接受消息就是输入。

2、编码常用的注解

在这里插入图片描述

组成说明
Middleware中间件,目前只支持RabbitMQ和Kafka
BinderBinder是应用与消息中间件之间的封装,目前实现了Kafka和RabbitMQ的Binder,通过BInder可以很方便的连接中间件,可以动态的改变消息类型(对应于Kafka的topic,RabbitMQ的exchange),这些都可以通过配置文件来实现。
@Input注解标识输入通道,通过该输入通道接收到的消息进入应用程序
@Output注解标识输出通道,发布的消息将通过该通道离开应用程序
@StreamListener监听队列,用于消费者的队列的消息接收
@EnableBinding指信道channel和exchange绑定在一起

3、编码步骤

3.1、添加依赖

<dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>

3.2、修改配置文件

server:port: 8088spring:cloud:stream:binders: #需要绑定的rabbitmq的服务信息defaultRabbit:  #定义的名称,用于bidding整合type: rabbit  #消息组件类型environment:  #配置rabbimq连接环境spring:rabbitmq:host: localhost   #rabbitmq 服务器的地址port: 5672           #rabbitmq 服务器端口username: tiger       #rabbitmq 用户名password: tiger       #rabbitmq 密码virtual-host: tiger_vh  #虚拟路径bindings:        #服务的整合处理saveOrderOutput:    #这个是消息通道的名称 --->保存订单输出通道destination: exchange-saveOrder     #exchange名称,交换模式默认是topic;把SpringCloud stream的消息输出通道绑定到RabbitMQ的exchange-saveOrder交换器。content-type: application/json      #设置消息的类型,本次为jsondefault-binder: defaultRabbitgroup: saveOrderGroup               #分组saveOrderInput: #生产者绑定,这个是消息通道的名称---> 保存订单输入通道destination: exchange-saveOrder     #exchange名称,交换模式默认是topic;把SpringCloud stream的消息输出通道绑定到RabbitMQ的exchange-saveOrder交换器。content-type: application/json      #设置消息的类型,本次为jsondefault-binder: defaultRabbitgroup: saveOrderGroup               #分组

3.3、生产

/*** 订单消息输出通道处理器*/
@Component
public interface OrderOutputChannelProcesor {@Output("saveOrderOutput")MessageChannel saveOrderOutput();
}
@Slf4j
@EnableBinding(OrderOutputChannelProcesor.class)
public class OrderMessageProducer {@Autowired@Output("saveOrderOutput")private MessageChannel messageChannel;public void sentMsg(UserInfo userInfo){messageChannel.send(MessageBuilder.withPayload(userInfo).build());log.info("消息发送成功:" + userInfo);}
}

3.4、消费

/*** 订单消息输入通道处理器*/
@Component
public interface OrderInputChannelProcesor {@Input("saveOrderInput")SubscribableChannel saveOrderInput();
}
@Slf4j
@EnableBinding(OrderInputChannelProcesor.class)
public class OrderMessageConsumer {@StreamListener("saveOrderInput")public void receiveMsg(Message<UserInfo> userInfoMessage){log.info("接收消息成功:" + userInfoMessage.getPayload());}
}

3.5、延迟队列

安装延迟队列插件:
https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/3.11.1/rabbitmq_delayed_message_exchange-3.11.1.ez
下载解压,到plugins目录,执行以下的命令:

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

3.5.1、修改配置文件

server:port: 8088spring:cloud:stream:binders: #需要绑定的rabbitmq的服务信息defaultRabbit:  #定义的名称,用于bidding整合type: rabbit  #消息组件类型environment:  #配置rabbimq连接环境spring:rabbitmq:host: localhost   #rabbitmq 服务器的地址port: 5672           #rabbitmq 服务器端口username: tiger       #rabbitmq 用户名password: tiger       #rabbitmq 密码virtual-host: tiger_vh  #虚拟路径bindings:        #服务的整合处理saveOrderOutput:    #这个是消息通道的名称 --->保存订单输出通道destination: exchange-saveOrder-delay     #exchange名称,交换模式默认是topic;把SpringCloud stream的消息输出通道绑定到RabbitMQ的exchange-saveOrder交换器。content-type: application/json      #设置消息的类型,本次为jsondefault-binder: defaultRabbitgroup: saveOrderGroup               #分组saveOrderInput: #生产者绑定,这个是消息通道的名称---> 保存订单输入通道destination: exchange-saveOrder-delay     #exchange名称,交换模式默认是topic;把SpringCloud stream的消息输出通道绑定到RabbitMQ的exchange-saveOrder交换器。content-type: application/json      #设置消息的类型,本次为jsondefault-binder: defaultRabbitgroup: saveOrderGroup               #分组rabbit:bindings: #服务的整合处理saveOrderOutput:    #这个是消息通道的名称 --->保存订单输出通道producer:delayed-exchange: truesaveOrderInput:consumer:delayed-exchange: true

3.5.2、生产端

@Slf4j
@EnableBinding(OrderOutputChannelProcesor.class)
public class OrderMessageProducer {@Autowired@Output("saveOrderOutput")private MessageChannel messageChannel;public void sentMsg(UserInfo userInfo){messageChannel.send(MessageBuilder.withPayload(userInfo).setHeader("x-delay", 5000).build());log.info("消息发送成功:" + userInfo);}
}

3.5.2、消息确认机制 消费端

rabbit:bindings: #服务的整合处理saveOrderInput:consumer:acknowledge-mode: MANUAL #手动确认
@StreamListener("saveOrderInput")
public void receiveMsg(Message<UserInfo> userInfoMessage){log.info("接收消息成功:" + userInfoMessage.getPayload());Channel channel = (Channel) userInfoMessage.getHeaders().get(AmqpHeaders.CHANNEL);Long delieverTag = (Long) userInfoMessage.getHeaders().get(AmqpHeaders.DELIVERY_TAG);/** deliveryTag:Channel的消息投递的唯一标识符。* multiple:是否否定应答多条消息。如果设置为true,则否定应答带指定deliveryTag的消息及该deliveryTag之前的多条消息;* 如果设置为false,则仅否定应答带指定deliveryTag的单条消息。* requeue:被否定应答的消息是否重入队列。如果设置为true,则消息重入队列;* 如果设置为false,则消息被丢弃或发送到死信Exchange。*/try {channel.basicAck(delieverTag,true);} catch (IOException e) {e.printStackTrace();}
}

定义交换机类型为direct

rabbit:bindings: #服务的整合处理saveOrderInput:consumer:bindingRoutingKey: orderRoutingKeybindQueue: trueexchangeType: directsaveOrderOutput:producer:routingKeyExpression: orderRoutingKeyexchangeType: direct

总结

spring-cloud-stream目前支持RabbitMQ和Kafka,与spring-cloud无缝集成,非常方便。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/174415.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

mysql讲解2 之事务 索引 以及权限等

系列文章目录 mysql 讲解一 博客链接 点击此处即可 文章目录 系列文章目录一、事务1.1 事务的四个原则1.2 脏读 不可重复读 幻读 二、索引三,数据库用户管理四、mysql备份 一、事务 1.1 事务的四个原则 什么是事务 事务就是将一组SQL语句放在同一批次内去执行 如果一个SQ…

FTP网络问题排查

Linux探测路径MTU&#xff1a; ping大包&#xff1a; [test]$ ifconfig eth0: flags4163<UP,BROADCAST,RUNNING,MULTICAST> mtu 1280 [test]$ ping -M do -s 1252 172.18.98.3 PING 172.18.98.3 (172.18.98.3) 1252(1280) bytes of data. 1260 bytes from 172.18.98.3: …

C 语言数组

C 语言数组 在本教程中&#xff0c;您将学习如何使用数组。您将借助示例学习如何声明&#xff0c;初始化和访问数组的元素。 数组是可以存储多个值的变量。例如&#xff0c;如果要存储100个整数&#xff0c;则可以为其创建一个数组。 示例 cint data[100];如何声明数组&…

Centos8上部署Zabbix5.0

1.关闭Selinux及防火墙&#xff0c;避免Web页面无法访问。 setenforce 0 vim /etc/selinux/config 修改“SELINUX”等号后的内容为disabled SELINUXdisabled\\关闭并关闭开机自启 systemctl stop firewalld systemctl disable firewalld 2.配置Centos8本地yum源。 mkdir /mn…

【开源】基于JAVA的超市商品管理系统

目录 一、摘要1.1 简介1.2 项目详细录屏 二、研究内容2.1 数据中心模块2.2 超市区域模块2.3 超市货架模块2.4 商品类型模块2.5 商品档案模块 三、系统设计3.1 用例图3.2 时序图3.3 类图3.4 E-R图 四、系统实现4.1 登录4.2 注册4.3 主页4.4 超市区域管理4.5 超市货架管理4.6 商品…

Vue3 数据响应式原理:Proxy和Reflect

我们在Vue2中使用的是Object.defineProperty方法来实现数据响应式的&#xff0c;可以通过get和set方法来监听对象的访问和修改。 但是并不能响应对象中属性的增加和删除&#xff0c;只能使用Vue.$set 和Vue.$delete 来对对象中的属性进行增加和删除。 数组也不能直接通过下标…

TensorFlow2.0教程3-CNN

` 文章目录 基础CNN网络读取数据卷积层池化层全连接层模型配置模型训练CNN变体网络简单的深度网络添加了其它功能层的深度卷积NIN网络文本卷积基础CNN网络 读取数据 import numpy as np import tensorflow as tf import tensorflow.keras as keras import tensorflow.keras.la…

如何修复msvcr120.dll丢失问题,常用的5个解决方法分享

电脑在启动某个软件时&#xff0c;出现了一个错误提示&#xff0c;显示“msvcr120.dll丢失&#xff0c;无法启动软件”。这个错误通常意味着计算机上缺少了一个重要的动态链接库文件&#xff0c;即msvcr120.dll。 msvcr120.dll是什么 msvcr120.dll是Microsoft Visual C Redist…

element 周选择器el-date-picker

2023.11.13今天我学习了在使用element 周选择器的时候&#xff0c;我们会发现默认的时间选择为星期日到下一个星期一&#xff0c;如图&#xff1a; 我们需要改成显示星期一到星期天&#xff0c;只需要加一行代码&#xff1a;picker-options <el-date-pickertype"week&…

VUE Slot

在某些场景中&#xff0c;我们可能想要为子组件传递一些模板片段&#xff0c;让子组件在它们的组件中渲染这些片段. <template><h3>ComponentA</h3><ComponentB><h3>插槽传递视图内容</h3></ComponentB> </template> <scr…

Web APIs——正则表达式使用

1、什么是正则表达式 正则表达式&#xff08;Regular Expression&#xff09;是用于匹配字符串中字符组合的模式。在JavaScript中&#xff0c;正则表达式也是对象 通常用来查找、替换那些符合正则表达式的文本&#xff0c;许多语言都支持正则表达式 1.1 正则表达式使用场景 例如…

保序回归:拯救你的校准曲线(APP)

保序回归&#xff1a;拯救你的校准曲线&#xff08;APP&#xff09; 校准曲线之所以是评价模型效能的重要指标是因为&#xff0c;校准曲线衡量模型预测概率与实际发生概率之间的一致性&#xff0c;它可以帮助我们了解模型的预测结果是否可信。一个理想的模型应该能够准确地预测…