详解如何保证消息队列不丢失消息(以kafka为例)


✨✨祝屏幕前的小伙伴们每天都有好运相伴左右,一定要天天开心!✨✨ 
🎈🎈作者主页: 喔的嘛呀🎈🎈

目录

一、引言

二. 持久化存储

2.1持久化存储原理:

2.2使用示例:

1. 安装 Kafka:

2. 生产者代码:

3. 消费者代码:

三. 消息确认机制

3.1消息确认机制原理:

3.2使用示例:

1. 生产者代码:

2. 消费者代码:

四. 事务机制

4.1事务机制原理:

4.2使用示例:

1. 生产者代码:

2. 消费者代码:

五. 数据备份与复制

5.1数据备份与复制原理

5.2使用示例:

1. Kafka Broker配置:

2. 生产者代码

3. 消费者代码

六. 消息过期机制

总结


一、引言

消息队列(Message Queue)是一种用于在不同组件、服务或系统之间传递消息的通信方式。在分布式系统中,消息队列起到了缓冲和解耦的作用,但在使用过程中,如何保证消息不丢失是一个重要的问题。下面详细探讨一下消息队列如何保证消息不丢失的方法。Apache Kafka是一个分布式消息系统,设计和实现了一套机制来保证消息队列中的消息不丢失。以下是一些关键的配置和实践方法。

二. 持久化存储

为了防止消息在队列中丢失,消息队列系统通常会提供持久化存储的机制。这意味着一旦消息被接收,它会被存储在持久化存储中,即使系统崩溃或重启,消息仍然可以被恢复。这种机制通常使用文件系统或数据库来实现。

在Java中使用消息队列的持久化存储,我们以Apache Kafka为例进行演示。Kafka是一个分布式的、可持久化的消息队列系统,适用于大规模的数据流处理。

2.1持久化存储原理:

Kafka通过将消息写入磁盘上的日志文件(日志段)来实现持久化存储。每个消息都会被追加到日志文件的末尾,确保消息在写入后不会被修改,从而保证了消息的持久性。

2.2使用示例:

1. 安装 Kafka:

首先,确保你已经安装并启动了 Kafka。你可以从 Kafka官方网站 下载并按照官方文档进行安装和启动。

2. 生产者代码:

