使用rabbitmq进行支付之后的消息通知

订单服务完成支付后将支付结果发给每一个与订单服务对接的微服务,订单服务将消息发给交换机,由交换机广播消息,每个订阅消息的微服务都可以接收到支付结果.

微服务收到支付结果根据订单的类型去更新自己的业务数据。

相关技术方案

使用消息队列进行异步通知需要保证消息的可靠性,即生产端将消息成功通知到消费端。

消息从生产端发送到消费端经历了如下过程:

1、消息发送到交换机

2、消息由交换机发送到队列

3、消息者收到消息进行处理

保证消息的可靠性需要保证以上过程的可靠性,本项目使用RabbitMQ可以通过如下方面保证消息的可靠性。

1、生产者确认机制

发送消息前使用数据库事务将消息保证到数据库表中

成功发送到交换机将消息从数据库中删除

2、mq持久化

mq收到消息进行持久化,当mq重启即使消息没有消费完也不会丢失。

需要配置交换机持久化、队列持久化、发送消息时设置持久化。

3、消费者确认机制

消费者消费成功自动发送ack,否则重试消费。

订单服务通过消息队列将支付结果发给学习中心服务,消息队列采用发布订阅模式。

1、订单服务创建支付结果通知交换机。

2、学习中心服务绑定队列到交换机。

首先需要在学习中心服务和订单服务工程配置连接消息队列。

1、首先在订单服务添加消息队列依赖

XML

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2、在nacos配置rabbitmq-dev.yaml为通用配置文件

YAML
spring:
  rabbitmq:
    host: 192.168.101.65
    port: 5672
    username: guest
    password: guest
    virtual-host: /
    publisher-confirm-type: correlated #correlated 异步回调,定义ConfirmCallback,MQ返回结果时会回调这个ConfirmCallback
    publisher-returns: false #开启publish-return功能,同样是基于callback机制,需要定义ReturnCallback
    template:
      mandatory: false #定义消息路由失败时的策略。true,则调用ReturnCallback;false:则直接丢弃消息
    listener:
      simple:
        acknowledge-mode: none #出现异常时返回unack,消息回滚到mq;没有异常,返回ack ,manual:手动控制,none:丢弃消息,不回滚到mq
        retry:
          enabled: true #开启消费者失败重试
          initial-interval: 1000ms #初识的失败等待时长为1秒
          multiplier: 1 #失败的等待时长倍数,下次等待时长 = multiplier * last-interval
          max-attempts: 3 #最大重试次数
          stateless: true #true无状态;false有状态。如果业务中包含事务,这里改为false

3、在订单服务接口工程引入rabbitmq-dev.yaml配置文件

YAML
shared-configs:
  - data-id: rabbitmq-${spring.profiles.active}.yaml
    group: xuecheng-plus-common
    refresh: true

4、在订单服务service工程编写MQ配置类,配置交换机

Java
package com.xuecheng.orders.config;

import com.alibaba.fastjson.JSON;
import com.xuecheng.messagesdk.model.po.MqMessage;
import com.xuecheng.messagesdk.service.MqMessageService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @author Mr.M
 * @version 1.0
 * @description TODO
 * @date 2023/2/23 16:59
 */
@Slf4j
@Configuration
public class PayNotifyConfig implements ApplicationContextAware {

    //交换机
    public static final String PAYNOTIFY_EXCHANGE_FANOUT = "paynotify_exchange_fanout";
    //支付结果通知消息类型
    public static final String MESSAGE_TYPE = "payresult_notify";
    //支付通知队列
    public static final String PAYNOTIFY_QUEUE = "paynotify_queue";

    //声明交换机,且持久化
    @Bean(PAYNOTIFY_EXCHANGE_FANOUT)
    public FanoutExchange paynotify_exchange_fanout() {
        // 三个参数:交换机名称、是否持久化、当没有queue与其绑定时是否自动删除
        return new FanoutExchange(PAYNOTIFY_EXCHANGE_FANOUT, true, false);
    }
    //支付通知队列,且持久化
    @Bean(PAYNOTIFY_QUEUE)
    public Queue course_publish_queue() {
        return QueueBuilder.durable(PAYNOTIFY_QUEUE).build();
    }

