数据传递语义
至少一次:ACK级别设置为-1+分区副本大于等于2+ISR里应答的最小副本数量大于等于2
最多一次:ACK级别设置为0
总结:
At Least Once:可以保证数据不丢失,但是不能保证数据不重复
At Most Once:可以保证数据不重复,但是不能保证数据不丢失
精确一次:对于一些非常重要的信息,比如和钱相关的数据,要求数据既不能重复也不丢失,Kafka 0.11版本以后,引入了重大特性:幂等性和事务
幂等性
幂等性就是值Producer不论向Broker发送多少次重复数据,Broker端都只会持久化一条,保证不重复。
精确一次=幂等性+至少一次(ack=-1+分区副本数>=2+ISR最小副本数量>=2)
重复数据的判断标准:具有<PID,partition,SeqNumber>相同主键的信息提交时,Broker只会持久化一条,其中PID是Kafka每次重启都会分配一个新的;partition表示分区号;Sequence Number是单调自增的。
所以幂等性只能保证再单分区会话内不回重复
开启幂等性参数
enable.idempotence默认为true,false关闭
生产者事务
开启事务,必须开启幂等性
producer在使用事务功能前,必须先自定义一个唯一的transactional.id,有了transactional.id,即使客户端挂掉了,它重启后也能继续处理未完成的事务
1、请求producer id 幂等性需要
2、返回producer id
3、发送消息到TopicA
4、发送commit请求
5、持久化commit请求
6、返回成功
7、后台发送commit请求
_transaction_state-分区-leader存储事务信息的特殊主题
默认又50个分区,每个分区负载一部分事务,事务划分根据transactional,id的hashcode值%50,计算出该分区事务属于哪个分区。该分区leader副本所在的broker节点即为这个transactional.id对应的Transaction Coordinator节点
例子
package com.longer.producer;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;public class CustomProducerTransactions {public static void main(String[] args) {//创建kafka配置对象Properties properties = new Properties();// 给 kafka 配置对象添加配置信息properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop100:9092");//序列化properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());//设置事务idproperties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"transaction_id_0");//创建kafka生产对象KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);//初始化事务producer.initTransactions();//开启事务producer.beginTransaction();//调用 send 方法,发送消息try {for (int i = 0; i < 5; i++) {producer.send(new ProducerRecord<>("first", "hi," + i));}System.out.println(1/0);//提交事务producer.commitTransaction();}catch (Exception e){e.printStackTrace();//终止事务producer.abortTransaction();}finally {//关闭资源producer.close();}}
}