一、消费方式
consumer采用pull(拉)的模式从broker中读取数据。
push(推)模式很难适应消费速率不同的消费者,因为消息发送速率是由broker决定的。它的目标是尽可能以最快的速度传递消息,但是这样容易造成consumer来不及处理消息,典型的表现就是拒绝服务以及网络拥塞。而pull模式可以根据consumer的消费能力以适当的速率消费消息。
pull模式的不足之处是,如果kafka没有数据,消费者可能会陷入循环中,一直返回空数据。针对这一点,kafka的消费者在消费数据时会传入一个时长参数timeout,如果当前没有数据可消费,consumer会等待一段时间后再返回。
二、分区分配策略
一个consumer group中有多个consumer,一个topic有多个partition,所以必然会涉及到partition的分配问题,即确定哪个partition由哪个consumer来消费。Kafka提供了3种消费者分区分配策略:RangeAssigor、RoundRobinAssignor、StickyAssignor。
PartitionAssignor接口用于用户定义实现分区分配算法,以实现Consumer之间的分区分配。消费组的成员订阅它们感兴趣的Topic并将这种订阅关系传递给作为订阅组协调者的Broker。协调者选择其中的一个消费者来执行这个消费组的分区分配并将分配结果转发给消费组内所有的消费者。Kafka默认采用RangeAssignor的分配算法。
2.1 RangeAssignor
RangeAssignor对每个Topic进行独立的分区分配。对于每一个Topic,首先对分区按照分区ID进行排序,然后订阅这个Topic的消费组的消费者再进行排序,之后尽量均衡的将分区分配给消费者。这里只能是尽量均衡,因为分区数可能无法被消费者数量整除,那么有一些消费者就会多分配到一些分区。分配示意图如下:
![](https://ask.qcloudimg.com/http-save/yehe-1420980/a840ef125cbfcb6424a0ddbb73f7cd18.png)
分区分配的算法如下:
@Override public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic,Map<String, Subscription> subscriptions) {Map<String, List<String>> consumersPerTopic = consumersPerTopic(subscriptions);Map<String, List<TopicPartition>> assignment = new HashMap<>();for (String memberId : subscriptions.keySet())assignment.put(memberId, new ArrayList<TopicPartition>());//for循环对订阅的多个topic分别进行处理for (Map.Entry<String, List<String>> topicEntry : consumersPerTopic.entrySet()) {String topic = topicEntry.getKey();List<String> consumersForTopic = topicEntry.getValue();Integer numPartitionsForTopic = partitionsPerTopic.get(topic);if (numPartitionsForTopic == null)continue;//对消费者进行排序Collections.sort(consumersForTopic);//计算平均每个消费者分配的分区数int numPartitionsPerConsumer = numPartitionsForTopic / consumersForTopic.size();//计算平均分配后多出的分区数int consumersWithExtraPartition = numPartitionsForTopic % consumersForTopic.size();List<TopicPartition> partitions = AbstractPartitionAssignor.partitions(topic, numPartitionsForTopic);for (int i = 0, n = consumersForTopic.size(); i < n; i++) {//计算第i个消费者,分配分区的起始位置int start = numPartitionsPerConsumer * i + Math.min(i, consumersWithExtraPartition);//计算第i个消费者,分配到的分区数量int length = numPartitionsPerConsumer + (i + 1 > consumersWithExtraPartition ? 0 : 1);assignment.get(consumersForTopic.get(i)).addAll(partitions.subList(start, start + length));}}return assignment; }
这种分配方式明显的一个问题是随着消费者订阅的Topic的数量的增加,不均衡的问题会越来越严重,比如上图中4个分区3个消费者的场景,C0会多分配一个分区。如果此时再订阅一个分区数为4的Topic,那么C0又会比C1、C2多分配一个分区,这样C0总共就比C1、C2多分配两个分区了,而且随着Topic的增加,这个情况会越来越严重。分配结果:
![](https://ask.qcloudimg.com/http-save/yehe-1420980/0da0d256cf2538fb5e765b7ed8b7661e.png)
订阅2个Topic,每个Topic4个分区,共3个Consumer
- C0:[T0P0,T0P1,T1P0,T1P1]
- C1:[T0P2,T1P2]
- C2:[T0P3,T1P3]
2.2 RoundRobinAssignor
RoundRobinAssignor的分配策略是将消费组内订阅的所有Topic的分区及所有消费者进行排序后尽量均衡的分配(RangeAssignor是针对单个Topic的分区进行排序分配的)。如果消费组内,消费者订阅的Topic列表是相同的(每个消费者都订阅了相同的Topic),那么分配结果是尽量均衡的(消费者之间分配到的分区数的差值不会超过1)。如果订阅的Topic列表是不同的,那么分配结果是不保证“尽量均衡”的,因为某些消费者不参与一些Topic的分配。
![](https://ask.qcloudimg.com/http-save/yehe-1420980/a8af86ed3058b5fe308cb976a39d7f14.png)
以上两个topic的情况,相比于之前RangeAssignor的分配策略,可以使分区分配的更均衡。不过考虑这种情况,假设有三个消费者分别为C0、C1、C2,有3个Topic T0、T1、T2,分别拥有1、2、3个分区,并且C0订阅T0,C1订阅T0和T1,C2订阅T0、T1、T2,那么RoundRobinAssignor的分配结果如下:
![](https://ask.qcloudimg.com/http-save/yehe-1420980/a4216737d755c068b3b20fb2fbf84175.png)
看上去分配已经尽量的保证均衡了,不过可以发现C2承担了4个分区的消费而C1订阅了T1,是不是把T1P1交给C1消费能更加的均衡呢?
2.3 StickyAssignor
StickyAssignor分区分配算法,目的是在执行一次新的分配时,能在上一次分配的结果的基础上,尽量少的调整分区分配的变动,节省因分区分配变化带来的开销。Sticky是“粘性的”,可以理解为分配结果是带“粘性的”——每一次分配变更相对上一次分配做最少的变动。其目标有两点:
- 分区的分配尽量的均衡。
- 每一次重分配的结果尽量与上一次分配结果保持一致。
当这两个目标发生冲突时,优先保证第一个目标。第一个目标是每个分配算法都尽量尝试去完成的,而第二个目标才真正体现出StickyAssignor特性的。
StickyAssignor算法比较复杂,下面举例来说明分配的效果(对比RoundRobinAssignor),前提条件:
- 有4个Topic:T0、T1、T2、T3,每个Topic有2个分区。
- 有3个Consumer:C0、C1、C2,所有Consumer都订阅了这4个分区。
![](https://ask.qcloudimg.com/http-save/yehe-1420980/ff983b6bcc49bdd67cff403ca4412de5.png)
上面红色的箭头代表的是有变动的分区分配,可以看出,StickyAssignor的分配策略,变动较小。
三.Offset
由于Consumer在消费过程中可能会出现断电宕机等故障,Consumer恢复后,需要从故障前的位置继续消费,所以Consumer需要实时记录自己消费到哪个位置,以便故障恢复后继续消费。Kafka0.9版本之前,Consumer默认将offset保存在zookeeper中,从0.9版本开始,Consumer默认将offset保存在Kafka一个内置的名字叫_consumeroffsets的topic中。默认是无法读取的,可以通过设置consumer.properties中的exclude.internal.topics=false来读取。