Kafka - 异步/同步发送API

文章目录

  • 异步发送
    • 普通异步发送
      • 异步发送流程
      • Code
    • 带回调函数的异步发送
      • 带回调函数的异步发送流程
      • Code
  • 同步发送API

在这里插入图片描述


异步发送

普通异步发送

需求:创建Kafka生产者,采用异步的方式发送到Kafka broker

异步发送流程

在这里插入图片描述

Code

<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.6.0</version>
</dependency>
package com.artisan.pc;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;
import java.util.concurrent.ExecutionException;/*** @author 小工匠* @version 1.0* @mark: show me the code , change the world*/
public class CustomProducer {public static void main(String[] args) throws ExecutionException, InterruptedException {// 1. 创建kafka生产者的配置对象Properties properties = new Properties();// 2. 给kafka配置对象添加配置信息properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.126.170:9092");// key,value序列化properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());// 3. 创建kafka生产者对象KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);// 4. 调用send方法,发送消息for (int i = 0; i < 10; i++) {RecordMetadata art = kafkaProducer.send(new ProducerRecord<>("art", "kafka-msg-" + i)).get();System.out.println(art.offset());System.out.println("over - " + i);}// 5. 关闭资源kafkaProducer.close();}}

输出

31
over - 0
32
over - 1
33
over - 2
34
over - 3
35
over - 4
36
over - 5
37
over - 6
38
over - 7
39
over - 8
40
over - 9

忽略我这个offset … 我都发了好多次了…

看控制台的吧

在这里插入图片描述


带回调函数的异步发送

回调函数callback()会在producer收到ack时调用,为异步调用。

该方法有两个参数分别是RecordMetadata(元数据信息)和Exception(异常信息)。

