RabbitMQ快速上手(延迟队列)

安装

官网

参考文章:
​
https://blog.csdn.net/miaoye520/article/details/123207661
​
https://blog.csdn.net/lvoelife/article/details/126658695

安装Erlang,并添加环境变量ERLANG_HOME,命令行运行erl

安装rabbitmq,rabbitmq-server-3.12.0.exe

注意Erlang要选择对应的版本

安装RabbitMQ-Plugins插件,rabbitmq-plugins enable rabbitmq_management

访问 http://localhost:15672/

账号密码 guest,guest

使用

中文文档

参考文章:
https://blog.csdn.net/weixin_45698935/article/details/123481137
https://www.liwenzhou.com/posts/Go/rabbitmq-1/

Go实践:

go get github.com/streadway/amqp 

基本使用:

生产者

package main
​
import ("github.com/streadway/amqp""log"
)
​
type App struct {Name stringNum int
}
​
type Root struct {Apps []*App
}
​
func main() {// 1.尝试连接RabbitMQ,建立连接// 该连接抽象了套接字连接,并为我们处理协议版本协商和认证等。conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")if err != nil {log.Fatalf(err.Error())}defer conn.Close()
​// 2.创建一个通道, 大多数API都是该通道操作的ch, err := conn.Channel()defer ch.Close()
​// 3.声明消息要发送的队列q, err := ch.QueueDeclare("hello", false, false, false, false, nil)if err != nil {log.Fatalf(err.Error())}
​body := "hello world12"err = ch.Publish("", q.Name, false, false, amqp.Publishing{ContentType: "text/plain",Body: []byte(body),})if err != nil {log.Fatalf(err.Error())}
​return
}

消费者

package main
​
import ("github.com/streadway/amqp""log"
)
​
func main() {// 1.尝试连接RabbitMQ,建立连接// 该连接抽象了套接字连接,并为我们处理协议版本协商和认证等。conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")if err != nil {log.Fatalf(err.Error())}defer conn.Close()
​// 2.创建一个通道, 大多数API都是该通道操作的ch, err := conn.Channel()defer ch.Close()
​// 3.声明消息要发送的队列q, err := ch.QueueDeclare("hello", false, false, false, false, nil)if err != nil {log.Fatalf(err.Error())}
​// 获取接收消息的Delivery通道msgs, err := ch.Consume(q.Name, // queue"",     // consumertrue,   // auto-ackfalse,  // exclusivefalse,  // no-localfalse,  // no-waitnil,    // args)if err != nil {log.Println(err.Error(), "Failed to register a consumer")}
​forever := make(chan bool)
​go func() {for d := range msgs {log.Printf("Received a message: %s", d.Body)}}()
​log.Printf(" [*] Waiting for messages. To exit press CTRL+C")<-forever
}

延迟队列

不应该是队列,而应该是堆。将先过期的消息排在前面

需要安装插件:有插件的支持 Community Plugins — RabbitMQ rabbitmq_delayed_message_exchange

首先要引入一个概念:死信队列,当我们的发送的消息被接收端nck或reject,消息在队列的存活时间超过设定的 TTL,消息数量超过最大队列长度,这样的消息会被认为是死信(“dead letter”)通过配置的死信交换机这样的死信可以被投递到对应的死信队列中

发送者:

发送者的实现就很简单了,就和普通的发送实现几乎一致,因为反正就是投递到对应的队列中就可以了,只需要将发送消息的部分,在消息的 header 中加入 x-delay 字段表示当前消息的 TTL 就可以了,也就是设定延迟时间,注意单位为毫秒

package main
​
import ("log""os""strings"
​"github.com/streadway/amqp"
)
​
func main() {failOnError := func (err error, msg string) {if err != nil {log.Fatalf("%s: %s", msg, err)}}
​conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")failOnError(err, "Failed to connect to RabbitMQ")defer conn.Close()
​ch, err := conn.Channel()failOnError(err, "Failed to open a channel")defer ch.Close()
​body := bodyFrom(os.Args)// 将消息发送到延时队列上err = ch.Publish("",                 // exchange 这里为空则不选择 exchange"test_delay",       // routing keyfalse,              // mandatoryfalse,              // immediateamqp.Publishing{ContentType: "text/plain",Body:        []byte(body),Expiration: "10000",    // 设置五秒的过期时间})failOnError(err, "Failed to publish a message")
​log.Printf(" [x] Sent %s", body)
}
​
func bodyFrom(args []string) string {var s stringif (len(args) < 2) || os.Args[1] == "" {s = "hello3"} else {s = strings.Join(args[1:], " ")}return s
}

接收者

