Kafka的配置和使用

目录

1.服务器用docker安装kafka

2.springboot集成kafka实现生产者和消费者


1.服务器用docker安装kafka

        ①、安装docker(docker类似于linux的软件商店,下载所有应用都能从docker去下载)

                a、自动安装 

curl -fsSL https://get.docker.com | bash -s docker --mirror Aliyun

                b、启动docker

sudo systemctl start docker

                c、 通过运行hello-world镜像来验证是否正确安装了Docker Engine-Community。

// 拉取镜像

sudo docker pull hello-world

// 执行

hello-world sudo docker run hello-world

                 d、安装成功

         ②、zookeeper

                a、docker search zookeeper

                b、docker pull zookeeper

        ③、安装kafka

                a、docker search kafka

                b、docker pull wurstmeister/kafka

        ④、运行zookeeper

                a、docker run -d --restart=always --log-driver json-file --log-opt max-size=100m --log-opt max-file=2 --name zookeeper -p 2181:2181 -v /etc/localtime:/etc/localtime zookeeper

        ⑤、运行kafka

                a、 docker run -d --restart=always --log-driver json-file --log-opt max-size=100m --log-opt max-file=2 --name kafka -p 9092:9092 -e KAFKA_BROKER_ID=0 -e KAFKA_ZOOKEEPER_CONNECT=42.194.238.131:2181/kafka -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://42.194.238.131:9092 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 -v /etc/localtime:/etc/localtime wurstmeister/kafka

                b、参数说明

参数说明:

-e KAFKA_BROKER_ID=0 在kafka集群中,每个kafka都有一个BROKER_ID来区分自己

-e KAFKA_ZOOKEEPER_CONNECT=172.21.10.10:2181/kafka 配置zookeeper管理kafka的路径172.21.10.10:2181/kafka

-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://172.21.10.10:9092 把kafka的地址端口注册给zookeeper,如果是远程访问要改成外网IP,类如Java程序访问出现无法连接。

-e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 配置kafka的监听端口

-v /etc/localtime:/etc/localtime 容器时间同步虚拟机的时间

        ⑥、检验kafka是否可以使用

docker exec -it kafka bash

cd /opt/kafka_2.13-2.8.1/

cd bin

                a、运行kafka生产者并发送消息

./kafka-console-producer.sh --broker-list localhost:9092 --topic test

                b、在开一个页面,运行kafka消费者发送消息

 ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

        ⑦、结果是这个样子的

 

         ⑧、每条消息都有一个主题,消费者指定监听哪个主题的消息,如果进来消息队列的是我们指定监听的主题,就消费,否则不消费(topic这里指定的生产和消费的主题)

        ⑨、消费者宕掉了,生产者接着发,消息不会丢,消费者重启之后会重新接收到宕机之后发的所有消息

2.springboot集成kafka实现生产者和消费者

        ①、在pom中创建依赖

<dependency>

        <groupId>org.springframework.kafka</groupId>

        <artifactId>spring-kafka</artifactId>

         <version>2.7.8</version>

</dependency>

        ②、配置kafka

                a、在 application.yml 文件中添加以下配置:(注:yml中两个相同名字的会报错,比如两个spring)

spring:

         kafka:

                #自己的kafka所在的ip地址和端口号

                 bootstrap-servers: localhost:9092

                 consumer:

                 #一个group-id代表一个消费组,一个消息可以被几个消费组消费

                    group-id: my-group

                    auto-offset-reset: earliest

                producer: #序列化

                    value-serializer: org.apache.kafka.common.serialization.StringSerializer

                    key-serializer: org.apache.kafka.common.serialization.StringSerializer

        b、创建一个生产者

@Configuration
public class KafkaProducerConfig {@Value("${spring.kafka.bootstrap-servers}")private String bootstrapServers;@Beanpublic Map<String, Object> producerConfigs() {Map<String, Object> props = new HashMap<>();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);return props;}@Beanpublic ProducerFactory<String, String> producerFactory() {return new DefaultKafkaProducerFactory<>(producerConfigs());}@Beanpublic KafkaTemplate<String, String> kafkaTemplate() {return new KafkaTemplate<>(producerFactory());}}

sendMessage 方法,用于发送消息到 Kafka。

@RestController
public class KafkaController {@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;@PostMapping("/send")public void sendMessage(@RequestBody String message) {kafkaTemplate.send("my-topic", message);}}

        c、 创建一个消费者

@Configuration
@EnableKafka
public class KafkaConsumerConfig {@Value("${spring.kafka.bootstrap-servers}")private String bootstrapServers;@Value("${spring.kafka.consumer.group-id}")private String groupId;@Beanpublic Map<String, Object> consumerConfigs() {Map<String, Object> props = new HashMap<>();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);return props;}@Beanpublic ConsumerFactory<String, String> consumerFactory() {return new DefaultKafkaConsumerFactory<>(consumerConfigs());}@Beanpublic ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory());return factory;}}

@KafkaListener 注解声明了一个消费者方法,用于接收从 

