Kafka应用Demo: 抽取消费者公共处理代码并利用redis实现多消费者实例负载分担

问题描述

在项目中使用消息中间件,主要为实现两个目的:

  1. 任务排队:当请求过多时,消费端无法同时处理,需要排队等待。这一点kafka采用的是"拉取消息"的模式,自然支持。
  2. 负载分担: 这里的负载负担不是指Kafka本身的横向扩容,而是指在任务量过大时可以通过增加消费者实例,提升效率。

下面重点分析一下,增加消费者实例以提升系统处理效率的问题。

简要分析

 Kafka消费者订阅消息可以通过主题订阅,也可以指定分区订阅。如果是通过主题订阅,消费者将获取该主题下所有分区的消息。如果是指定分区订阅,消费者只能获取该分区下的消息。

  1. 按主题订阅的模式。同一分组的消费者实例中只会有一个实例收到消息,其它实例处于空转状态,会浪费资源,无法提升效率。当处理任务的实例挂了后,服务再平衡,另外一个消费者实例才会接手继续处理任务。
  2. 按分区定义的模式。消费者只能接收到所订阅分区的消息。如果有多个消费者实例订阅同一个分区,它们将收到相同的消息,这相当于广播, 如果不做特殊处理会出现消息多次消费的情况(这种订阅方式,官方文档要求的是配置不同的分组ID,否则会导致提交冲突)。

 按分区订阅的方式会增加生产者发送消息的处理复杂度,发送消息时需要知道哪些消息放在哪个分区能被正确消费。后面如果增加了分区和主题,生产者和消费者都需要做较大的改动。

 所以,初步考虑还是走按主题订阅消息这条路。生产者发送消失时只需要指定topic, 消费者订阅也简单。在这种模式下,有两点需要处理:

  1. 多个消费者实例为实现负载分担,需要配置不同的组ID,这样可以收到相同的消息(相当于广播)。
  2. 为避免同一个消息在多个消费者实例中重复处理,需要做一些互斥。这个可以考虑用redis来做。

生产者代码样例

