RabbitMQ学习笔记

1、什么是MQ?

MQ全称message queue(消息队列),本质是一个队列,FIFO先进先出,是消息传送过程中保存消息的容器,多 用于分布式系统之间进行通信。

在互联网架构中,MQ是一种非常常见的上下游“逻辑解耦+物理解耦”的消息通讯服务,使用了MQ后,消息发送上游只需要依赖MQ,不需要依赖其他的服务。

2、为什么使用MQ?
  1. 流量削峰
  2. 应用解耦
    1. 比如电商系统中分为订单系统,支付系统,库存系统,物流系统,如果订单系统直接调用三种系统,其中一个系统出现了短暂的故障,订单系统就属于不可用的状态,
    2. 如果使用mq,订单系统生成的订单直接存放在MQ中,即便其余某个系统短暂故障,订单系统不感知,系统可用性增强
  3. 异步处理
    1. 假设A调用B ,B是异步处理并且需要很长时间来处理,但是A需要知道B的处理结果,通常做法是
      1. A每隔一段时间去调用B的查询函数,
      2. 或者A提供一个回调函数让B调用完成之后通知A。
    2. MQ提供了一种新的处理思路,即B处理完之后,发送一条消息给MQ,MQ将消息给A进行处理。
3、实现MQ的两种主流方式?

两种,AMQP和JMS

  1. AMQP即 Advanced Message Queuing Protocol(高级消息队列协议),是一个网络协议,是应用层协议的一个开放标准,为面向消息的中间件设计的。
  2. JMS 即 Java 消息服务(Java Message Service)应用程序接口,是一个java平台中关于面向消息中间件的API。是javaEE规范的一种。

区别?

  • JMS定义了统一的接口,来对消息操作进行统一,而AMQP是通过协议规定了数据交互格式
  • JMS限制了必须使用java,AMQP只是协议,不规定实现方式,是跨语言的
  • JMS规定了两种消息模式,AMQP的消息模式更加丰富

4、MQ的选择?
  1. kafka:主要特点是基于pull的模式来处理消息消费,追求高吞吐量,一开始的目的就是用于日志收集和传输,适合产生大量数据的数据收集业务,如果有日志采集功能,首选kafka
  2. rocketMQ: 适用于金融互联网领域,对可靠性要求很高的场景适用(阿里双11),尤其是电商里面的订单扣款、业务削峰等,稳定性好,适用于并发场景。
  3. rabbitMQ:性能好,时效性微妙级别,社区活跃度高,功能完备,管理界面使用方便,适合数据量没有那么大的中小型公司。

5、rabbitMQ中的四大核心概念?
  1. 生产者:产生数据,发送消息的程序
  2. 交换机:rabbitMQ中非常重要的一个部件,一方面接收来自生产者的消息,另一方面将消息推送到队列中。(交换机必须要确切知道如何处理接收到的消息)
  3. 队列:存放消息的数据结构,本质是一个大的消息缓冲区
  4. 消费者:大多数情况是一个等待接收消息的程序

6、rabbitMQ的基本结构?

  • producer:消息生产者,即生产消息的客户端
  • consumer:消息消费者,即消费消息的客户端,接收MQ转发的消息
  • connection:producer/consumer 和broker之间的TCP连接
  • channel:如果每一次访问rabbitMQ都建立一个connection,在消息量大的时候建立TCP连接的开销是巨大的,效率也很低,channel是在connection内部建立的逻辑连接,一个连接内包含多个信道,每次发消息只占用一个信道,这样就极大的减少了建立connection的开销。
  • broker:接收和分法消息的应用,消息队列的服务进程,包括两个部分,exchange和queue
  • exchange:消息队列交换机,按照一定的规则将消息路由转发给到某个队列,对消息进行过滤
  • queue:消息队列,存储消息
  • binding:exchange和queue之间的虚拟连接

生产者生产消息的过程:

  1. producer先连接到broker,这个步骤需要先建立connection连接,并开启一个信道channel
  2. producer声明一个交换器,并设置相关属性 (交换器写空字符串,会使用默认的交换器)
  3. producer声明一个队列,并设置相关属性
  4. producer通过绑定将交换器和队列进行绑定
  5. producer发送消息到broker,其中包含路由键,交换器等信息
  6. 交换器根据收到的路由键查找对应的队列
  7. 如果找到,就会将消息存入相应的队列,如果没有找到,会根据producer的配置选择丢弃或者是退回给生产者
  8. 关闭信道

消费者接收消息的过程:

  1. consumer连接到broker,建立connection连接,开启一个信道
  2. consumer请求消费相应队列中的消息,可以设置响应的回调函数
  3. 等待broker回应并投递相应队列中的消息,接收消息
  4. consumer确认收到消息,ack响应
  5. rabbitMQ接收到ack,将队列中的消息删除
  6. 关闭信道。

7、rabbitMQ的消息应答机制ack?

rabbitMQ向消费者传递完消息后,会删除该条消息(kafka中是不删除的,这个是一点差异)

为了保证消息在发送过程中不丢失,rabbitMQ引入了消息应答机制:消费者在接收消息并处理该消息后,告诉rabbitMQ他已经处理了,此时,rabbitMQ就可以把该消息删除了

  1. 自动应答:消息一旦被消费者接收,自动发送ack
  2. 手动应答:消息接收后,不会发送ack,需要手动调用

如何选择应答方式呢?

  1. 如果消息不太重要,丢失也没有影响,那么选择自动ack会比较好--- 性能高,可能丢失数据
  2. 如果不允许消息丢失,那么需要选择在消费完成后手动ack --- 可靠性高,性能稍差
8、rabbitMQ消息的重新入列?

如果消费者由于某些原因失去连接,导致消费者未成功发送ACK确认应答,RabbitMQ将会对未完全处理完的消息重新入队,如果其他消费者可以处理,则该消息将被分配到另一个消费者,从而保证消息未丢失。

9、rabbitMQ的持久化?
  1. 队列持久化
  2. 消息持久化
  3. exchange持久化

持久化只是告诉rabbitMQ将消息保存到磁盘,但是并不能真正的保证数据不丢失(准备从内存往磁盘写的时候rabbitMQ挂掉了)

队列持久化是在定义队列的时候,由durable参数决定的,设置为true的时候,才会持久化队列。

Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
//第二个餐胡设置为true,代表队列持久化
channel.queueDeclare("queue.persistent.name", true, false, false, null);

消息持久化是在发布消息的时候设置的,

//通过传入MessageProperties.PERSISTENT_PLAIN就可以实现消息持久化
channel.basicPublish("exchange.persistent", "persistent", MessageProperties.PERSISTENT_TEXT_PLAIN, "persistent_test_message".getBytes());

exchange的持久化:如果不设置exchange的持久化对消息的可靠性来说没有什么影响,**但是同样如果exchange不设置持久化,那么当broker服务重启之后,exchange将不复存在,那么既而发送方rabbitmq producer就无法正常发送消息。因此建议同样设置exchange的持久化。

一般只需要:``channel.exchangeDeclare(exchangeName, “direct/topic/header/fanout”, true); 就是在声明的时候讲durable字段设置为true就行了。

10、rabbitMQ的分发机制,以及如何修改?

rabbitMQ默认的分发机制是轮询模式,是公平的,但是实际场景中并不适用,比如consumerA处理消息很快,consumerB处理消息很满,那么轮询的机制就会导致consumerA有很多时间处于空闲,因此,需要修改成能者多劳的模式。

如何实现?

对信道进行设置,通过 BasicQos 方法设置prefetchCount = 1,需要注意的是,不公平分发只在手动ack的时候才会生效。

 

11、rabbitMQ中的预取值?

预取值是消费者信道最大传输信息数。上面说了如何设置rabbitMQ 的不公平分发,即设置prefetchCount = 1,其实这个值是可以设置更大的数字的,这个设置的值,就是预取值。

