go对rabbitmq基本操作

一、安装rabbitmq

  • 1、直接使用docker拉取镜像

    docker pull rabbitmq:3.8
    
  • 2、启动容器

    docker run \-e RABBITMQ_DEFAULT_USER=admin \-e RABBITMQ_DEFAULT_PASS=123456 \-v mq-plugins:/plugins \--name rabbit01 \--hostname rabbit01 --restart=always \-p 15672:15672 \-p 5672:5672 \-d \rabbitmq:3.8
    
  • 3、关于端口的介绍

    • 15672的给浏览器控制台使用的
    • 5672是给程序调用的
  • 4、进入到rabbit01容器中

    docker exec -it rabbit01 /bin/bash
    
  • 5、开启可视化界面操作

    rabbitmq-plugins enable rabbitmq_management
    
  • 6、客户端直接访问xx:15672

  • 7、或者直接用别人搞好的镜像

    docker run \-e RABBITMQ_DEFAULT_USER=admin \-e RABBITMQ_DEFAULT_PASS=123456 \-v mq-plugins:/plugins \--name rabbit02 \--hostname rabbit02 --restart=always \-p 15672:15672 \-p 5672:5672 \-d \rabbitmq:3.8-management
    

二、go语言对rabbitmq基本操作

  • 1、安装依赖包

    go get github.com/streadway/amqp
    
  • 2、基本的连接操作

    package mainimport ("fmt""github.com/streadway/amqp"
    )func main() {// 连接rabbitmq// conn,_ := amqp.Dial("amqp://用户名:密码@IP:端口号/虚拟机空间名称")   // 端口号:5672conn, _ := amqp.Dial("amqp://admin:123456@localhost:5672//") // 端口号:5672defer conn.Close()// 打开通道ch, err := conn.Channel()fmt.Println(err)defer ch.Close()
    }
    
  • 3、由于部分每个地方都要使用,封装成一个方法

    package utilsimport ("fmt""github.com/streadway/amqp"
    )func RabbitmqUtils() *amqp.Channel {// 连接rabbitmqconn, _ := amqp.Dial("amqp://admin:123456@localhost:5672//") // 端口号:5672//defer conn.Close()// 打开通道ch, err := conn.Channel()fmt.Println(err)//defer ch.Close()return ch
    }
    
  • 4、创建一个队列,然后到可视化界面查看是否自动创建

    func main() {// 创建一个队列// durable, autoDelete, exclusive, noWait boolqueue, err := utils.RabbitmqUtils().QueueDeclare("simple_queue", false, false, false, false, nil)fmt.Println(queue.Name, err)
    }
    

    在这里插入图片描述

  • 5、关于创建队列几个参数的介绍

    • 第一个参数是队列名称
    • 第二个参数是队列是否持久化
    • 第三个参数是是否自动删除
    • 第四个参数是队列是否可以被其他队列访问
    • 第五个参数是设置为true则表示不等待服务器回执信息.函数将返回NULL,可以提高访问速度

三、简单模式

  • 1、根据官网图来看,简单模式是不需要交换机的

    在这里插入图片描述

  • 2、定义生产者,向队列中发送消息(注意要先创建队列)

    func main() {/**第一个参数是交换机名称第二个参数是队列名称第三个参数是 如果生产者生产的任务没有正常进入队列中,设置为true会返还给生产者,设置为false会直接丢弃第四个参数是 路由的时候第五个参数是消息体*/err := utils.RabbitmqUtils().Publish("", "simple_queue", false, false, amqp.Publishing{Body: []byte("hello word"),})fmt.Println(err)
    }
    
  • 3、查看可是界面是否存在一条消息

  • 4、创建消费者,来获取消息内容

    /**
    第一个参数是队列名称
    第二个参数自己给当前消费者命名
    第三个参数是否自动应答
    第三个参数队列是否可以被其他队列访问
    第四个参数
    第五个参数设置为true则表示不等待服务器回执信息.函数将返回NULL,可以提高访问速度
    */
    msgChan, err := utils.RabbitmqUtils().Consume("simple_queue", "", false, false, false, false, nil)
    fmt.Println(err)
    for msg := range msgChan {fmt.Println(string(msg.Body))
    }
    

四、工作模式

  • 1、工作模式是指一个生产者多个消费者,在简单模式上扩展成多个消费者,每个消费者只能交替来消费消息

  • 2、定义2个消费者来消费消息

    func main() {msgChan, err := utils.RabbitmqUtils().Consume("work_queue", "", true, false, false, true, nil)fmt.Println(err)for msg := range msgChan {fmt.Println("消费者1:", string(msg.Body))}
    }
    
  • 3、生产多条消息

    func main() {for i := 0; i < 10; i++ {_ = utils.RabbitmqUtils().Publish("", "work_queue", false, false, amqp.Publishing{Body: []byte(fmt.Sprintf("hello word %d", i)),})}
    }
    
  • 4、消费结果

    在这里插入图片描述

