自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

用 Go 實現(xiàn)一個支持任務(wù)下發(fā)的后臺服務(wù):Gin + Machinery v2 完整實戰(zhàn)

開發(fā) 后端
本文將帶你一步一步使用Go語言,結(jié)合Gin框架與Machinery v2實現(xiàn)一個完整的 “任務(wù)下發(fā)服務(wù)”,并支持通過REST API發(fā)起任務(wù)。

在日常開發(fā)當(dāng)中,我們經(jīng)常希望通過消息通信機制來異步執(zhí)行任務(wù),例如發(fā)郵件、生成報表、風(fēng)控計算等。這種場景中,使用“任務(wù)隊列”框架來解耦主業(yè)務(wù)流程是一種最佳實踐。

本文將帶你一步一步使用Go語言,結(jié)合Gin框架與Machinery v2實現(xiàn)一個完整的 “任務(wù)下發(fā)服務(wù)”,并支持通過REST API發(fā)起任務(wù)。廢話不多說,開始今天的內(nèi)容吧,let's Go!??!

核心目標(biāo)

項目開始前我們先設(shè)定一個小目標(biāo),具體項如下所示:

  • 使用 Gin 提供一個 HTTP 接口,用于接收任務(wù)參數(shù)
  • 使用 Machinery v2 執(zhí)行后臺任務(wù)(通過 Redis 通信)
  • 使用消息隊列解耦 API 層與實際執(zhí)行邏輯

整體流程

用戶請求 -> Gin Server -> Redis Machinery -> Worker

用戶通過HTTP請求提交任務(wù)參數(shù),Gin服務(wù)將任務(wù)發(fā)送到Machinery任務(wù)隊列中,后續(xù)由Worker異步消費任務(wù)并執(zhí)行。

準(zhǔn)備工作

(1) 安裝 Redis

本地環(huán)境可以直接通過 Docker 運行:

docker run -d --name redis -p 6379:6379 redis

(2) 初始化 Go 項目

go mod init machinery-gin
go get github.com/gin-gonic/gin
go get github.com/RichardKnop/machinery/v2

(3) 創(chuàng)建對應(yīng)目錄和代碼文件

?  machinery-gin tree .
.
├── cmd
│   ├── api
│   │   └── main.go
│   └── worker
│       └── main.go
├── config
│   └── config.go
├── controller
│   └── task_controller.go
├── go.mod
├── go.sum
├── router
│   └── router.go
├── scheduler
│   └── manager.go
├── service
│   └── task_service.go
└── tasks
    ├── handler.go
    └── registry.go


10 directories, 11 files

核心模塊代碼實現(xiàn)

(1) config/config.go

package config


import "github.com/RichardKnop/machinery/v2/config"


func GetMachineryConfig() *config.Config {
	return &config.Config{
		Broker:        "redis://localhost:6379",
		DefaultQueue:  "machinery_tasks",
		ResultBackend: "redis://localhost:6379",
	}
}

(2) tasks/handler.go

package tasks


import (
	"fmt"
	"time"
)


func PrintMessage(msg string) error {
	fmt.Printf("?? Task Received: %s at %s\n", msg, time.Now().Format(time.RFC3339))
	return nil
}

(3) tasks/registry.go

package tasks


import "github.com/RichardKnop/machinery/v2"


func RegisterTasks(server *machinery.Server) error {
	return server.RegisterTasks(map[string]interface{}{
		"print_message": PrintMessage,
	})
}

(4) scheduler/manager.go

package scheduler


import (
	"sync"
	"time"


	"github.com/RichardKnop/machinery/v2"
	"github.com/RichardKnop/machinery/v2/tasks"
)


type ScheduledTask struct {
	Name     string
	Interval time.Duration
	Msg      string
	Paused   bool
	StopChan chan struct{}
}


var (
	tasksMap = make(map[string]*ScheduledTask)
	mu       sync.Mutex
)