我们将慢的消费者preCount取值为5,快的消费者预取值为2,然后发送7条消息,实际慢的服务器会收到5条消息(第一条处理的时候,其余四条会堆积),快的服务器只会收到2条消息。

这是因为快的消费者信道满了,不能再发送消息,所以消息只能发送给慢的服务器,这就是basicQos用法。

12、rabbitMQ的发布确认机制?

发布确认机制有三种方式:

  1. 单个确认发布:一种简单的同步确认发布的方式,也即是只有前一个发布的消息确认发布之后,后续的消息才可以继续发布。 缺点就是发布速度慢,没有确认发布的消息会阻塞后续消息发布,适用于每秒数百条消息吞吐量的环境。
  2. 批量确认发布:也是同步确认的方式,一样会阻塞后续消息的发布,但是可以先发布一批消息,然后一起确认,提高吞吐量,缺点就是发生故障导致发布失败后,不知道那个消息有问题,必须将整个批处理保存在内存中,来记录重要的信息,然后重新发布消息
  3. 异步确认发布:效率和可靠性都比较高,利用回调函数来达到消息的可靠性传递(这种情况下,所有在该信道上发布的消息都会被指派一个唯一的ID ,一旦消息被投递到所有匹配的队列后,rabbitMQ就会发送一个确认给生产者(包含这个消息的唯一id),这样生产者就知道消息已经正确的到达目的队列了,如果rabbitMQ没能处理这个消息,也会发送一个NACK 的消息给producer,这时就可以进行重试操作。)
13、如何处理异步未确认的消息?

简陋版本:

将未确认的消息放到一个基于内存的,能够被发布线程访问的队列中,能够在confirm callbacks线程和发布线程之间进行消息传递。

比如使用ConcurrentSkipListMap,这个是基于并发的有序map集合。(ConcurrentHashMap是无序的)

1、RabbitMQ的消息确认机制确保了消息的可靠抵达,其中ConfirmCallback是其中一种实现方式
  1. ConfirmCallback是一个回调函数,用于在消息被确认时进行回调,以确保消息已经被正确地发送到RabbitMQ Broker并被处理。当生产者发送消息时,可以通过调用channel的confirmSelect()方法将channel设置为confirm模式,然后通过添加ConfirmCallback回调函数来处理消息确认。
  2. 当消息被发送到Broker后,如果Broker成功地将消息路由到目标队列,则会调用ConfirmCallback回调函数的handleAck()方法,表示消息已被确认。如果Broker无法将消息路由到目标队列,则会调用handleNack()方法,表示消息未被确认
  3. 使用ConfirmCallback可以确保消息已经被正确地发送到RabbitMQ Broker并被处理,从而避免了消息丢失或重复发送的情况同时,ConfirmCallback还可以在消息未被确认时进行重试或记录日志等操作,以确保消息的可靠性和稳定性
2、RabbitMQ的ReturnCallback机制是为了解决消息无法路由到指定队列的问题
  1. 当发送的消息无法被路由到指定队列时,RabbitMQ会将消息返回给生产者,这时候如果生产者设置了ReturnCallback回调函数,就可以在回调函数中处理这种情况
  2. ReturnCallback机制的使用场景一般是在消息发送时,指定了mandatory参数为true,表示如果消息无法被路由到指定队列,则将消息返回给生产者。如果mandatory参数为false,则消息会被直接丢弃。
  3. 当生产者设置了ReturnCallback回调函数后,RabbitMQ在将消息返回给生产者时,会触发该回调函数。在ReturnCallback回调函数中,可以处理消息无法路由的情况,例如重发消息、记录日志等。
  4. 需要注意的是,ReturnCallback机制只有在消息被发送到交换机后,才会触发。如果消息发送的交换机不存在,或者路由键不符合任何绑定规则,消息会被直接丢弃,不会触发ReturnCallback回调函数
3、备份交换机

