自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

Gin+robfig/cron/v3 實(shí)現(xiàn)任務(wù)調(diào)度系統(tǒng):定時(shí)、周期、立即執(zhí)行、重試與恢復(fù)全支持!

開發(fā) 后端
本文將手把手教你如何使用Go語言的Gin 框架+robfig/cron/v3實(shí)現(xiàn)一個(gè)自定義任務(wù)調(diào)度系統(tǒng)。

最近在開發(fā)自動(dòng)化平臺(tái)、漏洞工單系統(tǒng)、監(jiān)控平臺(tái)等后臺(tái)服務(wù)時(shí),我們經(jīng)常需要任務(wù)調(diào)度系統(tǒng),來定時(shí)執(zhí)行某些邏輯,比如周期性數(shù)據(jù)同步、定時(shí)告警、定時(shí)下發(fā)任務(wù)等。

本文將手把手教你如何使用Go語言的Gin 框架+robfig/cron/v3實(shí)現(xiàn)一個(gè)自定義任務(wù)調(diào)度系統(tǒng),支持以下核心功能:

  • 任務(wù)定時(shí)執(zhí)行
  • 周期執(zhí)行(Cron 表達(dá)式)
  • 立即執(zhí)行任務(wù)
  • 暫停/恢復(fù)任務(wù)
  • 重試任務(wù)

技術(shù)棧

  • Gin:HTTP 接口服務(wù)框架
  • robfig/cron v3:任務(wù)調(diào)度庫(kù),支持 Cron 表達(dá)式解析
  • 原生 Goroutine 和上下文管理:控制任務(wù)生命周期

項(xiàng)目結(jié)構(gòu)

做一個(gè)簡(jiǎn)單的demo項(xiàng)目,所以項(xiàng)目目錄結(jié)構(gòu)比較簡(jiǎn)單,具體如下所示:

?  robfig_cron_gin tree .
.
├── go.mod
├── go.sum
├── handler
│   └── task_handler.go
├── main.go
├── router
│   └── router.go
└── scheduler
    ├── scheduler.go
    └── task.go


4 directories, 7 files

初始化項(xiàng)目,并添加依賴,具體如下所示:

go get github.com/robfig/cron/v3
go get github.com/gin-gonic/gin

原理簡(jiǎn)析

(1) robfig/cron工作機(jī)制

cron/v3是Go最成熟的任務(wù)調(diào)度庫(kù)之一,支持標(biāo)準(zhǔn)Cron表達(dá)式(包括秒級(jí)),本質(zhì)上維護(hù)了一個(gè)調(diào)度器,通過 AddFunc() 添加任務(wù)后,每到設(shè)定的時(shí)間點(diǎn)自動(dòng)執(zhí)行。

(2) 自定義調(diào)度器

我們封裝了一個(gè) Scheduler:

  • 內(nèi)部持有 *cron.Cron
  • 每個(gè)任務(wù)用 map[string]EntryID 管理,支持增刪查
  • 支持RunNow() 手動(dòng)觸發(fā)
  • 支持暫停(remove)和恢復(fù)(重新 add)

代碼實(shí)現(xiàn)

(1) 定義任務(wù)結(jié)構(gòu)體scheduler/task.go

package scheduler


import "context"


type Task struct {
	ID       string
	Name     string
	Schedule string
	JobFunc  func(ctx context.Context)
	Retry    int
}

(2) 調(diào)度器核心邏輯scheduler/scheduler.go

package scheduler


import (
	"context"
	"sync"


	"github.com/robfig/cron/v3"
)


type Scheduler struct {
	c      *cron.Cron
	mu     sync.Mutex
	tasks  map[string]cron.EntryID
	funcs  map[string]*Task
	status map[string]string
}


func NewScheduler() *Scheduler {
	return &Scheduler{
		c:      cron.New(cron.WithSeconds()),
		tasks:  make(map[string]cron.EntryID),
		funcs:  make(map[string]*Task),
		status: make(map[string]string),
	}
}


func (s *Scheduler) Start() {
	s.c.Start()
}


func (s *Scheduler) Stop() {
	s.c.Stop()
}


func (s *Scheduler) AddTask(t *Task) error {
	s.mu.Lock()
	defer s.mu.Unlock()


	id, err := s.c.AddFunc(t.Schedule, func() {
		ctx := context.Background()
		t.JobFunc(ctx)
	})
	if err != nil {
		return err
	}


	s.tasks[t.ID] = id
	s.funcs[t.ID] = t
	s.status[t.ID] = "running"
	return nil
}


func (s *Scheduler) RunNow(taskID string) {
	s.mu.Lock()
	defer s.mu.Unlock()


	if t, ok := s.funcs[taskID]; ok {
		go t.JobFunc(context.Background())
	}
}


func (s *Scheduler) PauseTask(taskID string) {
	s.mu.Lock()
	defer s.mu.Unlock()


	if id, ok := s.tasks[taskID]; ok {
		s.c.Remove(id)
		s.status[taskID] = "paused"
	}
}


func (s *Scheduler) ResumeTask(taskID string) error {
	if t, ok := s.funcs[taskID]; ok {
		return s.AddTask(t)
	}
	return nil
}


func (s *Scheduler) RetryTask(taskID string) {
	s.RunNow(taskID) // 簡(jiǎn)單實(shí)現(xiàn),等同于立即執(zhí)行
}


func (s *Scheduler) Status(taskID string) string {
	s.mu.Lock()
	defer s.mu.Unlock()
	return s.status[taskID]
}

