Kafka的幂等性,事务等配置

news/2025/2/5 23:32:16/文章来源:https://www.cnblogs.com/jhfnewstart/p/18700324

Kafka的本质是日志消息代理 日志的特点就是append-only和不可变 它能带来的显而易见的好处是强大的局部性 内存中可以抽象为buffer 内核态里它又是page cache 磁盘上它会集中在同一磁道 从上至下利于软件和操作系统进行快速写入 这也是为什么大量知名系统 不论是MySQL Server的binlog还是redis的aof 都是使用类似的方式

它是典型的IO密集型应用 所以它并不是线程池, Kafka的大量技术细节都在解决IO性能 包括但不限于零拷贝

一:实际问题总结

卡夫卡的消息存储在日志内的,以append-only的方式顺序追加在日志后面。

Kafka 主题(Topic)被分成多个分区(Partitions),每个分区是一个有序的、不可变的消息序列。消息在分区内是有序的,但不同分区之间没有顺序保证。这种分区机制允许 Kafka 在多个服务器上并行处理消息。并且使用单线程模型进行每个写操作,也就是所有操作任务都在单线程模型内处理,避免了多线程高并发场景下日志记录无序错乱。在消费时通过ack机制,也就是提交偏移量来确认消息已被处理,确保 Kafka 知道哪些消息已经被消费。