通过mandatory参数和消息回退机制,可以处理交换机投递失败的消息,但是消息回退给生产者后,有时候并不知道如何处理这些消息,最多就是打印一个日志,存在缓存中,然后定时重试投递,还要考虑多次投递失败后的告警等等。如果生产者多了的话,每个生产者都要写这些逻辑代码,无疑大大增加了生产者的复杂性。

rabbitMQ中有死信队列可以处理消费失败的信息,但是当前所说的这些消息根本就没有进入队列,因此死信队列也没有用。在 RabbitMQ 中,有一种备份交换机的机制存在,可以很好的应对这个问题。

备份交换机可以理解为 RabbitMQ 中交换机的“备胎”,当我们为某一个交换机声明一个对应的备份交换机时,就是为它创建一个备胎,当交换机接收到一条不可路由消息时,将会把这条消息转发到备份交换机中,由备份交换机来进行转发和处理,通常备份交换机的类型为 Fanout ,这样就能把所有消息都投递到与其绑定的队列中,然后我们在备份交换机下绑定一个队列,这样所有那些原交换机无法被路由的消息,就会都进入这个队列了。 当然,还可以建立一个报警队列,用独立的消费者来进行监测和报警。

mandatory 参数与备份交换机可以一起使用的时候,如果两者同时开启,备份交换机的优先级更高。

在这里插入图片描述

 

14、rabbitMQ中的交换机exchange

rabbitMQ消息传递的模型核心思想是:生产者生产的消息不直接发送到队列,而是通过交换机。(事实上,生产者压根不知道发送到了哪些队列),交换机的功能十分简单,一方面接收来自生产者的消息,另一方面将消息推入队列。

交换机必须知道如何处理接收到的消息,是放入特定的队列,还是放入很多队列,还是丢弃,这些是由交换机的类型决定的。

bindings:binding其实就是exchange和queue之间的桥梁,即是绑定关系

交换机的类型:

  1. 无名exchange(默认exchange),声明的时候就是一个空字符串,但是通过routingkey绑定queue
  2. 扇出交换机 fanout: 就是将受到的所有消息广播到他知道的所有队列中,routingkey可以是空字符串
  3. 直接交换机 direct:消息只到交换机绑定的队列中,通过routingkey 来绑定,如果所有队列的routingkey都一样,那么就相当于是fanout 交换机了
  4. 主题交换机 topic:topic 交换机的消息的 routing_key 不能随意写,必须满足一定的要求,它必须是一个单词列表,以点号分隔开,类似于正则表达式,*(星号)可以代替一个单词 ;#(井号)可以替代零个或多个单词
  5. 头部交换机 header:不通过RoutingKey进行分发消息,而时通过消息中内容的headers的 key/value(键值对)匹配队列 (性能不高,用的少?)

15、死信和死信队列?

什么是死信?

在rabbitMQ中,消息可能有不同的表现,死信,顾名思义,就是dead message。死信消息通常包括以下几种:

  • 消息被拒绝,即rabbitMQ返回了一个nack信号
  • 消息的TTL过期了
  • 消息队列达到最大长度,后续消息无法入列
  • 消息不符合要求等。。

什么是死信队列?

死信队列就是用于存储死信的队列,死信队列中,有且只有死信构成,不会存在其余类型的消息。

死信队列在rabbitMQ中并不会单独存在,通常死信队列都会绑定一个普通的消息队列,当绑定的消息队列中有消息变成死信了,那么这个消息就会重新被交换机路由到指定的死信队列中,我们可以通过对这个死信队列进行监听,从而手动去对这些消息进行补偿。

如何使用死信队列?

在 RabbitMQ 中,死信队列的标识为 x-dead-letter-exchange ,通过观察死信队列的标识,我们不难发现,其标识最后为 exchange ,即 RabbitMQ 中的交换机,RabbitMQ 中的死信队列就是由死信交换机而得出的,要想使用死信队列,我们需要首先声明一个普通的消息队列,并将死信队列的标识绑定到这个普通的消息队列上。

