redis实现消息队列redis发布订阅redis监听key

文章目录

  • Redis消息队列实现异步秒杀
    • 1. jvm阻塞队列问题
    • 2. 什么是消息队列
    • 3. Redis实现消息队列
      • 1. 基于List结构模拟消息队列
        • 操作
        • 优缺点
      • 2. 基于PubSub发布订阅的消息队列
        • 操作
        • 优缺点
        • spring 结合redis的pubsub使用示例
          • 1. 引入依赖
          • 2. 配置文件
          • 3. RedisConfig
          • 4. CustomizeMessageListener
          • 5. RedisMessageReceiver
          • 6. 监听原理简析
          • 7. 监听redis的key
            • 修改redis.conf
            • KeyspaceEventMessageListener
            • KeyExpirationEventMessageListener
            • 修改RedisConfig
      • 3. 基于Stream的消息队列
        • 1. 单消费者
          • xadd
          • xread
          • 操作示例
          • XREAD命令特点
        • 2. 消费者组
          • 特点
          • 要点
          • 创建消费者组
          • 从消费者组读取消息
          • ==图示操作过程==
          • 消费者监听消息的基本思路
          • XREADGROUP命令特点

Redis消息队列实现异步秒杀

1. jvm阻塞队列问题

java使用阻塞队列实现异步秒杀存在问题:

  1. jvm内存限制问题:jvm内存不是无限的,在高并发的情况下,当有大量的订单需要创建时,就有可能超出jvm阻塞队列的上限。
  2. 数据安全问题:jvm的内存没有持久化机制,当服务重启或宕机时,阻塞队列中的订单都会丢失。或者,当我们从阻塞队列中拿到订单任务,但是尚未处理时,如果此时发生了异常,这个订单任务就没有机会处理了,也就丢失了。

2. 什么是消息队列

消息队列Message Queue),字面意思就是存放消息的队列。最简单的消息队列模型包括3个角色:

  • 消息队列:存储和管理消息,也被称为消息代理(Message Broker)
  • 生产者:发送消息到消息队列
  • 消费者:从消息队列获取消息并处理消息

在这里插入图片描述

(正常下单,我们需要将订单消息写入数据库。但由于秒杀并发访问量大,数据库本身并发处理能力不强,因此,在处理秒杀业务时,可以将部分业务在生产者这边做校验,然后将消息写入消息队列,而消费者处理该消息队列中的消息,从而实现双方解耦,更快的处理秒杀业务)

3. Redis实现消息队列

我们可以使用一些现成的mq,比如kafka,rabbitmq等等,但是呢,如果没有安装mq,我们也可以直接使用redis提供的mq方案,降低我们的部署和学习成本。Redis提供了三种不同的方式来实现消息队列:

  • list结构:基于List结构模拟消息队列
  • PubSub:基本的点对点消息模型
  • Stream:比较完善的消息队列模型

1. 基于List结构模拟消息队列

消息队列(Message Queue),字面意思就是存放消息的队列。而Redis的list数据结构是一个双向链表,很容易模拟出队列效果。

队列是入口和出口不在一边,因此我们可以利用:LPUSH 结合 RPOP、或者 RPUSH 结合 LPOP来实现

不过要注意的是,当队列中没有消息时RPOP或LPOP操作会返回null,并不像JVM的阻塞队列那样会阻塞并等待消息。因此这里应该使用BRPOP或者BLPOP来实现阻塞效果

在这里插入图片描述

操作

命令介绍如下

在这里插入图片描述

在这里插入图片描述

优缺点

优点:

  • 利用Redis存储,不受限于JVM内存上限
  • 基于Redis的持久化机制,数据安全性有保证
  • 可以满足消息有序性

缺点:

  • 无法避免消息丢失(如果消费者获取消息后,然后立马就宕机了,这个消息就得不到处理,等同于丢失了)
  • 只支持单消费者(1个消息只能被1个消费者取走,其它消费者会收不到此消息)

2. 基于PubSub发布订阅的消息队列

