Kafka:安装和配置

 producer:发布消息的对象,称为消息产生者 (Kafka topic producer)

topic:Kafka将消息分门别类,每一个消息称为一个主题(topic)

consumer:订阅消息并处理发布消息的对象称为消费者(consumer)

broker:已发布的消息保存在一组服务器中,称为kafka集群,集群中的每一个服务器都是一个代理(broker),消费者(consumer)可以订阅一个或者多个主题(topic),并从broker中拉取数据,从而消费这些已发布的信息。

1、Kafka对zookeeper是一个强依赖,保存Kafka相关的节点数据,所以安装kafka之前要先安装zookeeper

下载镜像

docker pull zookeeper:3.4.14

创建容器

docker run -d --name zookeeper -p 2181:2181 zookeeper:3.4.14

下载镜像

docker pull wurstmeister/kafka:2.12-2.3.1

创建容器

docker run -d --name kafka \
--env KAFKA_ADVERTISED_HOST_NAME=192.168.200.130 \
--env KAFKA_ZOOKEEPER_CONNECT=192.168.200.130:2181 \
--env KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.200.130:9092 \
--env KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 \
--env KAFKA_HEAP_OPTS="-Xmx256M -Xms256M" \
--net=host wurstmeister/kafka:2.12-2.3.1

2、入门案例

①创建kafka-demo工程并引入依赖

        <!--kafka--><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId></dependency>

②创建ProducerQuickStart生产者类并实现

package com.heima.kafkademo.sample;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;import java.util.Properties;/*生产者*/
public class ProducerQuickStart {public static void main(String[] args) {/*1、kafka配置信息*/Properties properties = new Properties();//kafka连接地址properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.200.130:9092");//发送失败,失败的重试次数properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,5);//key和value的序列化器properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");/*2、生产对象*/KafkaProducer<String,String> producer = new KafkaProducer<String, String>(properties);//封装发送消息的对象ProducerRecord<String,String> record = new ProducerRecord<String, String>("itheima-topic","100001","hello kafka");/*3、发送消息*/producer.send(record);/*4、关闭通道,负责消息发送不成功*/producer.close();}
}

③创建ConsumerQuickStart消费者类并实现

package com.heima.kafkademo.sample;import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerConfig;import java.time.Duration;
import java.util.Collections;
import java.util.Properties;/*消费者*/
public class ConsumerQuickStart {public static void main(String[] args) {/*1、kafka配置信息*/Properties properties = new Properties();//kafka连接地址properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.200.130:9092");//发送失败,失败的重试次数properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,5);//key和value的序列化器properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");/*2、消费者对象*/KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);/*3、订阅主题*/consumer.subscribe(Collections.singletonList("itheima-topic"));//当前线程处于一直监听状态while (true){//4、获取消息ConsumerRecords<String,String> consumerRecords =consumer.poll(Duration.ofMillis(1000));for (ConsumerRecord<String, String> record : consumerRecords) {System.out.println(record.key());System.out.println(record.value());}}}
}

④运行测试

        成功接收到消息

总结

  • 生产者发送消息,多个消费者订阅同一个主题,只能有一个消费者收到消息(一对一)

