kafka生产者

1.原理

2.普通异步发送

引入pom:

    <dependencies><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.0.0</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>1.7.25</version></dependency></dependencies>
package com.atguigu.kafka.producer;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;public class CustomProducer {public static void main(String[] args) {// 1. 创建 kafka 生产者的配置对象Properties properties = new Properties();// 2. 给 kafka 配置对象添加配置信息:bootstrap.serversproperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop100:9092");// key,value 序列化(必须):key.serializer,value.serializerproperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");// 3. 创建 kafka 生产者对象KafkaProducer<String, String> kafkaProducer = newKafkaProducer<String, String>(properties);// 4. 调用 send 方法,发送消息for (int i = 0; i < 5; i++) {kafkaProducer.send(newProducerRecord<>("first","atguigu " + i));}// 5. 关闭资源kafkaProducer.close();}
}

测试效果:

3.带回调的异步发送

回调的信息实际是从队列返回的

 

4.同步发送 

只需在异步发送的基础上,再调用一下 get()方法即可。

5.分区

6.分区策略 

指定key的值:对key的hashcode做分配

希望将订单表的数据全部发到kafka的一个分区上,怎么处理?

将该表的名称作为key值然后发送即可 
如:

7.自定义分区 (脏数据的处理)

如果研发人员可以根据企业需求,自己重新实现分区器。 1)需求 例如我们实现一个分区器实现,发送过来的数据中如果包含 atguigu,就发往 0 号分区, 不包含 atguigu,就发往 1 号分区。 

自定义分区器:

package com.atguigu.kafka.producer;import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;import java.util.Map;public class MyPartitioner implements Partitioner {@Overridepublic int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {String s = value.toString();int partion;if(s.contains("atguigu")) {partion = 1;}else {partion=0;}return partion;}@Overridepublic void close() {}@Overridepublic void configure(Map<String, ?> map) {}
}

 拷贝全类名,产生关联

测试结论:

8.如何让提高生产者的吞吐量

 

properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,33554432);properties.put(ProducerConfig.BATCH_SIZE_CONFIG,16384);properties.put(ProducerConfig.LINGER_MS_CONFIG,1);// compression.type:压缩,默认 none,可配置值 gzip、snappy、lz4 和 zstdproperties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"snappy");

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/491859.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【wails】(6):使用wails做桌面应用开发,使用gin+go-chatglm.cpp进行本地模型运行,在windows上运行成功

1&#xff0c;整体架构说明 主要使用&#xff0c;参考的开源项目是&#xff1a; https://github.com/wailsapp/wails 前端项目&#xff1a; https://github.com/Chanzhaoyu/chatgpt-web 运行模型&#xff1a; https://github.com/Weaxs/go-chatglm.cpp 参考代码&#xff1a; h…

Python算法题集_实现 Trie [前缀树]

Python算法题集_实现 Trie [前缀树] 题208&#xff1a;实现 Trie (前缀树)1. 示例说明2. 题目解析- 题意分解- 优化思路- 测量工具 3. 代码展开1) 标准求解【定义数据类默认字典】2) 改进版一【初始化字典无额外类】3) 改进版二【字典保存结尾信息无额外类】 4. 最优算法5. 相关…

JAVA集合进阶(Set、Map集合)

一、Set系列集合 1.1 认识Set集合的特点 Set集合是属于Collection体系下的另一个分支&#xff0c;它的特点如下图所示 下面我们用代码简单演示一下&#xff0c;每一种Set集合的特点。 //Set<Integer> set new HashSet<>(); //无序、无索引、不重复 //Set<…

Vulnhub靶机:Hacker_Kid

一、介绍 运行环境&#xff1a;Virtualbox 攻击机&#xff1a;kali&#xff08;10.0.2.15&#xff09; 靶机&#xff1a;Hacker_Kid&#xff08;10.0.2.42&#xff09; 目标&#xff1a;获取靶机root权限和flag 靶机下载地址&#xff1a;https://download.vulnhub.com/hac…

信息抽取(UIE):使用自然语言处理技术提升证券投资决策效率

一、引言 在当今快速变化的证券市场中&#xff0c;信息的价值不言而喻。作为一名资深项目经理&#xff0c;我曾领导一个关键项目&#xff0c;旨在通过先进的信息抽取技术&#xff0c;从海量的文本数据中提取关键事件&#xff0c;如企业并购、新产品发布以及政策环境的变动。这些…

【openGL教程08】基于C++的着色器(02)

LearnOpenGL - Shaders 一、说明 着色器是openGL渲染的重要内容&#xff0c;客户如果想自我实现渲染灵活性&#xff0c;可以用着色器进行编程&#xff0c;这种程序小脚本被传送到GPU的显卡内部&#xff0c;起到动态灵活的着色作用。 二、着色器简述 正如“Hello Triangle”一章…

常用实验室器皿耐硝酸盐酸进口PFA材质容量瓶螺纹盖密封效果好

PFA容量瓶规格参考&#xff1a;10ml、25ml、50ml、100ml、250ml、500ml、1000ml。 别名可溶性聚四氟乙烯容量瓶、特氟龙容量瓶。常用于ICP-MS、ICP-OES等痕量分析以及同位素分析等实验&#xff0c;也可在地质、电子化学品、半导体分析测试、疾控中心、制药厂、环境检测中心等机…

Linux之JAVA环境配置jdkTomcatMySQL

目录 一. 安装jdk 1.1 查询是否有jdk 1.2 解压 1.3 配置环境变量 二. 安装Tomcat&#xff08;开机自启动&#xff09; 2.1 解压 2.2 启动tomcat 2.3 防火墙设置 2.4 创建启动脚本&#xff08;设置自启动&#xff0c;服务器开启即启动&#xff09; 三. MySQL安装&#xff08;…

力扣思路题:丑数

此题的思路非常奇妙&#xff0c;可以借鉴一下 bool isUgly(int num){if(num0)return false;while(num%20)num/2;while(num%30)num/3;while(num%50)num/5;return num1; }

α-酮戊二酸钙(CaAKG)应用领域广泛 食品级CaAKG市场需求旺盛

α-酮戊二酸钙&#xff08;CaAKG&#xff09;应用领域广泛 食品级CaAKG市场需求旺盛 α-酮戊二酸钙&#xff08;CaAKG&#xff09;是α-酮戊二酸&#xff08;AKG&#xff09;的钙盐&#xff0c;外观呈白色结晶粉末。   α-酮戊二酸是三羧酸能量代谢&#xff08;TCA&#xff0…

Unity3D 使用 Proto

一. 下载与安装 这里下载Google Protobuff下载 1. 源码用来编译CSharp 相关配置 2. win64 用于编译 proto 文件 二. 编译 1. 使用VS 打开 2. 点击最上面菜单栏 工具>NuGet 包管理器>管理解决方案的NuGet 管理包 版本一定要选择咱们一开始下载的对应版本否则不兼容&am…

vue基础操作(vue基础)

想到多少写多少把&#xff0c;其他的想起来了在写。也写了一些css的 input框的双向数据绑定 html <input value"123456" type"text" v-model"account" input"accou" class"bottom-line bottom" placeholder"请输入…