import org.apache.kafka.clients.producer.*;import java.util.Properties;public class KafkaProducerExample {public static void main(String[] args) {Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");// 创建生产者KafkaProducer<String, String> producer = new KafkaProducer<>(props);// 发送消息,将消息设置为持久化ProducerRecord<String, String> record = new ProducerRecord<>("example_topic", "Hello, Kafka!");producer.send(record, new Callback() {@Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) {if (exception == null) {System.out.println("Message sent successfully. Offset: " + metadata.offset());} else {exception.printStackTrace();}}});producer.close();}
}

3. 消费者代码:

import org.apache.kafka.clients.consumer.*;import java.time.Duration;
import java.util.Collections;
import java.util.Properties;public class KafkaConsumerExample {public static void main(String[] args) {Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("group.id", "example_group");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");// 创建消费者KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);// 订阅主题consumer.subscribe(Collections.singletonList("example_topic"));// 拉取消息,将消息设置为持久化while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record : records) {System.out.printf("Received message: offset = %d, key = %s, value = %s%n",record.offset(), record.key(), record.value());}}}
}

在上述代码中,通过将生产者和消费者配置中的acks属性设置为all(默认值),Kafka会等待消息被所有同步副本接收确认后再继续发送。这确保了消息在发送和接收时都会被持久化存储。

请注意,Kafka的配置和使用可能因版本而异,确保查阅相应版本的文档以获取准确的配置信息。

三. 消息确认机制

消息队列系统通常支持消息确认机制,确保消息在被消费者成功处理后才被标记为已处理。消费者在成功处理消息后发送确认给消息队列,然后消息队列才会将该消息从队列中移除。如果消费者处理失败,消息队列可以将消息重新投递给队列或者按照配置进行其他处理。

消息确认机制是确保消息在被消费者成功处理后才被标记为已处理的关键机制。在这里,我们将使用Apache Kafka作为示例进行演示,展示消息确认机制的实现。

3.1消息确认机制原理:

在Kafka中,消息确认机制主要通过Producer的acks参数和Consumer的手动确认来实现。acks参数表示生产者要求服务器确认消息的级别,而手动确认则是消费者在成功处理消息后通过调用特定的API来通知服务器。

3.2使用示例:

1. 生产者代码:

import org.apache.kafka.clients.producer.*;import java.util.Properties;public class KafkaProducerExample {public static void main(String[] args) {Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("acks", "all");  // 设置为all表示等待所有副本确认// 创建生产者KafkaProducer<String, String> producer = new KafkaProducer<>(props);// 发送消息,等待确认ProducerRecord<String, String> record = new ProducerRecord<>("example_topic", "Hello, Kafka!");producer.send(record, new Callback() {@Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) {if (exception == null) {System.out.println("Message sent successfully. Offset: " + metadata.offset());} else {exception.printStackTrace();}}});producer.close();}
}

2. 消费者代码:

import org.apache.kafka.clients.consumer.*;import java.time.Duration;
import java.util.Collections;
import java.util.Properties;public class KafkaConsumerExample {public static void main(String[] args) {Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("group.id", "example_group");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");// 创建消费者KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);// 订阅主题consumer.subscribe(Collections.singletonList("example_topic"));// 拉取消息while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record : records) {System.out.printf("Received message: offset = %d, key = %s, value = %s%n",record.offset(), record.key(), record.value());// 手动确认消息consumer.commitSync();}}}
}

在上述代码中,生产者的acks属性设置为all,表示等待所有副本确认。而消费者在处理完消息后,通过调用consumer.commitSync()手动确认消息。这确保了消息在被成功处理后才被标记为已处理。

请注意,Kafka的确认机制可能因版本而异,确保查阅相应版本的文档以获取准确的配置信息。

四. 事务机制

一些消息队列系统支持事务机制,允许生产者发送一组消息,并且只有在这组消息都成功写入队列后才被提交。如果有任何一个消息写入失败,整个事务会被回滚,从而确保消息的一致性。

事务机制是确保消息队列中一组消息要么全部成功处理,要么全部回滚的重要机制。在这里,我们以Apache Kafka为例进行演示,展示事务机制的实现。

4.1事务机制原理:

Kafka的事务机制主要涉及Producer API的事务支持。生产者可以在一组消息的发送过程中开启事务,然后要么全部提交(所有消息发送成功),要么全部回滚(任何一个消息发送失败)。

4.2使用示例:

1. 生产者代码:

import org.apache.kafka.clients.producer.*;import java.util.Properties;public class KafkaTransactionalProducerExample {public static void main(String[] args) {Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("acks", "all");  // 设置为all表示等待所有副本确认props.put("enable.idempotence", "true");  // 开启幂等性props.put("transactional.id", "my-transactional-id");  // 设置事务ID// 创建生产者KafkaProducer<String, String> producer = new KafkaProducer<>(props);// 开启事务producer.initTransactions();try {producer.beginTransaction();// 发送消息ProducerRecord<String, String> record1 = new ProducerRecord<>("example_topic", "Message 1");ProducerRecord<String, String> record2 = new ProducerRecord<>("example_topic", "Message 2");producer.send(record1);producer.send(record2);// 提交事务producer.commitTransaction();} catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {// 处理异常,中止事务producer.close();} catch (KafkaException e) {// 处理其他Kafka异常,回滚事务producer.abortTransaction();}producer.close();}
}

在上述代码中,通过设置enable.idempotencetrue和配置transactional.id为唯一的事务ID,生产者开启了事务。然后,通过beginTransactioncommitTransactionabortTransaction来控制事务的提交和回滚。

请注意,生产者中使用了enable.idempotence开启幂等性,这对于确保消息不会被重复发送也是非常重要的。同时,确保事务ID是唯一的,以避免与其他事务冲突。

2. 消费者代码:

消费者的代码相对简单,与普通的消费者代码基本相同。消费者不直接参与生产者的事务,而是通过消费消息来处理相关业务逻辑。

import org.apache.kafka.clients.consumer.*;import java.time.Duration;
import java.util.Collections;
import java.util.Properties;public class KafkaConsumerExample {public static void main(String[] args) {Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("group.id", "example_group");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");// 创建消费者KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);// 订阅主题consumer.subscribe(Collections.singletonList("example_topic"));// 拉取消息while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record : records) {System.out.printf("Received message: offset = %d, key = %s, value = %s%n",record.offset(), record.key(), record.value());}}}
}

在实际应用中,消费者的业务逻辑可能会与生产者的事务有关,例如在接收到特定消息时触发某些操作。在这种情况下,需要谨慎处理事务间的协调。

五. 数据备份与复制

数据备份与复制是确保消息队列系统可靠性和容错性的关键机制之一。在这里,我们以Apache Kafka为例进行演示,展示数据备份与复制的实现。

5.1数据备份与复制原理

Kafka通过数据备份与复制来防止因节点故障或灾难性事件导致的数据丢失。每个分区的数据会被复制到多个副本,这些副本分布在不同的节点上。这样即使一个节点发生故障,仍然可以从其他节点的副本中恢复数据。

5.2使用示例:

1. Kafka Broker配置:

在Kafka的server.properties配置文件中,可以配置副本的数量和复制策略。

# server.properties# 设置每个分区的副本数量
default.replication.factor=3# 设置副本的分布策略,可以选择不同的策略
# 可选值为: "rack-aware", "broker-aware", "0-1" (default)
# 具体策略的选择根据实际需求和环境
replica.selector.class=org.apache.kafka.common.replica.RackAwareReplicaSelector

2. 生产者代码

import org.apache.kafka.clients.producer.*;import java.util.Properties;public class KafkaProducerExample {public static void main(String[] args) {Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");// 创建生产者KafkaProducer<String, String> producer = new KafkaProducer<>(props);// 发送消息ProducerRecord<String, String> record = new ProducerRecord<>("example_topic", "Hello, Kafka!");producer.send(record, new Callback() {@Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) {if (exception == null) {System.out.println("Message sent successfully. Offset: " + metadata.offset());} else {exception.printStackTrace();}}});producer.close();}
}

3. 消费者代码

import org.apache.kafka.clients.consumer.*;import java.time.Duration;
import java.util.Collections;
import java.util.Properties;public class KafkaConsumerExample {public static void main(String[] args) {Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("group.id", "example_group");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");// 创建消费者KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);// 订阅主题consumer.subscribe(Collections.singletonList("example_topic"));// 拉取消息while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record : records) {System.out.printf("Received message: offset = %d, key = %s, value = %s%n",record.offset(), record.key(), record.value());}}}
}

在上述代码中,通过设置default.replication.factor来指定每个分区的副本数量,这里设置为3。副本的分布策略由replica.selector.class指定,这里选择了RackAwareReplicaSelector,可根据实际需求选择其他策略。

请注意,这里的代码示例主要是演示Kafka的配置和使用,实际上,Kafka会自动处理数据的备份和复制,你无需手动编写代码来执行这些操作。

六. 消息过期机制

消息过期机制是一种保证消息不会永远存在于消息队列中的重要机制。在消息队列系统中,可以设置消息的过期时间,一旦消息过期,系统会自动将其删除或标记为无效。消息过期机制有助于确保系统中的消息不会占用过多的资源并且能够及时清理不再需要的消息。

在Apache Kafka中,消息的过期机制并不是直接支持的特性,而是通过消费者在处理消息时判断消息的时间戳或其他属性来实现的。以下是一个简单的示例,展示了如何在消费者端处理消息的过期逻辑。

import org.apache.kafka.clients.consumer.*;import java.time.Duration;
import java.util.Collections;
import java.util.Properties;public class KafkaConsumerWithExpirationExample {public static void main(String[] args) {Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("group.id", "example_group");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");// 创建消费者KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);// 订阅主题consumer.subscribe(Collections.singletonList("example_topic"));// 拉取消息while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record : records) {// 判断消息是否过期(假设消息中包含时间戳字段)long timestamp = Long.parseLong(record.value());long currentTimestamp = System.currentTimeMillis();// 设置消息过期时间为10分钟long expirationTime = 10 * 60 * 1000;if (currentTimestamp - timestamp < expirationTime) {// 处理消息System.out.printf("Received message: offset = %d, key = %s, value = %s%n",record.offset(), record.key(), record.value());} else {// 消息过期,可以进行相应的处理,例如记录日志或丢弃消息System.out.printf("Expired message: offset = %d, key = %s, value = %s%n",record.offset(), record.key(), record.value());}}}}
}

在上述代码中,假设消息中包含一个时间戳字段,消费者在处理消息时通过比较时间戳判断消息是否过期。如果消息过期,可以根据实际需求进行相应的处理,例如记录日志或丢弃消息。

请注意,这只是一个简单的示例,实际上,消息的过期机制可能需要根据具体的业务逻辑和消息队列系统的特性进行更复杂的处理。

总结

综上所述,消息队列通过持久化存储、消息确认机制、事务机制、数据备份与复制以及消息过期机制等手段,保证了消息在传递过程中不丢失。在设计分布式系统时,合理选择并配置这些机制可以有效地提高消息队列的可靠性和稳定性。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/505884.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【tableau学习笔记】tableau无法连接数据源

【tableau学习笔记】tableau无法连接数据源 背景&#xff1a; 学校讲到Tableau&#xff0c;兴奋下载Kaggle Excel&#xff0c;一看后缀CSV&#xff0c;导入Tableau发现报错“tableau无法连接数据源”&#xff0c;自作聪明改为后缀XLSX&#xff0c;bug依旧。 省流&#xff1a…

.idea文件详解

.idea文件的作用&#xff1a; .idea文件夹是存储IntelliJ IDEA项目的配置信息&#xff0c;主要内容有IntelliJ IDEA项目本身的一些编译配置、文件编码信息、jar包的数据源和相关的插件配置信息。一般用git做版本控制的时候会把.idea文件夹排除&#xff0c;因为这个文件下保存的…

win11 更多网络适配器选项

win11更多网络适配器选项查找路径&#xff1a;控制面板→网络和共享中心→更改适配器设置

[技巧]Arcgis之图斑四至点批量计算

前言 上一篇介绍了arcgis之图斑四至范围计算&#xff0c;这里介绍的图斑四至点的计算及获取&#xff0c;两者之间还是有差异的。 [技巧]Arcgis之图斑四至范围计算 这里说的四至点指的是图斑最东、最西、最南、最北的四个地理位置点坐标&#xff0c;如下图&#xff1a; 四至点…

利用小蜜蜂AI智能问答ChatGPT+AI高清绘图生成图文故事案例

利用小蜜蜂AI智能问答ChatGPTAI高清绘图生成图文故事案例 这段时间利用小蜜蜂AI网站做了一些编程、绘图以及数据分析方面的案例。再过几个月&#xff0c;我的大孙子就要出生了。我要用小蜜蜂AI智能问答和AI高清绘图为大孙子生成一个1-9的数字图文故事。 小蜜蜂AI网站可以扫如…

Linux内存管理机制和虚拟内存技术

Linux内存管理机制和虚拟内存技术 1. 物理内存1.1 物理内存架构1.1.1 UMA内存架构1.1.2 NUMA内存架构 1.2 分页机制1.3 物理内存模型1.3.1 FLATMEM 平坦内存模型1.3.2 DISCONTIGMEM 非连续内存模型1.3.3 SPARSEMEM 稀疏内存模型 2. 虚拟内存2.1 分页机制2.2 多级页表2.3 内核态…

Image Fusion via Vision-Language Model【文献阅读】

阅读目录 文献阅读AbstractIntroduction3. Method3.1. Problem Overview3.2. Fusion via Vision-Language Model 4. Vision-Language Fusion Datasets5. Experiment5.1Infrared and Visible Image Fusion 6. Conclusion个人总结 文献阅读 原文下载&#xff1a;https://arxiv.or…

Qt开发 按钮类控件

Qt开发 按钮类控件 Push Button 使用 QPushButton 表示一个按钮。 QPushButton 继承自 QAbstractButton 。这个类是一个抽象类&#xff0c;是其他按钮的父类。 在 Qt Designer 中也能够看到这里的继承关系 QAbstractButton 中&#xff0c;和 QPushButton 相关性较大的属性 …

cetos7 Docker 安装 gitlab

一、gitlab 简单介绍和安装要求 官方文档&#xff1a;https://docs.gitlab.cn/jh/install/docker.html 1.1、gitlab 介绍 gitLab 是一个用于代码仓库管理系统的开源项目&#xff0c;使用git作为代码管理工具&#xff0c;并在此基础上搭建起来的Web服务平台&#xff0c;通过该平…

从0开始预训练1.4b中文大模型实践

作者&#xff1a;Lil2J知乎&#xff08;已授权&#xff09; 链接&#xff1a;https://zhuanlan.zhihu.com/p/684946331 简介 这篇文章主要记录了我个人对1.4b中文大模型的实践复现过程。我选择了QWEN作为基座模型&#xff0c;并训练了一个参数量达到1.4b的预训练模型&#xff0…

C# Socket通信从入门到精通(21)——TCP发送文件与接收文件 C#代码实现

1、前言 我们在开发上位机软件的过程中经常需要发送文件,本文就是介绍如何利用tcp客户端发送文件、tcp服务器端接收文件,也就是所谓的文件传输,而且本文介绍的方法具备以下特点: 1)可配置发送的文件夹和接收的文件夹路径: 2)可自动发送指定文件夹下的所有子目录和文件;…

备战蓝桥杯---动态规划之悬线法

Em...属于一知道就会&#xff0c;不知道的话比较难想。 我们先看题&#xff1a; 我们不妨把1抽象成一个平面上的点&#xff0c;因此可以变成这一幅图&#xff1a; 我们假设每一个点被向上牵拉了一根线&#xff1a; 显然&#xff0c;每一条悬线都有可能成为边界限制&#xff0c…