16,rabbitMQ中处理消息失败了怎么办?

生产环境中,使用MQ的时候设计两个队列:一个是业务队列,专门用来处理消息,另外一个死信队列,用来处理异常情况。

比如,消费者消费消息时,数据库等发生了故障,无法将数据写入数据库,这时,消费者就可以将该条消息返回一个nack:

  • 一旦返回nack,MQ就会将这条消息转入提前设置好的死信队列中
  • 数据库故障期间,处理的所有失败消息都会转入死信队列
  • 消费者设置一个后台线程,监控数据库是否正常
  • 一旦发现数据库正常后,这个线程就把死信队列中的消息取出来,重新消费

在这里插入图片描述

 

17、rabbitMQ的延迟队列?

延迟队列的内部是有序的,最重要的特性就是体现在它的延迟属性上,延迟队列中的元素就是希望在指定的时间到了之后,将他取出来消费

延迟队列的使用场景(在某个事件发生之后或者之前的指定时间内要做的任务)

  • 订单在十分钟内未支付自动取消
  • 新用户注册后,三天没有登录短信提醒
  • 用户退款,三天内没有 处理通知相关运营人员
  • 预定会议后,提前十分钟通知与会人员

 18、rabbitMQ的延迟队列怎么实现?

18.1:死信队列 +TTL 过期时间

rabbitMQ并没有直接提供延迟队列功能,但是可以通过 死信队列 +TTL 过期时间进行实现:TTL就是消息或者队列的过期功能。当消息过期就会进到死信队列,死信队列和普通队列没啥区别,然后我们只需要配置一个消费者来消费死信队列里面的消息就可以了

注意: RabbitMQ只会对队列头部的消息进行过期淘汰,消息是否过期是在即将投递消息到消费者之前判定的,如果队列出现消息堆积情况,则已过期的消息还是会继续存活的,比如过期时间设置在消息内,由于消息队列是先进先出的,假设第一个消息过期时间是10s,第二个消息过期时间是1s,一前一后几乎同时发消息,1s的已经过期了,但是10s的还没有过期,那么第二个消息也不会从队列中剔除转到死信队列,从而导致消息不断积压。

 

18.2:基于插件实现延迟队列

rabbitMQ还可以通过安装插件来实现延迟队列,安装过程略。

使用延迟插件的情况下,延迟时间短的消息会被优先消费,解决了死信队列+TTL过期时间导致的消息积压问题。(通过交换机延迟消息的方式来实现消息的延迟)

 

上面介绍了rabbitMQ中的延迟队列实现方式,当然还有一些其他的选择,比如利用java自带的DelayQueue, 利用redis中的zset,利用kafka的时间轮等等,这些方式各有特点,可以根据不同的适用场景选择不同的实现方式。

1、DelayQueue

  • DelayQueue是java自带的一个BlockingQueue,用于放置实现了Delayed接口的对象。队列中的对象只能在其到期的时候才能从队列中取出。
  • 添加元素:触发Delayed接口中的compareTo方法按照时间进行排序,排在队列头部是最早到期的,越往后越晚到期
  • 查看元素:消费者线程查看元素,调用getDelay方法,如果方法返回值小于等于0,说明元素已经到期,则会取出,否则,返回wait的时间,wait时间之后,在从头部取出元素
  • 注意,不能将null放入DelayQueue中。

大数据必学Java基础(六十七):DelayQueue深入了解 - 知乎

2、redis中的zeset

  • redis 中,zset的存储结构是k-v,其中value包含了memmber和score,通过score可以进行排序
  • 生产者将需要延迟发送的数据存redis中的 zset
  • 消费者循环从redis的zset队列中获取数据,消费时间到了的数据,然后删除已经消费了的数据

3、kafka实现延迟队列

  • 创建一个专门的Topic用于存储延迟消息
  • 在消息的key中设置延迟时间戳。可以使用当前时间戳加上延迟时间作为key
  • 消费者进程不断检查消息的key中的时间戳是否已经过期。
    • 可以使用当前时间戳与消息的key中的时间戳进行比较。如果时间戳已经过期,则将消息重新发送到目标Topic中,例如"target-messages"
    • 如果时间戳还未过期,则将消息重新发送到"delayed-messages" Topic中,并设置一个新的延迟时间戳。

kafka实现延迟队列需要消费者定期从delayed-messages 中查看消息,消费者进程宕机就会影响延迟队列功能,轮询检查也会消耗资源,延迟精度只能达到毫秒级别。

需要注意的是,Kafka并不是专门为延迟队列设计的,因此在实现过程中需要考虑一些细节问题,比如消息的重复消费、消息的顺序等。

19、rabbitMQ的幂等性?

幂等性是指用户对统一操作发起的一次或者多次请求结果都是一致的,不会因为重复消费而导致结果不一样。

rabbitmq 把消息发给消费者进行消费,消费者消费成功后返回ack消息,但是这个时候网络中断等原因,rabbitMQ没有收到ack消息,让rabbitMQ误以为消息消费失败,然后rabbitMQ把消息重新发送给其他消费者,或者等网络重连后重新发给这个消费者,这个时候就会造成重复消费问题。

解决思路:

消费者解决幂等性的一般方法就是使用一个唯一标识ID,消费前先判断是否已经消费过。

  1. 唯一id,数据库主键去重
  2. redis原子性,利用setnx命令天然的幂等性。

20、优先级队列?

顾名思义,优先级队列可以对元素设置优先级,优先级高的消息具备优先消费的特权。

RabbitMQ支持优先级队列,在声明channel的时候添加 “x-max-priority”属性,RabbitMQ中优先级大小支持0-255,但是实际使用,我们可以根据需要设置最大的优先级值。

当然,在消费端速度大于生产端速度,且broker中没有消息堆积的话,对发送的消息设置优先级也没什么实际意义,因为发送端刚发送完一条消息就被消费端消费了,那么就相当于broker至多只有一条消息,那么对于单条消息来说优先级是没有什么意义的

21、惰性队列?

RabbitMQ从3.6版本引入了惰性队列这一概念,惰性队列会尽可能的将消息存入磁盘中,消费者消费到响应的消息时才会被加载到内存中,他的一个重要的目标是支持更多的消息存储。

  • 默认情况下,当生产者将消息发送到RabbitMQ的时候,队列中的消息会尽可能地存储在内存之中,这样可以更加快速地将消息发送给消费者。即使是持久化的消息,在被写入磁盘的同时也会在内存中驻留一份备份。当RabbitMQ需要释放内存的时候,会将内存中的消息换页至磁盘中,这个操作会耗费较长的时间,也会阻塞队列的操作,进而无法接收新的消息。虽然RabbitMQ的开发者们一直在升级相关的算法,但是效果始终不太理想,尤其是在 消息量特别大的时候。
  • 惰性队列会将接收到的消息直接存入文件系统,而不管是持久化的或者是非持久化的,这样可以减少内存的消耗,但是会增加I/O的使用,如果消息是持久化的,那么这样的I/O操作不可避免,惰性队列和持久化的消息可谓是“最佳拍档”。
  • 注意如果惰性队列中存储的是非持久化的消息,内存的使用率会一直很稳定,但是重启之后消息一样会丢失。
  • 使用:"x-queue-mode"设置为 "lazy"

22、RabbitMQ集群?

RabbitMQ集群有两种模式:普通集群和镜像集群。

1、普通集群:

  • 就是将RabbitMQ部署到多台服务器上,每台服务器启动一个RabbitMQ实例,多个实例之间进行消息通信。在普通集群上,我们创建的队列queue,他的元数据(queue的一些配置信息)会在所有的RabbitMQ实例中进行同步,但是队列中的消息只会存在于一个RabbitMQ实例上,不会同步到其他队列。
  • 当消费消息的时候,如果连接到了另外一个实例,那么实例会通过元数据定位到queue所在的位置,然后访问queue所在的实例,拉取数据过来发送给消费者
  • 这种集群可以提高RabbitMQ的消费吞吐能力,但是无法保证高可用,因为一旦存消息的RabbitMQ挂了,消息就没办法访问了。

在这里插入图片描述

 

2、镜像集群

  • 和普通集群的最大区别就是queue数据不在单独存在一台机器上,而是同时存储在多台机器上。也就是说,每个RabbitMQ都至少有一份镜像数据(副本数据)。
  • 每次写入消息的时候都会自动把数据同步到多台实例上去,这样即便一台机器宕机,其他机器上还有副本数据可以继续提供服务,继而实现了高可用。

在这里插入图片描述

 

23、rabbitMQ中的federation exchange,联邦交换机?

应用场景:

有时候为了容灾等原因,会将rabbitMQ部署在不同的城市,当跨距离传输的时候,会有网络延迟等原因。federation exchange 提供了一个能力:可以让原本发送给上游交换器的消息路由到本地的某个队列中,联邦队列则允许一个本地消费者接收到来自上游队列的消息

federation的原理:

  • 联邦交换机首先需要创建出下游队列(广州的broker3),
  • federation插件会在北京(broker1)上建立一个同名的交换器,同时内部创建一个内部交换机,并通过路由将两个交换机绑定起来。
  • federation插件还会在broker1 上简历一个队列,并和broker3中的交换机之间建立一条AMQP连接来实时地消费队列federation: exchangeA.broker3中的数据
  • 对外而言,客户端只能看到federation连接是建立在broker1 exchangeA 和brokr3 exchangeA 之间。

 

24、rabbitMQ中的shovel?
  • shovel插件同样是为了解决数据的转发问题。它能够可靠地从源端broker中的队列中拉取数据并转发到目的端broker的交换机中
  • 作为源端的队列和作为目的端的交换机可以位于一个broker中(没理解),也可以位于不同的broker上
  • shovel的优点:松耦合,解决不同Broker、集群、用户、vhost、MQ和Erlang版本移动消息,支持广域网,可以容忍糟糕的网络,能保证消息的可靠性,高度定制,当Shovel成功连接后,可以配置。

 

拓展,实现一个定时任务的方法:

  1. 遍历所有的任务,根据时间来判断是否需要执行
    1. 优点:逻辑简单
    2. 缺点:每秒都要遍历所有的任务,很多距离到期时间还远的任务做了很多无用功,数据量大的时候,会导致任务执行延迟,占用CPU
  2. 根据执行时间采用小顶堆算法,每次都取最小的时间进行判读
    1. 优点:相比较全部遍历,比较次数变少
    2. 缺点:数据量大的时候,每次插入新数据,时间复杂度为Ologn, 但是还有可能导致任务延迟,(java中的Timer,ScheduledThreadPoolExcutor 就是这种做法)
  3. jdk自带的DelayQueue,每次插入都要重新排队,时间复杂度Onlogn
  4. 时间轮
    1. kafka时间轮的原理:秒懂 Kafka 时间轮(TimingWheel) - 知乎
    2. 避免时间轮的空转:从带圈数的时间轮改为多层时间轮:
      1. 其实就是从单纯小圈转改成:先大圈转,转到一定位置后,然后在小圈转
      2. 【第一层的跨度为1ms,第二层的跨度为20ms,第三层的跨度为400ms。那么例如我们放入的任务为501ms,则将会放入第三层的第一个节点(501%400=101),冗余了101ms,当第三层的指针转到第一个节点时,则将101ms的任务转移到第二层,再将任务放入到第二层的第5个节点(101%20=1)。当第二层的指针转移到低5个节点的时候发现冗余时间,则将任务转移到第一层的第一个节点,第一层转移一次就执行了。这么做的好处是避免了单轮空转的情况。】

动图封面

 

img

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/102049.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【数据库】MySQL基础知识全解

