项目目录结构
pom.xml
<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- Spring Kafka Starter --><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency></dependencies>
kafka-provider
-
application.yml
server:port: 10001 spring:kafka:bootstrap-servers: localhost:9092producer:key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerconsumer:key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializergroup-id: test-consumer-group
-
service
@Service public class ProviderServiceImpl {private static final String TOPIC_NAME = "dragon-topic";@Resourceprivate KafkaTemplate<String, String> kafkaTemplate;public String sendMsg(String info) {kafkaTemplate.send(TOPIC_NAME,info);return "发送成功";} }
@RestController @RequestMapping("/provider") @RequiredArgsConstructor public class ProviderController {private final ProviderServiceImpl providerService;@GetMappingpublic String providerApi() {return providerService.sendMsg("莫等闲,白了少年头,空悲切。");} }
启动类
@SpringBootApplication
public class ConsumerApp {public static void main(String[] args) {SpringApplication.run(ConsumerApp.class, args);}
}
kafka-consumer
-
kafka-listener
@Component public class KafkaListener {private static final Logger log = LoggerFactory.getLogger(KafkaListener.class);private static final String TOPIC_NAME = "dragon-topic";@org.springframework.kafka.annotation.KafkaListener(topics = TOPIC_NAME)public void receive(String msg) {log.info("接收到消息:{}",msg);} }
-
application.yml
server:port: 10002 spring:kafka:bootstrap-servers: localhost:9092producer:key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerconsumer:key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializergroup-id: test-consumer-group
启动类
@SpringBootApplication
public class ConsumerApp {public static void main(String[] args) {SpringApplication.run(ConsumerApp.class, args);}
}