用 Go 實現(xiàn)一個支持任務(wù)下發(fā)的后臺服務(wù):Gin + Machinery v2 完整實戰(zhàn)
在日常開發(fā)當(dāng)中,我們經(jīng)常希望通過消息通信機制來異步執(zhí)行任務(wù),例如發(fā)郵件、生成報表、風(fēng)控計算等。這種場景中,使用“任務(wù)隊列”框架來解耦主業(yè)務(wù)流程是一種最佳實踐。
本文將帶你一步一步使用Go語言,結(jié)合Gin框架與Machinery v2實現(xiàn)一個完整的 “任務(wù)下發(fā)服務(wù)”,并支持通過REST API發(fā)起任務(wù)。廢話不多說,開始今天的內(nèi)容吧,let's Go!??!
核心目標(biāo)
項目開始前我們先設(shè)定一個小目標(biāo),具體項如下所示:
- 使用 Gin 提供一個 HTTP 接口,用于接收任務(wù)參數(shù)
- 使用 Machinery v2 執(zhí)行后臺任務(wù)(通過 Redis 通信)
- 使用消息隊列解耦 API 層與實際執(zhí)行邏輯
整體流程
用戶請求 -> Gin Server -> Redis Machinery -> Worker
用戶通過HTTP請求提交任務(wù)參數(shù),Gin服務(wù)將任務(wù)發(fā)送到Machinery任務(wù)隊列中,后續(xù)由Worker異步消費任務(wù)并執(zhí)行。
準(zhǔn)備工作
(1) 安裝 Redis
本地環(huán)境可以直接通過 Docker 運行:
docker run -d --name redis -p 6379:6379 redis
(2) 初始化 Go 項目
go mod init machinery-gin
go get github.com/gin-gonic/gin
go get github.com/RichardKnop/machinery/v2
(3) 創(chuàng)建對應(yīng)目錄和代碼文件
? machinery-gin tree .
.
├── cmd
│ ├── api
│ │ └── main.go
│ └── worker
│ └── main.go
├── config
│ └── config.go
├── controller
│ └── task_controller.go
├── go.mod
├── go.sum
├── router
│ └── router.go
├── scheduler
│ └── manager.go
├── service
│ └── task_service.go
└── tasks
├── handler.go
└── registry.go
10 directories, 11 files
核心模塊代碼實現(xiàn)
(1) config/config.go
package config
import "github.com/RichardKnop/machinery/v2/config"
func GetMachineryConfig() *config.Config {
return &config.Config{
Broker: "redis://localhost:6379",
DefaultQueue: "machinery_tasks",
ResultBackend: "redis://localhost:6379",
}
}
(2) tasks/handler.go
package tasks
import (
"fmt"
"time"
)
func PrintMessage(msg string) error {
fmt.Printf("?? Task Received: %s at %s\n", msg, time.Now().Format(time.RFC3339))
return nil
}
(3) tasks/registry.go
package tasks
import "github.com/RichardKnop/machinery/v2"
func RegisterTasks(server *machinery.Server) error {
return server.RegisterTasks(map[string]interface{}{
"print_message": PrintMessage,
})
}
(4) scheduler/manager.go
package scheduler
import (
"sync"
"time"
"github.com/RichardKnop/machinery/v2"
"github.com/RichardKnop/machinery/v2/tasks"
)
type ScheduledTask struct {
Name string
Interval time.Duration
Msg string
Paused bool
StopChan chan struct{}
}
var (
tasksMap = make(map[string]*ScheduledTask)
mu sync.Mutex
)
func AddScheduledTask(server *machinery.Server, name, msg string, interval time.Duration) {
mu.Lock()
defer mu.Unlock()
if _, exists := tasksMap[name]; exists {
return
}
t := &ScheduledTask{
Name: name,
Interval: interval,
Msg: msg,
StopChan: make(chan struct{}),
}
tasksMap[name] = t
go func(task *ScheduledTask) {
ticker := time.NewTicker(task.Interval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
if !task.Paused {
signature := &tasks.Signature{
Name: "print_message",
Args: []tasks.Arg{
{Type: "string", Value: task.Msg},
},
}
server.SendTask(signature)
}
case <-task.StopChan:
return
}
}
}(t)
}
func PauseTask(name string) {
mu.Lock()
defer mu.Unlock()
if task, ok := tasksMap[name]; ok {
task.Paused = true
}
}
func ResumeTask(name string) {
mu.Lock()
defer mu.Unlock()
if task, ok := tasksMap[name]; ok {
task.Paused = false
}
}
func StopTask(name string) {
mu.Lock()
defer mu.Unlock()
if task, ok := tasksMap[name]; ok {
close(task.StopChan)
delete(tasksMap, name)
}
}
(5) service/task_service.go
package service
import (
"time"
"github.com/RichardKnop/machinery/v2"
"machinery-gin/scheduler"
)
func ScheduleNewTask(server *machinery.Server, name, msg string, intervalSec int) {
scheduler.AddScheduledTask(server, name, msg, time.Duration(intervalSec)*time.Second)
}
func PauseScheduledTask(name string) {
scheduler.PauseTask(name)
}
func ResumeScheduledTask(name string) {
scheduler.ResumeTask(name)
}
func StopScheduledTask(name string) {
scheduler.StopTask(name)
}
(6) controller/task_controller.go
package controller
import (
"net/http"
"github.com/RichardKnop/machinery/v2"
"github.com/gin-gonic/gin"
"machinery-gin/service"
)
func TaskHandler(server *machinery.Server) gin.HandlerFunc {
return func(c *gin.Context) {
var req struct {
Name string `json:"name"`
Interval int `json:"interval"`
Msg string `json:"msg"`
}
if err := c.ShouldBindJSON(&req); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
return
}
service.ScheduleNewTask(server, req.Name, req.Msg, req.Interval)
c.JSON(http.StatusOK, gin.H{"status": "task scheduled"})
}
}
func PauseHandler() gin.HandlerFunc {
return func(c *gin.Context) {
name := c.Query("name")
service.PauseScheduledTask(name)
c.JSON(http.StatusOK, gin.H{"status": "paused"})
}
}
func ResumeHandler() gin.HandlerFunc {
return func(c *gin.Context) {
name := c.Query("name")
service.ResumeScheduledTask(name)
c.JSON(http.StatusOK, gin.H{"status": "resumed"})
}
}
func StopHandler() gin.HandlerFunc {
return func(c *gin.Context) {
name := c.Query("name")
service.StopScheduledTask(name)
c.JSON(http.StatusOK, gin.H{"status": "stopped"})
}
}
(7) router/router.go
package routes
import (
"github.com/RichardKnop/machinery/v2"
"github.com/gin-gonic/gin"
"machinery-gin/controller"
)
func SetupRouter(server *machinery.Server) *gin.Engine {
r := gin.Default()
r.POST("/task/start", controller.TaskHandler(server))
r.POST("/task/pause", controller.PauseHandler())
r.POST("/task/resume", controller.ResumeHandler())
r.POST("/task/stop", controller.StopHandler())
return r
}
(8) cmd/api/main.go (啟動Gin API服務(wù))
package main
import (
server "github.com/RichardKnop/machinery/v2"
redisbackend "github.com/RichardKnop/machinery/v2/backends/redis"
redisbroker "github.com/RichardKnop/machinery/v2/brokers/redis"
eagerlock "github.com/RichardKnop/machinery/v2/locks/eager"
"machinery-gin/config"
"machinery-gin/router"
"machinery-gin/tasks"
)
func main() {
cfg := config.GetMachineryConfig()
broker := redisbroker.New(cfg, "localhost:6379", "", "", 0)
backend := redisbackend.New(cfg, "localhost:6379", "", "", 0)
lock := eagerlock.New()
machineryServer := server.NewServer(cfg, broker, backend, lock)
_ = tasks.RegisterTasks(machineryServer)
r := routes.SetupRouter(machineryServer)
r.Run(":9311")
}
(9) cmd/worker/main.go (啟動Worker消費者)
package main
import (
server "github.com/RichardKnop/machinery/v2"
redisbackend "github.com/RichardKnop/machinery/v2/backends/redis"
redisbroker "github.com/RichardKnop/machinery/v2/brokers/redis"
eagerlock "github.com/RichardKnop/machinery/v2/locks/eager"
"machinery-gin/config"
"machinery-gin/tasks"
)
func main() {
cfg := config.GetMachineryConfig()
broker := redisbroker.New(cfg, "localhost:6379", "", "", 0)
backend := redisbackend.New(cfg, "localhost:6379", "", "", 0)
lock := eagerlock.New()
machineryServer := server.NewServer(cfg, broker, backend, lock)
_ = tasks.RegisterTasks(machineryServer)
worker := machineryServer.NewWorker("worker_name", 10)
_ = worker.Launch()
}
測試程序
啟動 API 服務(wù)和 Worker:
go run cmd/api/main.go
go run cmd/worker/main.go
測試命令如下所示:
? ~ curl -X POST http://localhost:9311/task/start -H 'Content-Type: application/json' -d '{
"name": "hello-task",
"interval": 5,
"msg": "Hello from Machinery"
}'
{"status":"task scheduled"}%
? ~ curl -X POST http://localhost:9311/task/pause\?name\=hello-task
{"status":"paused"}%
? ~ curl -X POST http://localhost:9311/task/resume\?name\=hello-task
{"status":"resumed"}%
? ~ curl -X POST http://localhost:9311/task/stop\?name\=hello-task
{"status":"stopped"}%
測試結(jié)果如下所示:
總結(jié)
我們已經(jīng)實現(xiàn)了任務(wù)的“下發(fā)與執(zhí)行”,“暫停/恢復(fù)”后續(xù)可以進一步擴展:
- 支持任務(wù)列表,任務(wù)詳情
- 支持“周期定時任務(wù)(調(diào)度器)”
- 支持任務(wù)執(zhí)行狀態(tài)查詢/UI管理面板