第三章 Kafka集成 SpringBoot
SpringBoot 是一个在 JavaEE 开发中非常常用的组件。可以用于 Kafka 的生产者,也可以 用于 SpringBoot 的消费者。
在初始化springboot环境的时候要勾选kafka依赖
<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka-test</artifactId><scope>test</scope></dependency>
3.1 SpringBoot 生产者
(1)修改 SpringBoot 核心配置文件 application.propeties, 添加生产者相关信息
server:port: 8080
spring:kafka:bootstrap-servers: hadoop102:9092,hadoop103:9092producer:key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerconsumer:key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializergroup-id: test
(2)创建 controller 从浏览器接收数据, 并写入指定的 topic
package cn.jxust.springbootkafka.Controller;/*** @author pengjx** */import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;@RestController
public class ProducerController {@Autowiredprivate KafkaTemplate<String,String> stringKafkaTemplate;@RequestMapping("/atguigu")public String data(String msg){stringKafkaTemplate.send("first",msg);return "ok";}
}
(3)在浏览器中给/atguigu 接口发送数据 http://localhost:8080/atguigu?msg=hello
3.2 SpringBoot 消费者
(1)修改 SpringBoot 核心配置文件 application.propeties
server:port: 8080
spring:kafka:bootstrap-servers: hadoop102:9092,hadoop103:9092producer:key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerconsumer:key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializergroup-id: test
(2)创建类消费 Kafka 中指定 topic 的数据
package cn.jxust.springbootkafka.Consumer;/*** @author pengjx** */import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.KafkaListener;@Configuration
public class KafkaConsumer {@KafkaListener(topics = "first")public void getData(String msg){System.out.println("消息是:"+msg);}}
3.3 工具类
KafkaProducer
package cn.jxust.springbootkafka.utils;/*** @author pengjx** */import cn.hutool.json.JSONUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;@Component
@Slf4j
public class KafkaProducer {@Autowiredprivate KafkaTemplate<String,Object> kafkaTemplate;private static final String TOPIC="first";public void send(Object object){String jsonStr = JSONUtil.toJsonStr(object);log.info("准备发送消息为:{}", jsonStr);ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(TOPIC, jsonStr);future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {@Overridepublic void onFailure(Throwable ex) {//发送失败的处理log.info(TOPIC + " - 生产者 发送消息失败:" + ex.getMessage());}@Overridepublic void onSuccess(SendResult<String, Object> result) {//成功的处理log.info(TOPIC + " - 生产者 发送消息成功:" + result.toString());}});}}
KafkaConsumer
package cn.jxust.springbootkafka.utils;/*** @author pengjx** */import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;import java.util.Optional;@Component
@Slf4j
public class KafkaConsumer {@KafkaListener(topics = "first")public void topicTest(ConsumerRecord<?,?> record){Optional<?> message = Optional.ofNullable(record.value());if(message.isPresent()){Object object = message.get();log.info("topic.group1 消费了: Topic:" + "first" + ",Message:" + object);}}}