PubSub(发布订阅)是Redis2.0版本引入的消息传递模型。顾名思义,消费者可以订阅一个或多个channel,生产者向对应channel发送消息后,所有订阅者都能收到相关消息。

  • SUBSCRIBE channel [channel] :订阅一个或多个频道
  • PUBLISH channel msg :向一个频道发送消息
  • PSUBSCRIBE pattern [pattern] :订阅与pattern格式匹配的所有频道
    • ?匹配1个字符:h?llo subscribes to hello, hallo and hxllo
    • *匹配0个或多个字符:h*llo subscribes to hllo and heeeello
    • []指定字符:h[ae]llo subscribes to hello and hallo, but not hillo

在这里插入图片描述

操作

在这里插入图片描述

优缺点

优点:

  • 采用发布订阅模型,支持多生产、多消费

缺点:

  • 不支持数据持久化(如果发送消息时,这个消息的频道没有被任何人订阅,那这个消息就丢失了,也消息就是不会被保存)
  • 无法避免消息丢失(发完了,没人收,直接就丢了)
  • 消息堆积有上限,超出时数据丢失(当我们发送消息时,如果有消费者在监听,消费者会有1个缓存区去缓存这个消息数据,如果消费者处理的慢,那么客户端的缓存区中的消息会不断堆积,而这个缓存区是有大小限制的,如果超出了就会丢失)
spring 结合redis的pubsub使用示例
1. 引入依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.1.8.RELEASE</version><relativePath/> <!-- lookup parent from repository --></parent><groupId>com.zzhua</groupId><artifactId>demo-redis-pubsub</artifactId><version>1.0-SNAPSHOT</version><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency><!-- 如果使用lettuce-core作为连接redis的实现, 不引入此依赖会报错: Caused by: java.lang.ClassNotFoundException:org.apache.commons.pool2.impl.GenericObjectPoolConfig --><dependency><groupId>org.apache.commons</groupId><artifactId>commons-pool2</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build></project>
2. 配置文件
spring:redis:host: 127.0.0.1port: 6379database: 0password:lettuce:pool:min-idle: 2max-active: 8max-idle: 8
3. RedisConfig

spring-data-redis提供了2种处理redis消息的方法:

  • 自己实现MessageListener接口

    public interface MessageListener {// 处理消息的方法// 第1个参数封装了: 消息发布到哪1个具体频道 和 消息的内容// 第2个参数封装了: //     1. 如果当前是通过普通模式去订阅的频道, 那么收到消息时该pattern就是消息发送的具体频道//     2. 如果当前是通过pattern通配符匹配去订阅的频道, 那么收到消息时, 该pattern就是订阅的频道void onMessage(Message message, @Nullable byte[] pattern);
    }
    
  • 指定MessageListenerAdapter适配器,该适配器指定特定对象的特定方法来处理消息(对特定的方法有参数方面的要求)