func AddScheduledTask(server *machinery.Server, name, msg string, interval time.Duration) {
	mu.Lock()
	defer mu.Unlock()


	if _, exists := tasksMap[name]; exists {
		return
	}


	t := &ScheduledTask{
		Name:     name,
		Interval: interval,
		Msg:      msg,
		StopChan: make(chan struct{}),
	}
	tasksMap[name] = t


	go func(task *ScheduledTask) {
		ticker := time.NewTicker(task.Interval)
		defer ticker.Stop()


		for {
			select {
			case <-ticker.C:
				if !task.Paused {
					signature := &tasks.Signature{
						Name: "print_message",
						Args: []tasks.Arg{
							{Type: "string", Value: task.Msg},
						},
					}
					server.SendTask(signature)
				}
			case <-task.StopChan:
				return
			}
		}
	}(t)
}


func PauseTask(name string) {
	mu.Lock()
	defer mu.Unlock()
	if task, ok := tasksMap[name]; ok {
		task.Paused = true
	}
}


func ResumeTask(name string) {
	mu.Lock()
	defer mu.Unlock()
	if task, ok := tasksMap[name]; ok {
		task.Paused = false
	}
}


func StopTask(name string) {
	mu.Lock()
	defer mu.Unlock()
	if task, ok := tasksMap[name]; ok {
		close(task.StopChan)
		delete(tasksMap, name)
	}
}

(5) service/task_service.go

package service


import (
	"time"


	"github.com/RichardKnop/machinery/v2"
	"machinery-gin/scheduler"
)


func ScheduleNewTask(server *machinery.Server, name, msg string, intervalSec int) {
	scheduler.AddScheduledTask(server, name, msg, time.Duration(intervalSec)*time.Second)
}


func PauseScheduledTask(name string) {
	scheduler.PauseTask(name)
}


func ResumeScheduledTask(name string) {
	scheduler.ResumeTask(name)
}


func StopScheduledTask(name string) {
	scheduler.StopTask(name)
}

(6) controller/task_controller.go

package controller


import (
	"net/http"


	"github.com/RichardKnop/machinery/v2"
	"github.com/gin-gonic/gin"
	"machinery-gin/service"
)


func TaskHandler(server *machinery.Server) gin.HandlerFunc {
	return func(c *gin.Context) {
		var req struct {
			Name     string `json:"name"`
			Interval int    `json:"interval"`
			Msg      string `json:"msg"`
		}


		if err := c.ShouldBindJSON(&req); err != nil {
			c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
			return
		}


		service.ScheduleNewTask(server, req.Name, req.Msg, req.Interval)
		c.JSON(http.StatusOK, gin.H{"status": "task scheduled"})
	}
}


func PauseHandler() gin.HandlerFunc {
	return func(c *gin.Context) {
		name := c.Query("name")
		service.PauseScheduledTask(name)
		c.JSON(http.StatusOK, gin.H{"status": "paused"})
	}
}


func ResumeHandler() gin.HandlerFunc {
	return func(c *gin.Context) {
		name := c.Query("name")
		service.ResumeScheduledTask(name)
		c.JSON(http.StatusOK, gin.H{"status": "resumed"})
	}
}


func StopHandler() gin.HandlerFunc {
	return func(c *gin.Context) {
		name := c.Query("name")
		service.StopScheduledTask(name)
		c.JSON(http.StatusOK, gin.H{"status": "stopped"})
	}
}

(7) router/router.go

package routes


import (
	"github.com/RichardKnop/machinery/v2"
	"github.com/gin-gonic/gin"
	"machinery-gin/controller"
)


func SetupRouter(server *machinery.Server) *gin.Engine {
	r := gin.Default()


	r.POST("/task/start", controller.TaskHandler(server))
	r.POST("/task/pause", controller.PauseHandler())
	r.POST("/task/resume", controller.ResumeHandler())
	r.POST("/task/stop", controller.StopHandler())


	return r
}

(8) cmd/api/main.go (啟動Gin API服務(wù))

package main


import (
	server "github.com/RichardKnop/machinery/v2"
	redisbackend "github.com/RichardKnop/machinery/v2/backends/redis"
	redisbroker "github.com/RichardKnop/machinery/v2/brokers/redis"
	eagerlock "github.com/RichardKnop/machinery/v2/locks/eager"
	"machinery-gin/config"
	"machinery-gin/router"
	"machinery-gin/tasks"
)


