Asynq: 基于Redis实现的Go生态分布式任务队列和异步处理库

Asynq[1]是一个Go实现的分布式任务队列和异步处理库,基于redis,类似Ruby的sidekiq[2]和Python的celery[3]。Go生态类似的还有machinery[4]和goworker

alt

同时提供一个WebUI asynqmon[5],可以源码形式安装或使用Docker image, 还可以和Prometheus集成

docker run --rm --name asynqmon -p 8080:8080 hibiken/asynqmon,如果使用的是主机上的redis,还需加上 --redis-addr=host.docker.internal:6379,否则会报错[6]

docker run --rm --name asynqmon -p 8080:8080 hibiken/asynqmon --redis-addr=host.docker.internal:6379


➜  asynq-demo git:(main) ✗ tree
.
├── client.go
├── const.go
├── go.mod
├── go.sum
└── server.go

0 directories, 5 files

其中const.go:

package main

const (
 redisAddr   = "127.0.0.1:6379"
 redisPasswd = ""
)

const (
 TypeExampleTask    = "shuang:asynq-task:example"
)

client.go:


package main

import (
 "encoding/json"
 "fmt"
 "log"
 "time"

 "github.com/hibiken/asynq"
)

type ExampleTaskPayload struct {
 UserID string
 Msg    string
 // 业务需要的其他字段
}

func NewExampleTask(userID string, msg string) (*asynq.Task, error) {
 payload, err := json.Marshal(ExampleTaskPayload{UserID: userID, Msg: msg})
 if err != nil {
  return nil, err
 }
 return asynq.NewTask(TypeExampleTask, payload), nil
}

var client *asynq.Client

func main() {

 client = asynq.NewClient(asynq.RedisClientOpt{Addr: redisAddr, Password: redisPasswd, DB: 0})
 defer client.Close()

 //go startExampleTask()
 startExampleTask()

 //startGithubUpdate() // 定时触发
}

func startExampleTask() {

 fmt.Println("开始执行一次性的任务")
 // 立刻执行
 task1, err := NewExampleTask("10001""mashangzhixing!")
 if err != nil {
  log.Fatalf("could not create task: %v", err)
 }

 info, err := client.Enqueue(task1)
 if err != nil {
  log.Fatalf("could not enqueue task: %v", err)
 }
 log.Printf("task1 -> enqueued task: id=%s queue=%s", info.ID, info.Queue)

 // 10秒后执行(定时执行)
 task2, err := NewExampleTask("10002""10s houzhixing")
 if err != nil {
  log.Fatalf("could not create task: %v", err)
 }

 info, err = client.Enqueue(task2, asynq.ProcessIn(10*time.Second))
 if err != nil {
  log.Fatalf("could not enqueue task: %v", err)
 }
 log.Printf("task2 -> enqueued task: id=%s queue=%s", info.ID, info.Queue)

 // 30s后执行(定时执行)
 task3, err := NewExampleTask("10003""30s houzhixing")
 if err != nil {
  log.Fatalf("could not create task: %v", err)
 }

 theTime := time.Now().Add(30 * time.Second)
 info, err = client.Enqueue(task3, asynq.ProcessAt(theTime))
 if err != nil {
  log.Fatalf("could not enqueue task: %v", err)
 }
 log.Printf("task3 -> enqueued task: id=%s queue=%s", info.ID, info.Queue)
}

server.go:

package main

import (
 "context"
 "encoding/json"
 "fmt"
 "time"

 "github.com/davecgh/go-spew/spew"
 "github.com/hibiken/asynq"
)

var AsynqServer *asynq.Server // 异步任务server

func initTaskServer() error {
 // 初始化异步任务服务端
 AsynqServer = asynq.NewServer(
  asynq.RedisClientOpt{
   Addr:     redisAddr,
   Password: redisPasswd, //与client对应
   DB:       0,
  },
  asynq.Config{
   // Specify how many concurrent workers to use
   Concurrency: 100,
   // Optionally specify multiple queues with different priority.
   Queues: map[string]int{
    "critical"6,
    "default":  3,
    "low":      1,
   },
   // See the godoc for other configuration options
  },
 )
 return nil
}