@Slf4j
@Configuration
public class RedisConfig {@Autowiredprivate RedisMessageReceiver redisMessageReceiver;@Autowiredprivate CustomizeMessageListener customizeMessageListener;@Beanpublic RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) {RedisMessageListenerContainer container = new RedisMessageListenerContainer();container.setConnectionFactory(connectionFactory);// 监听order.q通道(不带通配符匹配channel)container.addMessageListener(customizeMessageListener, new ChannelTopic("order.q"));// 监听order.*通道(带通配符匹配channel)container.addMessageListener(listenerAdapter(), new PatternTopic("order.*"));return container;}@Beanpublic MessageListenerAdapter listenerAdapter() {// 交给receiver的receiveMessage方法, 对于这个方法的参数有如下要求:// (2个参数: 第一个参数是Object-即消息内容(默认由RedisSerializer#deserialize处理,见MessageListenerAdapter#onMessage), //           第二个参数是String-即订阅的通道, 详细看上面MessageListener接口中第二个参数的解释)// (1个参数: 参数是Object-即消息内容)return new MessageListenerAdapter(redisMessageReceiver, "receiveMessage");}}
4. CustomizeMessageListener
@Slf4j
@Component
public class CustomizeMessageListener implements MessageListener {@Overridepublic void onMessage(Message message, byte[] pattern) {byte[] bodyBytes = message.getBody();byte[] channelBytes = message.getChannel();log.info("order.q - 消息订阅频道: {}", new String(channelBytes));log.info("order.q - 消息内容: {}", new String(bodyBytes));log.info("order.q - 监听频道: {}", new String(channelBytes));}
}
5. RedisMessageReceiver
@Slf4j
@Component
public class RedisMessageReceiver {public void receiveMessage(String msg, String topic) {log.info("order.* - 消息的订阅频道: {}", topic);log.info("order.* - 消息的内容: {}", msg);}}
6. 监听原理简析

spring-data-redis的lettuce-core是基于netty的,消息监听处理过程如下:
PubSubCommandHandler(netty中的ChannelHandler处理器)->PubSubEndpoint(根据消息类型调用LettuceMessageListener 的不同方法)->LettuceMessageListener -> RedisMessageListenerContainer$DispatchMessageListener(如果是pattern,则从patternMapping中获取所有的listener;如果不是pattern,则从channelMapping中获取所有的listener。至于怎么判断是不是pattern?)->使用异步线程池对上一步获取的所有listener执行onMessage方法

至于怎么判断是不是pattern?这个是根据订阅关系来的,如果订阅的是pattern,那么如果这个向这个pattern中发送了消息,那么就会收到1次消息,并且是pattern。如果订阅的是普通channel,那么如果向这个普通channel发送了消息,那么又会收到1次消息不是pattern。如果向1个channel中发送消息,这个channel既符合订阅的pattern,也符合订阅的普通channel,那么会收到2次消息,并且这2次消息1次是pattern,1次不是pattern的

7. 监听redis的key

既然已经说到了监听redis发布消息了,那么也补充一下监听redis的key过期。因为监听redis的key过期也是通过redis的发布订阅实现的。

修改redis.conf
############################# EVENT NOTIFICATION ############################### Redis能够将在keyspace中发生的事件通知给 发布/订阅 客户端# Redis can notify Pub/Sub clients about events happening in the key space. 
# This feature is documented at http://redis.io/topics/notifications# 例如:如果开启了keyspace事件通知(注意了,必须是开启了keyspace事件通知才可以,开启的方式就是添加参数K),
#      一个客户端在数据库0对一个叫'foo'的key执行了删除操作,
#      那么redis将会通过 发布订阅 机制发布2条消息 
#        PUBLISH __keyspace@0__:foo del 
#        PUBLISH __keyevent@0__:del foo# For instance if keyspace events notification is enabled, and a client
# performs a DEL operation on key "foo" stored in the Database 0, two
# messages will be published via Pub/Sub:
#
# PUBLISH __keyspace@0__:foo del
# PUBLISH __keyevent@0__:del foo#  也可以指定一组 类名 来选择 Redis 会通知的一类事件。
#  每类事件 都通过一个字符定义# It is possible to select the events that Redis will notify among a set
# of classes. Every class is identified by a single character:#        keySpace事件   以 __keyspace@<数据库序号>__ 为前缀 发布事件
#  K     Keyspace events, published with __keyspace@<db>__ prefix.  #        Keyevent事件   以 __keyevent@<数据库序号>__ 为前缀 发布事件
#  E     Keyevent events, published with __keyevent@<db>__ prefix.        #        执行常规命令,比如del、expire、rename           
#  g     Generic commands (non-type specific) like DEL, EXPIRE, RENAME, ...      #        执行 String 命令
#  $     String commands  #        执行 List   命令                                                          
#  l     List commands  #        执行 Set    命令                                                         
#  s     Set commands  #        执行 Hash   命令                                                         
#  h     Hash commands                                                          执行 Hash   命令#        执行 ZSet   命令
#  z     Sorted set commands #        key过期事件(每个key失效都会触发这类事件)                                                   
#  x     Expired events (events generated every time a key expires) #        key驱逐事件(当key在内存满了被清除时生成)
#  e     Evicted events (events generated when a key is evicted for maxmemory)  #        A是g$lshzxe的别名,因此AKE就意味着所有的事件
#  A     Alias for g$lshzxe, so that the "AKE" string means all the events.     
##  配置中的notify-keyspace-events这个参数由0个或多个字符组成,
#  如果配置为空字符串表示禁用通知
#  The "notify-keyspace-events" takes as argument a string that is composed     
#  of zero or multiple characters. The empty string means that notifications
#  are disabled.
##  比如,要开启list命令和generic常规命令的事件通知,
#  应该配置成 notify-keyspace-events Elg
#  Example: to enable list and generic events, from the point of view of the    
#           event name, use:
#
#  notify-keyspace-events Elg
#
#  比如,订阅了__keyevent@0__:expired频道的客户端要收到key失效的时间,
#  应该配置成 notify-keyspace-events Ex
#  Example 2: to get the stream of the expired keys subscribing to channel   name __keyevent@0__:expired use:
#
#  notify-keyspace-events Ex
##  默认情况下,所有的通知都被禁用了,并且这个特性有性能上的开销。
#  注意,K和E必须至少指定其中一个,否则,将收不到任何事件。
#  By default all notifications are disabled because most users don't need      
#  this feature and the feature has some overhead. Note that if you don't       
#  specify at least one of K or E, no events will be delivered.
notify-keyspace-events "Ex"############################### ADVANCED CONFIG ###############################
KeyspaceEventMessageListener
  1. 通过实现InitializingBean接口,在afterPropertiesSet方法中,调用初始化init方法,从redis中获取notify-keyspace-events配置项对应的值,如果未设置任何值,则改为EA,结合上面的redis.conf节选可知,表示的是开启所有的事件通知
  2. 使用redisMessageListenerContainer,通过pattern通配符匹配的方式订阅__keyevent@*频道
  3. 它是个抽象类,实现了MessageListener接口,处理消息的方法是个抽象方法
  4. 它有1个子类KeyExpirationEventMessageListener,订阅的pattern的频道是:__keyevent@*__:expired,通过重写doRegister修改了订阅的频道。并且重写了处理消息的方法,通过将消息内容包装成RedisKeyExpiredEvent事件对象,然后通过事件发布器将事件发布出去。
public abstract class KeyspaceEventMessageListener implements MessageListener, InitializingBean, DisposableBean {private static final Topic TOPIC_ALL_KEYEVENTS = new PatternTopic("__keyevent@*");private final RedisMessageListenerContainer listenerContainer;private String keyspaceNotificationsConfigParameter = "EA";/*** Creates new {@link KeyspaceEventMessageListener}.** @param listenerContainer must not be {@literal null}.*/public KeyspaceEventMessageListener(RedisMessageListenerContainer listenerContainer) {Assert.notNull(listenerContainer, "RedisMessageListenerContainer to run in must not be null!");this.listenerContainer = listenerContainer;}/** (non-Javadoc)* @see org.springframework.data.redis.connection.MessageListener#onMessage(org.springframework.data.redis.connection.Message, byte[])*/@Overridepublic void onMessage(Message message, @Nullable byte[] pattern) {if (message == null || ObjectUtils.isEmpty(message.getChannel()) || ObjectUtils.isEmpty(message.getBody())) {return;}doHandleMessage(message);}/*** Handle the actual message** @param message never {@literal null}.*/protected abstract void doHandleMessage(Message message);/*** Initialize the message listener by writing requried redis config for {@literal notify-keyspace-events} and* registering the listener within the container.*/public void init() {if (StringUtils.hasText(keyspaceNotificationsConfigParameter)) {RedisConnection connection = listenerContainer.getConnectionFactory().getConnection();try {Properties config = connection.getConfig("notify-keyspace-events");if (!StringUtils.hasText(config.getProperty("notify-keyspace-events"))) {connection.setConfig("notify-keyspace-events", keyspaceNotificationsConfigParameter);}} finally {connection.close();}}doRegister(listenerContainer);}/*** Register instance within the container.** @param container never {@literal null}.*/protected void doRegister(RedisMessageListenerContainer container) {listenerContainer.addMessageListener(this, TOPIC_ALL_KEYEVENTS);}/** (non-Javadoc)* @see org.springframework.beans.factory.DisposableBean#destroy()*/@Overridepublic void destroy() throws Exception {listenerContainer.removeMessageListener(this);}/*** Set the configuration string to use for {@literal notify-keyspace-events}.** @param keyspaceNotificationsConfigParameter can be {@literal null}.* @since 1.8*/public void setKeyspaceNotificationsConfigParameter(String keyspaceNotificationsConfigParameter) {this.keyspaceNotificationsConfigParameter = keyspaceNotificationsConfigParameter;}/** (non-Javadoc)* @see org.springframework.beans.factory.InitializingBean#afterPropertiesSet()*/@Overridepublic void afterPropertiesSet() throws Exception {init();}
}
KeyExpirationEventMessageListener
public class KeyExpirationEventMessageListener extends KeyspaceEventMessageListener implementsApplicationEventPublisherAware {private static final Topic KEYEVENT_EXPIRED_TOPIC = new PatternTopic("__keyevent@*__:expired");private @Nullable ApplicationEventPublisher publisher;/*** Creates new {@link MessageListener} for {@code __keyevent@*__:expired} messages.** @param listenerContainer must not be {@literal null}.*/public KeyExpirationEventMessageListener(RedisMessageListenerContainer listenerContainer) {super(listenerContainer);}/** (non-Javadoc)* @see org.springframework.data.redis.listener.KeyspaceEventMessageListener#doRegister(org.springframework.data.redis.listener.RedisMessageListenerContainer)*/@Overrideprotected void doRegister(RedisMessageListenerContainer listenerContainer) {listenerContainer.addMessageListener(this, KEYEVENT_EXPIRED_TOPIC);}/** (non-Javadoc)* @see org.springframework.data.redis.listener.KeyspaceEventMessageListener#doHandleMessage(org.springframework.data.redis.connection.Message)*/@Overrideprotected void doHandleMessage(Message message) {publishEvent(new RedisKeyExpiredEvent(message.getBody()));}/*** Publish the event in case an {@link ApplicationEventPublisher} is set.** @param event can be {@literal null}.*/protected void publishEvent(RedisKeyExpiredEvent event) {if (publisher != null) {this.publisher.publishEvent(event);}}/** (non-Javadoc)* @see org.springframework.context.ApplicationEventPublisherAware#setApplicationEventPublisher(org.springframework.context.ApplicationEventPublisher)*/@Overridepublic void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {this.publisher = applicationEventPublisher;}
}
修改RedisConfig
@Slf4j
@Configuration
public class RedisConfig {@Autowiredprivate RedisMessageReceiver redisMessageReceiver;@Autowiredprivate CustomizeMessageListener customizeMessageListener;@Beanpublic RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) {RedisMessageListenerContainer container = new RedisMessageListenerContainer();container.setConnectionFactory(connectionFactory);// 注意以下测试在redis.confi配置文件中设置了: notify-keyspace-events 为 AKE, // 也可以参照KeyspaceEventMessageListener在代码中设置这个配置项/*redis提供的事件通知发布消息示例如下:K =>  PUBLISH __keyspace@0__:foo delE =>  PUBLISH __keyevent@0__:del foo参照上述示例去写这个topic即可*/// 监听key删除事件container.addMessageListener(new MessageListener() {/*执行命令: del order:1234输出如下:监听key删除事件 - 消息的发布频道: __keyevent@0__:del监听key删除事件 - 消息内容: order:1234监听key删除事件 - 消息的订阅频道: __keyevent@*__:del*/@Overridepublic void onMessage(Message message, byte[] pattern) {byte[] bodyBytes = message.getBody();byte[] channelBytes = message.getChannel();log.info("监听key删除事件 - 消息的发布频道: {}", new String(channelBytes));log.info("监听key删除事件 - 消息内容: {}", new String(bodyBytes));log.info("监听key删除事件 - 消息的订阅频道: {}", new String(pattern));}}, new PatternTopic("__keyevent@*__:del"));// 监听指定前缀的keycontainer.addMessageListener(new MessageListener() {@Overridepublic void onMessage(Message message, byte[] pattern) {byte[] bodyBytes = message.getBody();byte[] channelBytes = message.getChannel();/*执行命令: set order:1234 a输出如下:监听指定前缀的key - 消息的发布频道: __keyspace@0__:order:1234监听指定前缀的key - 消息内容: set监听指定前缀的key - 消息的订阅频道: __keyspace@0__:order:**/log.info("监听指定前缀的key - 消息的发布频道: {}", new String(channelBytes));log.info("监听指定前缀的key - 消息内容: {}", new String(bodyBytes));log.info("监听指定前缀的key - 消息的订阅频道: {}", new String(pattern));}}, new PatternTopic("__keyspace@0__:order:*"));return container;}/* 借助了1. 这个KeyspaceEventMessageListener的bean中的对redis的配置修改2. 监听patter的topic*/@Beanpublic KeyspaceEventMessageListener keyspaceEventMessageListener(RedisMessageListenerContainer container) {return new KeyspaceEventMessageListener(container){/* __keyevent@* */@Overrideprotected void doHandleMessage(Message message) {log.info("监听所有key命令事件, 消息内容:{}, {}",// set name zzhua; expire name 5;// 消息内容就是key的名称, 比如: namenew String(message.getBody()),// 消息所发布的频道, 比如: __keyevent@0__:set, __keyevent@0__:expire等new String(message.getChannel()));}};}@Beanpublic KeyExpirationEventMessageListener keyExpirationEventMessageListener(RedisMessageListenerContainer container) {return new KeyExpirationEventMessageListener(container){/* __keyevent@*__:expired */@Overrideprotected void doHandleMessage(Message message) {log.info("监听所有key失效, 消息内容:{}, {}",// 消息内容就是key的名称, 比如: namenew String(message.getBody()),// 消息所发布的频道, 比如: __keyevent@0__:expirednew String(message.getChannel()));}};}}

3. 基于Stream的消息队列

Stream 是 Redis 5.0 引入的一种新的数据类型(因此支持持久化),可以实现一个功能非常完善的消息队列(专门为消息队列设计的),Redis streams官网介绍

1. 单消费者
xadd

发送消息的命令:

在这里插入图片描述

