Kafka生产者原理 kafka生产者发送流程 kafka消息发送到集群步骤 kafka如何发送消息 kafka详解

kafka尚硅谷视频:

10_尚硅谷_Kafka_生产者_原理_哔哩哔哩_bilibili

     1. producer初始化:加载默认配置,以及配置的参数,开启网络线程

     2. 拦截器拦截

     3. 序列化器进行消息key, value序列化

     4. 进行分区

     5. kafka broker集群 获取metaData

     6. 消息缓存到RecordAccumulator收集器,分配到该分区的DQueue(RecordBatch)

     7. batch.size满了,或者linker.ms到达指定时间,唤醒sender线程, 实例化networkClient

         RecordBatch ==>RequestClient 发送消息体,

      8. 与分区相同broker建立网络连接,发送到对应broker

 1. send()方法参数producerRecord对象:

    关于分区:

      a.指定分区,则发送到该分区    

      b.不指定分区,k值没有传入,使用黏性分区(sticky partition

                 第一次调用时随机生成一个整数(后面每次调用在这个整数上自增),将这个值与 topic 可用的 partition 总数取余得到 partition 值,也就是常说的 round-robin 算法   

      c.不指定分区,传入k值,k值先进行hash获取hashCodeValue, 再与topic下的分区数进行求模取余,进行分区。

      如 k hash = 5 topic目前的分区数2  则 分区为:1

          k  hash =6  topic目前的分区数2  则 分区为:0

2. KafkaProducer 异步, 同步发送api:

    异步发送:

                    producer.send(producerRecord对象);

    同步发送则send()方法后面.get()



kafka 的send方法核心逻辑:

    public Future<RecordMetadata> send(ProducerRecord<K, V> record) {return this.send(record, (Callback)null);}public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {// 拦截器集合。多个拦截对象循环遍历ProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record);return this.doSend(interceptedRecord, callback);}private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {TopicPartition tp = null;// 获取集群信息metadatatry {this.throwIfProducerClosed();long nowMs = this.time.milliseconds();ClusterAndWaitTime clusterAndWaitTime;try {clusterAndWaitTime = this.waitOnMetadata(record.topic(), record.partition(), nowMs, this.maxBlockTimeMs);} catch (KafkaException var22) {if (this.metadata.isClosed()) {throw new KafkaException("Producer closed while send in progress", var22);}throw var22;}nowMs += clusterAndWaitTime.waitedOnMetadataMs;long remainingWaitMs = Math.max(0L, this.maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs);Cluster cluster = clusterAndWaitTime.cluster;// 序列化器 key序列化byte[] serializedKey;try {serializedKey = this.keySerializer.serialize(record.topic(), record.headers(), record.key());} catch (ClassCastException var21) {throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() + " to class " + this.producerConfig.getClass("key.serializer").getName() + " specified in key.serializer", var21);}// 序列化器 value序列化byte[] serializedValue;try {serializedValue = this.valueSerializer.serialize(record.topic(), record.headers(), record.value());} catch (ClassCastException var20) {throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() + " to class " + this.producerConfig.getClass("value.serializer").getName() + " specified in value.serializer", var20);}// 分区int partition = this.partition(record, serializedKey, serializedValue, cluster);tp = new TopicPartition(record.topic(), partition);this.setReadOnly(record.headers());Header[] headers = record.headers().toArray();int serializedSize = AbstractRecords.estimateSizeInBytesUpperBound(this.apiVersions.maxUsableProduceMagic(), this.compressionType, serializedKey, serializedValue, headers);this.ensureValidRecordSize(serializedSize);long timestamp = record.timestamp() == null ? nowMs : record.timestamp();if (this.log.isTraceEnabled()) {this.log.trace("Attempting to append record {} with callback {} to topic {} partition {}", new Object[]{record, callback, record.topic(), partition});}Callback interceptCallback = new InterceptorCallback(callback, this.interceptors, tp);// RecordAccumulator.append() 添加数据转 ProducerBatchRecordAccumulator.RecordAppendResult result = this.accumulator.append(tp, timestamp, serializedKey, serializedValue, headers, interceptCallback, remainingWaitMs, true, nowMs);if (result.abortForNewBatch) {int prevPartition = partition;this.partitioner.onNewBatch(record.topic(), cluster, partition);partition = this.partition(record, serializedKey, serializedValue, cluster);tp = new TopicPartition(record.topic(), partition);if (this.log.isTraceEnabled()) {this.log.trace("Retrying append due to new batch creation for topic {} partition {}. The old partition was {}", new Object[]{record.topic(), partition, prevPartition});}interceptCallback = new InterceptorCallback(callback, this.interceptors, tp);result = this.accumulator.append(tp, timestamp, serializedKey, serializedValue, headers, interceptCallback, remainingWaitMs, false, nowMs);}if (this.transactionManager != null) {this.transactionManager.maybeAddPartition(tp);}// 判断是否满了,满了唤醒sender , sender继承了runnableif (result.batchIsFull || result.newBatchCreated) {this.log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);this.sender.wakeup();}return result.future;} catch (ApiException var23) {this.log.debug("Exception occurred during message send:", var23);if (tp == null) {tp = ProducerInterceptors.extractTopicPartition(record);}Callback interceptCallback = new InterceptorCallback(callback, this.interceptors, tp);interceptCallback.onCompletion((RecordMetadata)null, var23);this.errors.record();this.interceptors.onSendError(record, tp, var23);return new FutureFailure(var23);} catch (InterruptedException var24) {this.errors.record();this.interceptors.onSendError(record, tp, var24);throw new InterruptException(var24);} catch (KafkaException var25) {this.errors.record();this.interceptors.onSendError(record, tp, var25);throw var25;} catch (Exception var26) {this.interceptors.onSendError(record, tp, var26);throw var26;}}

  Sender类 run()方法:

 public void run() {this.log.debug("Starting Kafka producer I/O thread.");while(this.running) {try {this.runOnce();} catch (Exception var5) {this.log.error("Uncaught error in kafka producer I/O thread: ", var5);}}this.log.debug("Beginning shutdown of Kafka producer I/O thread, sending remaining records.");while(!this.forceClose && (this.accumulator.hasUndrained() || this.client.inFlightRequestCount() > 0 || this.hasPendingTransactionalRequests())) {try {this.runOnce();} catch (Exception var4) {this.log.error("Uncaught error in kafka producer I/O thread: ", var4);}}while(!this.forceClose && this.transactionManager != null && this.transactionManager.hasOngoingTransaction()) {if (!this.transactionManager.isCompleting()) {this.log.info("Aborting incomplete transaction due to shutdown");this.transactionManager.beginAbort();}try {this.runOnce();} catch (Exception var3) {this.log.error("Uncaught error in kafka producer I/O thread: ", var3);}}if (this.forceClose) {if (this.transactionManager != null) {this.log.debug("Aborting incomplete transactional requests due to forced shutdown");this.transactionManager.close();}this.log.debug("Aborting incomplete batches due to forced shutdown");this.accumulator.abortIncompleteBatches();}try {this.client.close();} catch (Exception var2) {this.log.error("Failed to close network client", var2);}this.log.debug("Shutdown of Kafka producer I/O thread has completed.");}void runOnce() {if (this.transactionManager != null) {try {this.transactionManager.maybeResolveSequences();if (this.transactionManager.hasFatalError()) {RuntimeException lastError = this.transactionManager.lastError();if (lastError != null) {this.maybeAbortBatches(lastError);}this.client.poll(this.retryBackoffMs, this.time.milliseconds());return;}this.transactionManager.bumpIdempotentEpochAndResetIdIfNeeded();if (this.maybeSendAndPollTransactionalRequest()) {return;}} catch (AuthenticationException var5) {this.log.trace("Authentication exception while processing transactional request", var5);this.transactionManager.authenticationFailed(var5);}}long currentTimeMs = this.time.milliseconds();// 发送数据long pollTimeout = this.sendProducerData(currentTimeMs);this.client.poll(pollTimeout, currentTimeMs);}

  sendProducerData() :

      最终转换为ClientRequest对象

         ClientRequest clientRequest = this.client.newClientRequest(nodeId, requestBuilder, now, acks != 0, this.requestTimeoutMs, callback);this.client.send(clientRequest, now);
private long sendProducerData(long now) {Cluster cluster = this.metadata.fetch();RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);Iterator iter;if (!result.unknownLeaderTopics.isEmpty()) {iter = result.unknownLeaderTopics.iterator();while(iter.hasNext()) {String topic = (String)iter.next();this.metadata.add(topic, now);}this.log.debug("Requesting metadata update due to unknown leader topics from the batched records: {}", result.unknownLeaderTopics);this.metadata.requestUpdate();}iter = result.readyNodes.iterator();long notReadyTimeout = Long.MAX_VALUE;while(iter.hasNext()) {Node node = (Node)iter.next();if (!this.client.ready(node, now)) {iter.remove();notReadyTimeout = Math.min(notReadyTimeout, this.client.pollDelayMs(node, now));}}Map<Integer, List<ProducerBatch>> batches = this.accumulator.drain(cluster, result.readyNodes, this.maxRequestSize, now);this.addToInflightBatches(batches);List expiredBatches;Iterator var11;ProducerBatch expiredBatch;if (this.guaranteeMessageOrder) {Iterator var9 = batches.values().iterator();while(var9.hasNext()) {expiredBatches = (List)var9.next();var11 = expiredBatches.iterator();while(var11.hasNext()) {expiredBatch = (ProducerBatch)var11.next();this.accumulator.mutePartition(expiredBatch.topicPartition);}}}this.accumulator.resetNextBatchExpiryTime();List<ProducerBatch> expiredInflightBatches = this.getExpiredInflightBatches(now);expiredBatches = this.accumulator.expiredBatches(now);expiredBatches.addAll(expiredInflightBatches);if (!expiredBatches.isEmpty()) {this.log.trace("Expired {} batches in accumulator", expiredBatches.size());}var11 = expiredBatches.iterator();while(var11.hasNext()) {expiredBatch = (ProducerBatch)var11.next();String errorMessage = "Expiring " + expiredBatch.recordCount + " record(s) for " + expiredBatch.topicPartition + ":" + (now - expiredBatch.createdMs) + " ms has passed since batch creation";this.failBatch(expiredBatch, (RuntimeException)(new TimeoutException(errorMessage)), false);if (this.transactionManager != null && expiredBatch.inRetry()) {this.transactionManager.markSequenceUnresolved(expiredBatch);}}this.sensors.updateProduceRequestMetrics(batches);long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout);pollTimeout = Math.min(pollTimeout, this.accumulator.nextExpiryTimeMs() - now);pollTimeout = Math.max(pollTimeout, 0L);if (!result.readyNodes.isEmpty()) {this.log.trace("Nodes with data ready to send: {}", result.readyNodes);pollTimeout = 0L;}this.sendProduceRequests(batches, now);return pollTimeout;}private void sendProduceRequests(Map<Integer, List<ProducerBatch>> collated, long now) {Iterator var4 = collated.entrySet().iterator();while(var4.hasNext()) {Map.Entry<Integer, List<ProducerBatch>> entry = (Map.Entry)var4.next();this.sendProduceRequest(now, (Integer)entry.getKey(), this.acks, this.requestTimeoutMs, (List)entry.getValue());}}private void sendProduceRequest(long now, int destination, short acks, int timeout, List<ProducerBatch> batches) {if (!batches.isEmpty()) {Map<TopicPartition, ProducerBatch> recordsByPartition = new HashMap(batches.size());byte minUsedMagic = this.apiVersions.maxUsableProduceMagic();Iterator var9 = batches.iterator();while(var9.hasNext()) {ProducerBatch batch = (ProducerBatch)var9.next();if (batch.magic() < minUsedMagic) {minUsedMagic = batch.magic();}}ProduceRequestData.TopicProduceDataCollection tpd = new ProduceRequestData.TopicProduceDataCollection();Iterator var16 = batches.iterator();while(var16.hasNext()) {ProducerBatch batch = (ProducerBatch)var16.next();TopicPartition tp = batch.topicPartition;MemoryRecords records = batch.records();if (!records.hasMatchingMagic(minUsedMagic)) {records = (MemoryRecords)batch.records().downConvert(minUsedMagic, 0L, this.time).records();}ProduceRequestData.TopicProduceData tpData = tpd.find(tp.topic());if (tpData == null) {tpData = (new ProduceRequestData.TopicProduceData()).setName(tp.topic());tpd.add(tpData);}tpData.partitionData().add((new ProduceRequestData.PartitionProduceData()).setIndex(tp.partition()).setRecords(records));recordsByPartition.put(tp, batch);}String transactionalId = null;if (this.transactionManager != null && this.transactionManager.isTransactional()) {transactionalId = this.transactionManager.transactionalId();}ProduceRequest.Builder requestBuilder = ProduceRequest.forMagic(minUsedMagic, (new ProduceRequestData()).setAcks(acks).setTimeoutMs(timeout).setTransactionalId(transactionalId).setTopicData(tpd));RequestCompletionHandler callback = (response) -> {this.handleProduceResponse(response, recordsByPartition, this.time.milliseconds());};String nodeId = Integer.toString(destination);ClientRequest clientRequest = this.client.newClientRequest(nodeId, requestBuilder, now, acks != 0, this.requestTimeoutMs, callback);// this.client 为KafkaClient接口 实现类:NetworkClient对象this.client.send(clientRequest, now);this.log.trace("Sent produce request to {}: {}", nodeId, requestBuilder);}}

 NetworkClient send()方法:

 public void send(ClientRequest request, long now) {this.doSend(request, false, now);}private void doSend(ClientRequest clientRequest, boolean isInternalRequest, long now) {this.ensureActive();String nodeId = clientRequest.destination();if (!isInternalRequest && !this.canSendRequest(nodeId, now)) {throw new IllegalStateException("Attempt to send a request to node " + nodeId + " which is not ready.");} else {AbstractRequest.Builder<?> builder = clientRequest.requestBuilder();try {NodeApiVersions versionInfo = this.apiVersions.get(nodeId);short version;if (versionInfo == null) {version = builder.latestAllowedVersion();if (this.discoverBrokerVersions && this.log.isTraceEnabled()) {this.log.trace("No version information found when sending {} with correlation id {} to node {}. Assuming version {}.", new Object[]{clientRequest.apiKey(), clientRequest.correlationId(), nodeId, version});}} else {version = versionInfo.latestUsableVersion(clientRequest.apiKey(), builder.oldestAllowedVersion(), builder.latestAllowedVersion());}this.doSend(clientRequest, isInternalRequest, now, builder.build(version));} catch (UnsupportedVersionException var9) {this.log.debug("Version mismatch when attempting to send {} with correlation id {} to {}", new Object[]{builder, clientRequest.correlationId(), clientRequest.destination(), var9});ClientResponse clientResponse = new ClientResponse(clientRequest.makeHeader(builder.latestAllowedVersion()), clientRequest.callback(), clientRequest.destination(), now, now, false, var9, (AuthenticationException)null, (AbstractResponse)null);if (!isInternalRequest) {this.abortedSends.add(clientResponse);} else if (clientRequest.apiKey() == ApiKeys.METADATA) {this.metadataUpdater.handleFailedRequest(now, Optional.of(var9));}}}}private void doSend(ClientRequest clientRequest, boolean isInternalRequest, long now, AbstractRequest request) {String destination = clientRequest.destination();RequestHeader header = clientRequest.makeHeader(request.version());if (this.log.isDebugEnabled()) {this.log.debug("Sending {} request with header {} and timeout {} to node {}: {}", new Object[]{clientRequest.apiKey(), header, clientRequest.requestTimeoutMs(), destination, request});}Send send = request.toSend(header);// clientRequest convert InFlightRequest 对象InFlightRequest inFlightRequest = new InFlightRequest(clientRequest, header, isInternalRequest, request, send, now);this.inFlightRequests.add(inFlightRequest);// nio channel。。。selector 发送消息信息//this.selector is Selectable interface  KafkaChannel is implementthis.selector.send(new NetworkSend(clientRequest.destination(), send));}

总结:直接阅读源码很快就能想明白kafka 生产者发送逻辑,kafka-client.jar。  核心==>

   本文第一张图片

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/86559.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

CSS中如何实现多列布局?

聚沙成塔每天进步一点点 ⭐ 专栏简介⭐ 多列布局&#xff08;Multi-column Layout&#xff09;⭐ column-count⭐ column-width⭐ column-gap⭐ column-rule⭐ column-span⭐ 示例⭐ 写在最后 ⭐ 专栏简介 前端入门之旅&#xff1a;探索Web开发的奇妙世界 记得点击上方或者右侧…

NoSQL数据库介绍+Redis部署

目录 一、NoSQL概述 1、数据的高并发读写 2、海量数据的高效率存储和访问 3、数据库的高扩展和高可用 二、NoSQL的类别 1、键值存储数据库 2、列存储数据库 3、文档型数据库 4、图形化数据库 三、分布式数据库中的CAP原理 1、传统的ACID 1&#xff09;、A--原子性 …

构建与应用大数据环境:从搭建到开发与组件使用的全面指南

文章目录 环境搭建开发与组件使用性能优化与监控安全与隐私总结 &#x1f388;个人主页&#xff1a;程序员 小侯 &#x1f390;CSDN新晋作者 &#x1f389;欢迎 &#x1f44d;点赞✍评论⭐收藏 ✨收录专栏&#xff1a;大数据系列 ✨文章内容&#xff1a; &#x1f91d;希望作者…

MeterSphere常用操作/脚本记录

设置变量 vars.put(“key”,“value”); //存为场景变量 设置环境变量 vars.put(${__metersphere_env_id}“key”,“value”); //存为环境变量 随机生成手机号 String phone “178123${__RandomString(5,0123456789)}”; //178123开头&#xff0c;后面5位随机 获取当前请求…

大数据(二)大数据行业相关统计数据

大数据&#xff08;二&#xff09;大数据行业相关统计数据 目录 一、大数据相关的各种资讯 二、转载自网络的大数据统计数据 2.1、国家大数据政策 2.2、产业结构分析 2.3、应用结构分析 2.4、数据中心 2.5、云计算 一、大数据相关的各种资讯 1. 据IDC预测&#xff0…

实战项目 在线学院springcloud调用篇3(nacos,feging,hystrix,gateway)

一 springcloud与springboot的关系 1.1 关系 1.2 版本关系 1.3 list转json串 public class Test {public static void main(String[] args) {List<String> dataListnew ArrayList<String>();dataList.add("12");dataList.add("45");dataLi…

Docker学习笔记

Docker学习笔记 docker的作用docker的基本组成安装docker阿里云镜像加速run的流程和docker原理 docker的思想来自于集装箱。 核心思想&#xff1a; 隔离 docker可以通过隔离机制将服务器利用到极致。 虚拟机&#xff1a;在windows中装一个Vmware&#xff0c;通过这个软件可以虚…

K8S最新版本集群部署(v1.28) + 容器引擎Docker部署(上)

温故知新 &#x1f4da;第一章 前言&#x1f4d7;背景&#x1f4d7;目的&#x1f4d7;总体方向 &#x1f4da;第二章 基本环境信息&#x1f4d7;机器信息&#x1f4d7;软件信息&#x1f4d7;部署用户kubernetes &#x1f4da;第三章 Kubernetes各组件部署&#x1f4d7;安装kube…

PHP 安装Composer,vue前端依赖包

电脑安装Composer 官网下载&#xff1a;https://getcomposer.org/Composer-Setup.exe 后端安装&#xff1a; 检查是否安装依赖&#xff1a; 安装Composer install 或 Composer i 前端安装&#xff1a; yarn install 安装依赖

AliOS-Things引入

目录 一、简介 1.1 硬件抽象层 1.2 AliOS-Things内核 rhino ​编辑 1.3 AliOS-Things组件 二、如何进行AliOS-Things开发 三、安装环境 安装python pip git 修改pip镜像源 安装aos-cube 一、简介 AliOS-Things是阿里巴巴公司推出的致力于搭建云端一体化LoT软件。AliOS-…

[uniapp] scroll-view 简单实现 u-tabbar效果

文章目录 方案踩坑1.scroll-view 横向失败2.点击item不滚动?3. scrollLeft从哪里来? 效果图 方案 官方scroll-view 进行封装 配合属性 scroll-left Number/String 设置横向滚动条位置 即可 scroll-into-view 属性尝试过,方案较难实现 踩坑 1.scroll-view 横向失败 安装…

野生程序员写个python程序

背景 在各程序员的论坛上&#xff0c;关于概念 中文编程 相关讨论区&#xff0c;经常听到的一个说法是在程序中使用中文&#xff0c;会影响编程和效率。但现在在 IDE 强大的自动补全提示功能加持下&#xff0c;这种”使用中文影响编程效率“的拖词&#xff0c;是站不住脚的。 …