func main() {
 initTaskServer()
 mux := asynq.NewServeMux()

 mux.HandleFunc(TypeExampleTask, HandleExampleTask)
 // ...register other handlers...

 if err := AsynqServer.Run(mux); err != nil {
  fmt.Printf("could not run asynq server: %v", err)
 }
}

func HandleExampleTask(ctx context.Context, t *asynq.Task) error {

 res := make(map[string]string)

 spew.Dump("t.Payload() is:", t.Payload())
 err := json.Unmarshal(t.Payload(), &res)
 if err != nil {
  fmt.Printf("rum session, can not parse payload: %s,  err: %v", t.Payload(), err)
  return nil
 }
 //-----------具体处理逻辑------------
 spew.Println("拿到的入参为:", res, "接下来将进行具体处理")
 fmt.Println()
 // 模拟具体的处理
 time.Sleep(5 * time.Second)
 fmt.Println("--------------处理了5s,处理完成-----------------")

 return nil

}

执行redis-server


清除redis中所有的key:


执行docker run --rm --name asynqmon -p 8080:8080 hibiken/asynqmon --redis-addr=host.docker.internal:6379

alt

执行 go run client.go const.go (生产者,产生消息放入队列)

alt

此时能看到redis中多个几个key

alt

同时管理后台能看到队列的信息

alt

执行 go run server.go const.go (消费者,消费队列中的消息)

alt

可以看到都被处理了

alt

此时redis中的key:

alt

此处的业务处理为模拟,实际可能是某个被触发后不需要马上执行的操作




实际试一下。通过一个定时器(24h执行一次),触发代码每天向github push当天的代码等内容。收到触发后无需马上执行(可能当时其他请求量高,机器资源紧张),可以先放入队列,延迟30min后实际去执行。

完整Demo[7] push github的功能没有完全实现


另外可以配置队列的优先级,asynq队列如何配置队列优先级[8]

 // 初始化异步任务服务端
 AsynqServer = asynq.NewServer(
  asynq.RedisClientOpt{
   Addr:     redisAddr,
   Password: redisPasswd, //与client对应
   DB:       0,
  },
  asynq.Config{
   // Specify how many concurrent workers to use
   Concurrency: 100,
   // Optionally specify multiple queues with different priority.
   Queues: map[string]int{
    "critical"6,//关键队列中的任务将被处理 60% 的时间
    "default":  3,//默认队列中的任务将被处理 30% 的时间
    "low":      1,//低队列中的任务将被处理 10% 的时间
   },
   // See the godoc for other configuration options
  },
 )

go asynq 异步任务 (延迟触发) 简单案例及奇怪的错误[9]

参考资料

[1]

Asynq: https://github.com/hibiken/asynq

[2]

sidekiq: https://github.com/sidekiq/sidekiq

[3]

celery: https://github.com/celery/celery

[4]

machinery: https://blog.csdn.net/weixin_42681866/article/details/123334654

[5]

asynqmon: https://github.com/hibiken/asynqmon

[6]

报错: https://github.com/hibiken/asynqmon/issues/214

[7]

完整Demo: https://github.com/cuishuang/asynq-demo

[8]

asynq队列如何配置队列优先级: https://blog.csdn.net/itopit/article/details/126123626

[9]

go asynq 异步任务 (延迟触发) 简单案例及奇怪的错误: https://my.oschina.net/randolphcyg/blog/5539676

本文由 mdnice 多平台发布

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/58434.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

最新成果展示:GaN基Micro-LED热学模型数据库的开发及应用

由于GaN基Micro-LED表面积-体积比增加,其在热学方面的性质有别于大尺寸的LED,如缺陷复合导致的热效应将在发光区域中产生诸多“热”点,导致发光波长不均匀,这将影响后期显示系统的成像稳定性。针对上述问题,天津赛米卡…