  • 如果Exception为null,说明消息发送成功,
  • 如果Exception不为null,说明消息发送失败

带回调函数的异步发送流程

在这里插入图片描述

注意:消息发送失败会自动重试,不需要我们在回调函数中手动重试。

Code

package com.artisan.pc;import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;
import java.util.concurrent.ExecutionException;/*** @author 小工匠* @version 1.0* @mark: show me the code , change the world*/
public class CustomProducerWithCallBack {public static void main(String[] args) throws ExecutionException, InterruptedException {// 1. 创建kafka生产者的配置对象Properties properties = new Properties();// 2. 给kafka配置对象添加配置信息properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.126.170:9092");// key,value序列化properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());// 3. 创建kafka生产者对象KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);// 4. 调用send方法,发送消息for (int i = 0; i < 10; i++) {// 添加回调// 该方法在Producer收到ack时调用,为异步调用kafkaProducer.send(new ProducerRecord<>("art", "kafka-msg-callback-" + i), (recordMetadata, e) -> {// 没有异常,输出信息到控制台System.out.println("主题" + recordMetadata.topic() + ", 分区:" + recordMetadata.partition() + ", 偏移量:" + recordMetadata.offset());});}// 5. 关闭资源kafkaProducer.close();}}

在这里插入图片描述

控制台

在这里插入图片描述


同步发送API

同步发送的意思就是,一条消息发送之后,会阻塞当前线程,直至返回ack。
由于send方法返回的是一个Future对象,根据Futrue对象的特点,我们也可以实现同步发送的效果,只需在调用Future对象的get方发即可

在这里插入图片描述

package com.artisan.pc;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;
import java.util.concurrent.ExecutionException;/*** @author 小工匠* @version 1.0* @mark: show me the code , change the world*/
public class CustomProducerSync {public static void main(String[] args) throws ExecutionException, InterruptedException {// 1. 创建kafka生产者的配置对象Properties properties = new Properties();// 2. 给kafka配置对象添加配置信息properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.126.170:9092");// key,value序列化properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());// 3. 创建kafka生产者对象KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);// 4. 调用send方法,发送消息for (int i = 0; i < 10; i++) {// 通过Future接口的get实现同步阻塞kafkaProducer.send(new ProducerRecord<>("art", "kafka-msg-get-" + i)).get() ;}// 5. 关闭资源kafkaProducer.close();}}

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/151083.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

重复控制器的性能优化

前言 重复控制器在控制系统中是比较优秀的控制器&#xff0c;在整流逆变等周期性输入信号时&#xff0c;会有很好的跟随行&#xff0c;通常可以单独使用&#xff0c;也可以与其他补偿器串联并联使用。 这里我来分析一下重复控制器的重复控制器的应用工况以及其的优缺点。 分析…

UVa1354,ACM/ICPC Tokyo 2005,Mobile Computing(天平难题)

1、题目 2、题意 给出房间的宽度 r r r 和 s s s 个挂坠的重量 w i w_i wi​。设计一个尽量宽&#xff08;但宽度不能超过房间宽度 r r r&#xff09;的天平&#xff0c;挂着所有挂坠。 天平由一些长度为1的木棍组成。木棍的每一端要么挂一个挂坠&#xff0c;要么挂另外一…

【云原生】portainer管理多个独立docker服务器

目录 一、portainer简介 二、安装Portainer 1.1 内网环境下&#xff1a; 1.1.1 方式1&#xff1a;命令行运行 1.1.2 方式2&#xff1a;通过compose-file来启动 2.1 配置本地主机&#xff08;node-1&#xff09; 3.1 配置其他主机&#xff08;被node-1管理的节点服务器&…

MobPush厂商通道回执配置指南(Vivo,荣耀)

MobPush作为一款好用、可靠的智能推送开发者工具&#xff0c;为APP开发者提供了推送后用户行为的全链路数据分析&#xff0c;从而开发者可以更好地了解用户行为&#xff0c;优化推送策略&#xff0c;提高消息送达率&#xff0c;从而提升用户体验。 但这需要通过在后台配置厂商…

私有化部署企业IM即时通讯app,群聊多样化管控

随着企业内部沟通和协作的重要性不断增长&#xff0c;私有化部署企业即时通讯&#xff08;IM&#xff09;app成为了企业保护内部信息安全的一种重要手段。在这个领域&#xff0c;安全专属的移动数字化平台WorkPlus&#xff0c;支持私有化部署&#xff0c;涵盖即时通讯和办公应用…

Leo赠书活动-03期 【ChatGPT 驱动软件开发:AI 在软件研发全流程中的革新与实践 】

✅作者简介&#xff1a;大家好&#xff0c;我是Leo&#xff0c;热爱Java后端开发者&#xff0c;一个想要与大家共同进步的男人&#x1f609;&#x1f609; &#x1f34e;个人主页&#xff1a;Leo的博客 &#x1f49e;当前专栏&#xff1a; 赠书活动专栏 ✨特色专栏&#xff1a;…

C语言数据结构之数据结构入门

目录 数据结构介绍 数据结构发展史 何为算法 数据结构基础 基本概念和术语 四大逻辑结构&#xff08;Logic Structure&#xff09; 数据类型 理解复杂度概念 时间空间复杂度定义 度量时间复杂度的方法 程序运行时的内存与地址 编程预备 数据结构介绍 数据结构发展…

面试总结之消息中间件

RabbitMQ的消息如何实现路由 RabbitMQ是一个基于AMQP协议实现的分布式消息中间件&#xff0c;AMQP具体的工作机制是生产者将消息发送到RabbitMQ Broker上的Exchange交换机上&#xff0c;Exchange交换机将收到的消息根据路由规则发给绑定的队列&#xff08;Queue&#xff09;&am…

Centos7 安装和配置 Redis 5 教程

在Centos上安装Redis 5&#xff0c;如果是 Centos8&#xff0c;那么 yum 仓库中默认的 redis 版本就是 5&#xff0c;直接 yum install 即可。但如果是 Centos7&#xff0c;yum 仓库中默认的 redis 版本是 3 系列&#xff0c;比较老&#xff1a; 通过 yum list | grep redis 命…

并查集(畅通工程)

并查集就是不相交的集合 有两个常见操作&#xff1a; 1.合并 2.查询某元素属于什么集合 法一&#xff1a; 代码如下&#xff1a; find 目的找到元素的老大 &#xff08;链表遍历逐层向上找&#xff09; merge 合并集合&#xff08;实质改变集合老大&#xff0c;链表性质&…

降级熔断:如何屏蔽非核心系统故障的影响?

目录 前言 一、熔断是什么&#xff1f; 二、服务降级 三、雪崩是如何发生的 四、hystrix使用 五、降级机制要如何做 总结 前言 在“双十一”的巨大流量中&#xff0c;商品促销过程中出现了几次短暂的服务不可用&#xff0c;这给部分用户造成了不好的使用体验。事后&…

大数据架构设计理论与实践

大数据架构设计理论与实践 大数据处理系统概述 传统数据处理系统存在的问题 大数据处理系统面临的挑战 大数据处理系统的属性/特征 典型的大数据架构 Lambda架构 Lambda定义 优缺点 应用场景 Lambda的体系结构( Batch Layer (批处理层)、Speed Layer (加速层)、Serving Lay…