(3) 接口實(shí)現(xiàn)邏輯handler/task_handler.go

package handler


import (
	"context"
	"net/http"
	"robfig_cron_gin/scheduler"


	"github.com/gin-gonic/gin"
)


var sched *scheduler.Scheduler


func Init(s *scheduler.Scheduler) {
	sched = s
}


func AddTask(c *gin.Context) {
	var req struct {
		ID       string `json:"id"`
		Name     string `json:"name"`
		Schedule string `json:"schedule"`
	}
	if err := c.BindJSON(&req); err != nil {
		c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
		return
	}
	task := &scheduler.Task{
		ID:       req.ID,
		Name:     req.Name,
		Schedule: req.Schedule,
		JobFunc: func(ctx context.Context) {
			// 模擬任務(wù)執(zhí)行邏輯
			println("Task executed:", req.ID)
		},
	}
	err := sched.AddTask(task)
	if err != nil {
		c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
		return
	}
	c.JSON(http.StatusOK, gin.H{"message": "task added"})
}


func RunNow(c *gin.Context) {
	id := c.Param("id")
	sched.RunNow(id)
	c.JSON(http.StatusOK, gin.H{"message": "task executed now"})
}


func PauseTask(c *gin.Context) {
	id := c.Param("id")
	sched.PauseTask(id)
	c.JSON(http.StatusOK, gin.H{"message": "task paused"})
}


func ResumeTask(c *gin.Context) {
	id := c.Param("id")
	err := sched.ResumeTask(id)
	if err != nil {
		c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
		return
	}
	c.JSON(http.StatusOK, gin.H{"message": "task resumed"})
}

(4) Gin 路由配置router/router.go

package router


import (
	"robfig_cron_gin/handler"


	"github.com/gin-gonic/gin"
)


func SetupRouter() *gin.Engine {
	r := gin.Default()


	r.POST("/task", handler.AddTask)
	r.POST("/task/:id/run", handler.RunNow)
	r.POST("/task/:id/pause", handler.PauseTask)
	r.POST("/task/:id/resume", handler.ResumeTask)


	return r
}

(5) 服務(wù)入口main.go

package main


import (
	"robfig_cron_gin/handler"
	"robfig_cron_gin/router"
	"robfig_cron_gin/scheduler"
)


func main() {
	s := scheduler.NewScheduler()
	s.Start()
	defer s.Stop()


	handler.Init(s)


	r := router.SetupRouter()
	r.Run(":9311")
}

運(yùn)行并測(cè)試項(xiàng)目

運(yùn)行程序命令如下所示:

go run .

測(cè)試命令如下所示:

?  ~ curl -X POST http://localhost:9311/task \
  -H 'Content-Type: application/json' \
  -d '{"id":"task1", "name":"MyTask", "schedule":"*/10 * * * * *"}'
{"message":"task added"}%
?  ~ curl -X POST http://localhost:9311/task/task1/pause
{"message":"task paused"}%
?  ~ curl -X POST http://localhost:9311/task/task1/run


{"message":"task executed now"}%
?  ~ curl -X POST http://localhost:9311/task/task1/pause
{"message":"task paused"}%
?  ~ curl -X POST http://localhost:9311/task/task1/resume
{"message":"task resumed"}%

效果如下圖所示:

后續(xù)可擴(kuò)展方向

  • 支持任務(wù)持久化到數(shù)據(jù)庫(kù)(MySQL/Postgres)
  • 支持失敗重試策略(使用backoff、retry)
  • 支持任務(wù)執(zhí)行日志持久化 + WebSocket實(shí)時(shí)推送
  • 支持多實(shí)例集群調(diào)度,分布式鎖控制任務(wù)唯一執(zhí)行

總結(jié)

通過Gin + robfig/cron,我們實(shí)現(xiàn)了一個(gè)輕量、功能靈活的任務(wù)調(diào)度系統(tǒng)。結(jié)構(gòu)清晰、接口豐富,非常適合內(nèi)嵌進(jìn)后端服務(wù)系統(tǒng)中,如定時(shí)同步平臺(tái)、自動(dòng)化任務(wù)管理系統(tǒng)、CI/CD執(zhí)行器等。

責(zé)任編輯:趙寧寧 來源: 馬嘍編程筆記
相關(guān)推薦

2023-05-08 16:38:46

任務(wù)調(diào)度分布式任務(wù)調(diào)度

2024-05-13 09:49:30

.NETQuartz庫(kù)Cron表達(dá)式

2021-05-13 12:00:51

cron調(diào)度任務(wù)系統(tǒng)運(yùn)維

2010-01-07 13:24:22

Linux定時(shí)執(zhí)行工具

2023-12-26 07:44:00

Spring定時(shí)調(diào)度

2023-11-16 09:30:27

系統(tǒng)任務(wù)

2010-01-07 13:38:41

Linux定時(shí)任務(wù)

2025-02-03 10:00:00

DeepSeekChatGPT人工智能

2010-07-30 13:17:33

NFS V3

2025-01-06 08:53:37

2022-08-15 15:43:29

Linuxcron

2011-05-20 07:52:54

RADVISIONSCOPIA Mobi

2024-12-30 20:32:36

2023-03-01 09:39:40

調(diào)度系統(tǒng)

2009-05-25 09:43:59

ApusicOperaMasks金蝶中間件

2025-05-08 08:00:00

FastAPI開發(fā)異步定時(shí)

2009-08-13 09:07:36

Java多線程

2023-10-06 12:15:02

2014-05-16 11:21:37

OceanStorV3華為
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)