    //交换机和支付通知队列绑定
    @Bean
    public Binding binding_course_publish_queue(@Qualifier(PAYNOTIFY_QUEUE) Queue queue, @Qualifier(PAYNOTIFY_EXCHANGE_FANOUT) FanoutExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange);
    }

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        // 获取RabbitTemplate
        RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
        //消息处理service
        MqMessageService mqMessageService = applicationContext.getBean(MqMessageService.class);
        // 设置ReturnCallback
        rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
            // 投递失败,记录日志
            log.info("消息发送失败,应答码{},原因{},交换机{},路由键{},消息{}",
                    replyCode, replyText, exchange, routingKey, message.toString());
            MqMessage mqMessage = JSON.parseObject(message.toString(), MqMessage.class);
            //将消息再添加到消息表
            mqMessageService.addMessage(mqMessage.getMessageType(),mqMessage.getBusinessKey1(),mqMessage.getBusinessKey2(),mqMessage.getBusinessKey3());

        });
    }
}

重启订单服务,登录rabbitmq,查看交换机自动创建成功

查看队列自动成功

4.3.2 发送支付结果

在OrderService中定义接口

Java
/**
 * 发送通知结果
 * @param message
 */
public void notifyPayResult(MqMessage message);

编写接口实现方法:

Java
@Override
public void notifyPayResult(MqMessage message) {

    //1、消息体,转json
    String msg = JSON.toJSONString(message);
    //设置消息持久化
    Message msgObj = MessageBuilder.withBody(msg.getBytes(StandardCharsets.UTF_8))
            .setDeliveryMode(MessageDeliveryMode.PERSISTENT)
            .build();
    // 2.全局唯一的消息ID,需要封装到CorrelationData中
    CorrelationData correlationData = new CorrelationData(message.getId().toString());
    // 3.添加callback
    correlationData.getFuture().addCallback(
            result -> {
                if(result.isAck()){
                    // 3.1.ack,消息成功
                    log.debug("通知支付结果消息发送成功, ID:{}", correlationData.getId());
                    //删除消息表中的记录
                    mqMessageService.completed(message.getId());
                }else{
                    // 3.2.nack,消息失败
                    log.error("通知支付结果消息发送失败, ID:{}, 原因{}",correlationData.getId(), result.getReason());
                }
            },
            ex -> log.error("消息发送异常, ID:{}, 原因{}",correlationData.getId(),ex.getMessage())
    );
    // 发送消息
    rabbitTemplate.convertAndSend(PayNotifyConfig.PAYNOTIFY_EXCHANGE_FANOUT, "", msgObj,correlationData);

}

订单服务收到第三方平台的支付结果时,在saveAliPayStatus方法中添加代码,向数据库消息表添加消息并进行发送消息,如下所示:

Java
@Transactional
@Override
public void saveAliPayStatus(PayStatusDto payStatusDto) {
        .......
        //保存消息记录,参数1:支付结果通知类型,2: 业务id,3:业务类型
        MqMessage mqMessage = mqMessageService.addMessage("payresult_notify", orders.getOutBusinessId(), orders.getOrderType(), null);
        //通知消息
        notifyPayResult(mqMessage);
    }
}

配置交换机和队列

在order-service工程配置

消息发送方法

Java
/**
 * 发送通知结果
 * @param message
 */
public void notifyPayResult(MqMessage message);
 

4.4 接收支付结果

4.4.1 学习中心服务集成MQ

1、在学习中心服务添加消息队列依赖

XML
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2、在学习中心服务接口工程引入rabbitmq-dev.yaml配置文件

YAML
shared-configs:
  - data-id: rabbitmq-${spring.profiles.active}.yaml
    group: xuecheng-plus-common
    refresh: true

3、添加配置类

Java
package com.xuecheng.learning.config;

import com.alibaba.fastjson.JSON;
import com.xuecheng.messagesdk.model.po.MqMessage;
import com.xuecheng.messagesdk.service.MqMessageService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @author Mr.M
 * @version 1.0
 * @description TODO
 * @date 2023/2/23 16:59
 */
