消息队列使用场景
消息队列中间件是分布式系统中重要的组件,主要解决应用耦合,异步消息,削峰填谷等问题。实现高性能、高可用、可伸缩和最终一致性架构。
- 解耦:多个服务监听、处理同一条消息,避免多次 rpc 调用。
- 异步消息:消息发布者不用等待消息处理的的结果。
- 削峰填谷:较大流量、写入场景,为下游 I/O 服务抗流量。当然大流量下就需要使用其他方案了。
- 消息驱动框架:在事件总线中,服务通过监听事件消息驱动服务完成相应动作。
消息队列模式
点对点模式,不可重复消费
多个生产者可以向同一个消息队列发送消息,一个消息在被一个消息者消费成功后,这条消息会被移除,其他消费者无法处理该消息。如果消费者处理一个消息失败了,那么这条消息会重新被消费。
发布/订阅模式
发布订阅模式需要进行注册、订阅,根据注册消费对应的消息。多个生产者可以将消息写到同一个 Topic 中,多种消息可以被同一个消费者消费。一个生产者生产的消息,同样也可以被多个消费者消费,只要他们进行过消息订阅。
选型参考
- 消息顺序:发送到队列的消息,消费时是否可以保证消费的顺序;
- 伸缩:当消息队列性能有问题,比如消费太慢,是否可以快速支持扩容;当消费队列过多,浪费系统资源,是否可以支持缩容。
- 消息留存:消息消费成功后,是否还会继续保留在消息队列;
- 容错性:当一条消息消费失败后,是否有一些机制,保证这条消息一定能成功,比如异步第三方退款消息,需要保证这条消息消费掉,才能确定给用户退款成功,所以必须保证这条消息消费成功的准确性;
- 消息可靠性:是否会存在丢消息的情况,比如有 A/B 两个消息,最后只有 B 消息能消费,A 消息丢失;
- 消息时序:主要包括“消息存活时间”和“延迟消息”;
- 吞吐量:支持的最高并发数;
- 消息路由:根据路由规则,只订阅匹配路由规则的消息,比如有 A/B 两者规则的消息,消费者可以只订阅 A 消息,B 消息不会消费。
Kafka
Kafka 是由 Apache 软件基金会开发的一个开源流处理平台,由 Scala 和 Java 编写。该项目的目标是为处理实时数据提供一个统一、高吞吐、低延迟的平台。其持久化层本质上是一个“按照分布式事务日志架构的大规模发布/订阅消息队列”,这使它作为企业级基础设施来处理流式数据非常有价值。(维基百科)
基本术语
Producer:消息生产者。一般情况下,一条消息会被发送到特定的主题上。通常情况下,写入的消息会通过轮询将消息写入各分区。生产者也可以通过设定消息 key 值将消息写入指定分区。写入分区的数据越均匀 Kafka 的性能才能更好发挥。
Topic:Topic 是个抽象的虚拟概念,一个集群可以有多个 Topic,作为一类消息的标识。一个生产者将消息发送到 topic,消费者通过订阅 Topic 获取分区消息。
Partition:Partition 是个物理概念,一个 Topic 对应一个或多个 Partition。新消息会以追加的方式写入分区里,在同一个 Partition 里消息是有序的。Kafka 通过分区,实现消息的冗余和伸缩性,以及支持物理上的并发读、写,大大提高了吞吐量。
Replicas:一个 Partition 有多个 Replicas 副本。这些副本保存在 broker,每个 broker 存储着成百上千个不同主题和分区的副本,存储的内容分为两种:master 副本,每个 Partition 都有一个 master 副本,所有内容的写入和消费都会经过 master 副本;follower 副本不处理任何客户端的请求,只同步 master 的内容进行复制。如果 master 发生了异常,很快会有一个 follower 成为新的 master。
Consumer:消息读取者。消费者订阅主题,并按照一定顺序读取消息。Kafka 保证每个分区只能被一个消费者使用。
Offset:偏移量是一种元数据,是不断递增的整数。在消息写入时 Kafka 会把它添加到消息里。在分区内偏移量是唯一的。消费过程中,会将最后读取的偏移量存储在 Kafka 中,消费者关闭偏移量不会丢失,重启会继续从上次位置开始消费。
Broker:独立的 Kafka 服务器。一个 Topic 有 N 个 Partition,一个集群有 N 个 Broker,那么每个 Broker 都会存储一个这个 Topic 的 Partition。如果某 topic 有 N 个 partition,集群有(N+M)个 broker,那么其中有 N 个 broker 存储该 topic 的一个 partition,剩下的 M 个 broker 不存储该 topic 的 partition 数据。如果某 topic 有 N 个 partition,集群中 broker 数目少于 N 个,那么一个 broker 存储该 topic 的一个或多个 partition。在实际生产环境中,尽量避免这种情况的发生,这种情况容易导致 Kafka 集群数据不均衡。
系统框架
第一个 topic 有两个生产,新消息被写入到 partition 1 或者 partition 2,两个分区在 broker1、broker2 都有备份。有新消息写入后,两个 follower 分区会从两个 master 分区同步变更。对应的 consumer 会从两个 master 分区根据现在 offset 获取消息,并更新 offset。第二个 topic 只有一个生产者,同样对应两个 partition,分散在 Kafka 集群的两个 broker 上。有新消息写入,两个 follower 分区会同步 master 变更。两个 Consumer 分别从不同的 master 分区获取消息。
优点
高吞吐量、低延迟:kafka 每秒可以处理几十万条消息,它的延迟最低只有几毫秒;
可扩展性:kafka 集群支持热扩展;
持久性、可靠性:消息被持久化到本地磁盘,并且支持数据备份防止数据丢失;
容错性:允许集群中节点故障,一个数据多个副本,少数机器宕机,不会丢失数据;
高并发:支持数千个客户端同时读写。
缺点
分区有序:仅在同一分区内保证有序,无法实现全局有序;
无延时消息:消费顺序是按照写入时的顺序,不支持延时消息
重复消费:消费系统宕机、重启导致 offset 未提交;
Rebalance:Rebalance 的过程中 consumer group 下的所有消费者实例都会停止工作,等待 Rebalance 过程完成。
使用场景
日志收集:大量的日志消息先写入 kafka,数据服务通过消费 kafka 消息将数据落地;
消息系统:解耦生产者和消费者、缓存消息等;
用户活动跟踪:kafka 经常被用来记录 web 用户或者 app 用户的各种活动,如浏览网页、搜索、点击等活动,这些活动信息被各个服务器发布到 kafka 的 topic 中,然后消费者通过订阅这些 topic 来做实时的监控分析,亦可保存到数据库;
运营指标:记录运营、监控数据,包括收集各种分布式应用的数据,生产各种操作的集中反馈,比如报警和报告;
流式处理:比如 spark streaming。
RabbitMQ
RabbitMQ 是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件(英语:Message-oriented middleware)。RabbitMQ 服务器是用 Erlang 语言编写的,而群集和故障转移是构建在开放电信平台框架上的。所有主要的编程语言均有与代理接口通讯的客户端函式库。(维基百科)
基本术语
Broker:接收客户端链接实体,实现 AMQP 消息队列和路由功能;
Virtual Host:是一个虚拟概念,权限控制的最小单位。一个 Virtual Host 里包含多个 Exchange 和 Queue;
Exchange:接收消息生产者的消息并将消息转发到队列。发送消息时根据不同 ExchangeType 的决定路由规则,ExchangeType 常用的有:direct、fanout 和 topic 三种;
Message Queue:消息队列,存储为被消费的消息;
Message:由 Header 和 Body 组成,Header 是生产者添加的各种属性,包含 Message 是否持久化、哪个 MessageQueue 接收、优先级。Body 是具体的消息内容;
Binding:Binding 连接起了 Exchange 和 Message Queue。在服务器运行时,会生成一张路由表,这张路由表上记录着 MessageQueue 的条件和 BindingKey 值。当 Exchange 收到消息后,会解析消息中的 Header 得到 BindingKey,并根据路由表和 ExchangeType 将消息发送到对应的 MessageQueue。最终的匹配模式是由 ExchangeType 决定;
Connection:在 Broker 和客户端之间的 TCP 连接;
Channel:信道。Broker 和客户端只有 tcp 连接是不能发送消息的,必须创建信道。AMQP 协议规定只有通过 Channel 才能执行 AMQP 命令。一个 Connection 可以包含多个 Channel。之所以需要建立 Channel,是因为每个 TCP 连接都是很宝贵的。如果每个客户端、每个线程都需要和 Broker 交互,都需要维护一个 TCP 连接的话是机器耗费资源的,一般建议共享 Connection。RabbitMQ 不建议客户端线程之前共享 Channel,至少保证同一 Channel 发小消息是穿行的;
Command:AMQP 命令,客户端通过 Command 来完成和 AMQP 服务器的交互。
资料直通车:Linux内核源码技术学习路线+视频教程内核源码
学习直通车:Linux内核源码内存调优文件系统进程管理设备驱动/网络协议栈
系统框架
一条 Message 经过信道到达对应的 Exchange,Exchange 收到消息后解析出消息 Header 内容,获取消息 BindingKey 并根据 Binding 和 ExchangeType 将消息转发到对应的 MessageQueue,最后通过 Connection 将消息传送的客户端。
ExchangeType
Direct:精确匹配
- 只有 RoutingKey 和 BindingKey 完全匹配的时候,消息队列才可以获取消息;
- Broker 默认提供一个 Exchange,类型是 Direct 名字是空字符串,绑定到所有的 Queue(这里通过 Queue 名字来区分)。
Fanout:订阅、广播
- 这个模式会将消息转发到所有的路由的 Queue 中
Topic:通配符模式
- RoutingKey 为一个句点号“. ”分隔的字符串(将被句点号“. ”分隔开的每一段独立的字符串称为一个单词),如“quick.orange.rabbit”。BindingKey 与 RoutingKey 一样;
- Bindingkey 中的两个特殊字符"#"和“_”用于模糊匹配,“#”用于匹配多个单次,“_”用来匹配单个单词(包含零个)。
优点
- 基于 AMQP 协议:除了 Qpid,RabbitMQ 是唯一一个实现了 AMQP 标准的消息服务器;
- 健壮、稳定、易用;
- 社区活跃,文档完善;
- 支持定时消息;
- 可插入的身份验证,授权,支持 TLS 和 LDAP;
- 支持根据消息标识查询消息,也支持根据消息内容查询消息。
缺点
- erlang 开发源码难懂,不利于做二次开发和维护;
- 接口和协议复杂,学习和维护成本较高。
总结
- erlang 有并发优势,性能较好。虽然源码复杂,但是社区活跃度高,可以解决开发中遇到的问题;
- 业务流量不大的话可以选择功能比较完备的 RabbitMQ。
Pulsar
Apache Pulsar 是 Apache 软件基金会顶级项目,是下一代云原生分布式消息流平台,集消息、存储、轻量化函数式计算为一体,采用计算与存储分离架构设计,支持多租户、持久化存储、多机房跨区域数据复制,具有强一致性、高吞吐、低延时及高可扩展性等流数据存储特性,被看作是云原生时代实时消息流传输、存储和计算最佳解决方案。Pulsar 是一个 pub-sub (发布-订阅)模型的消息队列系统。(百科)
基本术语
Property:代表租户,每个 property 都可以代表一个团队、一个功能、一个产品线。一个 property 可包含多个 namesapce,多租户是一种资源隔离手段,可以提高资源利用率;
Namespace:Pulsar 的基本管理单元,在 namaspace 级别可设置权限、消息 TTL、Retention 策略等。一个 namaspace 里的所有 topic 都继承相同的设置。命名空间分为两种:本地命名空间,只在集群内可见、全局命名空间对多个集群可见集群命名空间;
Producer:数据生产方,负责创建消息并将消息投递到 Pulsar 中;
Consumer:数据消费方,连接到 Pulsar 接收消息并进行相应的处理;
Broker:无状态 Proxy 服务,负责接收消息、传递消息、集群负载均衡等操作,它对 client 屏蔽了服务端读写流程的复杂性,是保证数据一致性与数据负载均衡的重要角色。Broker 不会持久化保存元数据。可以扩容但不能缩容;
BookKeeper:有状态,负责持久化存储消息。当集群扩容时,Pulsar 会在新增 BookKeeper 和 Segment(即 Bookeeper 的 Ledger),不需要像 kafka 一样在扩容时进行 Rebalance。扩容结果是 Fragments 跨多个 Bookies 以带状分布,同一个 Ledger 的 Fragments 分布在多个 Bookie 上,导致读取和写入会在多个 Bookies 之间跳跃;
ZooKeeper:存储 Pulsar 、 BookKeeper 的元数据,集群配置等信息,负责集群间的协调、服务发现等;
Topic:用作从 producer 到 consumer 传输消息。Pulsar 在 Topic 级别拥有一个 leader Broker,称之为拥有 Topic 的所有权,针对该 Topic 所有的 R/W 都经过该 Broker 完成。Topic 的 Ledger 和 Fragment 之间映射关系等元数据存储在 Zookeeper 中,Pulsar Broker 需要实时跟踪这些关系进行读写流程;
Ledger:即 Segment,Pulsar 底层数据以 Ledger 的形式存储在 BookKeeper 上。是 Pulsar 删除的最小单位;
Fragment :每个 Ledger 由若干 Fragment 组成。
系统框架
上面框架图分别演示了扩容、故障转移两种情况。扩容:因业务量增大扩容新增 Bookie N,后续写入的数据 segment x、segment y 写入新增 Bookie 中,为保持均衡扩容结果如上图绿色模块所示。故障转移:Bookie 2 的 segment 4 发生故障,Pulasr 的 Topic 会立马从新选择 Bookie 1 作为处理读写的服务。
Broker 是无状态的服务,只服务数据计算不存储,所以 Pulsar 可以认为是一种基于 Proxy 的分布式系统。
优点
- 灵活扩容
- 无缝故障恢复
- 支持延时消息
- 内置的复制功能,用于跨地域复制如灾备
- 支持两种消费模型:流(独享模式)、队列(共享模式)
RocketMQ
RocketMQ 是一个分布式消息和流数据平台,具有低延迟、高性能、高可靠性、万亿级容量和灵活的可扩展性。RocketMQ 是 2012 年阿里巴巴开源的第三代分布式消息中间件。(维基百科)
基本术语
Topic:一个 Topic 可以有 0 个、1 个、多个生产者向其发送消息,一个生产者也可以同时向不同的 Topic 发送消息。一个 Topic 也可以被 0 个、1 个、多个消费者订阅;
Tag:消息二级类型,可以为用户提供额外的灵活度,一条消息可以没有 tag;
Producer:消息生产者;
Broker:存储消息,以 Topic 为纬度轻量级的队列;转发消息,单个 Broker 节点与所有的 NameServer 节点保持长连接及心跳,会定时将 Topic 信息注册到 NameServer;
Consumer:消息消费者,负责接收并消费消息;
MessageQueue:消息的物理管理单位,一个 Topic 可以有多个 Queue,Queue 的引入实现了水平扩展的能力;
NameServer:负责对原数据的管理,包括 Topic 和路由信息,每个 NameServer 之间是没有通信的;
Group:一个组可以订阅多个 Topic,ProducerGroup、ConsumerGroup 分别是一类生产者和一类消费者;
Offset:通过 Offset 访问存储单元,RocketMQ 中所有消息都是持久化的,且存储单元定长。Offset 为 Java Long 类型,理论上 100 年内不会溢出,所以认为 Message Queue 是无限长的数据,Offset 是下标;
Consumer:支持 PUSH 和 PULL 两种消费模式,支持集群消费和广播消费。
系统框架
优点
支持发布/订阅(Pub/Sub)和点对点(P2P)消息模型:
- 顺序队列:在一个队列中可靠的先进先出(FIFO)和严格的顺序传递;支持拉(pull)和推(push)两种消息模式;
- 单一队列百万消息的堆积能力;
- 支持多种消息协议,如 JMS、MQTT 等;
- 分布式横向扩展架构;
- 满足至少一次消息传递语义;
- 提供丰富的 Dashboard,包含配置、指标和监控等;
- 支持的客户端,目前是 java、c++及 golang
缺点
- 社区活跃度一般;
- 延时消息:开源版不支持任意时间精度,仅支持特定的 level。
使用场景
- 为金融互联网领域而生,对于可靠性要求很高的场景。
原文作者:极客重生