简介
Spring Cloud Stream是一个用于构建基于事件驱动的微服务应用程序的框架,其核心目标是简化开发过程,降低消息通信的复杂性,从而使开发人员能够专注于编写业务逻辑。Spring Cloud Stream通过提供Binder抽象,将应用程序与消息中间件解耦,让开发人员无需关心底层通信细节。同时,它还提供了一套丰富的API和特性,如消息分组、分区和错误处理,使得构建强大、可扩展的事件驱动应用程序变得更加简单。
应用程序可以通过inputs或者 outputs 来与Spring Cloud Stream中binder对象交互。
通过定义绑定器Binder作为中间层,实现了应用程序与消息中间件细节之间的隔离。
目前仅支持RabbitMQ、 Kafka。
一句话:屏蔽消息中间件之间的区别,提供统一API接口。类似JDBC
处理架构
Binder:绑定器对应了两端,其中INPUT对应于消费者、OUTPUT对应于生产者
相关注解
@Input 注解标识输入通道,通过该输乎通道接收到的消息进入应用程序
@Output 注解标识输出通道,发布的消息将通过该通道离开应用程序
@StreamListener 监听队列,用于消费者的队列的消息接收
@EnableBinding 指信道channel和exchange绑定在一起
生产者
<dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
application.yml:
spring:application:name: cloud-stream-servicedatasource:type: com.alibaba.druid.pool.DruidDataSource # 当前数据源操作类型driver-class-name: com.mysql.cj.jdbc.Driver # mysql驱动包url: jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=utf-8&useSSL=false&allowPublicKeyRetrieval=trueusername: rootpassword: 123456cloud:stream:binders: # 在此处配置要绑定的rabbitmq的服务信息;defaultRabbit: # 表示定义的名称,用于于binding整合type: rabbit # 消息组件类型environment: # 设置rabbitmq的相关的环境配置spring:rabbitmq:host: localhostport: 5672username: yipassword: 123456bindings: # 服务的整合处理output: # 这个名字是一个通道的名称destination: studyExchange # 表示要使用的Exchange名称定义content-type: application/json # 设置消息类型,本次为json,文本则设置“text/plain”binder: defaultRabbit # 设置要绑定的消息服务的具体设置
service层:向消息队列发送消息
public interface IMessageProvider {public String send();
}//通过rabbitMQ发送消息
@EnableBinding(Source.class) //定义消息的推送管道
public class IMessageProviderImp implements IMessageProvider {@Resourceprivate MessageChannel output;//消息发送管道@Overridepublic String send() {UUID uuid = UUID.randomUUID();output.send(MessageBuilder.withPayload(uuid.toString()).build());System.out.println("***8****发送的消息为:"+uuid.toString());return null;}
}
controller层:访问/sendMessage时会自动向消息队列发送UUID消息。
@RestController
public class SendMessageController {@Resourceprivate IMessageProvider iMessageProvider;@GetMapping("/sendMessage")public String sendMessage(){return iMessageProvider.send();}
}
消费者
pom.xml与生产者一样
application.yml:把生产者的input改成output。
controller层:
@RestController
@EnableBinding(Sink.class)
public class ReceiverMessageListenerController {@Value("${server.port}")private String serverPort;@StreamListener(Sink.INPUT)public void input(Message<String> message){System.out.println("消费者1号接收到的消息:"+message.getPayload()+" port:"+serverPort);}
}
消费组配置
同一个消费组中的成员只会有一个去进行消息的处理。
只需要在yml文件里配置group名。
配置消费组后,消费者会含有持久化属性,宕机重启后会重新获得未被消费的消息。