@Slf4j
@Configuration
public class PayNotifyConfig {

    //交换机
    public static final String PAYNOTIFY_EXCHANGE_FANOUT = "paynotify_exchange_fanout";
    //支付结果通知消息类型
    public static final String MESSAGE_TYPE = "payresult_notify";
    //支付通知队列
    public static final String PAYNOTIFY_QUEUE = "paynotify_queue";

    //声明交换机,且持久化
    @Bean(PAYNOTIFY_EXCHANGE_FANOUT)
    public FanoutExchange paynotify_exchange_fanout() {
        // 三个参数:交换机名称、是否持久化、当没有queue与其绑定时是否自动删除
        return new FanoutExchange(PAYNOTIFY_EXCHANGE_FANOUT, true, false);
    }
    //支付通知队列,且持久化
    @Bean(PAYNOTIFY_QUEUE)
    public Queue course_publish_queue() {
        return QueueBuilder.durable(PAYNOTIFY_QUEUE).build();
    }

    //交换机和支付通知队列绑定
    @Bean
    public Binding binding_course_publish_queue(@Qualifier(PAYNOTIFY_QUEUE) Queue queue, @Qualifier(PAYNOTIFY_EXCHANGE_FANOUT) FanoutExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange);
    }

}

4.4.2 接收支付结果

监听MQ,接收支付结果,定义ReceivePayNotifyService类如下:

Java
package com.xuecheng.learning.service.impl;

import com.alibaba.fastjson.JSON;
import com.rabbitmq.client.Channel;
import com.xuecheng.base.exception.XueChengPlusException;
import com.xuecheng.learning.config.PayNotifyConfig;
import com.xuecheng.learning.service.MyCourseTablesService;
import com.xuecheng.messagesdk.model.po.MqMessage;
import com.xuecheng.messagesdk.service.MqMessageService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.io.IOException;

/**
 * @author Mr.M
 * @version 1.0
 * @description 接收支付结果
 * @date 2023/2/23 19:04
 */
@Slf4j
@Service
public class ReceivePayNotifyService {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Autowired
    MqMessageService mqMessageService;

    @Autowired
    MyCourseTablesService myCourseTablesService;


    //监听消息队列接收支付结果通知
    @RabbitListener(queues = PayNotifyConfig.PAYNOTIFY_QUEUE)
    public void receive(Message message, Channel channel) {
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        //获取消息
        MqMessage mqMessage = JSON.parseObject(message.getBody(), MqMessage.class);
        log.debug("学习中心服务接收支付结果:{}", mqMessage);

        //消息类型
        String messageType = mqMessage.getMessageType();
        //订单类型,60201表示购买课程
        String businessKey2 = mqMessage.getBusinessKey2();
        //这里只处理支付结果通知
        if (PayNotifyConfig.MESSAGE_TYPE.equals(messageType) && "60201".equals(businessKey2)) {
            //选课记录id
            String choosecourseId = mqMessage.getBusinessKey1();
            //添加选课
            boolean b = myCourseTablesService.saveChooseCourseStauts(choosecourseId);
            if(!b){
                //添加选课失败,抛出异常,消息重回队列
                XueChengPlusException.cast("收到支付结果,添加选课失败");
            }
        }


    }


}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/103761.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

数学建模B多波束测线问题B

数学建模多波束测线问题 1.问题重述&#xff1a; 单波束测深是一种利用声波在水中传播的技术来测量水深的方法。它通过测量从船上发送声波到声波返回所用的时间来计算水深。然而&#xff0c;由于它是在单一点上连续测量的&#xff0c;因此数据在航迹上非常密集&#xff0c;但…

图解 LeetCode 算法汇总——链表

本文首发公众号&#xff1a;小码A梦 一般数据主要存储的形式主要有两种&#xff0c;一种是数组&#xff0c;一种是链表。数组是用来存储固定大小的同类型元素&#xff0c;存储在内存中是一片连续的空间。而链表就不同于数组。链表中的元素不是存储在内存中可以是不连续的空间。…

WPS或EXCEL表格单元格下拉快捷选择项修改及设置方法

