RocketMQ事务消息实现分布式事务

文章目录

    • 简介
    • 实现原理
    • 实现逻辑

简介

RocketMQ事务消息
RocketMQ在4.3.0版中支持分布式事务消息,这里RocketMQ的事务消息是采用2PC(两段式协议) +补偿机制(消息回查)的分布式事务功能。提供消息发送与业务落库的一致性。
RocketMQ事务消息(Transactional Message)是指应用本地事务和发送消息操作可以被定义到全局事务中,要么同时成功,要么同时失败。RocketMQ的事务消息提供类似 X/Open XA 的分布式事务功能,通过事务消息能达到分布式事务的最终一致。

RocketMQ的事务消息实现方式主要包括以下几个步骤:
1 生产者发送half消息到Broker。half消息在消费者看来是不可见的,这样可以避免消费者消费到事务未提交的数据,类似于数据库的隔离级别读已提交级别,避免脏读。
2.生产者创建订单,根据创建订单成功与否,向Broker发送commit或rollback。
3.生产者还可以提供Broker回调接口,当Broker发现一段时间half消息没有收到任何操作命令,会主动调用此接口来查询订单是否创建成功。
4.一旦half消息commit了,消费者库存系统就会来消费,如果消费成功,则消息销毁,分布式事务成功结束。
5.如果消费失败,则根据重试策略进行重试,最后还失败则进入死信队列,等待进一步处理。

在这里插入图片描述
在这里插入图片描述

实现原理

RocketMQ的事务消息实现原理基于两阶段提交协议(Two-Phase Commit Protocol),具体流程如下:
1.发送准备消息:当一个事务消息需要发送时,生产者会发送一个准备消息,该消息包含了实际的业务数据和事务的标识符。
2.执行本地事务:接收到准备消息后,生产者会执行一个本地事务,如果本地事务执行成功,则返回COMMIT状态,否则返回ROLLBACK状态。
3.提交或回滚本地事务:当生产者返回COMMIT状态时,代表本地事务已经执行成功,此时,RocketMQ会将消息标记为可提交状态,并发送一个commit消息给消费者;当生产者返回ROLLBACK状态时,代表本地事务执行失败,此时,RocketMQ会将消息标记为可回滚状态,并发送一个rollback消息给消费者。
4. 消费者处理消息:消费者在接收到commit或rollback消息后,会根据消息的状态来执行相应的操作。
5.提交或回滚事务:如果所有的消费者都接收到了commit消息,则代表该事务消息已经提交,此时生产者会提交本地事务,否则,如果有任何一个消费者接收到了rollback消息,则代表该事务消息已经回滚,生产者会回滚本地事务。
事务消息的实现原理基于两阶段提交协议,这是一种经典的分布式事务协议,可以保证数据的一致性和可靠性。RocketMQ通过实现TransactionListener接口来支持事务消息,同时也提供了许多配置选项和工具类来方便用户进行使用和扩展。

实现逻辑

RocketMQ的事务消息通过TransactionListener接口来实现。下面是事务消息的基本实现步骤:
1.实现事务监听器:首先,你需要实现RocketMQ提供的TransactionListener接口,该接口包括两个方法:executeLocalTransaction和checkLocalTransaction。
○ executeLocalTransaction方法用于执行本地事务,当发送事务消息时,RocketMQ会调用此方法来执行本地事务。在该方法内部,你需要执行实际的业务逻辑,并根据执行结果返回事务状态,可以是提交、回滚或是未知状态。
○ checkLocalTransaction方法用于检查本地事务状态,当RocketMQ没有收到事务消息的确认或者取消时,会调用此方法来检查本地事务的状态,然后对消息进行处理。
2.发送事务消息:在发送事务消息时,你需要指定事务监听器,并在executeLocalTransaction方法中执行实际的业务逻辑,然后根据业务逻辑的执行结果返回事务状态。
3.事务状态检查:RocketMQ会定期调用checkLocalTransaction方法来检查本地事务的状态,然后对消息进行处理,例如提交或者回滚。
4.事务消息处理:在消息消费端,你需要根据消息的实际状态来执行相应的处理逻辑。
下面是一个简单的示例代码,演示了如何使用RocketMQ的事务消息:

