问题描述
在项目中使用消息中间件,主要为实现两个目的:
- 任务排队:当请求过多时,消费端无法同时处理,需要排队等待。这一点kafka采用的是"拉取消息"的模式,自然支持。
- 负载分担: 这里的负载负担不是指Kafka本身的横向扩容,而是指在任务量过大时可以通过增加消费者实例,提升效率。
下面重点分析一下,增加消费者实例以提升系统处理效率的问题。
简要分析
Kafka消费者订阅消息可以通过主题订阅,也可以指定分区订阅。如果是通过主题订阅,消费者将获取该主题下所有分区的消息。如果是指定分区订阅,消费者只能获取该分区下的消息。
- 按主题订阅的模式。同一分组的消费者实例中只会有一个实例收到消息,其它实例处于空转状态,会浪费资源,无法提升效率。当处理任务的实例挂了后,服务再平衡,另外一个消费者实例才会接手继续处理任务。
- 按分区定义的模式。消费者只能接收到所订阅分区的消息。如果有多个消费者实例订阅同一个分区,它们将收到相同的消息,这相当于广播, 如果不做特殊处理会出现消息多次消费的情况(这种订阅方式,官方文档要求的是配置不同的分组ID,否则会导致提交冲突)。
按分区订阅的方式会增加生产者发送消息的处理复杂度,发送消息时需要知道哪些消息放在哪个分区能被正确消费。后面如果增加了分区和主题,生产者和消费者都需要做较大的改动。
所以,初步考虑还是走按主题订阅消息这条路。生产者发送消失时只需要指定topic, 消费者订阅也简单。在这种模式下,有两点需要处理:
- 多个消费者实例为实现负载分担,需要配置不同的组ID,这样可以收到相同的消息(相当于广播)。
- 为避免同一个消息在多个消费者实例中重复处理,需要做一些互斥。这个可以考虑用redis来做。
生产者代码样例
import com.alibaba.fastjson.JSON;
import com.elon.base.constant.KafkaTopicConst;
import com.elon.base.model.BIReportTask;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;public class KafkaProducerService {private KafkaProducer<String, String> producer = null;public KafkaProducerService() {Properties props = new Properties();props.put("bootstrap.servers", "192.168.5.128:9092");props.put("acks", "0");props.put("group.id", "1111");props.put("retries", "2");props.put("partitioner.class", NeoPartitioner.class);//设置key和value序列化方式props.put("key.serializer", StringSerializer.class);props.put("value.serializer", StringSerializer.class);//生产者实例producer = new KafkaProducer<>(props);}/*** 外部调用的发消息接口*/public void sendMessage() {for (int i = 0; i < 20; ++i) {BIReportTask task = new BIReportTask();task.setPath("d:/temp");task.getParamMap().put("value", String.valueOf(i));ProducerRecord<String, String> record = new ProducerRecord(KafkaTopicConst.ELON_TOPIC, JSON.toJSONString(task));producer.send(record);}}
生产者发送消息处理比较简单,创建ProducerRecord只需要指定Topic和Value。如果对消息处理有顺序要求,可以指定一个消息键。
消费者公共处理代码
下面的公共处理逻辑代码可以写到一个公共的common库中,编译成jar包。谁需要订阅处理kafka消息,引入依赖即可。
1. 定义一个公共的Task模型
import lombok.Getter;
import lombok.Setter;import java.util.Date;/*** 异步任务模型基类. 定义模型公共属性** @author neo* @since 2024-05-14*/
@Getter
@Setter
public class TaskBase {// 任务唯一ID标识, 用UUIDprivate String taskId = "";// 任务编码(任务类别)private String taskCode = "";// 创建人private String createUser = "";// 创建时private Date createTime = null;// 修改人private String updateUser = "";// 修改时间private Date updateTime = null;public TaskBase() {}public TaskBase(String taskId, String taskCode) {this.taskId = taskId;this.taskCode = taskCode;}
}
这里面的taskId是任务唯一的UUID标识。 taskCode是任务的类别,可以定义为常量,用于分区不同的任务。
2. 定义消息处理抽象类
import lombok.Getter;/*** 任务处理类。派生出各子类实现具体的处理逻辑** @author neo* @since 2024-05-14*/
public abstract class TaskHandler {// 任务编码@Getterprivate final String taskCode;/*** 任务处理接口** @param taskJson 任务对象JSON串*/public abstract void handle(String taskJson);protected TaskHandler(String taskCode) {this.taskCode = taskCode;}
}
所有消费者都需要从该类继承,并显示handle接口,实际的业务处理逻辑在子类的handle方法中去实现。
3. 消费者服务处理类
3.1核心逻辑代码
import com.alibaba.fastjson.JSON;
import com.elon.base.model.TaskBase;
import com.elon.base.util.StringUtil;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.params.SetParams;import java.time.Duration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;/*** Kafka消费者服务类。订阅接收消息, 再转给具体的业务处理类处理** @author neo* @since 2024-5-14*/
@Component
public class KafkaConsumerService {private static final Logger LOGGER = LoggerFactory.getLogger(KafkaConsumerService.class);@Value("${neo.application_name:}")private String applicationName;// Kafka分区器连接@Value("${neo.kafka.bootstrap.servers:}")private String kafkaServer;// Kafka组ID@Value("${neo.kafka.group.id:}")private String kafkaGroupId;// 最大一次拉取的消息数量@Value("${neo.kafka.max.poll.records:1}")private int maxPollRecords;@Value("${neo.kafka.topics}")private List<String> topics;@Value("${neo.redis.ip:}")private String redisIp;@Value("${neo.redis.port:}")private int redisPort;// 消费者private KafkaConsumer consumer = null;// 任务处理器. Map<任务编码, 任务处理器>private Map<String, TaskHandler> handlerMap = new HashMap<>();/*** 注册任务处理器** @param handler 任务处理器*/public void registerHandler(TaskHandler handler) {handlerMap.put(handler.getTaskCode(), handler);}/*** 初始化消费者实例. 订阅主题消息*/public void initKafkaConsumer() {LOGGER.info("Subscribe message. kafkaServer:{}|kafkaGroupId:{}|maxPollRecords:{}|topics:{}",kafkaServer, kafkaGroupId, maxPollRecords, topics);Properties properties = new Properties();properties.put("bootstrap.servers", kafkaServer); // 指定 Brokerproperties.put("group.id", kafkaGroupId); // 指定消费组群 IDproperties.put("max.poll.records", maxPollRecords);properties.put("enable.auto.commit", "false");properties.put("key.deserializer", StringDeserializer.class); // 将 key 的字节数组转成 Java 对象properties.put("value.deserializer", StringDeserializer.class); // 将 value 的字节数组转成 Java 对象consumer = new KafkaConsumer<String, String>(properties);consumer.subscribe(topics); // 订阅主题new Thread(this::handleMessage).start();}/*** 从Kafka获取消息,传给相应的处理器处理.*/public void handleMessage() {Jedis jedisClient = getJedisClient();while (true) {synchronized (this) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(Long.MAX_VALUE));LOGGER.info("Fetch record num:{}", records.count());for (ConsumerRecord<String, String> record : records) {try {handleSingleMessage(jedisClient, record);} catch (Exception e) {LOGGER.error("Handle message fail. Topic:{}|Partition:{}|Offset:{}|Key:{}|Message:{}",record.topic(), record.partition(), record.offset(), record.key(), record.value());}}// 提交任务,更新offsetconsumer.commitSync();}}}/*** 处理单个消息.** @param jedisClient redis客户端接口* @param record kafka消息记录* @author neo*/private void handleSingleMessage(Jedis jedisClient, ConsumerRecord<String, String> record) {TaskBase taskBase = JSON.parseObject(record.value(), TaskBase.class) ;if (!handlerMap.containsKey(taskBase.getTaskCode())) {return;}// 判断同一个任务是否已经有其它实例在处理String taskKey = "Task_" + taskBase.getTaskId();String handleAppName = jedisClient.getSet(taskKey, applicationName);// 设置过期时间只是为了方便自动清除redis中的数据。在实际项目中,任务数据是非常重要的,往往需要持久化到数据库jedisClient.expire(taskKey, 60 * 60);if (!StringUtil.isEmpty(handleAppName)) {jedisClient.set(taskKey, handleAppName, new SetParams().px(1000 * 60 * 60));LOGGER.info("Task:{} completed. Handle app name:{}", taskBase.getTaskId(), handleAppName);return;}// 将消息分发给具体的handler类处理LOGGER.info("Handle message. Topic:{}|Partition:{}|Offset:{}|Key:{}|Message:{}",record.topic(), record.partition(), record.offset(), record.key(), record.value());TaskHandler handler = handlerMap.get(taskBase.getTaskCode());handler.handle(record.value());}public Jedis getJedisClient() {Jedis jedis = new Jedis(redisIp, redisPort);return jedis;}
这里面有几个重要的方法:
- registerHandler: 消息处理类注册接口。消费者程序在继承实现了的该接口后,可以将子类实例注册过来。
- initKafkaConsumer: 这里面主要是按topic订阅消息的代码。
- handleMessage: 从Kafka服务器拉去消息并处理。
- handleSingleMessage: 处理单条消息。核心代码。
在handleSingleMessage方法中,首先会去查询handlerMap中是否有Task Code对应的处理器。因消息是按Topic广播过来的,肯定会有当前消费者不需要处理的消息(其它消费者订阅的)。然后, 在处理任务前会先将taskId通过jedis的getSet方法存储到redis,同时检测是否有其它消费者已经处理了该任务(已处理,则不再重复处理)。
注:这个地方因需要保证操作的原子性 用了jedis的getSet方法,实际上还是有瑕疵,第二次调用时会修改旧的数据。应该还可以通过lua脚本做一个好一些的处理(后续再处理)。
3.2 yml配置样例
消费者定制样例代码
定制代码是开发过程中每个消费者根据业务需要,结合需要消费的具体消息增加的业务处理代码。下面以虚构的一个生成BI业务报表为例,简单说明一下。
1. 定义任务模型
/*** 生成BI报告的任务模型。不同的任务可以根据需要定义不同的模型** @author neo* @since 2024-05-14*/
@Getter
@Setter
public class BIReportTask extends TaskBase {// 存放报告的路径private String path;// 参数private Map<String, String> paramMap = new HashMap<>();public BIReportTask() {super(UUID.randomUUID().toString(), TaskCodeConst.KAFKA_REPORT_EXPORT_BI_REPORT);}
}
根据业务需要传递的参数而定义,和Task Code关联。生产者和消费者约定使用同一模型。
2. 具体消息处理类
import com.alibaba.fastjson.JSON;
import com.elon.base.constant.TaskCodeConst;
import com.elon.base.model.BIReportTask;
import com.elon.base.service.kafka.KafkaConsumerService;
import com.elon.base.service.kafka.TaskHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;
import javax.annotation.Resource;/*** BI报表生成处理类** @author neo* @since 2024-05-14*/
@Component
public class KafkaBIReportHandler extends TaskHandler {private static final Logger LOGGER = LoggerFactory.getLogger(KafkaBIReportHandler.class);@Resourceprivate KafkaConsumerService kafkaConsumerService;public KafkaBIReportHandler() {super(TaskCodeConst.KAFKA_REPORT_EXPORT_BI_REPORT);}@PostConstructpublic void init() {kafkaConsumerService.registerHandler(this);}@Overridepublic void handle(String taskJson) {BIReportTask task = JSON.parseObject(taskJson, BIReportTask.class);LOGGER.info("Create BI report. taskCode:{}|taskId:{}", task.getTaskCode(), task.getTaskId());// 生成BI报表. 具体的处理逻辑略. 这里等待5秒表示业务处理耗时try {Thread.sleep(5000);} catch (InterruptedException e) {e.printStackTrace();}}
}
注:类中的init方法完成了注册功能。
3. 初始化消费者
/*** 消费者应用初始化.** @author neo* @since 2024-05-15*/
@Component
public class ConsumerApplicationInit implements ApplicationRunner {private static final Logger LOGGER = LoggerFactory.getLogger(ConsumerApplicationInit.class);@Resourceprivate KafkaConsumerService kafkaConsumerService;@Overridepublic void run(ApplicationArguments args) throws Exception {kafkaConsumerService.initKafkaConsumer();LOGGER.info("Init kafka consumer success.");}
}
在SpringBoot启动完成后做初始化操作。
测试验证及说明
修改yaml文件的组ID,springboot服务端口号,打两个包启动。生产者发送一批消息测试。可以看到两个消费者实例同时在处理这一批消息,并不重复。当前消费者会跳过另一消费者已处理过的任务。
上面描述的代码及方案基本可实现以Kafka作为消息中间件,多消费者实例负载分担和可靠性要求。Demo代码仅作为个人研究用,不一定适用于实际的实际项目开发。仅作参考。
详细代码可参考github:
公共代码包:https://github.com/ylforever/elon-base
消费者定制代码:https://github.com/ylforever/neo-kafka-consumer
生产者代码:https://github.com/ylforever/neo-kafka-producer