  • 不指定消息队列的的最大消息数量就是不限制消息数量

  • 消息唯一id建议使用*,让redis自动生成消息唯一id

  • (上面命令介绍中的:大写表示照着抄就行;小写的是需要我们自己提供的参数;中括号表示可选参数)

示例

## 创建名为 users 的队列,并向其中发送一个消息,内容是:{name=jack,age=21},并且使用Redis自动生成ID
127.0.0.1:6379> XADD users * name jack age 21
"1644805700523-0"
xread

读取消息的方式之一:

在这里插入图片描述

  • 不指定阻塞时间,就是直接返回(不阻塞);设置为0表示阻塞到有值为止;
  • stream中消息读取之后,不会被删除;
  • $ 表示读取最新的消息,但是如果之前消息都已经被读过了,那么当前继续去读的话,是读不到的(尽管当前stream中仍然有消息)

示例

## 从users的队列中读取1条消息, 从第1条开始读
127.0.0.1:6379> XREAD COUNT 1 STREAMS users 0
1) 1) "users"2) 1) 1) "1708522812423-0"2) 1) "name"2) "jack"3) "age"4) "21"
操作示例

查看当前redis版本是否支持stream数据结构

在这里插入图片描述

xadd与xread使用示例

在这里插入图片描述

在上面,还有1点没有体现出来:在stream中的每1个消息,被当前客户端读了1遍,还可以被当前客户端读1遍,然后,这个消息还可以被其它客户端读1遍。

