消息队列-kafka-消息发送流程(源码跟踪) 与消息可靠性

官方网址

源码:https://kafka.apache.org/downloads
快速开始:https://kafka.apache.org/documentation/#gettingStarted
springcloud整合

发送消息流程

在这里插入图片描述
主线程:主线程只负责组织消息,如果是同步发送会阻塞,如果是异步发送需要传入一个回调函数。
Map集合:存储了主线程的消息。
Sender线程:真正的发送其实是sender去发送到broker中。

源码阅读

1 首先打开Producer.send()可以看到里面的内容

// 返回值是一个 Future 参数为ProducerRecord
Future<RecordMetadata> send(ProducerRecord<K, V> record);
// ProducerRecord定义了这些信息
// 主题
private final String topic;
// 分区
private final Integer partition;
// header
private final Headers headers;
private final K key;
private final V value;
// 时间戳
private final Long timestamp;

2 发送之前的前置处理

public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {// intercept the record, which can be potentially modified; this method does not throw exceptions// 这里给开发者提供了前置处理的勾子ProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record);// 我们最终发送的是经过处理后的消息 并且如果是异步发送会有callback 这个是用户定义的return doSend(interceptedRecord, callback);}

3 进入真正的发送逻辑Future doSend()

  • 由于是网络通信,所以我们要序列化,在这个函数里面就做了序列化的内容。
try {serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());} catch (ClassCastException cce) {throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() +" to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() +" specified in key.serializer", cce);}byte[] serializedValue;try {serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value());} catch (ClassCastException cce) {throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() +" to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() +" specified in value.serializer", cce);}
  • 然后我们获取分区
// 然后这里又是一个策略者模式 也是由用户可以配置的  DefaultPartitioner UniformStickyPartitioner RoundRobinPartitioner 提供了这样三个分区器
private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) {Integer partition = record.partition();return partition != null ?partition :partitioner.partition(record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);
}

4 到了我们的RecordAccumulator,也就是先由主线程发送到了RecordAccumulator

// 也就是对图中的Map集合
RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,serializedValue, headers, interceptCallback, remainingWaitMs, true, nowMs);

我们发现里面是用一个MAP存储的一个分区和ProducerBatch 是讲这个消息写到内存里面MemoryRecordsBuilder 通过这个进行写入

// 可以看到是一个链表实现的双向队列,也就是消息会按append的顺序写到 内存记录中去
private final ConcurrentMap<TopicPartition, Deque<ProducerBatch>> batches;

5 接着我们看,我们append了以后,会有一个判断去唤醒sender线程,见下面的注释

// 如果说哦我们当前的 这个batch满了或者 我们创建了一个新的batch 这个时候唤醒 sender线程去发送数据
if (result.batchIsFull || result.newBatchCreated) {log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);// 唤醒sender 去发送数据this.sender.wakeup();}
// 实现了Runnable 所以我们去看一下RUN方法的逻辑
public class Sender implements Runnable 

好上来就是一个循环

while (running) {try {runOnce();} catch (Exception e) {log.error("Uncaught error in kafka producer I/O thread: ", e);}
}

接着进入runOnece方法,直接看核心逻辑

// 从RecordAccumulator 拿数据 然后发送
Map<Integer, List<ProducerBatch>> batches = this.accumulator.drain(cluster, result.readyNodes, this.maxRequestSize, now);addToInflightBatches(batches);
// 中间省去了非核心逻辑
sendProduceRequests(batches, now);

如果继续跟踪的话最终是走到了selector.send()里面:

Send send = request.toSend(destination, header);InFlightRequest inFlightRequest = new InFlightRequest(clientRequest,header,isInternalRequest,request,send,now);this.inFlightRequests.add(inFlightRequest);selector.send(send);

6 接着我们就要看返回逻辑了,可以看到在sendRequest里面sendProduceRequest方法是通过传入了一个回调函数处理返回的。

RequestCompletionHandler callback = new RequestCompletionHandler() {public void onComplete(ClientResponse response) {handleProduceResponse(response, recordsByPartition, time.milliseconds());}};
// 如果有返回
if (response.hasResponse()) {ProduceResponse produceResponse = (ProduceResponse) response.responseBody();for (Map.Entry<TopicPartition, ProduceResponse.PartitionResponse> entry : produceResponse.responses().entrySet()) {TopicPartition tp = entry.getKey();ProduceResponse.PartitionResponse partResp = entry.getValue();ProducerBatch batch = batches.get(tp);completeBatch(batch, partResp, correlationId, now, receivedTimeMs + produceResponse.throttleTimeMs());}this.sensors.recordLatency(response.destination(), response.requestLatencyMs());} 

追踪到ProducerBatch