Android平台一对一音视频通话方案对比:WebRTC VS RTMP VS RTSP

一对一音视频通话使用场景 一对一音视频通话都需要稳定、清晰和流畅,以确保良好的用户体验,常用的使用场景如下: 社交应用:社交应用是一种常见的使用场景,用户可以通过音视频通话进行面对面的交流;在线教…

Linux基础开发工具之Linux自动项目构建工具的使用

目录 前言 1.make/makefile 1.1 依赖关系/依赖方法 2.原理 3.项目清理 4.make的执行问题 5.相关简单的符号介绍 总结 前言 之前给大家介绍了我们在开发过程中所需要使用到的编辑器vim,以及编译器gcc/g的使用,但是我相信大家在使用过程会发现我们在…

centos7实现负载均衡

目录 一、基于 CentOS 7 构建 LVS-DR 集群。 1.1 配置lvs负载均衡服务 1.1.1 下载ipvsadm 1.1.2 增加vip 1.1.3 配置ipvsadm 1.2 配置rs1 1.2.1 编写测试页面 1.2.2 手工在RS端绑定VIP、添加路由 1.2.3 抑制arp响应 1.3 配置rs2 1.4 测试 二、配置nginx负载…

机器学习深度学习—语言模型和数据集

👨‍🎓作者简介:一位即将上大四,正专攻机器学习的保研er 🌌上期文章:机器学习&&深度学习——文本预处理 📚订阅专栏:机器学习&&深度学习 希望文章对你们有所帮助 语…

ASP.NET Core学习路线图

说明 1. 先决条件 - [C#](https://www.pluralsight.com/paths/csharp) - [Entity Framework](https://www.pluralsight.com/search?qentity%20framework%20core) - [ASP.NET Core](https://www.pluralsight.com/search?qasp.net%20core) - SQL基础知识 2. 通用开发技能 -…

Android 数据库之GreenDAO

GreenDAO 是一款开源的面向 Android 的轻便、快捷的 ORM 框架,将 Java 对象映射到 SQLite 数据库中,我们操作数据库的时候,不再需要编写复杂的 SQL语句, 在性能方面,greenDAO 针对 Android 进行了高度优化,…

MySQL的查询方法

单表查询 素材: 表名:worker-- 表中字段均为中文,比如 部门号 工资 职工号 参加工作 要求: 1、显示所有职工的基本信息。 2、查询所有职工所属部门的部门号,不显示重复的部门号。 3、求出所有职工的人数。 4、…

java动态生成excel并且需要合并单元格

java动态生成excel并且需要合并单元格 先上图看一下预期效果 集成poi <dependency><groupId>cn.afterturn</groupId><artifactId>easypoi-base</artifactId><version>4.0.0</version> </dependency> <dependency><…

docker部署jenkins且jenkins中使用docker去部署项目

docker部署jenkins且jenkins中使用docker去部署项目 1、确定版本 2.346.1是最后一个支持jdk8的 2、编写docker-compose.yml并执行 在这个目录中新增data文件夹&#xff0c;注意data是用来跟docker中的文件进行映射的 docker-compose.yml version: "3.1" service…

leetcode 746. 使用最小花费爬楼梯

2023.8.8 昨天爽玩一天&#xff0c;在家就是舒服。 今天继续刷动态规划题。 动态规划题最重要的就是搞清楚dp[i] 的定义&#xff0c;本题dp[i] 的含义是&#xff1a;到达第i层&#xff0c;所需的最小花费。 那么由于起始台阶可以是0或者1&#xff0c;那么dp[0]和dp[1]都初始化…

QT笔记——QT自定义事件

我们有时候想发送自定义事件 1&#xff1a;创建自定义事件&#xff0c;首先我们需要知道它的条件 1&#xff1a;自定义事件需要继承QEvent 2&#xff1a;事件的类型需要在 QEvent::User 和 QEvent::MaxUser 范围之间&#xff0c;在QEvent::User之前 是预留给系统的事件 3&#…