my-topic 主题中读取的消息

@Service
public class KafkaConsumer {@KafkaListener(topics = "my-topic", groupId = "my-group-id")public void consume(String message) {System.out.println("Received message: " + message);}}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/53190.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

VMware虚拟机开机状态动态增加内存和CPU

实验环境&#xff1a;一台虚拟机 1、右击虚拟机&#xff0c;点击“编辑设置”&#xff0c; 2、在“选项”中&#xff0c;找到“内存/cpu热插拔”这一项&#xff0c;把“为此虚拟机启动内存热添加”和“仅为此虚拟机启动CPU热添加”打钩&#xff0c;点击 “确定”。 注意&#x…

通过nvm工具快捷切换node.js版本、以及nvm的安装

使用nvm可以实现多个Node.js版本之间切换 步骤目录&#xff1a; 先卸载掉本系统中原有的node版本 去github上下载nvm安装包 安装node 常用的一些nvm命令 1、先卸载掉本系统中原有的node版本 2、去github上下载nvm安装包 https://github.com/coreybutler/nvm-windows/re…

idea添加翻译插件并配置有道翻译

1、安装Translation插件 2、 创建有道云应用 有道智云控制台 3、设置idea 4、效果&#xff08;选中文本右键翻译&#xff0c;默认快捷键CtrlShiftY&#xff09;

Python web实战之 Django 的模板语言详解

关键词&#xff1a; Python、web开发、Django、模板语言 概要 作为 Python Web 开发的框架之一&#xff0c;Django 提供了一套完整的 MVC 模式&#xff0c;其中的模板语言为开发者提供了强大的渲染和控制前端的能力。本文介绍 Django 的模板语言。 1. Django 模板语言入门 Dj…

Wavefront .OBJ文件格式解读【3D】

OBJ&#xff08;或 .OBJ&#xff09;是一种几何定义文件格式&#xff0c;最初由 Wavefront Technologies 为其高级可视化器动画包开发。 该文件格式是开放的&#xff0c;已被其他 3D 图形应用程序供应商采用。 OBJ 文件格式是一种简单的数据格式&#xff0c;仅表示 3D 几何体&…

FTP使用教程

FTP使用教程 目录 一&#xff0e;FTP简介二&#xff0e;FTP搭建三&#xff0e;FTP使用 一&#xff0e;FTP简介 FTP中文为文件传输协议&#xff0c;简称为文传协议。它也是一个应用程序&#xff0c;不同的操作系统有不同的FTP应用程序&#xff0c;这些应用程序都遵守同一种协议以…

Node.js爬虫只会Cheerio?来试试Puppeteer!

简介 上篇文章我们学习了如何通过 Cheerio 来爬取静态页面&#xff0c;但是我们没有办法处理动态渲染页面的数据 关于 Cheerio 的学习请查看 都 2023 年了还不会 Node.js 爬虫&#xff1f;快学起来&#xff01; 今天我们学习如何使用 Puppeteer 来轻松地完成我们解决不了的爬虫…

python算法指南程序员经典,python算法教程pdf百度云

大家好&#xff0c;小编来为大家解答以下问题&#xff0c;你也能看懂的python算法书 pdf&#xff0c;python算法教程这本书怎么样&#xff0c;现在让我们一起来看看吧&#xff01; 给大家带来的一篇关于算法相关的电子书资源&#xff0c;介绍了关于算法、详解、算法基础方面的内…

苹果电脑第三方应用程序卸载工具CleanMyMacX4.14

CleanMyMacX&#xff0c;这是一款可以帮助你快速识别并卸载电脑上的多个应用程序的第三方工具&#xff0c;省去了逐个卸载的繁琐步骤。 我们首先要到下载CleanMyMacX&#xff0c; CleanMyMac X全新版下载如下: https://wm.makeding.com/iclk/?zoneid49983 然后&#xff0c…

java如何将tif文件拆分为多个jpg文件,附jar包下载地址

1.在build.gradle中添加依赖&#xff1a; implementation group: javax.media, name: jai_codec, version: 1.1.3implementation group: com.sun, name: jai_core, version: 1.1.3implementation group: javax.media, name: jai_imageio, version: 1.1implementation xx.com.su…

TypeScript 类型断言

TypeScript 类型断言 简单来说类型断言就是 使用as关键词 强行指定获取到的结果类型 应用场景 // 类型断言: 强行指定获取到的结果类型// 应用场景// 页面上有一个 id 为 link 的 a 标签// 我们知道它是 a 标签// 但是 TS 不知道 // document.getElementById 的返回值是 HTMLE…

IDEA中maven项目失效,pom.xml文件橙色/橘色

IDEA中maven项目失效&#xff0c;pom.xml文件橙色/橘色 IDEA中Maven项目失效 IDEA中创建的maven项目中的文件夹都变成普通格式&#xff0c;pom.xml变成橙色 右键点击橙色的pom.xml文件&#xff0c;选择add as maven project maven项目开始重新导入相应依赖&#xff0c;恢复…