if (this.finalState.compareAndSet(null, tryFinalState)) {completeFutureAndFireCallbacks(baseOffset, logAppendTime, exception);return true;}
private void completeFutureAndFireCallbacks(long baseOffset, long logAppendTime, RuntimeException exception) {// Set the future before invoking the callbacks as we rely on its state for the `onCompletion` callproduceFuture.set(baseOffset, logAppendTime, exception);// execute callbacksfor (Thunk thunk : thunks) {try {if (exception == null) {RecordMetadata metadata = thunk.future.value();if (thunk.callback != null)thunk.callback.onCompletion(metadata, null);} else {if (thunk.callback != null)thunk.callback.onCompletion(null, exception);}} catch (Exception e) {log.error("Error executing user-provided callback on message for topic-partition '{}'", topicPartition, e);}}produceFuture.done();}

Thunk 这个其实就是我们在Append的时候的回调:
在这里插入图片描述
至此整个流程就完成了,从发送消息,到响应后回调我们的函数。

消息可靠性

// 所有消费者的配置都在ProducerConfig 里面
public static final String ACKS_CONFIG = "acks";

acks = 0:异步形式,单向发送,不会等待 broker 的响应
acks = 1:主分区保存成功,然后就响应了客户端,并不保证所有的副本分区保存成功
acks = all 或 -1:等待 broker 的响应,然后 broker 等待副本分区的响应,总之数据落地到所有的分区后,才能给到producer 一个响应

在可靠性的保证下,假设一些故障:

  • Broker 收到消息后,同步 ISR 异常:只要在 -1 的情况下,其实不会造成消息的丢失,因为有重试机制
  • Broker 收到消息,并同步 ISR 成功,但是响应超时:只要在 -1 的情况下,其实不会造成消息的丢失,因为有重试机制

可靠性能保证哪些,不能保障哪些?

  • 保证了消息不会丢失
  • 不保证消息一定不会重复(消息有重复的概率,需要消费者有幂等性控制机制)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/522561.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

力扣刷题Days13-101对称二叉树(js)

目录 1,题目 2&#xff0c;代码 2.1递归思想 2.2队列--迭代思想 3&#xff0c;学习与总结 1,题目 给你一个二叉树的根节点 root &#xff0c; 检查它是否轴对称。 2&#xff0c;代码 2.1递归思想 return dfs(left.left, right.right) && dfs(left.right, right.l…

Vue-Router路由介绍和使用

vue属于单页面应用&#xff0c;路由就是根据浏览器路径不同&#xff0c;用不同的试图组件替换这个页面内容 开启路由功能 如图在创建项目时候勾选rouler 这样创建好的项目就有路由功能 下一步 不同的访问路径 展示不同的页面内容 路由配置 路由连接组件 浏览器会解析为超链接 …

跨境电商趋势解析:社交电商携手私域流量运营,精准触达与转化

随着全球化的深入发展&#xff0c;跨境电商逐渐成为全球贸易的重要组成部分。在这一背景下&#xff0c;社交电商作为一种新兴的商业模式&#xff0c;正逐渐在跨境电商领域崭露头角&#xff0c;并对私域流量的运营产生了深远的影响。本文Nox聚星将和大家分析社交电商在跨境电商中…

CentOS7部署SonarQube 9.9.4 LTS

文章目录 下载地址前置条件安装sonarqube创建用户解压修改sonar.properties配置文件 启动sonarqube开启防火墙端口启动报错访问SonarQube安装汉化包 安装sonar-scanner 下载地址 社区稳定版本 版本依赖关系 Prerequisites and overview (sonarsource.com) 前置条件 JDK11安…

Java集合面试题(day 02)

&#x1f4d1;前言 本文主要是【JAVA】——Java集合面试题的文章&#xff0c;如果有什么需要改进的地方还请大佬指出⛺️ &#x1f3ac;作者简介&#xff1a;大家好&#xff0c;我是听风与他&#x1f947; ☁️博客首页&#xff1a;CSDN主页听风与他 &#x1f304;每日一句&am…

鞋服品牌怎样合理把控订货深度和宽度

在鞋服品牌的运营管理中&#xff0c;订货深度和宽度是两个至关重要的概念。订货深度指的是某一款式或规格的产品数量&#xff0c;而订货宽度则代表品牌所涵盖的产品种类和款式。合理把控订货深度和宽度对于品牌的库存管理、销售情况以及顾客满意度都有着深远的影响。本文将探讨…

电脑小问题:Windows更新后黑屏

Windows 更新后黑屏解决方法 在 Windows 更新后&#xff0c;伴随了一个小问题&#xff0c;电脑启动后出现了桌面黑屏。原因可能是火绒把 explorer.exe 当病毒处理了。 下面讲解 Windows 更新后黑屏的解决方法&#xff0c;步骤如下&#xff1a; 1. 按 ctrl alt delete 组合键…

Linux centos 常用的网络负载和网速查看工具和命令

在 CentOS 上查看网络速度和网络负载&#xff0c;可以使用多种工具&#xff0c;以下是一些常用的命令行工具&#xff1a; iftop - 用于实时监视网络带宽使用情况。 安装命令&#xff1a; sudo yum install iftop 使用命令&#xff1a; sudo iftop nload - 一个简单的控制…

最新基于R语言lavaan结构方程模型(SEM)实践技术应用

基于R语言lavaan程序包&#xff0c;通过理论讲解和实际操作相结合的方式&#xff0c;由浅入深地系统介绍结构方程模型的建立、拟合、评估、筛选和结果展示的全过程。我们筛选大量经典案例&#xff0c;这些案例来自Nature、Ecology、Ecological Applications、Journal of Ecolog…

react-native使用react-native-reanimated报错

error信息&#xff1a; Error: [Reanimated] UpdatePropsManager is not available on non-native platform. 解决方案一&#xff1a; 简单粗暴不在浏览器里看打印&#xff0c;关闭debugger即可。 解决方案二&#xff1a; 找到这个ts文件然后注释掉相关代码 然后给这个包打上…

LeetCode 刷题 [C++] 第3题.无重复字符的最长子串

题目描述 给定一个字符串 s &#xff0c;请你找出其中不含有重复字符的 最长子串 的长度。 题目分析 可以使用滑动窗口加哈希表来实现&#xff1a; 使用start和end两个变脸来表示滑动窗口的头部位置和尾部位置&#xff0c;两者开始均为0&#xff1b;借助哈希表来记录已经遍…

【微信小程序】基本语法

目录 一、列表渲染&#xff08;包括wx:for改变默认&#xff09; 二、事件冒泡和事件捕获 三、生命周期 一、列表渲染&#xff08;包括wx:for改变默认&#xff09; 1、列表渲染(wx-for、block 改变默认wx:for item等) <view> {{msg}} </view> //渲染跟普通vu…