所以当出现消息丢失,消息重复消费等问题时,我们就要从原理出发。

  1. 消息丢失就去查看日志,可以重新调整位移偏量offset来重新消费。

  2. 重复消费就大概率是位移偏量和消费拉取有问题。

    • 比如偏移量未及时提交,消费时崩溃或者重启了等,或者网络中断了
    • 手动提交偏移量时由于延迟等原因,导致下一批拉取时又拉取一遍重复消费
    • 消费者组group内各个topic的消费者数量发生变化,导致底下分区重新分配,可能会导致部分重复消费
  3. 既然问题如此,那么规避和解决也就对症下药。

    • 比如自动提交偏移量 auto-commit .但是通常不建议使用,假如消息处理失败了,偏移量却提交了,就无法正确重试了。

    • 通常手动提交,确保消息处理成功时才提交偏移量,consumer.commitSync()

    • 使用幂等性来保障同一条消息不会重复处理,或者哪怕重复消费,结果一致,也不会产生影响。比如消息具有唯一的标识ID,记录已经处理的标识,消息处理前判断是否已经处理过

    • 启用事务来解决。不同于幂等性处理方式着重于消费者端,使用业务逻辑来避免重消费。事务的处理方式是贯穿生产者和消费者的,使得生产者消费者在事务中进行原子性操作。事务支持需要开启生产者端的enable.idempotence幂等性配置( 该幂等性配置让消息在每个分区内有一个序列号,在各自分区内确保唯一性和顺序性 ,且提交时对消费者不可见)。消费者端开启read_committed只读已经提交的事务消息配置

      比如生产者代码:

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.ProducerConfig;
import java.util.Properties;public class TransactionalProducer {public static void main(String[] args) {Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("enable.idempotence", "true"); // 启用幂等性props.put("transactional.id", "my-transactional-id");KafkaProducer<String, String> producer = new KafkaProducer<>(props);producer.initTransactions();try {producer.beginTransaction();producer.send(new ProducerRecord<>("orders", "orderId1", "orderValue1"));// 其他生产逻辑producer.commitTransaction();} catch (Exception e) {producer.abortTransaction();} finally {producer.close();}}
}

消费者代码:

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;import java.time.Duration;
import java.util.Collections;
import java.util.Properties;public class TransactionalConsumer {public static void main(String[] args) {Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("group.id", "order-consumer-group");props.put("enable.auto.commit", "false");props.put("isolation.level", "read_committed"); // 只读取已提交的事务消息props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);consumer.subscribe(Collections.singletonList("orders"));while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record : records) {// 处理消息processOrder(record.value());// 提交偏移量consumer.commitSync(Collections.singletonMap(new TopicPartition(record.topic(), record.partition()),new OffsetAndMetadata(record.offset() + 1)));}}}private static void processOrder(String order) {// 订单处理逻辑}
}
  1. 在这里幂等性处理为何不开启幂等性配置idempotence呢,第一是因为 幂等性的处理主要是消费端的处理,是业务代码逻辑规避重复消费,使用的是业务意义上的唯一标识ID,对整个系统都是唯一的,而不是生产者的序列号,只是在各自分区内唯一。所以即便开启,对于幂等性处理来说也没有什么意义,还是要在消费者端进行逻辑处理; 第二是因为 idempotence的序列号对消费者不可见,只是确保在各自分区内每个消息的提交是唯一的,不会重复提交。而跨分区就可能会重复序列号,所以即便可见,也不能作为消费端的唯一标识,它不是整个业务系统内唯一的。

二:高并发具体支持

1. 分区(Partitioning)

Kafka 主题(Topic)被分成多个分区(Partitions),每个分区是一个有序的、不可变的消息序列。消息在分区内是有序的,但不同分区之间没有顺序保证。这种分区机制允许 Kafka 在多个服务器上并行处理消息,从而实现高吞吐量。

分区的顺序保证

  • 在单个分区内,消息是按顺序写入和读取的。
  • 消费者在读取消息时会按偏移量顺序读取,确保消息的顺序性。

2. 生产者端的顺序保证

Kafka 生产者在向分区发送消息时,通过以下方式确保消息的顺序性:

  • 单个生产者实例:单个生产者实例发送到同一分区的消息是按顺序发送的。
  • 幂等性生产者:通过 enable.idempotence 配置,Kafka 生产者可以确保消息不会重复发送,并且消息的顺序不会错乱。
  • 批量发送:生产者可以将多个消息打包成一个批次发送,从而减少网络开销,提高吞吐量。

3. 消费者端的顺序保证

Kafka 消费者通过以下方式确保消息的顺序性:

  • 单线程消费:单个消费者实例从一个分区读取消息时,按偏移量顺序读取,确保消息的顺序性。
  • 偏移量管理:消费者在处理完一条消息后,提交该消息的偏移量,确保下次从正确的位置继续消费。
  • 再均衡机制:Kafka 的消费者组允许多个消费者实例共同消费一个主题。再均衡机制确保每个分区在任意时刻只被一个消费者实例消费,避免消息错乱。

4. 高吞吐量的实现机制

Kafka 通过以下机制实现高吞吐量:

  • 零拷贝(Zero Copy):Kafka 使用零拷贝技术直接在磁盘和网络之间传输数据,减少了 CPU 和内存的开销。
  • 批量处理:生产者和消费者都可以批量发送和接收消息,从而减少网络往返,提高吞吐量。
  • 数据压缩:支持多种压缩格式(如 Gzip、Snappy、LZ4),减少传输数据量,提高网络利用率。
  • 异步 I/O:Kafka 使用异步 I/O 操作,提高了磁盘和网络 I/O 的效率。
  • 日志分段:Kafka 的分区日志被分成多个段,定期滚动新段,旧段可以被压缩或删除,提高了磁盘 I/O 性能。

5. 副本机制(Replication)

Kafka 通过副本机制确保数据的高可用性和容错性:

  • 多副本:每个分区可以有多个副本,分布在不同的 broker 上。
  • 领导者和追随者:每个分区有一个领导者副本和多个追随者副本。生产者和消费者只与领导者副本交互,确保消息的顺序性。
  • ISR(In-Sync Replicas):Kafka 维护一个同步副本集合(ISR),只有同步副本集合中的副本才被认为是最新的副本,确保数据的一致性。

6. 控制器的角色

Kafka 集群中的控制器(Controller)负责管理分区的领导者选举和再均衡过程,确保在 broker 故障时快速恢复,并保持分区数据的一致性和顺序性。

结论

Kafka 通过分区机制、生产者和消费者的顺序保证、批量处理、零拷贝技术、副本机制以及控制器的管理等多种设计和机制,实现了高并发吞吐量,同时确保消息的顺序性和一致性。这些特性使 Kafka 成为一个高性能、可靠的分布式消息系统。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/879320.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

GNURadio模块学习——Source and Sink类

介绍GNU Radio中常见的 Source 与 Sink 模块,包括流程图端口、音频输入输出、虚拟连接、文件读写、ZMQ跨流程图通信,以及随机信号源、固定信号源、噪声源等常见信号源和时域、频域、星座图等信号展示工具。Source and Sink Pad(流程图端口) 当该流程图是hierarchical block…

【C++】gflag使用指南

一、什么是gflags? gflags 是一个用于定义命令行参数的 C++ 库,它由 Google 开发并开源。通过 gflags,你可以轻松地在你的程序中添加各种类型的命令行选项,包括整数、布尔值、字符串等,并且可以为这些选项设置默认值。此外,gflags 还提供了强大的帮助信息生成功能,使得用…

【C++】Google benchmark理解与应用

一、介绍 Google Benchmark 是一个用于 C++ 的微基准测试库。它旨在帮助开发者编写出更高效、更具表现力的基准测试代码。通过使用 Google Benchmark,可以方便地测量函数或代码片段的性能,并且能够生成详细的报告。 二、安装与配置 2.1 安装 在Ubuntu环境中安装Google Benchm…

LRU浅析

LRU算法LRU是Least Recently Used的缩写,即最近最少使用,是一种常用的页面置换算法,选择最近最久未使用的页面予以淘汰。该算法赋予每个页面一个访问字段,用来记录一个页面自上次被访问以来所经历的时间 t,当须淘汰一个页面时,选择现有页面中其 t 值最大的,即最近最少使…

20250205 省选模拟赛 T3

20250205 省选模拟赛 T3 Description 设计一个 \(n\times n\) 的 01 矩阵,使得从 \((1,1)\) 走到 \((n,n)\) 且只能向右或下走且只经过为 \(1\) 的格子的方案数为 \(X\)。 \(n \leq 24\) 时得满分。\(X \leq 10^9\)。 Solution 基于 \(2\) 进制的构造方法我们称从左上到右下的…

Automa:自动化浏览器工作流

🏷️仓库名称:AutomaApp/automa 🌟截止发稿星数: 14340 (今日新增:33) 🇨🇳仓库语言: Vue 🤝仓库开源协议:Other 🔗仓库地址:https://github.com/AutomaApp/automa引言 Automa是一个浏览器扩展,允许用户通过连接模块来自动化浏览器任务。它消除了重复性任务的需…

本地部署DeepSeek教程

本地部署DeepSeek教程 步骤 本地部署DeepSeek教程步骤 1 安装Ollama 2 下载DeepSeek模型 3 可视化图文交互界面Chatbox(可选)1 安装Ollama 访问Ollama官网下载Ollama,默认安装即可。安装完成后打开终端(我这里是windows系统),输入: ollama help即可查看ollama选项,且可…

OpenLDAP篇-安装OpenLDAP服务01

1、OpenLDAP统⼀⽤户认证系统 1.1 为什么需要OpenLDAP 在没有OpenLDAP统⼀⽤户认证系统的环境中,往往会⾯临如下问题:1、当⽤户需要访问多台服务器时,管理员需要在每台服务器上⼿动创建账户。如果员⼯离职,还需逐台删除账户,整体操作繁琐且容易出现遗漏的情况,因此存在较…

集训3 20240127

集训3 20240127 牛客竞赛_ACM/NOI/CSP/CCPC/ICPC算法编程高难度练习赛_牛客竞赛OJ A: 题目大意:给定 \(n\) ,两个人轮流可以使 \(n\) 减去一个任意小于它且与它互质的数,求最后甲能否取胜 #include<bits/stdc++.h>using namespace std;int main() {long long n;cin&g…

RocketMQ实战—4.消息零丢失的方案

大纲 1.全链路分析为什么用户支付完成后却没有收到红包 2.RocketMQ的事务消息机制实现发送消息零丢失 3.RocketMQ事务消息机制的底层实现原理 4.是否可以通过同步重试方案来代替事务消息方案来实现发送消息零丢失 5.使用RocketMQ事务消息的代码案例细节 6.同步刷盘+Raft协议同步…

qoj7301 AGC036D 题解

qoj7301 orz yxx 有一个很牛的状态设计 \(f_{i,j,0/1}\),\(0\) 为 \(a_{i-1}>a_i\),\(j\) 记录 \(a_{i-1}\) 的值,\(a_i\) 的值未定;\(1\) 为 \(a_{i-1}<a_i\),\(j\) 记录 \(a_i\) 的值 这样可以完美解决 \(a_{i-1}>a_i<a_{i+1}\) 的问题 转移和优化都是简单的…

【PyTorch】对比Torch和Numpy

该部分主要通过对比Torch和Numpy基础知识,方便大家了解PyTorch。Numpy是处理数据的模块,处理各种矩阵的形式来多核加速运算。 Torch自称为神经网络界的Numpy,因为它能将torch产生的tensor(张量)放在 GPU 中加速运算(前提是你有合适的 GPU),就像Numpy会把array放在CPU中…