五、发布订阅模式

  • 1、发布订阅模式同样是一个生产者生产消息,多个消费者来消费,与上面的工作模式的区别是:工作模式是一个消费者消费后,另外一个消费者就消费不到了,发布订阅模式是不管有几个消费者都可以消费到消息

  • 2、使用goapi来创建交换机和队列

    func main() {// 1.创建2个队列queue1, _ := utils.RabbitmqUtils().QueueDeclare("first_queue", true, false, false, true, nil)queue2, _ := utils.RabbitmqUtils().QueueDeclare("second_queue", true, false, false, true, nil)// 2.创建一个交换机_ = utils.RabbitmqUtils().ExchangeDeclare("first_exchange", amqp.ExchangeDirect, true, false, false, false, nil)// 3.队列和交换机绑定在一起_ = utils.RabbitmqUtils().QueueBind(queue1.Name, "", "first_exchange", true, nil)_ = utils.RabbitmqUtils().QueueBind(queue2.Name, "", "first_exchange", true, nil)
    }
    
  • 3、消费者只需要绑定队列来消费消息就可以

    func main() {msgChan, err := utils.RabbitmqUtils().Consume("first_queue", "", true, false, false, true, nil)fmt.Println(err)for msg := range msgChan {fmt.Println("消费者1:", string(msg.Body))}
    }
    
  • 4、生产者只需要把消息发送到交换机里面就可以,交换机会根据绑定的队列来推送消息

    func main() {_ = utils.RabbitmqUtils().Publish("first_exchange", "", false, false, amqp.Publishing{Body: []byte("hello word"),})
    }
    
  • 5、可以查看控制台两个消费者都接收到消息

六、路由模式

  • 1、路由模式和上面的发布订阅模式有点类似,只是在上面的基础上添加的路由key

  • 2、使用go-api创建交换机和队列,并且对其绑定

    func main() {// 1.创建2个队列queue1, _ := utils.RabbitmqUtils().QueueDeclare("first_queue_key", true, false, false, true, nil)queue2, _ := utils.RabbitmqUtils().QueueDeclare("second_queue_key", true, false, false, true, nil)// 2.创建一个交换机err := utils.RabbitmqUtils().ExchangeDeclare("second_exchange", amqp.ExchangeDirect, true, false, false, false, nil)if err != nil {fmt.Println(err)}// 3.队列和交换机绑定在一起_ = utils.RabbitmqUtils().QueueBind(queue1.Name, "info", "second_exchange", true, nil)_ = utils.RabbitmqUtils().QueueBind(queue2.Name, "info", "second_exchange", true, nil)_ = utils.RabbitmqUtils().QueueBind(queue2.Name, "error", "second_exchange", true, nil)
    }
    
  • 3、定义消费者

    func main() {msgChan, err := utils.RabbitmqUtils().Consume("first_queue_key", "", true, false, false, true, nil)fmt.Println(err)for msg := range msgChan {fmt.Println("消费者1:", string(msg.Body))}
    }
    
  • 4、定义生产者

    func main() {// 消费者会根据绑定的路由key来获取消息_ = utils.RabbitmqUtils().Publish("second_exchange", "error", false, false, amqp.Publishing{Body: []byte("hello word"),})
    }
    

七、主题模式

  • 1、主题模式和上面路由模式差不多,只是多了一个模糊匹配
    • *表示只匹配一个单词
    • #表示匹配多个单词

