Java Kafka 消息队列入门

简介

Apache Kafka 是一个分布式流处理平台,常用于处理实时数据流。它的核心是一个高吞吐量的分布式消息队列系统,适用于各类数据管道和数据流处理。Apache Kafka 由四个核心 API 组成:Producer API、Consumer API、Streams API 和 Connect API。本文将重点讲解 Java 中如何使用 Apache Kafka 实现消息队列功能,并分享一些常见实践及最佳实践,帮助读者更好地理解与运用。

目录

  1. Java Apache Kafka 基础概念
  2. 安装与环境配置
  3. Java 中的基本使用方法
    • 生产者实例
    • 消费者实例
  4. 常见实践
  5. 最佳实践
  6. 小结
  7. 参考资料

Java Apache Kafka 基础概念

Apache Kafka 原本由 LinkedIn 开发并在 2011 年开源,之后被捐赠给 Apache 基金会。它的设计目标是处理日志数据、流数据处理,以及作为消息中间件。Kafka 提供以下关键功能:

  • 发布与订阅记录流,类似消息系统。
  • 以容错方式存储记录流。
  • 实时处理记录流。

Kafka 以 Topic 作为消息的类别,每个 Topic 可以有多个生产者和消费者。数据通过 Producer 发送到 Kafka Broker,Consumer 从 Broker 中读取数据。

安装与环境配置

安装 Kafka

  1. 下载 Kafka 二进制文件并解压。

    wget https://dlcdn.apache.org/kafka/3.1.0/kafka_2.13-3.1.0.tgz
    tar -xzf kafka_2.13-3.1.0.tgz
    cd kafka_2.13-3.1.0
    
  2. 启动 Kafka 的 ZooKeeper 服务和 Kafka 服务。

    bin/zookeeper-server-start.sh config/zookeeper.properties
    bin/kafka-server-start.sh config/server.properties
    

配置 Java 项目环境

在 Java 项目中引入 Kafka 客户端库。以 Maven 项目为例,pom.xml 文件中加入依赖:

<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.1.0</version>
</dependency>

Java 中的基本使用方法

生产者实例

Kafka 生产者客户端负责将记录发布到 Kafka 集群中的主题。生产者通过主题来组织记录。