  • 生产者发送消息,多个消费者订阅同一个主题,所有消费者都能收到消息(一对多)

下一篇: springboot集成kafka收发消息

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/60394.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Apache2.4源码安装与配置

环境准备 openssl-devel pcre-devel expat-devel libtool gcc libxml2-devel 这些包要提前安装&#xff0c;否则httpd编译安装时候会报错 下载源码、解压缩、软连接 1、wget下载[rootnode01 ~]# wget https://downloads.apache.org/httpd/httpd-2.4.57.tar.gz --2023-07-20 …

jenkins流水线

1.拉取代码 https://gitee.com/Wjc_project/yygh-parent.git2、项目编译 mvn clean package -Dmaven.test.skiptrue ls hospital-manage/target3、构建镜像 ls hospital-manage/target docker build -t hospital-manage:latest -f hospital-manage/Dockerfile ./hospital-ma…

本质矩阵E、基本矩阵F、单应矩阵H

1. E (归一化坐标对进行计算) t ^ R 为3*3的矩阵, 因为R,t共有6个自由度&#xff0c;又因为单目尺度等价性&#xff0c;所以实际上E矩阵共有5个自由度。因此至少需要5个点对来求解。 2. 基本矩阵F:根据两帧间匹配的像素点对儿计算 3*3且自由度为7的矩阵kF也为基础矩阵&#x…

如何给Google Chrome增加proxy

1. 先打开https://github.com/KaranGauswami/socks-to-http-proxy/releases 我的电脑是Liunx系统所以下载第一个 2. 下载完之后把这个文件变成可执行文件&#xff0c;可以是用这个命令 chmod x 文件名 3. 然后执行这个命令&#xff1a; ./sthp-linux -p 8080 -s 127.0.0.1:…

使用 Visual Studio Code 调试 CMake 脚本

之前被引入到 Visual Studio 中的 CMake 调试器&#xff0c;现已在 Visual Studio Code 中可用。 也就是说&#xff0c;现在你可以通过在 VS Code 中安装 CMake 工具扩展&#xff0c;来调试你的 CMakeLists.txt 脚本了。是不是很棒? 背景知识 Visual C 开发团队和 CMake 的维…

tui.calender日历在vue中的使用1.0

官网&#xff1a;https://ui.toast.com/tui-calendar github&#xff1a;https://github.com/nhn/tui.calendar/tree/main 月、周、日视图都有&#xff0c;拖拽也比较方便&#xff0c;但是自己用起来比较费劲&#xff0c;参考文档写得不全&#xff0c;做个记录日后方便参考&…

如何使用 AT+WEBSERVER 指令实现自定义的 Webserver html 网页配网

开启 AT 固件中的 Webserver 指令和 FS 指令支持 乐鑫官网发布的默认通用 AT 固件不支持 webserver 配网功能&#xff0c; 需要用户自己搭建 esp-at 环境&#xff0c;并在 sdkconfig 中开启 webserver AT 指令 和 FS 指令的支持&#xff0c; 如下图所示&#xff1a; 测试 AT 固…

【Spring Boot】构建RESTful服务 — 构建RESTful应用接口

构建RESTful应用接口 RESTful架构是目前最流行的互联网软件架构规范&#xff0c;是Web API&#xff08;应用编程接口&#xff09;的大趋势和主流规范&#xff0c;了解了RESTful的众多优点之后&#xff0c;接下来一步一步地学习如何使用Spring Boot构建RESTful Web API。 1.Sp…

ThinkPHP6企业OA办公系统

有需要请加文章底部Q哦 可远程调试 ThinkPHP6企业OA办公系统 一 介绍 勾股OA基于ThinkPHP6开发&#xff0c;前端Layui&#xff0c;数据库mysql&#xff0c;是一款实用的企业办公系统。可多角色登录&#xff0c;集成了系统设置、人事管理、消息管理、审批管理、日常办公、客户…

解决Qt的列表加载大量数据卡顿的问题

问题概述 本人在使用QListView插入大量数据时&#xff0c;界面卡顿十分严重。数据量大概只有上千左右&#xff0c;但是每个Item的内容比较多。当数据不停地插入一段时间后&#xff0c;卡顿到鼠标的移动都有点困难。 解决思路 QListView是典型的MVC思想的产物。界面呈现出来的数…

python——案例11:数值交换

案例11&#xff1a;数值交换xinput(输入一个数值赋值给x&#xff1a;) yinput(输入一个数值赋值给y&#xff1a;)tempx #创建临时变量&#xff0c;以此变量为基础进行逐次交换 xy ytemp print(交换后的X的值是:{}.format(x)) # print(交换后的Y的值是:{}.format(y)) #

RabbitMQ学习——发布订阅/fanout模式 topic模式 rabbitmq回调确认 延迟队列(死信)设计

目录 引出点对点(simple)Work queues 一对多发布订阅/fanout模式以登陆验证码为例pom文件导包application.yml文件rabbitmq的配置生产者生成验证码&#xff0c;发送给交换机消费者消费验证码 topic模式配置类增加配置生产者发送信息进行发送控制台查看 rabbitmq回调确认配置类验…