kafka消息队列最常用的两种模式,以及应用场景

目录

一、发布-订阅模式

二、点对点模式

三、应用场景


 

一、发布-订阅模式

发布-订阅模式是最常见的消息传递模式,其中消息发布者将消息发送到一个或多个主题(Topic),而订阅者可以选择订阅一个或多个主题来接收消息。每个订阅者都可以独立地消费消息,而发布者和订阅者之间没有直接的联系。

在Kafka中,使用KafkaProducer类进行消息发布,KafkaConsumer类进行消息订阅。以下是一个简单的Java代码示例:

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.serialization.StringDeserializer;import java.util.Properties;public class PubSubExample {private static final String TOPIC = "my_topic";private static final String BOOTSTRAP_SERVERS = "localhost:9092";public static void main(String[] args) {// Kafka ProducerProperties producerProps = new Properties();producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());KafkaProducer<String, String> producer = new KafkaProducer<>(producerProps);// Publish messagesfor (int i = 0; i < 10; i++) {String message = "Message " + i;ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, message);producer.send(record, new Callback() {@Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) {if (exception != null) {System.err.println("Error publishing message: " + exception.getMessage());} else {System.out.println("Message published successfully: " + metadata.offset());}}});}producer.close();// Kafka ConsumerProperties consumerProps = new Properties();consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "my_consumer_group");consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps);consumer.subscribe(Collections.singletonList(TOPIC));// Consume messageswhile (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record : records) {System.out.println("Received message: " + record.value());// Process the message}}}
}

 

二、点对点模式

点对点模式中,消息发送者将消息发送到一个指定的队列(Queue),而消息接收者从相同的队列中接收消息。每个消息只能被一个接收者消费。

在Kafka中,点对点模式可以通过创建单个消费者组来实现。以下是一个简单的Java代码示例:

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.serialization.StringDeserializer;import java.util.Properties;public class PointToPointExample {private static final String QUEUE = "my_queue";private static final String BOOTSTRAP_SERVERS = "localhost:9092";public static void main(String[] args) {// Kafka ProducerProperties producerProps = new Properties();producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());KafkaProducer<String, String> producer = new KafkaProducer<>(producerProps);// Publish messagesfor (int i = 0; i < 10; i++) {String message = "Message " + i;ProducerRecord<String, String> record = new ProducerRecord<>(QUEUE, message);producer.send(record, new Callback() {@Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) {if (exception != null) {System.err.println("Error publishing message: " + exception.getMessage());} else {System.out.println("Message published successfully: " + metadata.offset());}}});}producer.close();// Kafka ConsumerProperties consumerProps = new Properties();consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "my_consumer_group");consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps);consumer.subscribe(Collections.singletonList(QUEUE));// Consume messageswhile (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record : records) {System.out.println("Received message: " + record.value());// Process the messageconsumer.commitAsync();}}}
}

以上代码示例演示了如何使用Kafka的Java客户端库进行发布和订阅消息以及点对点消息传递。请注意,代码中的BOOTSTRAP_SERVERS需要根据你的实际环境进行配置。

 

三、应用场景

Kafka消息队列具有高吞吐量、低延迟、可扩展性等特点,因此广泛应用于以下场景:

  1. 日志收集和数据管道:Kafka可以用作集中式日志收集系统,可以将不同服务、应用程序、服务器生成的日志集中到一个中心化的消息队列中,再通过消费者进行处理、分析和存储。同时,Kafka还可以作为数据管道,将不同数据源的数据通过消息队列进行传输和处理。

  2. 实时流处理:Kafka与流处理框架(如Apache Flink、Apache Spark)结合使用,可以实现实时的数据流处理。Kafka可以作为输入源和输出源,将数据流传输给流处理框架进行实时分析、计算和处理。

  3. 微服务架构:Kafka可以用作微服务之间的异步通信机制,不同的微服务各自独立地生产和消费消息,实现解耦和扩展性。同时,Kafka还可以用于实现事件驱动架构,不同的微服务通过订阅事件的方式进行通信和协作。

  4. 网络爬虫和数据采集:Kafka可以用于构建高可靠的网络爬虫系统和数据采集系统。爬虫可以将抓取的数据写入Kafka队列,然后其他系统可以消费这些数据进行进一步的处理和分析。

  5. 消息系统和通信中间件:Kafka提供了可靠的消息传递机制,可以作为消息系统和通信中间件,用于构建分布式系统、实现异步通信和跨系统的数据传输。

总之,Kafka消息队列的应用场景非常广泛,适用于大数据处理、实时数据流处理、异步通信等各种场景。它具有高性能、可靠性和可扩展性的特点,可以帮助解决数据流处理和消息传递的各种问题。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/27997.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

深入理解网络栈

网络路径 发送端 应用层 1、socket 各种网络应用程序基本上都是通过 Linux Socket 编程接口来和内核空间的网络协议栈通信的 socket 是网络编程的入口&#xff0c;它提供了大量的系统调用&#xff0c;构成了网络程序的主体 udp UDP 是面向无连接的协议&#xff0c;不需要与…

Linux的权限管理精细总结

&#xff08;该图由AI绘制 关注我 学习AI画图&#xff09; 目录 一、权限概述 1、权限的基本概念 2、为什么要设置权限 3、Linux用户身份类别 4、user文件拥有者 5、group文件所属组内用户 6、other其他用户 7、特殊用户root 二、普通权限管理 1、ls -l命令查看文件…

项目名称:无源在线词典项目

一&#xff0c;概述 基于C语言的网络电子词典项目&#xff0c;使用到了tcp协议的并发服务器设计、网络编程、文件I/O、数据库等多方面的知识。可以满足多用户同时登陆&#xff0c;用户登陆后可以查询单词及历史记录&#xff0c;具有查找快速&#xff0c;保密性好等优点。 开…

详细介绍MATLAB中的图论算法

MATLAB是一种功能强大的编程语言和环境,提供了许多用于图论算法的工具和函数。图论是研究图及其属性和关系的数学分支,广泛应用于计算机科学、网络分析、社交网络分析等领域。在MATLAB中,我们可以使用图论算法来解决各种问题,如最短路径问题、最小生成树问题、最大流问题等…

HDFS与MapResource笔记

客户端向NN请求上传文件 NN回应可以上传 请求上传块,返回DN 所以后面就比较慢 找最近的服务器进行 64K发到1节点,1节点立刻发给2节点,同时1节点自动开始落盘,这里,3个节点是同时落盘的. 因为缓存是在内存中,而持久化是将数据存到磁盘上. 副本节点选择: 1.安全:放不同机架 2.速…

Apache Knox Gateway

简介&#xff1a; Knox是一个提供认证和访问集群中hadoop服务的单个端点服务。目标是为用户和操作者简化hadoop安全。knox运行为一个服务或者集群服务&#xff0c;并提供集中访问一个或者多个hadoop集群。通常网关的目标如下&#xff1a; 1、为hadoop rest api 提供外层的安全…

深入篇【C++】谈vector中的深浅拷贝与迭代器失效问题

深入篇【C】谈vector中的深浅拷贝与迭代器失效问题 Ⅰ.深浅拷贝问题1.内置类型深拷贝2.自定义类型深拷贝 Ⅱ.迭代器失效问题1.内部迭代器失效2.外部迭代器失效 Ⅰ.深浅拷贝问题 1.内置类型深拷贝 浅拷贝是什么意思&#xff1f;就是单纯的值拷贝。 浅拷贝的坏处&#xff1a; ①…

力扣 860. 柠檬水找零

题目来源&#xff1a;https://leetcode.cn/problems/lemonade-change/description/ C题解&#xff1a;由于收到的钱币只有5&#xff0c;10&#xff0c;20三种&#xff0c;对于5元直接收&#xff0c;对于10元找零1张5元&#xff0c;对于20元找零15元&#xff0c;可以找零105或者…

echarts 横向柱状图 刻度标签

echarts 横向柱状图 刻度标签 怎么调试都不左对齐 将width去掉固定宽度 echarts会自适应

自来水收费系统适合应用于哪些场景?

自来水收费系统是一种用于自来水公司或供水管理部门的软件系统&#xff0c;旨在帮助自动化自来水的收费和管理过程。该系统可以帮助自来水公司更好地管理水资源&#xff0c;提高供水质量和效率&#xff0c;同时也可以为用户提供更加便捷和安全的用水服务。下面将从多个方面来介…

mysal数据库的日志恢复

目录 一 物理冷备份 二 mysqldump 备份与恢复&#xff08;温备份&#xff09; 三 mgsql中的增量备份需要借助mysql日志的二进制来恢复 小结 一 物理冷备份 systemctl stop mysqld yum -y install xz 压缩备份 tar Jcvf /opt/mysql_all_$(date %F).tar.xz /usr/local/mysql/…

Acwing.908 最大不相交区间数量(贪心)

题目 给定N个闭区间[ai,bi]&#xff0c;请你在数轴上选择若干区间&#xff0c;使得选中的区间之间互不相交&#xff08;包括端点)。输出可选取区间的最大数量。 输入格式 第一行包含整数N&#xff0c;表示区间数。 接下来N行&#xff0c;每行包含两个整数ai , bi&#xff0c…