由上图可以看出:KafkaProducer有两个基本线程:
主线程:
负责消息创建,拦截器,序列化器,分区器等操作,并将消息追加到消息收集器
RecoderAccumulator中;
消息收集器RecoderAccumulator为每个分区都维护了一个Deque<ProducerBatch>
类型的双端队列。
ProducerBatch 可以理解为是 ProducerRecord 的集合,批量发送有利于提升吞吐量,降低网络影响;
由于生产者客户端使用 java.io.ByteBuffer 在发送消息之前进行消息保存,并维护了一个 BufferPool 实现 ByteBuffer 的复用;该缓存池只针对特定大小(batch.size 指定)的 ByteBuffer进行管理,对于消息过大的缓存,不能做到重复利用。
每次追加一条ProducerRecord消息,会寻找/新建对应的双端队列,从其尾部获取一个ProducerBatch,判断当前消息的大小是否可以写入该批次中。若可以写入则写入;若不可以写入,则新建一个ProducerBatch,判断该消息大小是否超过客户端参数配置 batch.size 的值,不超过,则以 batch.size建立新的ProducerBatch,这样方便进行缓存重复利用;若超过,则以计算的消息大小建立对应的ProducerBatch ,缺点就是该内存不能被复用了。
Sender线程:
该线程从消息收集器获取缓存的消息,将其处理为 <Node, List<ProducerBatch>>
的形式, Node 表示集群的broker节点。
进一步将<Node, List<ProducerBatch>>
转化为<Node, Request>
形式,此时才可以向服务端发送数据。
在发送之前,Sender线程将消息以 Map<NodeId, Deque<Request>>
的形式保存到 InFlightRequests 中进行缓存,可以通过其获取 leastLoadedNode ,即当前Node中负载压力最小的一个,以实现消息的尽快发出。
生产者创建消息。
该角色将消息发布到Kafka的topic中。broker接收到生产者发送的消息后,broker将该消息追加到当前用于追加数据的 segment 文件中。
一般情况下,一个消息会被发布到一个特定的主题上。
- 默认情况下通过轮询把消息均衡地分布到主题的所有分区上。
- 在某些情况下,生产者会把消息直接写到指定的分区。这通常是通过消息键和分区器来实现的,分区器为键生成一个散列值,并将其映射到指定的分区上。这样可以保证包含同一个键的消息会被写到同一个分区上。
- 生产者也可以使用自定义的分区器,根据不同的业务规则将消息映射到分区。
消息写入的时候,每一个分区都有一个offset,这个offset就是生产者的offset,同时也是这个分区的最新最大的offset