docker安装和使用kafka

1. 启动zookeeper

Kafka依赖zookeeper, 首先安装zookeeper
-p:设置映射端口(默认2181

docker run --name zookeeper \--network app-tier \-e ALLOW_ANONYMOUS_LOGIN=yes \--restart=always \-d bitnami/zookeeper:latest

2. 启动kafka

docker run --name kafka \--network app-tier \-p 9092:9092 \-e ALLOW_PLAINTEXT_LISTENER=yes \-e KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181 \-e KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092	 \--restart=always \-d bitnami/kafka:latest
命令解释
ALLOW_PLAINTEXT_LISTENER=yes任何人可以访问
KAFKA_CFG_ZOOKEEPER_CONNECTzookeeper地址
KAFKA_CFG_ADVERTISED_LISTENERS当前kafka安装的主机地址 如果是服务器部署则配服务器IP或域名否则客户端监听消息会报地址错误

2. 启动kafka-map管理工具

docker run --name kafka-map \--network app-tier \-p 9001:8080 \-v /usr/local/kafka-map/data:/usr/local/kafka-map/data \-e DEFAULT_USERNAME=admin \-e DEFAULT_PASSWORD=admin \--restart=always \-d dushixiang/kafka-map:latest

启动成功后, 访问客户端: http://localhost:9001
账户: admin
密码: admin

在这里插入图片描述

3. springboot集成kafka

pom.xml配置

    <dependencies><!--kafka依赖--><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency></dependencies>        

配置application.yml

#------------------------------------spring----------------------------------
spring:#------------------------------------消息队列kafka配置----------------------------------kafka:#  kafka server的地址,如果有多个,使用逗号分割bootstrap-servers: localhost:9092producer:# 发生错误后,消息重发的次数。retries: 1#当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算。batch-size: 16384# 设置生产者内存缓冲区的大小。32MB的批处理缓冲区buffer-memory: 33554432# 键的序列化方式key-serializer: org.apache.kafka.common.serialization.StringSerializer# 值的序列化方式value-serializer: org.apache.kafka.common.serialization.StringSerializer# acks=0 : 生产者在成功写入消息之前不会等待任何来自服务器的响应。# acks=1 : 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应。# acks=all :只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。acks: 1properties:# 自定义拦截器interceptor.classes: com.wms.message.kafka.interceptor.CustomProducerInterceptor#自定义分区器partitioner.classes: com.wms.message.kafka.interceptor.CustomPartitionerconsumer:# 自动提交的时间间隔 在spring boot 2.X 版本中这里采用的是值的类型为Duration 需要符合特定的格式,如1S,1M,2H,5Dauto-commit-interval: 1S# 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理:# latest(默认值)在偏移量无效的情况下,消费者将从最新的记录开始读取数据(在消费者启动之后生成的记录)# earliest :在偏移量无效的情况下,消费者将从起始位置读取分区的记录auto-offset-reset: earliest# 是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量enable-auto-commit: false# 键的反序列化方式key-deserializer: org.apache.kafka.common.serialization.StringDeserializer# 值的反序列化方式value-deserializer: org.apache.kafka.common.serialization.StringDeserializerproperties:# 自定义消费者拦截器interceptor.classes: com.wms.message.kafka.interceptor.CustomConsumerInterceptor# 默认消费者组group-id: code-safe-group# 设置最大轮询间隔时间(毫秒),默认值为 300000(5分钟)# 如果两次 poll() 之间的时间超过此配置值,可能导致 rebalance, 消费者会被剔除 此处设置10分钟max-poll-interval-ms: 600000# 批量一次最大拉取数据量max-poll-records: 1000batch:# 批消费并发量,小于或等于Topic的分区数concurrency: 3listener:# 在侦听器容器中运行的线程数。concurrency: 5#listner负责ack,每调用一次,就立即commitack-mode: manual_immediatemissing-topics-fatal: falsetopics:# 自定义主题名称twsm: webSocket_send_message_devgroup-id: group-idtopic-name:- topic1

测试发送消息到kafka

/*** Kafka测试** @version 1.0* @author: web* @date: 2024/1/18 15:07*/
@Slf4j
@RestController
@RequestMapping("/message/kafkaTest")
public class KafkaTestController extends BaseController
{@Autowiredprivate KafkaUtils kafkaUtils;/*** 生产者_推送消息到kafka** @param msg* @author: web* @return: AjaxResult* @date: 2024/1/18 15:16*/@PostMapping("/send")public AjaxResult send(@RequestBody Map<String, Object> msg){try{String userId = msg.get("userId").toString();Object content = msg.get("content");Message message = kafkaUtils.setMessage(userId, content);kafkaUtils.send(KafkaUtils.TOPIC_TEST, message);}catch (Exception e){log.error("生产者_推送消息到kafka发生异常");}return success();}/*** 消费者1** @param record* @param ack* @param topic* @author: web* @return: void* @date: 2024/1/18 15:07*/@KafkaListener(topics = KafkaUtils.TOPIC_TEST)public void topicTest1(ConsumerRecord<?, ?> record, Acknowledgment ack,@Header(KafkaHeaders.RECEIVED_TOPIC) String topic){Optional message = Optional.ofNullable(record.value());if (message.isPresent()){Object msg = message.get();log.info("topic.group1 消费了: Topic:" + topic + ",Message:" + msg);ack.acknowledge();}}/*** 消费者2** @param record* @param ack* @param topic* @author: web* @return: void* @date: 2024/1/18 15:07*///    @KafkaListener(topics = KafkaUtils.TOPIC_TEST, groupId = KafkaUtils.TOPIC_GROUP2)//    public void topicTest2(ConsumerRecord<?, ?> record, Acknowledgment ack,//                           @Header(KafkaHeaders.RECEIVED_TOPIC) String topic)//    {////        Optional message = Optional.ofNullable(record.value());//        if (message.isPresent())//        {//            Object msg = message.get();//            log.info("topic.group2 消费了: Topic:" + topic + ",Message:" + msg);//            ack.acknowledge();//        }//    }}

KafkaUtils类

/*** 生产者** @version: 1.0* @author: web* @date: 2024/1/18 10:37*/
@Component
@Slf4j
public class KafkaUtils
{@Resourceprivate KafkaTemplate<String, Object> kafkaTemplate;/*** 自定义topic*/public static final String TOPIC_TEST = "topic.code-safe";/*** 自定义消费组*/public static final String TOPIC_GROUP1 = "topic.group1";public static final String TOPIC_GROUP2 = "topic.group2";// 业务相关topic/*** 主题: webSocket发送消息到客户端*/public static String TOPIC_WEBSOCKET_SEND_MESSAGE;@Autowiredprivate String[] kafkaTopicName;/*** 获取配置文件中的盐值,并设置到静态变量中** @param topic 主题*/@Value("${spring.kafka.topics.twsm}")private void setTwsmTopic(String topic){TOPIC_WEBSOCKET_SEND_MESSAGE = topic;}/*** 发送消息** @param topic   主题* @param message 消息内容* @author: web* @return: void* @date: 2024/1/18 10:42*/public void send(String topic, Object message){if (StringUtils.isEmpty(topic) || StringUtils.isNull(message)){throw new ServiceException("生产者发送消息到kafka_主题或消息内容不能为空!");}String obj2String = JsonUtils.toJsonString(message);//        log.info("准备发送消息为:{}", obj2String);//发送消息ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(topic, obj2String);// 监听回调future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>(){@Overridepublic void onFailure(Throwable throwable){//发送失败的处理log.error(topic + " - 生产者 发送消息失败:" + throwable.getMessage());}@Overridepublic void onSuccess(SendResult<String, Object> stringObjectSendResult){//成功的处理
//                log.info(topic + " - 生产者 发送消息成功:" + stringObjectSendResult.toString());}});}/*** 设置websocket发送的消息体** @param userId 用户ID* @param msg    消息内容* @author: web* @return: Message 消息对象* @date: 2024/1/19 11:36*/public Message setMessage(String userId, Object msg){Message message = new Message();message.setSendUserId(userId);message.setSendTime(DateUtils.getTime());message.setSendContent(String.valueOf(msg));return message;}
}

Message类

@Data
public class Message implements Serializable
{private static final long serialVersionUID = -118L;/*** 发送人ID*/private String sendUserId;/*** 发送人*///    private String sendUserName;/*** 发送时间*/private String sendTime;/*** 发送内容*/private String sendContent;
}

监听消息

/*** 消息接收监听器【分布式系统】** @version: 1.0* @author: web* @date: 2024/1/19 13:44*/
@Component
@Slf4j
public class MessageListener
{/*** 根据用户id发送消息到客户端** @param record* @param ack* @param topic* @author: web* @return: void* @date: 2024/1/20 22:05*/@KafkaListener(topics = "#{'${spring.kafka.topics.twsm}'}", groupId = "#{topicGroupId}")public void sendMessageByUserId(ConsumerRecord<String, String> record, Acknowledgment ack,@Header(KafkaHeaders.RECEIVED_TOPIC) String topic){Optional<String> optional = Optional.ofNullable(record.value());if (optional.isPresent()){Message message = JsonUtils.parseObject(optional.get(), Message.class);if (StringUtils.isNull(message)){log.error("消费者收到kafka消息的内容为空!");return;}
//            log.info("消费者收到kafka消息");String sendUserId = message.getSendUserId();String sendContent = message.getSendContent();// 确认收到消息ack.acknowledge();}}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/527292.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

C#上位机与欧姆龙PLC的通信13----【又爆肝】上位机应用开发(云端版)

1、概念背景 随着物联网技术的快速发展&#xff0c;工业互联网应运而生。工业互联网云平台作为连接智能制造和智慧工厂的重要技术手段&#xff0c;为制造业提供了更高效、更安全、更便捷的生产模式。工业互联网是指将互联网和物联网技术应用于工业生产和制造过程中&#xff0c;…

软考73-上午题-【面向对象技术2-UML】-UML中的图4

一、构件图&#xff08;组件图&#xff09; 1-1、构件图的定义 展现了&#xff0c;一组构件之间的组织和依赖。 构件图专注于系统的静态实现图。 构件图与类图相关&#xff0c;通常把构件映射为一个、多个类、接口、协作。 【回顾】&#xff1a; 类图展示了一组对象、接口、…

Vue中的组件:构建现代Web应用的基石

&#x1f90d; 前端开发工程师、技术日更博主、已过CET6 &#x1f368; 阿珊和她的猫_CSDN博客专家、23年度博客之星前端领域TOP1 &#x1f560; 牛客高级专题作者、打造专栏《前端面试必备》 、《2024面试高频手撕题》 &#x1f35a; 蓝桥云课签约作者、上架课程《Vue.js 和 E…

07 数据结构之图

# Makefile CCgcc CFLAGS -g -Wall SRCStest.c graph.c link_queue.c OBJS$(SRCS:.c.o) #variable replace APPtestall:$(OBJS) #指定一个目标&#xff0c; 不然默认目标不会检查依赖文件的时间戳$(CC) $(SRCS) -o $(APP) .PH…

怎么写品牌方流量打造抖音运营规划方案

【干货资料持续更新&#xff0c;以防走丢】 怎么写品牌方流量打造抖音运营规划方案 部分资料预览 资料部分是网络整理&#xff0c;仅供学习参考。 抖音运营资料合集&#xff08;完整资料包含以下内容&#xff09; 目录 Step 1: 人货沟通策略 人群定位与细分 1. 从品牌及产品…

修改MonkeyDev默认配置适配Xcode15

上一篇文章介绍了升级Xcode15后&#xff0c;适配MonkeyDev的一些操作&#xff0c;具体操作可以查看&#xff1a;Xcode 15 适配 MonkeyDev。 但是每次新建项目都要去修改那些配置&#xff0c;浪费时间和精力&#xff0c;这篇文章主要介绍如何修改MonkeyDev的默认配置&#xff0…

AI写真,太火了

昨天晚上&#xff0c;AI大佬吴东子直播讲解了AI写真项目&#xff0c;说21点破局星球会准时放出预约链接&#xff0c;结果21点星球直接崩溃了&#xff0c;只能说这个项目太火爆了 经过星球授权&#xff0c;这里把整个项目的SOP截取一部分给到大家&#xff0c;完整的SOP太长了&am…

gradle下载太慢者超时!国内镜像可以直接下载

# 解决Gradle下载过慢问题的有效方式&#xff1a;使用国内镜像站点 在开发过程中&#xff0c;我们经常会遇到Gradle下载速度缓慢或超时的问题。作为一个强大而流行的构建工具&#xff0c;Gradle是许多项目中必不可少的一部分。然而&#xff0c;由于官方下载地址可能受网络限制…

【工具】Git的介绍与安装

目录 前言 1W&#xff1a;什么是Git&#xff1f; 2W&#xff1a;为什么使用Git&#xff1f; 3W&#xff1a;如何使用Git&#xff1f; Git的安装步骤 测试 3.1 桌面空白部分鼠标右击 3.2 选择 Open Git Bash here 3.3 输入 git -v 命令查看版本 Git区域分布 Git的工作…

javase day02笔记

第二天课堂笔记 源文件的组成部分★★ 源文件外部结构 class 类名{}main方法 public static void main(String [] args){}main方法可有可无 没有main的情况&#xff0c;编译成功&#xff0c;运行失败&#xff0c;没有程序入口 多个main情况&#xff0c;编译报错&#xff0c;…

【Docker】容器的概念

容器技术&#xff1a;容器技术是基于虚拟化技术的&#xff0c;它使应用程序从一个计算机环境快速可靠地转移到另一个计算机环境中&#xff0c;可以说是一个新型地虚拟化技术。 一、docker容器 Docker:是一个开源地容器引擎Docker 是一种轻量级的容器化技术&#xff0c;其主要原…

ROS2学习(七) Foxy版本ros2替换中间件。

在ros2使用的过程中&#xff0c;一开始选用的foxy版本&#xff0c;后来发现&#xff0c;foxy版本的ros2有很多问题。一个是foxy版本已经停止维护了。另一个问题是这个版本有很多bug, 后续的版本在功能实现上做了很大的改动&#xff0c;甚至说进行了重写。修复的一些问题&#x…