八、简单对其封装

  • 1、封装代码

    package utilsimport ("errors""fmt""github.com/streadway/amqp""log"
    )// MQURL url的格式 amqp://账号:密码@rabbitmq服务器地址:端口号/vhost
    const MQURL = "amqp://admin:123456@localhost:5672//"type RabbitMQ struct {conn    *amqp.Connectionchannel *amqp.ChannelMQUrl   string
    }// NewRabbitMQ 创建RabbitMQ的结构体实例
    func NewRabbitMQ() *RabbitMQ {rabbitMQ := &RabbitMQ{MQUrl: MQURL,}var err error// 创建rabbitMQ连接rabbitMQ.conn, err = amqp.Dial(rabbitMQ.MQUrl)if err != nil {rabbitMQ.failOnErr(err, "创建连接错误")}rabbitMQ.channel, err = rabbitMQ.conn.Channel()if err != nil {rabbitMQ.failOnErr(err, "获取channel失败")}return rabbitMQ
    }// Binding 创建交换机和队列并且绑定在一起
    func (r *RabbitMQ) Binding(queueName, exchange, key, routerKey string) {// 1.创建1个队列queue1, err := r.channel.QueueDeclare(queueName, true, false, false, true, nil)if err != nil {r.failOnErr(err, "创建队列失败")}if exchange != "" && key == "" {r.failOnErr(errors.New("错误"), "请传递交换机链接类型")}if exchange != "" {// 2.创建一个交换机err1 := r.channel.ExchangeDeclare(exchange, key, true, false, false, false, nil)if err1 != nil {r.failOnErr(err, "创建交换机失败")}// 3.队列和交换机绑定在一起if err := r.channel.QueueBind(queue1.Name, routerKey, exchange, true, nil); err != nil {fmt.Println("1111")r.failOnErr(err, "交换机和队列绑定失败")}}fmt.Println("创建成功")
    }// failOnErr 定义内部错误处理
    func (r *RabbitMQ) failOnErr(err error, message string) {if err != nil {log.Fatalf("%s:%s", message, err)panic(fmt.Sprintf("%s:%s", message, err))}
    }func (r *RabbitMQ) Close() {defer func(Conn *amqp.Connection) {err := Conn.Close()if err != nil {r.failOnErr(err, "关闭链接失败")}}(r.conn)defer func(Channel *amqp.Channel) {err := Channel.Close()if err != nil {r.failOnErr(err, "关闭通道失败")}}(r.channel)
    }func (r *RabbitMQ) Qos(prefetchCount, prefetchSize int, global bool) {err := r.channel.Qos(prefetchCount, prefetchSize, global)if err != nil {r.failOnErr(err, "限流失败")}
    }// Publish 发布者
    func (r *RabbitMQ) Publish(exchange, routerKey, message string) {// 2.发送数据到队列中if err := r.channel.Publish(exchange,routerKey,false, // 如果为true的时候会根据exchange的类型和routKey规则,如果无法找到符合条件的队列那么会把发送的消息发挥给发送者false, // 如果为true的时候当exchane发送消息到队列后发现队列上没有绑定消费者则会把消息发还给发送者amqp.Publishing{Body: []byte(message),},); err != nil {r.failOnErr(err, "发送消息失败")}fmt.Println("恭喜你,消息发送成功")
    }// Consumer 消费者
    func (r *RabbitMQ) Consumer(queueName string, callback func(message []byte)) {// 2.接收消息message, err := r.channel.Consume(queueName,"",    // 区分多个消费者true,  // 是否自动应答false, // 是否具有排他性false, // 如果为true的时候,表示不能将同一个connection中发送的消息传递给connection中的消费者false, // 队列消费是否阻塞nil,)if err != nil {r.failOnErr(err, "接收消息失败")}fmt.Println("消费者等待消费...")forever := make(chan bool)// 使用协程处理消息go func() {for d := range message {log.Printf("接收到的消息:%s", d.Body)callback(d.Body)}}()<-forever
    }
    
  • 2、简单模式的使用

    func main() {mq := utils.NewRabbitMQ()mq.Consumer("simple_queue1", func(message []byte) {fmt.Println(string(message))})defer mq.Close()
    }func main() {mq := utils.NewRabbitMQ()mq.Binding("simple_queue1", "", "", "")defer mq.Close()mq.Publish("", "simple_queue1", "你好水痕")
    }
    
  • 3、工作模式的使用

    func main() {mq := utils.NewRabbitMQ()mq.Consumer("work_queue1", func(message []byte) {fmt.Println("消费者2", string(message))})defer mq.Close()
    }func main() {mq := utils.NewRabbitMQ()defer mq.Close()for i := 0; i < 10; i++ {mq.Publish("", "work_queue1", fmt.Sprintf("你好水痕%d", i))}
    }
    
  • 4、交换机带路由的时候

    func main() {mq := utils.NewRabbitMQ()mq.Binding("first_queue1", "first_exchange1", amqp.ExchangeDirect, "info")mq.Binding("first_queue1", "first_exchange1", amqp.ExchangeDirect, "error")mq.Binding("first_queue2", "first_exchange1", amqp.ExchangeDirect, "info")defer mq.Close()
    }func main() {mq := utils.NewRabbitMQ()mq.Consumer("first_queue2", func(message []byte) {fmt.Println("消费者2", string(message))})defer mq.Close()
    }func main() {mq := utils.NewRabbitMQ()defer mq.Close()mq.Publish("first_exchange1", "error", "你好水痕")
    }
    

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/221619.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

安全风险综合监测预警平台建设指南(2023 版)》正式发布,汉威科技方案领跑行业

11月24日&#xff0c;国务院安委会办公室印发《城市安全风险综合监测预警平台建设指南&#xff08;2023版&#xff09;》&#xff08;以下简称“指南”&#xff09;&#xff0c;引发行业密切关注。 据悉&#xff0c;“指南”在总结前期18 个试点城市&#xff08;区&#xff09;…