xread读取最新数据要使用阻塞的方法才可以

在这里插入图片描述

我们发现,只有在阻塞期间,使用$才能读取到最新消息;如果不使用阻塞,想要读取最新数据是不可能的。

在业务开发中,我们可以循环的调用XREAD阻塞方式来查询最新消息,从而实现持续监听队列的效果,伪代码如下:

在这里插入图片描述

但是这会存在消息漏读的问题,由于:只有在阻塞期间,使用$才能读取到最新消息,假设在处理消息的时候,此时消息队列中发来了消息,那么这些消息就会被错过,只有当执行XREAD COUNT 1 BLOCK 2000 STREAMS users $开始时收到的第1个消息,才会被处理。

XREAD命令特点

STREAM类型消息队列的XREAD命令特点

  • 消息可回溯(消息读取完之后,不会消失,永久的保留在我们的队列当中,随时想看都可以回去读)
  • 一个消息可以被多个消费者读取(因为消息读取之后,不会消失)
  • 可以阻塞读取
  • 有消息漏读的风险(在处理消息的过程中,如果来了多条消息,则只能看到最后一条消息,即最新的那1条)
2. 消费者组

上面,我们知道通过xread命令你阻塞读取最新消息,有消息漏读的风险,下面,我们看看消费者组是如何解决这个问题的。

