Gin+robfig/cron/v3 實(shí)現(xiàn)任務(wù)調(diào)度系統(tǒng):定時(shí)、周期、立即執(zhí)行、重試與恢復(fù)全支持!
最近在開發(fā)自動(dòng)化平臺(tái)、漏洞工單系統(tǒng)、監(jiān)控平臺(tái)等后臺(tái)服務(wù)時(shí),我們經(jīng)常需要任務(wù)調(diào)度系統(tǒng),來定時(shí)執(zhí)行某些邏輯,比如周期性數(shù)據(jù)同步、定時(shí)告警、定時(shí)下發(fā)任務(wù)等。
本文將手把手教你如何使用Go語言的Gin 框架+robfig/cron/v3實(shí)現(xiàn)一個(gè)自定義任務(wù)調(diào)度系統(tǒng),支持以下核心功能:
- 任務(wù)定時(shí)執(zhí)行
- 周期執(zhí)行(Cron 表達(dá)式)
- 立即執(zhí)行任務(wù)
- 暫停/恢復(fù)任務(wù)
- 重試任務(wù)
技術(shù)棧
- Gin:HTTP 接口服務(wù)框架
- robfig/cron v3:任務(wù)調(diào)度庫(kù),支持 Cron 表達(dá)式解析
- 原生 Goroutine 和上下文管理:控制任務(wù)生命周期
項(xiàng)目結(jié)構(gòu)
做一個(gè)簡(jiǎn)單的demo項(xiàng)目,所以項(xiàng)目目錄結(jié)構(gòu)比較簡(jiǎn)單,具體如下所示:
? robfig_cron_gin tree .
.
├── go.mod
├── go.sum
├── handler
│ └── task_handler.go
├── main.go
├── router
│ └── router.go
└── scheduler
├── scheduler.go
└── task.go
4 directories, 7 files
初始化項(xiàng)目,并添加依賴,具體如下所示:
go get github.com/robfig/cron/v3
go get github.com/gin-gonic/gin
原理簡(jiǎn)析
(1) robfig/cron工作機(jī)制
cron/v3是Go最成熟的任務(wù)調(diào)度庫(kù)之一,支持標(biāo)準(zhǔn)Cron表達(dá)式(包括秒級(jí)),本質(zhì)上維護(hù)了一個(gè)調(diào)度器,通過 AddFunc() 添加任務(wù)后,每到設(shè)定的時(shí)間點(diǎn)自動(dòng)執(zhí)行。
(2) 自定義調(diào)度器
我們封裝了一個(gè) Scheduler:
- 內(nèi)部持有 *cron.Cron
- 每個(gè)任務(wù)用 map[string]EntryID 管理,支持增刪查
- 支持RunNow() 手動(dòng)觸發(fā)
- 支持暫停(remove)和恢復(fù)(重新 add)
代碼實(shí)現(xiàn)
(1) 定義任務(wù)結(jié)構(gòu)體scheduler/task.go
package scheduler
import "context"
type Task struct {
ID string
Name string
Schedule string
JobFunc func(ctx context.Context)
Retry int
}
(2) 調(diào)度器核心邏輯scheduler/scheduler.go
package scheduler
import (
"context"
"sync"
"github.com/robfig/cron/v3"
)
type Scheduler struct {
c *cron.Cron
mu sync.Mutex
tasks map[string]cron.EntryID
funcs map[string]*Task
status map[string]string
}
func NewScheduler() *Scheduler {
return &Scheduler{
c: cron.New(cron.WithSeconds()),
tasks: make(map[string]cron.EntryID),
funcs: make(map[string]*Task),
status: make(map[string]string),
}
}
func (s *Scheduler) Start() {
s.c.Start()
}
func (s *Scheduler) Stop() {
s.c.Stop()
}
func (s *Scheduler) AddTask(t *Task) error {
s.mu.Lock()
defer s.mu.Unlock()
id, err := s.c.AddFunc(t.Schedule, func() {
ctx := context.Background()
t.JobFunc(ctx)
})
if err != nil {
return err
}
s.tasks[t.ID] = id
s.funcs[t.ID] = t
s.status[t.ID] = "running"
return nil
}
func (s *Scheduler) RunNow(taskID string) {
s.mu.Lock()
defer s.mu.Unlock()
if t, ok := s.funcs[taskID]; ok {
go t.JobFunc(context.Background())
}
}
func (s *Scheduler) PauseTask(taskID string) {
s.mu.Lock()
defer s.mu.Unlock()
if id, ok := s.tasks[taskID]; ok {
s.c.Remove(id)
s.status[taskID] = "paused"
}
}
func (s *Scheduler) ResumeTask(taskID string) error {
if t, ok := s.funcs[taskID]; ok {
return s.AddTask(t)
}
return nil
}
func (s *Scheduler) RetryTask(taskID string) {
s.RunNow(taskID) // 簡(jiǎn)單實(shí)現(xiàn),等同于立即執(zhí)行
}
func (s *Scheduler) Status(taskID string) string {
s.mu.Lock()
defer s.mu.Unlock()
return s.status[taskID]
}
(3) 接口實(shí)現(xiàn)邏輯handler/task_handler.go
package handler
import (
"context"
"net/http"
"robfig_cron_gin/scheduler"
"github.com/gin-gonic/gin"
)
var sched *scheduler.Scheduler
func Init(s *scheduler.Scheduler) {
sched = s
}
func AddTask(c *gin.Context) {
var req struct {
ID string `json:"id"`
Name string `json:"name"`
Schedule string `json:"schedule"`
}
if err := c.BindJSON(&req); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
return
}
task := &scheduler.Task{
ID: req.ID,
Name: req.Name,
Schedule: req.Schedule,
JobFunc: func(ctx context.Context) {
// 模擬任務(wù)執(zhí)行邏輯
println("Task executed:", req.ID)
},
}
err := sched.AddTask(task)
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
return
}
c.JSON(http.StatusOK, gin.H{"message": "task added"})
}
func RunNow(c *gin.Context) {
id := c.Param("id")
sched.RunNow(id)
c.JSON(http.StatusOK, gin.H{"message": "task executed now"})
}
func PauseTask(c *gin.Context) {
id := c.Param("id")
sched.PauseTask(id)
c.JSON(http.StatusOK, gin.H{"message": "task paused"})
}
func ResumeTask(c *gin.Context) {
id := c.Param("id")
err := sched.ResumeTask(id)
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
return
}
c.JSON(http.StatusOK, gin.H{"message": "task resumed"})
}
(4) Gin 路由配置router/router.go
package router
import (
"robfig_cron_gin/handler"
"github.com/gin-gonic/gin"
)
func SetupRouter() *gin.Engine {
r := gin.Default()
r.POST("/task", handler.AddTask)
r.POST("/task/:id/run", handler.RunNow)
r.POST("/task/:id/pause", handler.PauseTask)
r.POST("/task/:id/resume", handler.ResumeTask)
return r
}
(5) 服務(wù)入口main.go
package main
import (
"robfig_cron_gin/handler"
"robfig_cron_gin/router"
"robfig_cron_gin/scheduler"
)
func main() {
s := scheduler.NewScheduler()
s.Start()
defer s.Stop()
handler.Init(s)
r := router.SetupRouter()
r.Run(":9311")
}
運(yùn)行并測(cè)試項(xiàng)目
運(yùn)行程序命令如下所示:
go run .
測(cè)試命令如下所示:
? ~ curl -X POST http://localhost:9311/task \
-H 'Content-Type: application/json' \
-d '{"id":"task1", "name":"MyTask", "schedule":"*/10 * * * * *"}'
{"message":"task added"}%
? ~ curl -X POST http://localhost:9311/task/task1/pause
{"message":"task paused"}%
? ~ curl -X POST http://localhost:9311/task/task1/run
{"message":"task executed now"}%
? ~ curl -X POST http://localhost:9311/task/task1/pause
{"message":"task paused"}%
? ~ curl -X POST http://localhost:9311/task/task1/resume
{"message":"task resumed"}%
效果如下圖所示:
后續(xù)可擴(kuò)展方向
- 支持任務(wù)持久化到數(shù)據(jù)庫(kù)(MySQL/Postgres)
- 支持失敗重試策略(使用backoff、retry)
- 支持任務(wù)執(zhí)行日志持久化 + WebSocket實(shí)時(shí)推送
- 支持多實(shí)例集群調(diào)度,分布式鎖控制任務(wù)唯一執(zhí)行
總結(jié)
通過Gin + robfig/cron,我們實(shí)現(xiàn)了一個(gè)輕量、功能靈活的任務(wù)調(diào)度系統(tǒng)。結(jié)構(gòu)清晰、接口豐富,非常適合內(nèi)嵌進(jìn)后端服務(wù)系統(tǒng)中,如定時(shí)同步平臺(tái)、自動(dòng)化任務(wù)管理系統(tǒng)、CI/CD執(zhí)行器等。