工作流能实现自动化吗?应该用什么工具?

研究显示&#xff0c;CRM系统工作流自动化软件不仅能简化冗余的工作且不需要监控和指导就能提高员工的工作效率。企业需要工作流自动化软件吗&#xff1f;答案是肯定的&#xff0c;工作流自动化的好处有哪些&#xff1f; 为什么企业需要工作流自动化软件 每家企业都希望降本增…

Walrus 0.4发布:单一配置、多态运行,体验下一代应用交付模型

今天&#xff0c;我们高兴地宣布云原生统一应用平台 Walrus 0.4 正式发布&#xff0c;这是一个里程碑式的版本更新。新版本采用了全新的应用模型——仅需进行单一配置&#xff0c;即可在多种模态的基础设施及环境中运行包括应用服务及周边依赖资源在内的完整应用系统。“You bu…

【算法萌新闯力扣】:合并两个有序链表

力扣题目&#xff1a;合并两个有序链表 开篇 今天是备战蓝桥杯的第24天及算法村开营第2天。根据算法村的讲义&#xff0c;来刷链表的相关题目。今天要分享的是合并两个有序链表。 题目链接: 21.合并两个有序链表 题目描述 代码思路 通过创建一个新链表&#xff0c;然后遍历…

OpenGL 自学总结

前言&#xff1a; 本人是工作后才接触到的OpenGL&#xff0c;大学找工作的时候其实比较着急&#xff0c;就想着尽快有个着落。工作后才发现自己的兴趣点。同时也能感觉到自己当前的工作有一点温水煮青蛙的意思&#xff0c;很担心自己往后能力跟不上年龄的增长。因此想在工作之余…

小型内衣洗衣机什么牌子好?口碑最好的小型洗衣机

很多人会觉得内衣洗衣机是智商税&#xff0c;洗个内衣只需要两分钟的事情&#xff0c;需要花个几百块钱去入手一个洗衣机吗&#xff1f;然而清洗贴身衣物的并不是一件简单的事情&#xff0c;如果只是简单的搓洗&#xff0c;内裤上看不见的细菌也无法消除&#xff0c;而且对来生…

【第二节:微信小程序 app.json配置】微信小程序入门,以思维导图的方式展开2

以思维导图的方式呈现出来&#xff0c;是不是会更加直观一些呢 如果看不清楚&#xff0c;私信给单发 &#xff1a; 第二节&#xff1a;微信小程序 app.json配置&#xff1a; 包括&#xff1a; window pages tabBar networkTimeout debug 如下图所示&#xff1a; 2、ap…

CISA建议企业需要多重身份验证

多重身份验证(MFA)让您的企业更加安全&#xff0c;免受在线威胁的侵害。 中小型多因素身份验证提供额外的安全性 贵公司的知识产权、员工个人信息、客户信息等是犯罪活动的主要目标。仅使用密码并不总是能有效地保护组织的数据。事实上&#xff0c;弱密码或被盗密码是在线犯罪分…

CRM的智能招投标对企业有什么意义?

如今CRM系统的生态系统越来越壮大&#xff0c;这些工具的集成极大地丰富了CRM系统的应用场景&#xff0c;例如CRM系统集成企业微信等社交媒体为获客提供便利&#xff1b;再比如CRM集成ChatGPT提高邮件内容质量&#xff0c;对于经常接触招投标项目的业务人员来说&#xff0c;在C…

Spring框架体系及Spring IOC思想

目录 Spring简介Spring体系结构SpringIOC控制反转思想自定义对象容器Spring实现IOCSpring容器类型容器接口容器实现类对象的创建方式使用构造方法使用工厂类的方法使用工厂类的静态方法对象的创建策略对象的销毁时机生命周期方法获取Bean对象的方式通过id/name获取通过类型获取…

UE4/UE5 c++绘制编辑器场景直方图(源码包含场景中的像素获取、菜单添加ToolBar)

UE4/UE5 c场景直方图 UE4/UE5 C绘制编辑器场景直方图绘制原理&#xff1a;元素绘制坐标轴绘制 源码处理 UE4/UE5 C绘制编辑器场景直方图 注&#xff1a;源码包含场景中的像素获取、菜单添加ToolBar 实现效果&#xff1a; 这个是用于美术统计场景中像素元素分布&#xff0c;类…

相机内存卡照片删除怎么恢复?没有备份可这样操作

在使用相机时&#xff0c;不小心删除了重要的照片可能是每位摄影爱好者的噩梦。然而&#xff0c;通过一些恢复方法&#xff0c;我们有机会挽救被删除的照片。本文将详细介绍相机内存卡照片删除恢复的方法。 图片来源于网络&#xff0c;如有侵权请告知 如果您误删了相机内存卡中…