特点

消费者组(Consumer Group):将多个消费者划分到一个组中,监听同一个队列。具备下列特点:

在这里插入图片描述

消息分流

队列中的消息会分流给组内不同消费者,而重复消费,从而加快消息处理的速度

消息标示

消费者组会维护一个标示记录最后一个被处理的消息,哪怕消费者宕机重启,还会从标示之后读取消息。确保每一个消息都会被消费

消息确认

消费者获取消息后,消息处于pending状态,并存入一个pending-list。当处理完成后需要通过XACK来确认消息,标记消息为已处理,才会从pending-list移除。

要点

redis服务器维护了多个消费者组

可以给1个stream指定多个消费者组

  • 把这里的消费者组当成上节中的消费者即可
  • 1个stream绑定的的多个消费者组都会收到消息

消息发给消费者组

  • 即多个消费者共同加入到消费者组中,形成1个消费者,而消息就分给消费者中中的消息来消费

消费者加入消费者组

消费者从消费者组中拉取消息,拉取到的消息进入消费者组中的pending-list

消费者消费完消息后,向消费者组确认消息已处理,已确认处理的消息会从pending-list中删除

消费者组总会用1个标识,来记录最后1个被处理的消息

创建消费者组
XGROUP CREATE  key groupName ID [MKSTREAM]
  • key:队列名称
  • groupName:消费者组名称
  • ID:起始ID标示,$代表队列中最后一个消息,0则代表队列中第一个消息
    • 建议:如果不想处理队列中已存在的消息,就可以使用$;如果要处理已存在的消息,就是用0)
  • MKSTREAM:队列不存在时自动创建队列;不指定的话,当不存在时,不会创建