系列综述: 💞目的:本系列是个人整理为了秋招面试的,整理期间苛求每个知识点,平衡理解简易度与深入程度。 🥰来源:材料主要源于拓跋阿秀、小林coding等大佬博客进行的,每个知识点的修…

一百七十三、Flume——Flume写入HDFS后的诸多小文件问题

一、目的 在用Flume采集Kafka中的数据写入HDFS后,发现写入HDFS的不是每天一个文件,而是一个文件夹,里面有很多小文件,浪费namenode的宝贵资源 二、Flume的配置文件优化(参考了其他博文) (一&a…

c语言初阶指针

目录 何为指针 地址大小 野指针 成因 如何规避 有效性 指针计算 -整数 ​编辑 指针比较运算 指针-指针 ​编辑 数组与指针关系 二级指针 指针数组 应用 何为指针 指针就是指针变量,用来存放内存空间的一个编号,将指针比作我们宾馆的客人&a…

java基础(三)

101.如何让写出去的数据能成功生效? flush()刷新数据 close()方法是关闭流,关闭包含刷新,关闭后流不可以继续使用了。 102.学会字节流完成文件的复制(支持一切的文件) public class CopyDemo05 { public static void main(St…

css transition属性

如果想实现一些效果:比如一个div容器宽高拉伸效果,或者一些好看的有过渡的效果可以使用 定义和用法 transition 属性是一个简写属性,用于设置四个过渡属性: transition-property transition-duration transition-timing-func…

sqlserver 各种集合、区间、 时间轴(持更)

1.所有有交集的区间 场景:在事件表里查找某年员工的岗位系数,并计算其加权平均数。case1:该员工是老员工,从2020年一直到2049年。case2:该员工是老员工,但是今年离职。case3:该员工是今年的新员…

使用共享 MVI 架构实现高效的 Kotlin Multiplatform Mobile (KMM) 开发

使用共享 MVI 架构实现高效的 Kotlin Multiplatform Mobile (KMM) 开发 文章中探讨了 Google 提供的应用架构指南在多平台上的实现。通过共享视图模型(View Models)和共享 UI 状态(UI States),我们可以专注于在原生端…

网络编程day5作业

1. 根据select TCP服务器流程图编写服务器&#xff08;上交&#xff09; #include <myhead.h> #define ERR_MSG(msg) do{\fprintf(stderr,"__%d__",__LINE__);\perror(msg);\ }while(0) #define PORT 6666 #define IP "192.168.114.50" int main(in…

内网渗透之凭据收集的各种方式

凭据收集是什么&#xff1f; 凭据收集是获取用户和系统凭据访问权限的术语。 这是一种查找或窃取存储的凭据的技术&#xff0c;包括网络嗅探&#xff0c;攻击者可以在网络嗅探中捕获传输的凭据。 凭证可以有多种不同的形式&#xff0c;例如&#xff1a; 帐户详细信息&#xf…

ESXI主机扩容(VCSA)

原因分析SCSI扩容 VMware为ESXI虚拟机硬盘扩容(需要先关闭ESXI) ESXI扩容前ESXI扩容 https://blog.csdn.net/tongxin_tongmeng/article/details/132652423 ESXI扩容后

2023年8大在线渗透测试工具介绍与分析

随着企业参与数字化运动&#xff0c;网络安全已成为大多数董事会讨论的一个重要方面。事实上&#xff0c;最近的一份报告显示&#xff0c;2022 年网络犯罪造成的损失总额达到惊人的 103 亿美元。 这就是在线渗透测试工具在网络安全中受到关注的地方。 今天&#xff0c;我们希…

安卓核心板的不同核心规格及架构介绍

安卓核心板是将核心功能封装的一块电子主板&#xff0c;集成芯片、存储器和功放器件等&#xff0c;并提供标准接口的芯片。 其特点&#xff1a; ● 能跑 Android 等操作系统 强大的功能及丰富的接口 支持 LCD/TP&#xff0c;Audio&#xff0c;Camera&#xff0c;Video&#…