WPS或新版本EXCEL的设置下拉选项的方法是.点击一个单元格,菜单上选择数据,下拉列表即可设置,双击文字可编辑 EXCEL 旧的版本不同,可能有不同方法 方法一, 1.在空白区域里面&#xff0c;准备好需要填入下拉菜单里面的内容。 2.选中一个需要添加下拉菜单的单元格&#xff0c;然后…

MicroStation中将二维对象投射到三维实体

在三维建模中&#xff0c;偶而会遇到需要将一个2D对象沿Z轴或指定方向投射到一个3D实体&#xff0c;在3D实体表面生成这个2D对象的投影对象。 需要使用的操作命令为&#xff1a; Stencil 2D Elements on 3D Geometry&#xff0c;位于可视化工作流、实用工具栏内。 操作时先后选…

ChatGPT:深度学习和机器学习的知识桥梁

目录 ChatGPT简介 ChatGPT的特点 ChatGPT的应用领域 ChatGPT的工作原理 与ChatGPT的交互 ChatGPT的优势 ChatGPT在机器学习中的应用 ChatGPT在深度学习中的应用 总结 近年来&#xff0c;随着深度学习技术的不断发展&#xff0c;自然语言处理技术也取得了显著的进步。其…

More Effective C++学习笔记(5)

目录 条款25&#xff1a;将构造函数和非成员函数虚化条款26&#xff1a;限制某个类所能产生的对象数量条款27&#xff1a;要求&#xff08;或禁止&#xff09;对象产生于heap&#xff08;堆&#xff09;之中条款28&#xff1a;智能指针条款29&#xff1a;引用计数条款30&#x…

WIFI版本云音响设置教程阿里云平台版本

文章目录 WIFI本云音响设置教程介绍一、申请设备三元素1.登录阿里云物联网平台2.创建产品3.设置产品参数4.添加设备5.获取三元素 二、设置设备三元素1.打开MQTTConfigTools2.计算MQTT参数3.使用windows电脑的WIFI连接到设备热点4.设置参数5.配置设备连接路由器 三、阿里云物联网…

【校招VIP】测试算法考点之链表

考点介绍&#xff1a; 链表是一种逻辑简单的、实用的数据结构&#xff0c;几乎被所有程序设计语言支持。单链表的操作算法是笔试面试中较为常见的题目。 测试算法考点之链表-相关题目及解析内容可点击文章末尾链接查看&#xff01; 一、考点题目 1.一个长度为n的单向链表&am…

【创新项目探索】大数据服务omnidata-hive-connector介绍

omnidata-hive-connector介绍 omnidata-hive-connector是一种将大数据组件Hive的算子下推到存储节点上的服务&#xff0c;从而实现近数据计算&#xff0c;减少网络带宽&#xff0c;提升Hive的查询性能。目前支持Hive on Tez。omnidata-hive-connector已在openEuler社区开源。 …

哈希的应用——位图

文章目录 前言1. 面试题思考2. 位图2.1 位图的概念2.2 思路讲解及代码实现结构定义构造函数set和reset接口实现set和reset测试观察test接口实现test接口测试思考 3. 位图的应用&#xff08;海量数据处理面试题&#xff09;习题1习题2习题3 4. 总结5. 源码5.1 bitset.h5.2 Test.…

2023高教社杯数学建模国赛C题思路解析+代码+论文

如下为C君的2023高教社杯全国大学生数学建模竞赛C题思路分析代码论文 C题蔬菜类商品的自动定价与补货决策 在生鲜商超中&#xff0c;一般蔬菜类商品的保鲜期都比较短&#xff0c;且品相随销售时间的增加而变差, 大部分品种如当日未售出&#xff0c;隔日就无法再售。因此&…

MyBatis的逆向工程

文章目录 前言MyBatis的逆向工程创建逆向工程的步骤添加依赖和插件创建MyBatis的核心配置文件创建逆向工程的配置文件执行MBG插件的generate目标 QBC查询增改 总结 前言 MyBatis的逆向工程 正向工程&#xff1a;先创建Java实体类&#xff0c;由框架负责根据实体类生成数据库表…