其它常见命令:

# 删除指定的消费者组
XGROUP DESTORY key groupName# 给指定的消费者组添加消费者
#(一般情况下,我们并不需要自己添加消费者,因为当我们从这个消费者组当中指定1个消费者,
#                                   并且监听消息的时候,如果这个消费者不存在,则会自动创建消费者)
XGROUP CREATECONSUMER key groupname consumername# 删除消费者组中的指定消费者
XGROUP DELCONSUMER key groupname consumername
从消费者组读取消息
XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...]
  • group:消费组名称
  • consumer:消费者名称,如果消费者不存在,会自动创建一个消费者
  • count:本次查询的最大数量
  • BLOCK milliseconds:当没有消息时最长等待时间(若未指定,则不阻塞)
  • NOACK:无需手动ACK,即获取到消息后自动确认(一般不建议使用)
  • STREAMS key:指定队列名称
  • ID:获取消息的起始ID:
    • “>”:从消费者组的标记找到最后1个处理的消息(注意:不是已处理的消息,是处理的消息,也就是说它有可能被消费者获取了,但还没被消费者确认掉),的下一个未处理的消息开始
    • 其它(除了">"以外的所有):根据指定id从pending-list中获取已消费但未确认的消息。例如0,是从pending-list中的第一个消息开始(一直拿0,就是一直从pending-list中拿第1个消息)
图示操作过程

在这里插入图片描述

在这里插入图片描述

消费者监听消息的基本思路

在这里插入图片描述

XREADGROUP命令特点
  • 消息可回溯
  • 可以多消费者争抢消息,加快消费速度
  • 可以阻塞读取
  • 没有消息漏读的风险
  • 有消息确认机制,保证消息至少被消费一次
  • (内存不受jvm限制,消息可做持久化,消息确认机制)

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/485225.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Hive【内部表、外部表、临时表、分区表、分桶表】【总结】

目录 Hive的物种表结构特性 一、内部表 建表 使用场景 二、外部表 建表:关键词【EXTERNAL】 场景&#xff1a; 外部表与内部表可互相转换 三、临时表 建表 临时表横向对比​编辑 四、分区表 建表&#xff1a;关键字【PARTITIONED BY】 场景&#xff1a; 五、分桶表 …

【vue vue-seamless-scroll】解决vue-seamless-scroll鼠标悬浮才滚动或者只滚动一次就失效的问题

解决问题&#xff1a;使用vue-seamless-scroll发现只有鼠标悬浮上去才滚动&#xff0c;而且滚动一次停止了 目标效果&#xff1a; 解决方案&#xff1a; 最后发现是因为数据需要在页面挂载好就赋值&#xff0c;否则页面在加载完成后&#xff0c;数据无法自动滚动。但因为数据…

