Kafka的本质是日志消息代理 日志的特点就是append-only和不可变 它能带来的显而易见的好处是强大的局部性 内存中可以抽象为buffer 内核态里它又是page cache 磁盘上它会集中在同一磁道 从上至下利于软件和操作系统进行快速写入 这也是为什么大量知名系统 不论是MySQL Server的binlog还是redis的aof 都是使用类似的方式
它是典型的IO密集型应用 所以它并不是线程池, Kafka的大量技术细节都在解决IO性能 包括但不限于零拷贝
一:实际问题总结
卡夫卡的消息存储在日志内的,以append-only的方式顺序追加在日志后面。
Kafka 主题(Topic)被分成多个分区(Partitions),每个分区是一个有序的、不可变的消息序列。消息在分区内是有序的,但不同分区之间没有顺序保证。这种分区机制允许 Kafka 在多个服务器上并行处理消息。并且使用单线程模型进行每个写操作,也就是所有操作任务都在单线程模型内处理,避免了多线程高并发场景下日志记录无序错乱。在消费时通过ack机制,也就是提交偏移量来确认消息已被处理,确保 Kafka 知道哪些消息已经被消费。
所以当出现消息丢失,消息重复消费等问题时,我们就要从原理出发。
-
消息丢失就去查看日志,可以重新调整位移偏量offset来重新消费。
-
重复消费就大概率是位移偏量和消费拉取有问题。
- 比如偏移量未及时提交,消费时崩溃或者重启了等,或者网络中断了
- 手动提交偏移量时由于延迟等原因,导致下一批拉取时又拉取一遍重复消费
- 消费者组group内各个topic的消费者数量发生变化,导致底下分区重新分配,可能会导致部分重复消费
-
既然问题如此,那么规避和解决也就对症下药。
-
比如自动提交偏移量 auto-commit .但是通常不建议使用,假如消息处理失败了,偏移量却提交了,就无法正确重试了。
-
通常手动提交,确保消息处理成功时才提交偏移量,consumer.commitSync()
-
使用幂等性来保障同一条消息不会重复处理,或者哪怕重复消费,结果一致,也不会产生影响。比如消息具有唯一的标识ID,记录已经处理的标识,消息处理前判断是否已经处理过
-
启用事务来解决。不同于幂等性处理方式着重于消费者端,使用业务逻辑来避免重消费。事务的处理方式是贯穿生产者和消费者的,使得生产者消费者在事务中进行原子性操作。事务支持需要开启生产者端的enable.idempotence幂等性配置( 该幂等性配置让消息在每个分区内有一个序列号,在各自分区内确保唯一性和顺序性 ,且提交时对消费者不可见)。消费者端开启read_committed只读已经提交的事务消息配置
比如生产者代码:
-
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.ProducerConfig;
import java.util.Properties;public class TransactionalProducer {public static void main(String[] args) {Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("enable.idempotence", "true"); // 启用幂等性props.put("transactional.id", "my-transactional-id");KafkaProducer<String, String> producer = new KafkaProducer<>(props);producer.initTransactions();try {producer.beginTransaction();producer.send(new ProducerRecord<>("orders", "orderId1", "orderValue1"));// 其他生产逻辑producer.commitTransaction();} catch (Exception e) {producer.abortTransaction();} finally {producer.close();}}
}
消费者代码:
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;import java.time.Duration;
import java.util.Collections;
import java.util.Properties;public class TransactionalConsumer {public static void main(String[] args) {Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("group.id", "order-consumer-group");props.put("enable.auto.commit", "false");props.put("isolation.level", "read_committed"); // 只读取已提交的事务消息props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);consumer.subscribe(Collections.singletonList("orders"));while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record : records) {// 处理消息processOrder(record.value());// 提交偏移量consumer.commitSync(Collections.singletonMap(new TopicPartition(record.topic(), record.partition()),new OffsetAndMetadata(record.offset() + 1)));}}}private static void processOrder(String order) {// 订单处理逻辑}
}
- 在这里幂等性处理为何不开启幂等性配置idempotence呢,第一是因为 幂等性的处理主要是消费端的处理,是业务代码逻辑规避重复消费,使用的是业务意义上的唯一标识ID,对整个系统都是唯一的,而不是生产者的序列号,只是在各自分区内唯一。所以即便开启,对于幂等性处理来说也没有什么意义,还是要在消费者端进行逻辑处理; 第二是因为 idempotence的序列号对消费者不可见,只是确保在各自分区内每个消息的提交是唯一的,不会重复提交。而跨分区就可能会重复序列号,所以即便可见,也不能作为消费端的唯一标识,它不是整个业务系统内唯一的。
二:高并发具体支持
1. 分区(Partitioning)
Kafka 主题(Topic)被分成多个分区(Partitions),每个分区是一个有序的、不可变的消息序列。消息在分区内是有序的,但不同分区之间没有顺序保证。这种分区机制允许 Kafka 在多个服务器上并行处理消息,从而实现高吞吐量。
分区的顺序保证
- 在单个分区内,消息是按顺序写入和读取的。
- 消费者在读取消息时会按偏移量顺序读取,确保消息的顺序性。
2. 生产者端的顺序保证
Kafka 生产者在向分区发送消息时,通过以下方式确保消息的顺序性:
- 单个生产者实例:单个生产者实例发送到同一分区的消息是按顺序发送的。
- 幂等性生产者:通过
enable.idempotence
配置,Kafka 生产者可以确保消息不会重复发送,并且消息的顺序不会错乱。 - 批量发送:生产者可以将多个消息打包成一个批次发送,从而减少网络开销,提高吞吐量。
3. 消费者端的顺序保证
Kafka 消费者通过以下方式确保消息的顺序性:
- 单线程消费:单个消费者实例从一个分区读取消息时,按偏移量顺序读取,确保消息的顺序性。
- 偏移量管理:消费者在处理完一条消息后,提交该消息的偏移量,确保下次从正确的位置继续消费。
- 再均衡机制:Kafka 的消费者组允许多个消费者实例共同消费一个主题。再均衡机制确保每个分区在任意时刻只被一个消费者实例消费,避免消息错乱。
4. 高吞吐量的实现机制
Kafka 通过以下机制实现高吞吐量:
- 零拷贝(Zero Copy):Kafka 使用零拷贝技术直接在磁盘和网络之间传输数据,减少了 CPU 和内存的开销。
- 批量处理:生产者和消费者都可以批量发送和接收消息,从而减少网络往返,提高吞吐量。
- 数据压缩:支持多种压缩格式(如 Gzip、Snappy、LZ4),减少传输数据量,提高网络利用率。
- 异步 I/O:Kafka 使用异步 I/O 操作,提高了磁盘和网络 I/O 的效率。
- 日志分段:Kafka 的分区日志被分成多个段,定期滚动新段,旧段可以被压缩或删除,提高了磁盘 I/O 性能。
5. 副本机制(Replication)
Kafka 通过副本机制确保数据的高可用性和容错性:
- 多副本:每个分区可以有多个副本,分布在不同的 broker 上。
- 领导者和追随者:每个分区有一个领导者副本和多个追随者副本。生产者和消费者只与领导者副本交互,确保消息的顺序性。
- ISR(In-Sync Replicas):Kafka 维护一个同步副本集合(ISR),只有同步副本集合中的副本才被认为是最新的副本,确保数据的一致性。
6. 控制器的角色
Kafka 集群中的控制器(Controller)负责管理分区的领导者选举和再均衡过程,确保在 broker 故障时快速恢复,并保持分区数据的一致性和顺序性。
结论
Kafka 通过分区机制、生产者和消费者的顺序保证、批量处理、零拷贝技术、副本机制以及控制器的管理等多种设计和机制,实现了高并发吞吐量,同时确保消息的顺序性和一致性。这些特性使 Kafka 成为一个高性能、可靠的分布式消息系统。