以下是一个简单的生产者示例:

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;import java.util.Properties;public class SimpleProducer {public static void main(String[] args) {// 配置参数Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");// 创建生产者KafkaProducer<String, String> producer = new KafkaProducer<>(props);// 发送消息for (int i = 0; i < 10; i++) {ProducerRecord<String, String> record = new ProducerRecord<>("my-topic", Integer.toString(i), "Message " + i);producer.send(record, (RecordMetadata metadata, Exception exception) -> {if (exception != null) {exception.printStackTrace();} else {System.out.println("Sent message with offset " + metadata.offset());}});}producer.close();}
}

消费者实例

Kafka 消费者客户端从 Kafka 代理读取记录。它订阅一个或多个主题,解析有序记录。

以下是一个简单的消费者示例:

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.Collections;
import java.util.Properties;public class SimpleConsumer {public static void main(String[] args) {// 配置参数Properties props = new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());// 创建消费者Consumer<String, String> consumer = new KafkaConsumer<>(props);consumer.subscribe(Collections.singletonList("my-topic"));// 读取消息while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));records.forEach(record -> {System.out.printf("Consumed message: key = %s, value = %s, offset = %d%n", record.key(), record.value(), record.offset());});}// consumer.close(); 需在适当时机调用,用于释放资源。}
}

常见实践

  1. Topic 设计: 根据业务需求合理设计 Topic,避免过多或过少的分区。

  2. 序列化/反序列化: 使用 Kafka 提供的序列化接口来实现自定义数据类型的序列化和反序列化。

  3. High Throughput 设置: 在生产者配置中,设置 batch.sizelinger.ms 能有效提升吞吐量。

  4. Consumer Group: Consumer 属于某个 Consumer Group,若一个 Group 中的 Consumer 订阅了同一 Topic,则每条记录只会被组内的一个 Consumer 处理。

  5. Offset 管理: 选择自动提交 offset 或手动提交 offset。自动提交需设置合适的提交间隔,手动提交需在处理完消息后进行,以实现可靠的消息消费。

最佳实践

  • 可靠性: 确保生产者启用了 acks=all 来确保消息的可靠性。
  • 幂等性与事务: 使用 Kafka 的幂等性生产者和事务来更安全地处理数据流。
  • 监控与报警: 部署并使用 Kafka 的管理工具监控其性能和健康状态。

小结

Java 中实现 Apache Kafka 消息队列既能够支持简单的消息传递,也能够在处理实时流数据中提供扩展性和可靠性。通过本文的概念讲解、代码示例以及最佳实践分享,希望能够帮助开发者更好地理解并运用 Kafka 提供的功能。同时,通过合理设计和配置好 Kafka 的各类参数,开发者可以构建出高性能、低延迟的数据处理系统。

参考资料

  1. Apache Kafka 文档
  2. Kafka Java Client API
  3. Kafka 源码仓库
  4. Kafka Monitoring and Operations
  5. 《Kafka: The Definitive Guide》

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/877897.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Easysearch 集群通过 API 进行用户密码重置

在日常使用 Easysearch 中,难免会遇到集群密码需要重置的情况(如密码遗失、安全审计要求等)。 通过查看 Easysearch 用户接口文档,创建用户使用如下接口: PUT _security/user/<username> {"password": "adminpass","roles": ["m…

Elasticvue:一款轻量级的Elasticsearch可视化管理工具

Elasticvue是一款免费开源的Elasticsearch GUI工具,你可以使用它来管理ES里的数据, Elasticvue具有多种安装形式,我们这里采用最简单的Docker安装方式,其他版本如Winodws、MacOS、Linux和浏览器插件。 Elasticvue相比Kibana的优势主要体现在以下几个方面: 1、 轻量级与易用…

【攻防3.0 】信任攻击

Author: sm0nk@深蓝攻防实验室 上周在一个技术沙龙我分享了一个攻防相关议题——进击的白巨人,在此归档。一、进攻场景思考 无论是端侧产品还是流量侧产品、亦或是原生安全还是外挂式,主模式还是鉴黑和鉴白; 随着防守方强化的安全建设,安全产品越来越强,进攻的难度在增加;…

[2025.2.1 MySQL学习] MVCC

MVCC 基本概念当前读(直接读取数据页最新版本):读取的是记录的最新版本,读取时还要保证其他并发事务不能修改当前记录,会对读取的记录进行加锁。对于一些日常操作,如:select...lock in share mode、select ... for update、update、isnert、delete都是一种当前读快照读:…

母婴app

您好!这是一个非常全面的母婴健康管理APP构想。让我帮您从技术角度分析并提供一个基础的项目结构建议。 技术架构建议 1. 前端技术栈:- iOS: Swift/SwiftUI - Android: Kotlin - 跨平台选项: Flutter/React Native2. 后端技术栈:- 主服务框架: Spring Boot - 数据库: - MySQL …

毕设学习第六天SSM框架之Spring5

虽然目前spring已经出现了6但是现如今大多数应用的还是spring5,因此毕设学习选择Spring5而非6 spring简介Spring 是一个开源的 Java 企业级应用开发框架,旨在简化企业级 Java 应用的开发过程。它通过控制反转(IOC)和面向切面编程(AOP)等核心技术,帮助开发人员构建松耦合…

心态急躁,什么事都做不成

春节这几天,心态有些急躁。也许是突如其来的放松让大脑不适应,最近做事(尤其是打游戏)不顺。 比如体现在炉石酒馆,农,围棋这三者上。这三个是2/1号我从外面回来之后进行的三项娱乐活动。 首先先打了几把炉石,一把速七,两把速八,再加上之前的两把速七速八,让我直接从8…

【风控】风控测试的质效提升之路

# 货拉拉 随着货拉拉业务的迅猛发展,平台每时每刻都面临着黑产的攻击和挑战。为了保障业务安全和稳健地发展,风控作为抗击黑产的前线,负责各项业务的风险识别和阻断工作。同时,各类业务的接入以及风控策略的高强度迭代,也给风控的质量保障和交付效率带来了挑战。如何在保障…

Windows环境变量列表变成老式的横行封号分割PATH路径不方便

前言全局说明win11上环境变量的增、删、改有了专用的列表框,每行一个的环境变量,观察也非常方便。 但有的时候设置完变量,再次打开PATH环境变量,就变成以前win7那样的所有环境变量都在一行,用封号分割了,非常不方便。一、说明 1.1 环境: Windows 11 家庭版 23H2 22631.37…

Spring MVC 初始化

继承关系 DispatcherServlet > FrameworkServlet > HttpServletBean > HttpServlet > GenericServlet > Servlet初始化流程DispatcherServlet 是一个 Servlet,所有的 Servlet 初始化都会执行 init 方法(JAVA EE 的知识,别忘了)HttpServletBean 复写了 init(…

kmp匹配

kmp匹配 代码: #include<bits/stdc++.h> using namespace std; const int N=1e5+6; const int M=1e6+6; char s[M];//长串 char p[N];//模式串 int ne[N];//next指针 ,后退的指针 int main(){int n,m;cin>>n>>p+1>>m>>s+1;//计算ne //ne[1]…

Tokenizer

一、思维导图二、subword(子词)粒度 在很多情况下,既不希望将文本切分成单独的词(太大),也不想将其切分成单个字符(太小),而是希望得到介于词和字符之间的子词单元。这就引入了 subword(子词)粒度的分词方法。本文重点介绍这一部分。 2.1 WordPiece 在BERT时代,Wor…