import com.alibaba.fastjson.JSON;
import com.elon.base.constant.KafkaTopicConst;
import com.elon.base.model.BIReportTask;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;public class KafkaProducerService {private KafkaProducer<String, String> producer = null;public KafkaProducerService() {Properties props = new Properties();props.put("bootstrap.servers", "192.168.5.128:9092");props.put("acks", "0");props.put("group.id", "1111");props.put("retries", "2");props.put("partitioner.class", NeoPartitioner.class);//设置key和value序列化方式props.put("key.serializer", StringSerializer.class);props.put("value.serializer", StringSerializer.class);//生产者实例producer = new KafkaProducer<>(props);}/*** 外部调用的发消息接口*/public void sendMessage() {for (int i = 0; i < 20; ++i) {BIReportTask task = new BIReportTask();task.setPath("d:/temp");task.getParamMap().put("value", String.valueOf(i));ProducerRecord<String, String> record = new ProducerRecord(KafkaTopicConst.ELON_TOPIC, JSON.toJSONString(task));producer.send(record);}}

 生产者发送消息处理比较简单,创建ProducerRecord只需要指定Topic和Value。如果对消息处理有顺序要求,可以指定一个消息键。

消费者公共处理代码

 下面的公共处理逻辑代码可以写到一个公共的common库中,编译成jar包。谁需要订阅处理kafka消息,引入依赖即可。

1. 定义一个公共的Task模型

import lombok.Getter;
import lombok.Setter;import java.util.Date;/*** 异步任务模型基类. 定义模型公共属性** @author neo* @since 2024-05-14*/
@Getter
@Setter
public class TaskBase {// 任务唯一ID标识, 用UUIDprivate String taskId = "";// 任务编码(任务类别)private String taskCode = "";// 创建人private String createUser = "";// 创建时private Date createTime = null;// 修改人private String updateUser = "";// 修改时间private Date updateTime = null;public TaskBase() {}public TaskBase(String taskId, String taskCode) {this.taskId = taskId;this.taskCode = taskCode;}
}

 这里面的taskId是任务唯一的UUID标识。 taskCode是任务的类别,可以定义为常量,用于分区不同的任务。

2. 定义消息处理抽象类

import lombok.Getter;/*** 任务处理类。派生出各子类实现具体的处理逻辑** @author neo* @since 2024-05-14*/
public abstract class TaskHandler {// 任务编码@Getterprivate final String taskCode;/*** 任务处理接口** @param taskJson 任务对象JSON串*/public abstract void handle(String taskJson);protected TaskHandler(String taskCode) {this.taskCode = taskCode;}
}

 所有消费者都需要从该类继承,并显示handle接口,实际的业务处理逻辑在子类的handle方法中去实现。

3. 消费者服务处理类

3.1核心逻辑代码

import com.alibaba.fastjson.JSON;
import com.elon.base.model.TaskBase;
import com.elon.base.util.StringUtil;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.params.SetParams;import java.time.Duration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;/*** Kafka消费者服务类。订阅接收消息, 再转给具体的业务处理类处理** @author neo* @since 2024-5-14*/
@Component
public class KafkaConsumerService {private static final Logger LOGGER = LoggerFactory.getLogger(KafkaConsumerService.class);@Value("${neo.application_name:}")private String applicationName;// Kafka分区器连接@Value("${neo.kafka.bootstrap.servers:}")private String kafkaServer;// Kafka组ID@Value("${neo.kafka.group.id:}")private String kafkaGroupId;// 最大一次拉取的消息数量@Value("${neo.kafka.max.poll.records:1}")private int maxPollRecords;@Value("${neo.kafka.topics}")private List<String> topics;@Value("${neo.redis.ip:}")private String redisIp;@Value("${neo.redis.port:}")private int redisPort;// 消费者private KafkaConsumer consumer = null;// 任务处理器. Map<任务编码, 任务处理器>private Map<String, TaskHandler> handlerMap = new HashMap<>();/*** 注册任务处理器** @param handler 任务处理器*/public void registerHandler(TaskHandler handler) {handlerMap.put(handler.getTaskCode(), handler);}/*** 初始化消费者实例. 订阅主题消息*/public void initKafkaConsumer() {LOGGER.info("Subscribe message. kafkaServer:{}|kafkaGroupId:{}|maxPollRecords:{}|topics:{}",kafkaServer, kafkaGroupId, maxPollRecords, topics);Properties properties = new Properties();properties.put("bootstrap.servers", kafkaServer);  // 指定 Brokerproperties.put("group.id", kafkaGroupId);              // 指定消费组群 IDproperties.put("max.poll.records", maxPollRecords);properties.put("enable.auto.commit", "false");properties.put("key.deserializer", StringDeserializer.class); // 将 key 的字节数组转成 Java 对象properties.put("value.deserializer", StringDeserializer.class);  // 将 value 的字节数组转成 Java 对象consumer = new KafkaConsumer<String, String>(properties);consumer.subscribe(topics);  // 订阅主题new Thread(this::handleMessage).start();}/*** 从Kafka获取消息,传给相应的处理器处理.*/public void handleMessage() {Jedis jedisClient = getJedisClient();while (true) {synchronized (this) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(Long.MAX_VALUE));LOGGER.info("Fetch record num:{}", records.count());for (ConsumerRecord<String, String> record : records) {try {handleSingleMessage(jedisClient, record);} catch (Exception e) {LOGGER.error("Handle message fail. Topic:{}|Partition:{}|Offset:{}|Key:{}|Message:{}",record.topic(), record.partition(), record.offset(), record.key(), record.value());}}// 提交任务,更新offsetconsumer.commitSync();}}}/*** 处理单个消息.** @param jedisClient redis客户端接口* @param record kafka消息记录* @author neo*/private void handleSingleMessage(Jedis jedisClient, ConsumerRecord<String, String> record) {TaskBase taskBase = JSON.parseObject(record.value(), TaskBase.class) ;if (!handlerMap.containsKey(taskBase.getTaskCode())) {return;}// 判断同一个任务是否已经有其它实例在处理String taskKey = "Task_" + taskBase.getTaskId();String handleAppName = jedisClient.getSet(taskKey, applicationName);// 设置过期时间只是为了方便自动清除redis中的数据。在实际项目中,任务数据是非常重要的,往往需要持久化到数据库jedisClient.expire(taskKey, 60 * 60);if (!StringUtil.isEmpty(handleAppName)) {jedisClient.set(taskKey, handleAppName, new SetParams().px(1000 * 60 * 60));LOGGER.info("Task:{} completed. Handle app name:{}", taskBase.getTaskId(), handleAppName);return;}// 将消息分发给具体的handler类处理LOGGER.info("Handle message. Topic:{}|Partition:{}|Offset:{}|Key:{}|Message:{}",record.topic(), record.partition(), record.offset(), record.key(), record.value());TaskHandler handler = handlerMap.get(taskBase.getTaskCode());handler.handle(record.value());}public Jedis getJedisClient() {Jedis jedis = new Jedis(redisIp, redisPort);return jedis;}

这里面有几个重要的方法:

  1. registerHandler: 消息处理类注册接口。消费者程序在继承实现了的该接口后,可以将子类实例注册过来。
  2. initKafkaConsumer: 这里面主要是按topic订阅消息的代码。
  3. handleMessage: 从Kafka服务器拉去消息并处理。
  4. handleSingleMessage: 处理单条消息。核心代码。

 在handleSingleMessage方法中,首先会去查询handlerMap中是否有Task Code对应的处理器。因消息是按Topic广播过来的,肯定会有当前消费者不需要处理的消息(其它消费者订阅的)。然后, 在处理任务前会先将taskId通过jedis的getSet方法存储到redis,同时检测是否有其它消费者已经处理了该任务(已处理,则不再重复处理)。

 注:这个地方因需要保证操作的原子性 用了jedis的getSet方法,实际上还是有瑕疵,第二次调用时会修改旧的数据。应该还可以通过lua脚本做一个好一些的处理(后续再处理)。

3.2 yml配置样例

在这里插入图片描述

消费者定制样例代码

 定制代码是开发过程中每个消费者根据业务需要,结合需要消费的具体消息增加的业务处理代码。下面以虚构的一个生成BI业务报表为例,简单说明一下。

1. 定义任务模型

/*** 生成BI报告的任务模型。不同的任务可以根据需要定义不同的模型** @author neo* @since 2024-05-14*/
@Getter
@Setter
public class BIReportTask extends TaskBase {// 存放报告的路径private String path;// 参数private Map<String, String> paramMap = new HashMap<>();public BIReportTask() {super(UUID.randomUUID().toString(), TaskCodeConst.KAFKA_REPORT_EXPORT_BI_REPORT);}
}

 根据业务需要传递的参数而定义,和Task Code关联。生产者和消费者约定使用同一模型。

2. 具体消息处理类

import com.alibaba.fastjson.JSON;
import com.elon.base.constant.TaskCodeConst;
import com.elon.base.model.BIReportTask;
import com.elon.base.service.kafka.KafkaConsumerService;
import com.elon.base.service.kafka.TaskHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;
import javax.annotation.Resource;/*** BI报表生成处理类** @author neo* @since 2024-05-14*/
@Component
public class KafkaBIReportHandler extends TaskHandler {private static final Logger LOGGER = LoggerFactory.getLogger(KafkaBIReportHandler.class);@Resourceprivate KafkaConsumerService kafkaConsumerService;public KafkaBIReportHandler() {super(TaskCodeConst.KAFKA_REPORT_EXPORT_BI_REPORT);}@PostConstructpublic void init() {kafkaConsumerService.registerHandler(this);}@Overridepublic void handle(String taskJson) {BIReportTask task = JSON.parseObject(taskJson, BIReportTask.class);LOGGER.info("Create BI report. taskCode:{}|taskId:{}", task.getTaskCode(), task.getTaskId());// 生成BI报表. 具体的处理逻辑略. 这里等待5秒表示业务处理耗时try {Thread.sleep(5000);} catch (InterruptedException e) {e.printStackTrace();}}
}

注:类中的init方法完成了注册功能。

3. 初始化消费者

/*** 消费者应用初始化.** @author neo* @since 2024-05-15*/
@Component
public class ConsumerApplicationInit implements ApplicationRunner {private static final Logger LOGGER = LoggerFactory.getLogger(ConsumerApplicationInit.class);@Resourceprivate KafkaConsumerService kafkaConsumerService;@Overridepublic void run(ApplicationArguments args) throws Exception {kafkaConsumerService.initKafkaConsumer();LOGGER.info("Init kafka consumer success.");}
}

在SpringBoot启动完成后做初始化操作。

测试验证及说明

 修改yaml文件的组ID,springboot服务端口号,打两个包启动。生产者发送一批消息测试。可以看到两个消费者实例同时在处理这一批消息,并不重复。当前消费者会跳过另一消费者已处理过的任务。

在这里插入图片描述

 上面描述的代码及方案基本可实现以Kafka作为消息中间件,多消费者实例负载分担和可靠性要求。Demo代码仅作为个人研究用,不一定适用于实际的实际项目开发。仅作参考。

详细代码可参考github:

公共代码包:https://github.com/ylforever/elon-base

消费者定制代码:https://github.com/ylforever/neo-kafka-consumer

生产者代码:https://github.com/ylforever/neo-kafka-producer

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/705555.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Benedict Evans:Ways to think about AGI思考 AGI 的方法:

​Benedict Evans本文发布于2024 年 5 月 4 日 How do we think about a fundamentally unknown and unknowable risk, when the experts agree only that they have no idea? 当专家们一致认为他们一无所知时&#xff0c;我们如何看待根本上未知和不可知的风险&#xff1f; T…

JVM学习-虚拟机栈

虚拟机栈 每个线程创建时都会创建一个虚拟机栈&#xff0c;其内部保存一个个栈帧&#xff0c;对应一次次Java方法调用&#xff0c;栈是线程私有的。 生命周期: 与线程相同 作用 主管Java程序的运行&#xff0c;它保存方法的局部变量、部分结果、并参与方法的调用和返回。 …

【管理咨询宝藏104】普华永道财务管理与内控培训

本报告首发于公号“管理咨询宝藏”&#xff0c;如需阅读完整版报告内容&#xff0c;请查阅公号“管理咨询宝藏”。 【管理咨询宝藏104】普华永道财务管理与内控培训 【格式】PDF版本 【关键词】普华永道、四大、财务管理 【核心观点】 - 职能转变后&#xff0c;财务在决策支持…

亚马逊跨境电商平台优势凸显,武汉星起航解析助力卖家把握商机

在全球电商市场的激烈竞争中&#xff0c;亚马逊凭借其独特的优势和卓越的运营能力&#xff0c;成为众多卖家首选的跨境电商平台。武汉星起航作为深耕亚马逊跨境电商领域的领军企业&#xff0c;对亚马逊平台的优势有着深刻的理解和独到的见解。本文将重点探讨亚马逊跨境电商平台…

eMMC和SD模式速率介绍

概述 在实际项目开发中我们常见的问题是有人会问&#xff0c;“当前项目eMMC、SD所使用模式是什么&#xff1f; 速率是多少&#xff1f;”。这些和eMMC、SD的协议中要求的&#xff0c;要符合协议。接下来整理几张图来介绍。 eMMC 模式介绍 一般情况下我们项目中都是会支持到H…

基于SpringBoot设计模式之创建型设计模式·工厂方法模式

文章目录 介绍开始架构图样例一定义工厂定义具体工厂&#xff08;上衣、下装&#xff09;定义产品定义具体生产产品&#xff08;上衣、下装&#xff09; 测试样例 总结优点缺点与抽象工厂不同点 介绍 在 Factory Method模式中&#xff0c;父类决定实例的生成方式&#xff0c;但…

牛客NC404 最接近的K个元素【中等 二分查找+双指针 Java/Go/PHP】

题目 题目链接&#xff1a; https://www.nowcoder.com/practice/b4d7edc45759453e9bc8ab71f0888e0f 知识点 二分查找&#xff1b;找到第一个大于等于x的数的位置idx;然后从idx开始往两边扩展Java代码 import java.util.*;public class Solution {/*** 代码中的类名、方法名、…

UnitTest / pytest 框架

文章目录 一、UnitTest框架1. TestCase使用2. TestSuite 和 TestRunner3. TestLoader4. Fixture装置5. UnitTest断言1. 登录案例 6. 参数化1. parameterized插件 7. unitTest 跳过 二、pytest 框架1. 运行方式3.读取配置文件(常用方式) 2. pytest执行用例的顺序1. 分组执行(冒烟…

ArcGIS10.X入门实战视频教程(arcgis入门到精通)

点击学习&#xff1a; ArcGIS10.X入门实战视频教程&#xff08;GIS思维&#xff09;https://edu.csdn.net/course/detail/4046?utm_sourceblog2edu 点击学习&#xff1a; ArcGIS10.X入门实战视频教程&#xff08;GIS思维&#xff09;https://edu.csdn.net/course/detail/404…

【Python从入门到进阶】54、使用Python轻松操作SQLite数据库

一、引言 1、什么是SQLite SQLite的起源可以追溯到2000年&#xff0c;由D. Richard Hipp&#xff08;理查德希普&#xff09;所创建。作为一个独立的开发者&#xff0c;Hipp在寻找一个能够在嵌入式系统中使用的轻量级数据库时&#xff0c;发现现有的解决方案要么过于庞大&…

CAD插入文字到另一图形样式变相同

CAD从一张图形复制到另外一张图形后&#xff0c;文字样式变成一样是因为两张图所用的文字样式名称一样&#xff0c;但是样式里面的使用字体样式不一样。如下图所示&#xff0c;找到工具栏中的注释 &#xff0c;点击文字样式。里面就会显示当前图形中使用的样式名称及其对应的字…

C++map容器关联式容器

Cmap 1. 关联式容器 vector、list、deque、forward_list(C11)等STL容器&#xff0c;其底层为线性序列的数据结构&#xff0c;里面存储的是元素本身&#xff0c;这样的容器被统称为序列式容器。而map、set是一种关联式容器&#xff0c;关联式容器也是用来存储数据的&#xff0…