func main() {
	cfg := config.GetMachineryConfig()


	broker := redisbroker.New(cfg, "localhost:6379", "", "", 0)
	backend := redisbackend.New(cfg, "localhost:6379", "", "", 0)
	lock := eagerlock.New()
	machineryServer := server.NewServer(cfg, broker, backend, lock)


	_ = tasks.RegisterTasks(machineryServer)
	r := routes.SetupRouter(machineryServer)
	r.Run(":9311")
}

(9) cmd/worker/main.go (啟動Worker消費者)

package main


import (
	server "github.com/RichardKnop/machinery/v2"
	redisbackend "github.com/RichardKnop/machinery/v2/backends/redis"
	redisbroker "github.com/RichardKnop/machinery/v2/brokers/redis"
	eagerlock "github.com/RichardKnop/machinery/v2/locks/eager"
	"machinery-gin/config"
	"machinery-gin/tasks"
)


func main() {
	cfg := config.GetMachineryConfig()
	broker := redisbroker.New(cfg, "localhost:6379", "", "", 0)
	backend := redisbackend.New(cfg, "localhost:6379", "", "", 0)
	lock := eagerlock.New()
	machineryServer := server.NewServer(cfg, broker, backend, lock)
	_ = tasks.RegisterTasks(machineryServer)


	worker := machineryServer.NewWorker("worker_name", 10)
	_ = worker.Launch()
}

測試程序

啟動 API 服務(wù)和 Worker:

go run cmd/api/main.go
go run cmd/worker/main.go

測試命令如下所示:

?  ~ curl -X POST http://localhost:9311/task/start -H 'Content-Type: application/json' -d '{
  "name": "hello-task",
  "interval": 5,
  "msg": "Hello from Machinery"
}'
{"status":"task scheduled"}%                                                       
 ?  ~ curl -X POST http://localhost:9311/task/pause\?name\=hello-task
{"status":"paused"}%
?  ~ curl -X POST http://localhost:9311/task/resume\?name\=hello-task
{"status":"resumed"}%                                                                 
?  ~ curl -X POST http://localhost:9311/task/stop\?name\=hello-task
{"status":"stopped"}%

測試結(jié)果如下所示:

總結(jié)

我們已經(jīng)實現(xiàn)了任務(wù)的“下發(fā)與執(zhí)行”,“暫停/恢復(fù)”后續(xù)可以進一步擴展:

  • 支持任務(wù)列表,任務(wù)詳情
  • 支持“周期定時任務(wù)(調(diào)度器)”
  • 支持任務(wù)執(zhí)行狀態(tài)查詢/UI管理面板
責(zé)任編輯:趙寧寧 來源: 馬嘍編程筆記
相關(guān)推薦

2022-05-22 13:55:30

Go 語言

2024-01-08 08:36:29

HTTPGo代理服務(wù)器

2024-01-02 13:58:04

GoREST API語言

2024-05-10 08:47:22

標(biāo)準(zhǔn)庫v2Go

2024-03-15 15:20:10

并發(fā)服務(wù)IP

2025-03-06 08:54:24

泛型類型MapGo1

2010-08-06 14:07:21

RIP V2

2010-08-05 17:00:04

RIP V2協(xié)議

2014-04-14 15:54:00

print()Web服務(wù)器

2022-03-06 19:57:50

狀態(tài)機easyfsm項目

2023-05-10 08:05:41

GoWeb應(yīng)用

2021-09-27 09:55:06

Chrome瀏覽器Manifest V2

2012-04-24 18:10:56

華為E5

2020-07-03 10:21:48

Go框架Docker

2017-05-08 15:00:20

H5代碼服務(wù)器

2021-08-23 15:14:09

Linuxat命令任務(wù)

2023-02-26 01:37:57

goORM代碼

2023-03-01 09:39:40

調(diào)度系統(tǒng)

2009-08-14 17:04:19

Windows后臺服務(wù)

2021-06-05 10:16:55

Linkerd 服務(wù)網(wǎng)格Kubernetes
點贊
收藏

51CTO技術(shù)棧公眾號