安装
官网
参考文章: https://blog.csdn.net/miaoye520/article/details/123207661 https://blog.csdn.net/lvoelife/article/details/126658695
安装Erlang,并添加环境变量ERLANG_HOME,命令行运行erl
安装rabbitmq,rabbitmq-server-3.12.0.exe
注意Erlang要选择对应的版本
安装RabbitMQ-Plugins插件,rabbitmq-plugins enable rabbitmq_management
访问 http://localhost:15672/
账号密码 guest,guest
使用
中文文档
参考文章: https://blog.csdn.net/weixin_45698935/article/details/123481137 https://www.liwenzhou.com/posts/Go/rabbitmq-1/
Go实践:
go get github.com/streadway/amqp
基本使用:
生产者
package main
import ("github.com/streadway/amqp""log"
)
type App struct {Name stringNum int
}
type Root struct {Apps []*App
}
func main() {// 1.尝试连接RabbitMQ,建立连接// 该连接抽象了套接字连接,并为我们处理协议版本协商和认证等。conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")if err != nil {log.Fatalf(err.Error())}defer conn.Close()
// 2.创建一个通道, 大多数API都是该通道操作的ch, err := conn.Channel()defer ch.Close()
// 3.声明消息要发送的队列q, err := ch.QueueDeclare("hello", false, false, false, false, nil)if err != nil {log.Fatalf(err.Error())}
body := "hello world12"err = ch.Publish("", q.Name, false, false, amqp.Publishing{ContentType: "text/plain",Body: []byte(body),})if err != nil {log.Fatalf(err.Error())}
return
}
消费者
package main
import ("github.com/streadway/amqp""log"
)
func main() {// 1.尝试连接RabbitMQ,建立连接// 该连接抽象了套接字连接,并为我们处理协议版本协商和认证等。conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")if err != nil {log.Fatalf(err.Error())}defer conn.Close()
// 2.创建一个通道, 大多数API都是该通道操作的ch, err := conn.Channel()defer ch.Close()
// 3.声明消息要发送的队列q, err := ch.QueueDeclare("hello", false, false, false, false, nil)if err != nil {log.Fatalf(err.Error())}
// 获取接收消息的Delivery通道msgs, err := ch.Consume(q.Name, // queue"", // consumertrue, // auto-ackfalse, // exclusivefalse, // no-localfalse, // no-waitnil, // args)if err != nil {log.Println(err.Error(), "Failed to register a consumer")}
forever := make(chan bool)
go func() {for d := range msgs {log.Printf("Received a message: %s", d.Body)}}()
log.Printf(" [*] Waiting for messages. To exit press CTRL+C")<-forever
}
延迟队列
不应该是队列,而应该是堆。将先过期的消息排在前面
需要安装插件:有插件的支持 Community Plugins — RabbitMQ rabbitmq_delayed_message_exchange
首先要引入一个概念:死信队列,当我们的发送的消息被接收端nck或reject,消息在队列的存活时间超过设定的 TTL,消息数量超过最大队列长度,这样的消息会被认为是死信(“dead letter”)通过配置的死信交换机这样的死信可以被投递到对应的死信队列中
发送者:
发送者的实现就很简单了,就和普通的发送实现几乎一致,因为反正就是投递到对应的队列中就可以了,只需要将发送消息的部分,在消息的 header 中加入 x-delay
字段表示当前消息的 TTL 就可以了,也就是设定延迟时间,注意单位为毫秒
package main
import ("log""os""strings"
"github.com/streadway/amqp"
)
func main() {failOnError := func (err error, msg string) {if err != nil {log.Fatalf("%s: %s", msg, err)}}
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")failOnError(err, "Failed to connect to RabbitMQ")defer conn.Close()
ch, err := conn.Channel()failOnError(err, "Failed to open a channel")defer ch.Close()
body := bodyFrom(os.Args)// 将消息发送到延时队列上err = ch.Publish("", // exchange 这里为空则不选择 exchange"test_delay", // routing keyfalse, // mandatoryfalse, // immediateamqp.Publishing{ContentType: "text/plain",Body: []byte(body),Expiration: "10000", // 设置五秒的过期时间})failOnError(err, "Failed to publish a message")
log.Printf(" [x] Sent %s", body)
}
func bodyFrom(args []string) string {var s stringif (len(args) < 2) || os.Args[1] == "" {s = "hello3"} else {s = strings.Join(args[1:], " ")}return s
}
接收者
package main
import ("log"
"github.com/streadway/amqp"
)
func main() {
failOnError := func (err error, msg string) {if err != nil {log.Fatalf("%s: %s", msg, err)}}
// 建立链接conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")failOnError(err, "Failed to connect to RabbitMQ")defer conn.Close()
ch, err := conn.Channel()failOnError(err, "Failed to open a channel")defer ch.Close()
// 声明一个主要使用的 exchangeerr = ch.ExchangeDeclare("logs", // name"fanout", // typetrue, // durablefalse, // auto-deletedfalse, // internalfalse, // no-waitnil, // arguments)failOnError(err, "Failed to declare an exchange")
// 声明一个常规的队列, 其实这个也没必要声明,因为 exchange 会默认绑定一个队列q, err := ch.QueueDeclare("test_logs", // namefalse, // durablefalse, // delete when unusedtrue, // exclusivefalse, // no-waitnil, // arguments)failOnError(err, "Failed to declare a queue")
/*** 注意,这里是重点!!!!!* 声明一个延时队列, ß我们的延时消息就是要发送到这里*/_, errDelay := ch.QueueDeclare("test_delay", // namefalse, // durablefalse, // delete when unusedtrue, // exclusivefalse, // no-waitamqp.Table{// 当消息过期时把消息发送到 logs 这个 exchange"x-dead-letter-exchange":"logs",}, // arguments)failOnError(errDelay, "Failed to declare a delay_queue")
err = ch.QueueBind(q.Name, // queue name, 这里指的是 test_logs"", // routing key"logs", // exchangefalse,nil)failOnError(err, "Failed to bind a queue")
// 这里监听的是 test_logsmsgs, err := ch.Consume(q.Name, // queue name, 这里指的是 test_logs"", // consumertrue, // auto-ackfalse, // exclusivefalse, // no-localfalse, // no-waitnil, // args)failOnError(err, "Failed to register a consumer")
forever := make(chan bool)
go func() {for d := range msgs {log.Printf(" [x] %s", d.Body)}}()
log.Printf(" [*] Waiting for logs. To exit press CTRL+C")<-forever
}
参考:golang 使用 rabbitmq 延迟队列-腾讯云开发者社区-腾讯云