opencv图像的本质

目的 OpenCV是一个跨平台的库&#xff0c;使用它我们可以开发实时的计算机视觉应用程序。 它主要集中在图像处理&#xff0c;视频采集和分析&#xff0c;包括人脸检测和物体检测等功能。 数字图像在计算机中是以矩阵形式存储的&#xff0c;矩阵中的每一个元素都描述一定的图像…

day09-分库分表专题-今日指数

分库分表专题 学习目标 1、理解分库分表基础概念【垂直分库分表、水平分库分表】 2、能够说出sharding-jdbc为我们解决什么问题 3、理解sharding-jdbc中的关键名词 4、理解sharding-jdbc的整体架构及原理 5、掌握sharding-jdbc集成SpringBoot的方式 第一章 分库分表介绍 1、…

uniapp微信小程序解决上方刘海屏遮挡

问题 在有刘海屏的手机上&#xff0c;我们的文字和按钮等可能会被遮挡 应该避免这种情况 解决 const SYSTEM_INFO uni.getSystemInfoSync();export const getStatusBarHeight ()> SYSTEM_INFO.statusBarHeight || 15;export const getTitleBarHeight ()>{if(uni.get…

node 之 初步认识

思考&#xff1a;为什么JavaScript可以在浏览器中被执行 代执行的js代码——JavaScript解析引擎 不同的浏览器使用不同的JavaScript解析引擎 Chrome 浏览器 》 V8 Firefox浏览器 》OdinMonkey(奥丁猴&#xff09; Safri浏览器 》JSCore IE浏览器 》Chakra(查克拉&#xff09; e…

H5移动端文件预览pdf

H5移动端文件预览pdf 需求&#xff1a;H5页面嵌入浙政钉&#xff0c;需要文件预览Pdf。 试用了多个插件&#xff0c;踩了很多坑&#xff0c;如果小伙伴有类似填坑经历&#xff0c;并成功解决&#xff0c;感谢留言指点&#xff01;&#xff01;&#xff01; 先讲最终方案&#x…

qt 软件发布(Windows)

1. 开发环境 QtCreator MSVC编译器 2. 源码编译 生成release或者debug版本的exe可执行文件(x64或x86) 3. windeployqt 打包 ①左下角开始菜单栏找到QT的命令交互对话框&#xff0c;如下图MSVC 2017 64-bit(根据第二步编译的类型选择64位或者32位)。 ②cd 切换到第二步可…

性能测试的几个指标范围(CPU,内存,IO,网络)

性能测试中&#xff0c;对服务端的指标监控也是很重要的一个环节。通过对各项服务器性能指标的监控分析&#xff0c;可以定位到性能瓶颈。 后端性能指标有 CPU&#xff0c;内存&#xff0c;网络&#xff0c;jvm&#xff0c;I/O 等等 分析思路 整体系统 CPU 利用率 内存利用…

Flutter GLSL - 肆 | 从条纹到马赛克

theme: cyanosis Flutter & GLSL 系列文章&#xff1a; 《Flutter & GLSL - 壹 | Shader 让绘制无限强大》《Flutter & GLSL - 贰 | 从坐标到颜色》《Flutter & GLSL - 叁 | 变量传参》《Flutter & GLSL - 肆 | 从条纹到马赛克》 案例代码开源地址 【skele…

【高德地图】Android搭建3D高德地图详细教

&#x1f4d6;Android搭建3D高德地图详细教程 &#x1f4d6;第1章 高德地图介绍✅了解高德地图✅2D地图与3D地图 &#x1f4d6;第2章 搭建3D地图并显示✅第 1 步&#xff1a;创建 Android 项目✅第 2 步&#xff1a;获取高德Key✅第 3 步&#xff1a;下载地图SDK✅第 4 步&…

数据可视化在商业领域有哪些重要性?

数据可视化在商业领域的重要性体现在多个方面&#xff0c;它通过将复杂的数据集转化为直观、易于理解的图形和图表&#xff0c;帮助企业和组织做出更明智的决策。以下是数据可视化对商业的一些关键重要性&#xff1a; 提高决策效率&#xff1a;通过直观的图表和图形&#xff0c…