package main
​
import ("log"
​"github.com/streadway/amqp"
)
​
func main() {
​failOnError := func (err error, msg string) {if err != nil {log.Fatalf("%s: %s", msg, err)}}
​// 建立链接conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")failOnError(err, "Failed to connect to RabbitMQ")defer conn.Close()
​ch, err := conn.Channel()failOnError(err, "Failed to open a channel")defer ch.Close()
​// 声明一个主要使用的 exchangeerr = ch.ExchangeDeclare("logs",   // name"fanout", // typetrue,     // durablefalse,    // auto-deletedfalse,    // internalfalse,    // no-waitnil,      // arguments)failOnError(err, "Failed to declare an exchange")
​// 声明一个常规的队列, 其实这个也没必要声明,因为 exchange 会默认绑定一个队列q, err := ch.QueueDeclare("test_logs",    // namefalse, // durablefalse, // delete when unusedtrue,  // exclusivefalse, // no-waitnil,   // arguments)failOnError(err, "Failed to declare a queue")
​/*** 注意,这里是重点!!!!!* 声明一个延时队列, ß我们的延时消息就是要发送到这里*/_, errDelay := ch.QueueDeclare("test_delay",    // namefalse, // durablefalse, // delete when unusedtrue,  // exclusivefalse, // no-waitamqp.Table{// 当消息过期时把消息发送到 logs 这个 exchange"x-dead-letter-exchange":"logs",},   // arguments)failOnError(errDelay, "Failed to declare a delay_queue")
​err = ch.QueueBind(q.Name, // queue name, 这里指的是 test_logs"",     // routing key"logs", // exchangefalse,nil)failOnError(err, "Failed to bind a queue")
​// 这里监听的是 test_logsmsgs, err := ch.Consume(q.Name, // queue name, 这里指的是 test_logs"",     // consumertrue,   // auto-ackfalse,  // exclusivefalse,  // no-localfalse,  // no-waitnil,    // args)failOnError(err, "Failed to register a consumer")
​forever := make(chan bool)
​go func() {for d := range msgs {log.Printf(" [x] %s", d.Body)}}()
​log.Printf(" [*] Waiting for logs. To exit press CTRL+C")<-forever
}

参考:golang 使用 rabbitmq 延迟队列-腾讯云开发者社区-腾讯云

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/2053.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

时间序列预测的20个基本概念总结

1、时间序列 时间序列是一组按时间顺序排列的数据点 比如&#xff1a; 每小时的气压每年的医院急诊按分钟计算的股票价格 2、时间序列的组成部分 时间序列数据有三个主要组成部分。 趋势季节性残差或白噪声 3、趋势 在时间序列中记录的长期缓慢变化/方向。 4、季节性 …

【软件架构模式】—微内核架构

欢迎回到软件架构模式博客系列。这是本系列的第 4 章&#xff0c;我们将讨论微内核架构模式 概述&#xff1a; 内核模式也被称为插件架构模式。将附加应用程序功能作为插件添加到核心应用程序&#xff0c;以提供可扩展性以及功能分离和隔离。 这种模式由两种类型的架构组件组…

SpringSecurity实现前后端分离登录授权详解

在介绍完SpringSecurity实现前后端分离认证之后&#xff0c;然后就是SpringSecurity授权&#xff0c;在阅读本文章之前可以先了解一下作者的上一篇文章SpringSecurity认证SpringSecurity实现前后端分离登录token认证详解_山河亦问安的博客-CSDN博客。 目录 1. 授权 1.1 权限系…

ERR! code ERR_SOCKET_TIMEOUT

问题 安装npm包&#xff0c;终端报错ERR! code ERR_SOCKET_TIMEOUT ERR! code ERR_SOCKET_TIMEOUT详细问题 笔者运行以下命令重新安装依赖项&#xff1a; npm install控制台报错&#xff0c;具体报错信息如下 npm ERR! code ERR_SOCKET_TIMEOUT npm ERR! network Socket t…

360手机刷机 360手机Xposed框架安装 360手机EdXposed、LSP 360手机xposed模块

360手机刷机 360手机Xposed框架安装 360手机EdXposed、LSP 360手机xposed模块 参考&#xff1a;360手机-360刷机360刷机包twrp、root 360刷机包360手机刷机&#xff1a;360rom.github.io 【前言】 手机须Twrp或root后&#xff0c;才可使用与操作Xposed安装后&#xff0c;重启…

Spring Boot 中的 @Transactional 注解是什么,原理,如何使用

Spring Boot 中的 Transactional 注解是什么&#xff0c;原理&#xff0c;如何使用 简介 在 Spring Boot 中&#xff0c;Transactional 注解是非常重要的一个注解&#xff0c;用于实现数据库事务的管理。通过使用 Transactional 注解&#xff0c;我们可以很方便地对事务进行控…

java + opencv对比图片不同

1&#xff0c;去官网下载opencv&#xff0c;下载的时候需要注册一个 Oracle 账户&#xff0c;分分钟就能注册。然后安装。我下的是4.7的。 2&#xff0c;找到jar包放进项目里 3&#xff0c;项目结构&#xff0c;比较简单 4&#xff0c;把下载的文件放进C盘 5&#xff0c;主类代…

GitLab无法提交大文件的问题

GitLab无法提交大文件的问题 问题描述 GitLab 当提交大文件时遇到如下报错 MYOPS001MYOPS001 MINGW64 /e/work/GitLab/system-deploy (main) $ git push Enumerating objects: 91, done. Counting objects: 100% (91/91), done. Delta compression using up to 16 threads C…

西安石油大学期末考试C++真题解析

1、一、类型、返回值类型 二、参数表、函数重载 2、一、实例化 二、实例化的类型或类类是对象的蓝图&#xff0c;对象是类的实例化 3、const 4、一个 两个 5、一、公有继承 二、私有继承、保护继承 6、抽象类、实例化对象 7、函数模板、类模板 8、try、catch、throw 9、…

selenium之鼠标操作

首先导入ActionChains类&#xff0c;该类可以完成鼠标移动&#xff0c;鼠标点击事件&#xff0c;键盘输入、内容菜单交互等交互行为。 from selenium.webdriver.common.action_chains import ActionChains 操作语法&#xff1a; 第一步&#xff1a;初始化ActionChains类&…

python【爬虫】【批量下载】年报抓取

python年报爬取更新 本人测试发现&#xff0c;ju chao网的年报爬取距离我上一篇博客并没有啥变化&#xff0c;逻辑没变&#xff0c;应好多朋友的需要&#xff0c;这里补充代码 import json import osimport requestsweb_url 改成网站的域名&#xff0c;因为csdn屏蔽 def load…