用RabbitMQ和golang实现一个异步任务系统,你会不会?

news/2024/10/24 3:16:36/文章来源:https://www.cnblogs.com/cheyunhua/p/18491949

在使用 RabbitMQ 和 Go 语言实现一个异步任务系统时,你可以将任务分配给生产者,将任务的处理交给消费者,这样消费者可以异步处理这些任务。

RabbitMQ 是一个强大的消息队列系统,它允许多个生产者和多个消费者进行异步通信,这使得它成为构建异步任务系统的理想选择。

系统架构概述

  • 生产者 (Producer):生产者是任务的发送方。它会将任务发送到 RabbitMQ 中的一个队列。

  • 消费者 (Consumer):消费者从 RabbitMQ 队列中获取任务,并处理这些任务。

  • RabbitMQ:它充当消息中间件,负责存储和分发消息(任务)。

步骤概述:

  1. 安装 RabbitMQ。

  2. 使用 Go 的 amqp 库与 RabbitMQ 交互。

  3. 实现生产者,将任务发送到队列中。

  4. 实现消费者,从队列中获取任务并处理。

 

》》》GoLand永久账号 免费领取《《《

 

图片

 

1. 安装 RabbitMQ

RabbitMQ 可以通过 Docker 快速安装:

docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:management

RabbitMQ 将运行在 localhost:5672 上,管理控制台运行在 localhost:15672。默认登录用户名和密码为 guest/guest

2. 安装 Go 的 amqp 库

首先,安装用于与 RabbitMQ 进行交互的 Go 包 github.com/streadway/amqp

go get github.com/streadway/amqp

3. 生产者实现(Producer)

生产者将任务发送到 RabbitMQ 的队列中。

package main

import (
"fmt"
"log"
"github.com/streadway/amqp"
)

// 连接 RabbitMQ
func connectRabbitMQ() (*amqp.Connection, error) {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
if err != nil {
return nil, fmt.Errorf("failed to connect to RabbitMQ: %w", err)
}
return conn, nil
}

func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}

func main() {
conn, err := connectRabbitMQ()
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()

// 创建一个信道
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()

// 声明队列
q, err := ch.QueueDeclare(
"task_queue", // 队列名称
true, // 持久化
false, // 是否自动删除
false, // 是否排他
false, // 是否阻塞
nil, // 额外参数
)
failOnError(err, "Failed to declare a queue")

// 要发送的消息
body := "This is a task"
err = ch.Publish(
"", // 交换机
q.Name, // 队列名称
false, // 如果没有路由到队列,是否返回消息
false, // 如果消费者没有接收,是否返回消息
amqp.Publishing{
DeliveryMode: amqp.Persistent, // 持久化消息
ContentType: "text/plain",
Body: []byte(body),
})
failOnError(err, "Failed to publish a message")
log.Printf(" [x] Sent %s", body)
}

这个生产者会将消息 This is a task 发送到名为 task_queue 的 RabbitMQ 队列中。

 

4. 消费者实现(Consumer)

消费者将从队列中异步获取消息,并处理这些消息。

package main

import (
"bytes"
"log"
"time"
"github.com/streadway/amqp"
)

// 错误处理函数
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}

// 模拟耗时任务
func worker(body string) {
log.Printf("Processing task: %s", body)
// 模拟耗时任务
time.Sleep(2 * time.Second)
log.Printf("Task finished: %s", body)
}

func main() {
// 连接 RabbitMQ
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()

// 打开信道
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()

// 声明队列
q, err := ch.QueueDeclare(
"task_queue", // 队列名称
true, // 持久化
false, // 是否自动删除
false, // 是否排他
false, // 是否阻塞
nil, // 额外参数
)
failOnError(err, "Failed to declare a queue")

// 消费者从队列中获取消息
msgs, err := ch.Consume(
q.Name, // 队列名称
"", // 消费者标识
false, // 自动应答
false, // 是否排他
false, // 非本地
false, // 阻塞
nil, // 额外参数
)
failOnError(err, "Failed to register a consumer")

// 启动协程处理消息
forever := make(chan bool)
go func() {
for d := range msgs {
// 模拟任务处理
log.Printf("Received a message: %s", d.Body)
worker(string(d.Body))

// 手动应答消息已处理
d.Ack(false)
}
}()

log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
<-forever
}

5. 运行步骤

  1. 启动 RabbitMQ Docker 容器(如果还没有启动)。

  2. 运行消费者(Consumer)程序:

    go run consumer.go
  3. 运行生产者(Producer)程序:

    go run producer.go

消费者会自动获取生产者发布的任务,并处理这些任务。在这个例子中,消费者每次处理一条消息后会等待 2 秒钟,模拟任务处理的过程。

6. 扩展

这个基础结构可以进一步扩展来支持更多功能:

  • 任务重试机制:在消费者无法处理消息时,可以重新放回队列或者进行错误处理。

  • 多个消费者:可以启动多个消费者,处理并行任务。

  • 任务优先级:可以基于消息属性设置不同的优先级。

  • 延迟任务:可以使用 RabbitMQ 插件实现延迟队列,调度未来的任务。

