kafka消费者接收不到消息

背景:

   对kafka消息进行监听,生产者发了消息,但是消费端没有接到消息,监听代码

消费端,kafka配置

spring.kafka.bootstrap-servers=kafka.cestc.dmp:9591

spring.kafka.properties.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="admin" password="Kafka#Cestc2021";

spring.kafka.properties.security.protocol=SASL_PLAINTEXT

spring.kafka.properties.sasl.mechanism=SCRAM-SHA-256

#=============== provider =======================

spring.kafka.producer.retries=0

# 每次批量发送消息的数量

spring.kafka.producer.batch-size=16384

spring.kafka.producer.buffer-memory=33554432

# 指定消息key和消息体的编解码方式

spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer

spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

#=============== consumer =======================

# 指定默认消费者group id

spring.kafka.consumer.group-id=dq

spring.kafka.consumer.auto-offset-reset=latest

spring.kafka.consumer.enable-auto-commit=true

spring.kafka.consumer.auto-commit-interval=100

# 指定消息key和消息体的编解码方式

spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer

spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

@KafkaListener(groupId = "${spring.kafka.consumer.group-id:dq}",topics = {"t_dq_rwzt_topic"})
public ReturnT<String> listenKafka2(String records, Acknowledgment ack) {

}

offset explorer发现生产者发送了消息,offset是0

问题解决:

后来查看生产者kafka配置,发现他们的enable-auto-commit是false:

spring.kafka.consumer.enable-auto-commit=false

修改kafka配置

spring.kafka.consumer.enable-auto-commit=false

# 在侦听器容器中运行的线程数

spring.kafka.listener.concurrency=5

# listner负责ack,每调用commit方法,立即向服务器提交

spring.kafka.listener.ack-mode=manual_immediate

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/498121.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Docker之数据卷自定义镜像

文章目录 前言一、数据卷二、自定义镜像 前言 Docker提供了一个持久化存储数据的机制&#xff0c;与容器生命周期分离&#xff0c;从而带来一系列好处&#xff1a; 总的来说Docker 数据卷提供了一种灵活、持久、可共享的存储机制&#xff0c;使得容器化应用在数据管理方面更加…

SpringBoot整合rabbitmq-直连队列,没有交换机(一)

说明&#xff1a;本文章只是springboot和rabbitmq的直连整合&#xff0c;只使用队列生产和消费消息&#xff0c;最简单整合&#xff01; 工程图&#xff1a; A.总体pom.xml <?xml version"1.0" encoding"UTF-8"?><project xmlns"http://…

ElementUI之MessgageBox使用

<template><el-button type"text" click"open">点击打开 Message Box</el-button> </template><script>export default {methods: {open() {this.$confirm(是否完成维修工单&#xff1f;, 提示, {distinguishCancelAndClose…

React中使用useActive

1.引入 import { useActivate } from "react-activation";2.React Activation 在React中使用react-activation,其实就是类似于Vue中的keep-alive&#xff0c;实现数据的缓存&#xff1b; 源码&#xff1a; import { ReactNode, ReactNodeArray, Context, Component…

Node.js中的缓存策略和缓存技巧

在Node.js中&#xff0c;缓存策略和缓存技巧是提升应用性能和用户体验的关键因素。通过有效地利用缓存&#xff0c;我们可以显著减少系统资源的消耗&#xff0c;加快数据访问速度&#xff0c;从而提升整体的网站性能。本文将针对Node.js中的缓存策略和缓存技巧展开深入探讨&…

KubeEdge 边缘计算

文章目录 1.KubeEdge2.KubeEdge 特点3.KubeEdge 组成4.KubeEdge 架构 KubeEdge # KubeEdgehttps://iothub.org.cn/docs/kubeedge/ https://iothub.org.cn/docs/kubeedge/kubeedge-summary/1.KubeEdge KubeEdge 是一个开源的系统&#xff0c;可将本机容器化应用编排和管理扩展…

【Java程序设计】【C00322】基于Springboot的高校竞赛管理系统(有论文)

基于Springboot的高校竞赛管理系统&#xff08;有论文&#xff09; 项目简介项目获取开发环境项目技术运行截图 项目简介 这是一个基于Springboot的高校竞赛管理系统&#xff0c;本系统有管理员、老师、专家以及用户四种角色&#xff1b; 管理员&#xff1a;首页、个人中心、管…

网络编程、UDP、TCP

计算机网络 就是将地理位置不同的具有独立功能的多台计算及外部设备&#xff0c;通过通信线路连接起来&#xff0c;在网络操作系统、网络管理软件以及网络通信协议的管理和协调下&#xff0c;实现资源共享和信息传递的计算机系统 目的 传播交流信息、数据交换、通信 如何做…

购买域名后需要怎么做才能正常使用?

随着互联网的发展&#xff0c;越来越多人选择购买域名&#xff0c;以此来保护自己的品牌。域名并不仅仅只是一个简单的字母组合或者数字组合&#xff0c;它在建站、投资以及品牌保护方面都发挥着不可忽视的重要作用。一般来说&#xff0c;用户购买域名后的两大主要用途就是域名…

一小时入门python3网络爬虫

随着互联网的发展&#xff0c;网络爬虫已经成为了一项重要的技能。Python 3 作为一种流行的编程语言&#xff0c;也是网络爬虫的常用工具之一。本文将介绍如何使用 Python 3 编写网络爬虫&#xff0c;以及如何使用 Python 3 中的一些库进行网页解析和数据提取。 确定目标 在开…

vue3 开发记录

1.引入nprogress插件&#xff0c;显示未声明文件 无法找到模块“nprogress”的声明文件。 解决方法&#xff1a; vite-env.d.ts // 解决引入模块的报错提示 declare module "nprogress";2.在 .evn 文件中创建了自定义环境变量 VITE_APP_BASE_URL 但在项目中使用时出…

redis-RedisTemplate.opsForGeo 的geo地理位置及实现附近的人的功能

redis内部使用的是 zset 数据结构存储&#xff0c;如下 import cn.huawei.VideoApplication; import cn.huawei.domain.Jingqu; import cn.huawei.service.JingquService; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired…