消息队列-kafka-消息发送流程(源码跟踪)

官方网址

源码:https://kafka.apache.org/downloads
快速开始:https://kafka.apache.org/documentation/#gettingStarted
springcloud整合

发送消息流程

在这里插入图片描述
主线程:主线程只负责组织消息,如果是同步发送会阻塞,如果是异步发送需要传入一个回调函数。
Map集合:存储了主线程的消息。
Sender线程:真正的发送其实是sender去发送到broker中。

源码阅读

1 首先打开Producer.send()可以看到里面的内容

// 返回值是一个 Future 参数为ProducerRecord
Future<RecordMetadata> send(ProducerRecord<K, V> record);
// ProducerRecord定义了这些信息
// 主题
private final String topic;
// 分区
private final Integer partition;
// header
private final Headers headers;
private final K key;
private final V value;
// 时间戳
private final Long timestamp;

2 发送之前的前置处理

public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {// intercept the record, which can be potentially modified; this method does not throw exceptions// 这里给开发者提供了前置处理的勾子ProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record);// 我们最终发送的是经过处理后的消息 并且如果是异步发送会有callback 这个是用户定义的return doSend(interceptedRecord, callback);}

3 进入真正的发送逻辑Future doSend()

  • 由于是网络通信,所以我们要序列化,在这个函数里面就做了序列化的内容。
try {serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());} catch (ClassCastException cce) {throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() +" to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() +" specified in key.serializer", cce);}byte[] serializedValue;try {serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value());} catch (ClassCastException cce) {throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() +" to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() +" specified in value.serializer", cce);}
  • 然后我们获取分区
// 然后这里又是一个策略者模式 也是由用户可以配置的  DefaultPartitioner UniformStickyPartitioner RoundRobinPartitioner 提供了这样三个分区器
private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) {Integer partition = record.partition();return partition != null ?partition :partitioner.partition(record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);
}

4 到了我们的RecordAccumulator,也就是先由主线程发送到了RecordAccumulator

// 也就是对图中的Map集合
RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,serializedValue, headers, interceptCallback, remainingWaitMs, true, nowMs);

我们发现里面是用一个MAP存储的一个分区和ProducerBatch 是讲这个消息写到内存里面MemoryRecordsBuilder 通过这个进行写入

// 可以看到是一个链表实现的双向队列,也就是消息会按append的顺序写到 内存记录中去
private final ConcurrentMap<TopicPartition, Deque<ProducerBatch>> batches;

5 接着我们看,我们append了以后,会有一个判断去唤醒sender线程,见下面的注释

// 如果说哦我们当前的 这个batch满了或者 我们创建了一个新的batch 这个时候唤醒 sender线程去发送数据
if (result.batchIsFull || result.newBatchCreated) {log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);// 唤醒sender 去发送数据this.sender.wakeup();}
// 实现了Runnable 所以我们去看一下RUN方法的逻辑
public class Sender implements Runnable 

好上来就是一个循环

while (running) {try {runOnce();} catch (Exception e) {log.error("Uncaught error in kafka producer I/O thread: ", e);}
}

接着进入runOnece方法,直接看核心逻辑

// 从RecordAccumulator 拿数据 然后发送
Map<Integer, List<ProducerBatch>> batches = this.accumulator.drain(cluster, result.readyNodes, this.maxRequestSize, now);addToInflightBatches(batches);
// 中间省去了非核心逻辑
sendProduceRequests(batches, now);

如果继续跟踪的话最终是走到了selector.send()里面:

Send send = request.toSend(destination, header);InFlightRequest inFlightRequest = new InFlightRequest(clientRequest,header,isInternalRequest,request,send,now);this.inFlightRequests.add(inFlightRequest);selector.send(send);

6 接着我们就要看返回逻辑了,可以看到在sendRequest里面sendProduceRequest方法是通过传入了一个回调函数处理返回的。

RequestCompletionHandler callback = new RequestCompletionHandler() {public void onComplete(ClientResponse response) {handleProduceResponse(response, recordsByPartition, time.milliseconds());}};
// 如果有返回
if (response.hasResponse()) {ProduceResponse produceResponse = (ProduceResponse) response.responseBody();for (Map.Entry<TopicPartition, ProduceResponse.PartitionResponse> entry : produceResponse.responses().entrySet()) {TopicPartition tp = entry.getKey();ProduceResponse.PartitionResponse partResp = entry.getValue();ProducerBatch batch = batches.get(tp);completeBatch(batch, partResp, correlationId, now, receivedTimeMs + produceResponse.throttleTimeMs());}this.sensors.recordLatency(response.destination(), response.requestLatencyMs());} 

追踪到ProducerBatch