public class TransactionProducer {public static void main(String[] args) throws MQClientException {DefaultMQProducer producer = new DefaultMQProducer("transaction_producer_group");producer.setNamesrvAddr("localhost:9876");// 设置事务监听器TransactionListener transactionListener = new MyTransactionListener();producer.setTransactionListener(transactionListener);// 启动生产者producer.start();try {// 创建事务消息Message msg = new Message("TopicTest", "TagA", "KEY1", "Hello, RocketMQ".getBytes());// 发送事务消息SendResult sendResult = producer.sendMessageInTransaction(msg, null);System.out.printf("%s%n", sendResult);} catch (MQClientException | UnsupportedEncodingException e) {e.printStackTrace();}}
}class MyTransactionListener implements TransactionListener {@Overridepublic LocalTransactionState executeLocalTransaction(Message msg, Object arg) {// 执行本地事务,例如数据库操作等// 返回本地事务的状态,可以是COMMIT、ROLLBACK或UNKNOW}@Overridepublic LocalTransactionState checkLocalTransaction(MessageExt msg) {// 检查本地事务状态,返回本地事务的状态,例如COMMIT、ROLLBACK或UNKNOW}
}

上述代码展示了如何创建一个事务消息生产者,并实现一个简单的事务监听器。在实际应用中,你需要根据业务需求和具体场景来实现更复杂的事务逻辑。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/295764.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

windows下使用vccode+cmake编译cuda程序

1、在vscode中安装Nsight Visual Studio Code Edition 在vscode中安装插件能够对cuda的代码进行语法检查 2、编写cuda程序 #include <iostream>__global__ void mykernelfunc(){}; int main() {mykernelfunc<<<1,1>>>();std::cout << "hel…

IDEA常用快捷键一

一、文本编辑 1、Ctrl X &#xff1a;剪切 剪切选中的文本&#xff0c;若是没有选中&#xff0c;则剪切当前行。 2、CtrlC&#xff1a;复制 复制选中文本&#xff0c;若未选中则复制当前行。 3、CtrlV&#xff1a;粘贴 4、Ctrl Shift V: 从历史中选择粘贴 从历史剪…

视频监控管理平台/智能监测/检测系统EasyCVR智能地铁监控方案,助力地铁高效运营

近日&#xff0c;关于全国44座城市开通地铁&#xff0c;却只有5座城市赚钱的新闻冲上热搜。地铁作为城市交通的重要枢纽&#xff0c;是人们出行必不可少的一种方式&#xff0c;但随着此篇新闻的爆出&#xff0c;大家也逐渐了解到城市运营的不易&#xff0c;那么&#xff0c;如何…

PMP项目管理 - 成本管理

系列文章目录 系统架构设计 PMP项目管理 - 整合管理 PMP项目管理 - 范围管理 PMP项目管理 - 质量管理 PMP项目管理 - 采购管理 PMP项目管理 - 资源管理 PMP项目管理 - 风险管理 PMP项目管理 - 沟通管理 现在的一切都是为将来的梦想编织翅膀&#xff0c;让梦想在现实中展翅高飞…

Python开发GUI常用库PyQt6和PySide6介绍之二:设计师(Designer)

Python开发GUI常用库PyQt6和PySide6介绍之二&#xff1a;设计师&#xff08;Designer&#xff09; PySide6和PyQt6都有自己的设计师&#xff08;Designer&#xff09;&#xff0c;用于可视化地设计和布局GUI应用程序的界面。这些设计师提供了丰富的工具和功能&#xff0c;使开…

【图像分类】【深度学习】【轻量级网络】【Pytorch版本】ShuffleNet_V1模型算法详解

【图像分类】【深度学习】【轻量级网络】【Pytorch版本】ShuffleNet_V1模型算法详解 文章目录 【图像分类】【深度学习】【轻量级网络】【Pytorch版本】ShuffleNet_V1模型算法详解前言ShuffleNet_V1讲解group convolution(分组卷积)Channel Shuffle(通道混洗)ShuffleNet Uint(S…

简述用C++实现SIP协议栈

SIP&#xff08;Session Initiation Protocol&#xff0c;会话初始协议&#xff09;是一个基于文本的应用层协议&#xff0c;用于创建、修改和终止多媒体会话&#xff08;如语音、视频、聊天、游戏等&#xff09;中的通信。SIP协议栈是实现SIP协议的一组软件模块&#xff0c;它…

【Vulnhub 靶场】【Corrosion: 1】【简单】【20210731】

1、环境介绍 靶场介绍&#xff1a;https://www.vulnhub.com/entry/corrosion-1,730/ 靶场下载&#xff1a;https://download.vulnhub.com/corrosion/Corrosion.ova 靶场难度&#xff1a;简单 发布日期&#xff1a;2021年07月31日 文件大小&#xff1a;7.8 GB 靶场作者&#xf…

Java--抽象工厂设计模式

抽象工厂设计模式 抽象工厂模式&#xff08;Abstract Factory Pattern&#xff09;是围绕一个超级工厂创建其他工厂。该超级工厂又称为其他工厂的工厂。这种类型的设计模式属于创建型模式&#xff0c;它提供了一种创建对象的最佳方式。 在抽象工厂模式中&#xff0c;接口是负责…

化繁为简,Python快速入门,从基础到实践的学习。

文章目录 前言一、安装与运行命令行运行 python 文件 二、变量和简单数据类型2.1 变量命名规则2.2 字符串2.2.1 字符串的简单运算title()upper()、lower() 2.2.2 合并&#xff08;拼接&#xff09;字符串2.2.3 使用制表符或换行符来添加空白2.2.4 删除空白2.2.5 Python 2 中的 …

【Java异常】聊聊异常可能带来的坑

一个活生生的案例 本周帮同事排查了一个问题&#xff0c;比较诡异的是他通过测试&#xff0c;并没有找到根本原因&#xff0c;只是发现有对应的错误日志。 但是其实并没有将堆栈信息打印出来。很难看出问题。添加了 e.printStackTrace(); get exception in exter: / by zero显…

UGC编辑器开发-代码实现物体旋转操作轴

1.视频效果&#xff1a; 工程百度网盘链接&#xff1a; 链接&#xff1a;https://pan.baidu.com/s/1OYkt2T3Wv_Hh0Bt7nLyR-A 提取码&#xff1a;1212 2.设计思路&#xff1a; 我们从鼠标点击的屏幕坐标打出一根射线&#xff0c;求出射线和旋转面的交点&#xff0c;交点减去原…