通过 RabbitMQ 和 Go 的结合,你可以轻松构建一个高效的异步任务系统,并根据需求进行扩展。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/819052.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

CMDB实践指南:项目规划与实施策略解析

随着现代企业IT系统的日益复杂,如何有效管理这些庞大的IT资产和资源,成为每个企业必须面对的重要问题。CMDB应运而生,帮助企业集中管理IT资源,维护系统的稳定性,并支持故障排查与决策制定。本文将深入探讨如何从零开始规划和实施一个成功的CMDB项目。 一、CMDB能做什么CMD…

从0到1实现项目Docker编排部署

在深入讨论 Docker 编排之前,首先让我们了解一下 Docker 技术本身。Docker 是一个开源平台,旨在帮助开发者自动化应用程序的部署、扩展和管理。自 2013 年推出以来,Docker 迅速发展成为现代软件开发和运维领域不可或缺的重要工具。 Docker 采用容器技术,将应用程序及其所有…

用 (Excel) VBA 读取 OneNote!

本文记录,用 VBA 读取 OneNote 的方法,这块似乎一直是空白,研究了好久才找到解决方案!小白贡献,语失莫怪!问题背景: 我在 OneNote 里有上百篇笔记,可 OneNote 自己,却无法导出全部的标题。于是我千方百计,想要读取 OneNote 的文件,来获取标题和日志信息。尝试了各种…

强化学习的数学原理-02贝尔曼公式

目录Motivating examplesstate valueBellman equationAction valuesummary Motivating examples 一个核心概念:state value 一个基本的工具:Bellman equation 为什么return是重要的?return可以用来评估policy下面计算3个例子计算return的方法:第一种方法:(by definition|通过…

纷享销客CRM“标签管理”助力企业精准营销业绩增长

在数字化驱动的商业世界中,数据管理无疑是企业铸就成功的核心要素。它能够帮助企业精准洞察市场趋势,深度了解客户需求,进而优化业务流程,并在激烈的市场竞争中崭露头角。然而,在实际的业务场景中,还尚存这些问题: • 面对海量数据,企业如何避免信息过载,实现有效分类…

PbootCMS出现database disk image is malformed的解决办法

database disk image is malformed 错误通常是由于 SQLite 数据库文件损坏引起的。这种问题可能发生在写入数据库时突然中断操作,比如服务器突然重启或网络中断等情况。以下是一些解决方法,包括删除栏目模型重建和修复 SQLite 数据库。 解决方法 1. 删除栏目模型,重建备份数…

PbootCMS填写授权码的地方不见了怎么办

当你在填写授权码时,同时填写了“授权码手机”这一栏,系统会认为你填写了“万能授权码”,从而隐藏了授权码输入框。如果你没有万能授权码,就不应该填写“授权码手机”这一栏。 解决步骤 1. 清除授权码和授权码手机登录后台:打开浏览器,输入你的PbootCMS后台地址,登录后台…

二、Linux 包管理器与软件安装

前言在上一章我们简单了解了Linux的基本概念,基本思想,以及一些简单的文件操作,相信你现在已经可以打开linux并进行一些基本操作。当然,这些操作对于我们操作一个电脑来说过于简陋了,这和在windows下拿鼠标到处点点点一样,什么都干不了。本章来继续扩展linux的相关操作,…

PbootCMS系统管理员点击文章评论的状态按钮提示权限不足

1. 开启后台菜单登录后台:打开浏览器,输入你的PbootCMS后台地址,登录后台管理系统。进入系统设置:在后台管理界面,进入“系统设置” -> “菜单管理”。开启后台菜单:如果你还没有开启后台菜单,可以参考这篇教程:如何开启PbootCMS后台菜单。2. 修改会员中心的文章评论…

PbootCMS备案号怎么加网站链接

站备案成功后需要在网站底部添加备案号,并且备案号需要链接到备案查询页面。以下是具体的操作步骤: 操作步骤 1. 后台填写备案号登录后台:打开浏览器,输入你的PbootCMS后台地址,登录后台管理系统。填写备案号:在后台管理界面,进入“系统设置” -> “站点设置”。 找到…

PbootCMS缓存如何清理runtime文件夹下经常满怎么办?清理缓存的方法

方式一:通过后台清理缓存登录后台:打开浏览器,输入你的PbootCMS后台地址,登录后台管理系统。清理缓存:登录后,在右上角找到“清理缓存”按钮,点击即可自动清理所有缓存文件。方式二:通过FTP或服务器直接删除runtime文件夹下的所有文件连接FTP服务器:使用FTP客户端(如…

PbootCMS中常见的错误提示及其解决方案

PbootCMS 是一个功能强大的内容管理系统,但在使用过程中可能会遇到各种错误提示。以下是一些常见的 PbootCMS 错误提示及其可能的解决方案: 常见错误提示及解决方案 1. 数据库连接错误 错误提示: 数据库连接失败解决方案:检查数据库配置文件(通常位于 /config/database.ph…