if (this.finalState.compareAndSet(null, tryFinalState)) {completeFutureAndFireCallbacks(baseOffset, logAppendTime, exception);return true;}
private void completeFutureAndFireCallbacks(long baseOffset, long logAppendTime, RuntimeException exception) {// Set the future before invoking the callbacks as we rely on its state for the `onCompletion` callproduceFuture.set(baseOffset, logAppendTime, exception);// execute callbacksfor (Thunk thunk : thunks) {try {if (exception == null) {RecordMetadata metadata = thunk.future.value();if (thunk.callback != null)thunk.callback.onCompletion(metadata, null);} else {if (thunk.callback != null)thunk.callback.onCompletion(null, exception);}} catch (Exception e) {log.error("Error executing user-provided callback on message for topic-partition '{}'", topicPartition, e);}}produceFuture.done();}

Thunk 这个其实就是我们在Append的时候的回调:
在这里插入图片描述
至此整个流程就完成了,从发送消息,到响应后回调我们的函数。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/511517.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

机器学习的算法及原理

机器学习算法原理是一种让计算机通过数据学习并做出预测或决策的方法。这些算法可以分为几种主要类型&#xff0c;包括监督学习、无监督学习和强化学习。以下是这些类型中一些常见的算法及其基本原理&#xff0c;希望对大家有所帮助。北京木奇移动技术有限公司&#xff0c;专业…

利用excel文件增量同步一个库的数据并自动校正两端数据库条数不一致

利用excel文件增量同步一个库的数据并自动校正两端数据库条数不一致 现在有sqlserver和mysql两个库上的表在进行同步&#xff0c;sqlserver上的是源表&#xff0c;mysql上是目标表。 我们就把sqlserver上的数据同步到mysql上 mysql 是没有数据的。 sqlserver的三个表只是创建了…

基于springboot+vue的周边游平台个人管理系统

博主主页&#xff1a;猫头鹰源码 博主简介&#xff1a;Java领域优质创作者、CSDN博客专家、阿里云专家博主、公司架构师、全网粉丝5万、专注Java技术领域和毕业设计项目实战&#xff0c;欢迎高校老师\讲师\同行交流合作 ​主要内容&#xff1a;毕业设计(Javaweb项目|小程序|Pyt…

计网面试题整理下

1. HTTP常见的状态码有哪些&#xff1f; 常见状态码&#xff1a; 200&#xff1a;服务器已成功处理了请求。 通常&#xff0c;这表示服务器提供了请求的网页。301 &#xff1a; (永久移动) 请求的网页已永久移动到新位置。 服务器返回此响应(对 GET 或 HEAD 请求的响应)时&am…

EXPLAIN PLAN FOR:在Oracle中生成执行计划

目录 案例 解析 Operation类型 在Oracle中&#xff0c;可以使用 EXPLAIN PLAN FOR 命令来生成执行计划&#xff0c;然后通过 SELECT plan_table_output FROM TABLE(DBMS_XPLAN.DISPLAY(PLAN_TABLE))来查看执行计划。需要注意的是&#xff0c;这两个命令需要在同一个窗口下运…

学习记录12-单片机代码几种常见命名规则

良好的编程习惯&#xff0c;决定了今后代码的质量。 有很多人平时不注意自己的代码规范&#xff0c;函数和变量命命随心所欲&#xff0c;造成一个星期就不认识自己的代码&#xff0c;于是今天就来分享一点关于软件代码常见的几种命名规则。 匈牙利命名法 匈牙利命名法广泛应用…

玩转地下管网三维建模:MagicPipe3D系统

地下管网是保障城市运行的基础设施和“生命线”。随着实景三维中国建设的推进&#xff0c;构建地下管网三维模型与地上融合的数字孪生场景&#xff0c;对于提升智慧城市管理至关重要&#xff01;针对现有三维管线建模数据差异大、建模交互弱、模型效果差、缺乏语义信息等缺陷&a…

易基因:NAR:RCMS编辑系统在特定细胞RNA位点的靶向m5C甲基化和去甲基化研究|项目文章

喜讯&#xff01;易基因表观转录组学RNA-BS技术服务见刊《核酸研究》 大家好&#xff0c;这里是专注表观组学十余年&#xff0c;领跑多组学科研服务的易基因。 2024年2月15日&#xff0c;吉林大学张涛、赵飞宇、李金泽为共同第一作者&#xff0c;吉林大学李占军、隋婷婷及赖良…

MySQL中有事务无法回滚的语句?

目录 0.从修改表结构语句开始 1.DDL(Data Definition Language) 数据定义语言 2.DCL(Data Control Language) 数据控制语言 3.在该事务还没提交时开启新事务 4.锁操作 5.行政声明语句 6.主从复制的从机操作 7.如何避免出现隐式提交导致的错误 0.从修改表结构语句开始 试…

windows下thinkphp使用php7.4.5版本链接oracle数据库

我用的php运行环境是PHPCUSTOM&#xff0c;感谢大佬Lccee的耐心指导。 大佬的博客https://blog.csdn.net/Lccee?typeblog 首先查看自己的oracle版本 查询语句: SELECT * FROM v$version;根据自己的版本下载对应的oracle客户端&#xff0c;及得版本运行环境与自己的环境位数要…

GIN与Echo:选择正确Go框架的指南

您是否在Go中构建Web应用&#xff1f;选择正确的框架至关重要&#xff01;GIN和Echo是两个热门选择&#xff0c;每个都有其优势和特点。本指南将详细介绍每个框架的特性、速度、社区热度以及它们各自擅长的项目类型。最后&#xff0c;您将能够为您的下一个Web项目选择完美的框架…

使用GRU进行天气变化的时间序列预测

本文基于最适合入门的100个深度学习项目的学习记录&#xff0c;同时在Google clolab上面是实现&#xff0c;文末有资源连接 天气变化的时间序列的难点 天气变化的时间序列预测涉及到了一系列复杂的挑战&#xff0c;主要是因为天气系统的高度动态性和非线性特征。以下是几个主…