上一篇『系統設計: 如何取消正在執行的工作任務 (一)』教大家如何用 Go 語言實現 canceler
package 來紀錄及取消正在執行的任務。而本篇來實現上圖的 HTTP Server
及 Worker
程式碼,底下直接用 Gin 框架來快速實現 HTTP 兩個 Handle,分別是 Cancel Task
及 Watch Task
(如下圖標示的 1 跟 2)。
其中上圖綠色框框 1
是用來接收使用者想要取消的任務,而 2
是用來讓 worker 進行長連接,根據不同的情境可以設定不同的等待時間。大家可能會問,為什麼不讓 Server 主動通知 Worker 就可以了,先解釋這點,這邊我們可能要先假設 Worker 存在的環境是封閉的,不能任意架設服務,故需要主動向 HTTP Server 進行詢問。其中 HTTP Server 跟 Worker 中間可以透過 gRPC 或 RESTful 進行資料交換,本篇先以 RESTful 進行說明。
實現 HTTP 服務
先透過 Gin 框架簡單實現 HTTP 服務
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
| package main
import (
"context"
"net/http"
"time"
"github.com/gin-gonic/gin"
)
func main() {
r := gin.Default()
r.GET("/ping", func(c *gin.Context) {
c.JSON(http.StatusOK, gin.H{
"message": "pong",
})
})
r.Run() // listen and serve on 0.0.0.0:8080 (for windows "localhost:8080")
}
|
先來看看使用者取消任務 (ID 為 1234),服務端該如何實現?上一篇教學有實現了 schedule
package
1
2
3
4
5
6
7
8
9
10
11
| package schedule
type Engine struct {
*canceler
}
func New() *Engine {
return &Engine{
canceler: newCanceler(),
}
}
|
這時候只需要透過 New() 就可以。
1
2
| // initial schedule instance
s := schedule.New()
|
接著實現 Cancel Task Handler。
1
2
3
4
5
6
7
8
9
10
| r.GET("/cancel-task/:id", func(c *gin.Context) {
taskID := c.Param("id")
if err := s.Cancel(context.Background(), taskID); err != nil {
c.String(http.StatusInternalServerError, "crash")
return
}
c.String(http.StatusOK, "ok")
})
|
上述函式執行 Cancel 會將任務 ID 直接紀錄在 struct map 內,後續可以接著更新任務狀態為『取消』,這邊就看大家怎麼存任務,最直接的方式就是更新資料庫。接著時線 Watch Task
函式
要做到即時取消任務,故 Worker 需要長時間連接上 HTTP 服務,避免過多的請求,造成不必要的負擔,畢竟任務可能需要長時間去跑,時間設定太短,就會造成 HTTP Server 的負擔。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
| r.GET("/watch-task/:id", func(c *gin.Context) {
taskID := c.Param("id")
ctxDone, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
ok, _ := s.Cancelled(ctxDone, taskID)
if ok {
c.String(http.StatusOK, "true")
return
}
c.String(http.StatusOK, "false")
})
|
上述程式碼可以看到,透過宣告 context timeout 來決定需要等待多長的時間,超過時間後就回覆 false
,由 HTTP 服務端決定 Worker 連接上來後可以等待的時間。
實現 Worker 服務
簡單的 Worker 範例就是開幾個 Goroutine 去持續跟 HTTP 服務連線,直到收到 Cancel 事件。
先假設有兩個任務分別是 1234
及 5678
,當這兩個任務正在執行的時候,也順便開 goroutine 在背景跟 HTTP 服務連接,等到任務結束或收到 HTTP 端的中指請求,則結束各自的 Goroutine。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
| package main
import (
"fmt"
"io"
"net/http"
"os"
"sync"
)
func cancelTask(id string) []byte {
req, err := http.NewRequest(http.MethodGet, "http://localhost:8080/watch-task/"+id, nil)
if err != nil {
fmt.Printf("client: could not create request: %s\n", err)
os.Exit(1)
}
res, err := http.DefaultClient.Do(req)
if err != nil {
fmt.Printf("client: error making http request: %s\n", err)
os.Exit(1)
}
resBody, err := io.ReadAll(res.Body)
if err != nil {
fmt.Printf("client: could not read response body: %s\n", err)
os.Exit(1)
}
return resBody
}
func main() {
wg := &sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
for {
resp := string(cancelTask("1234"))
fmt.Println("task[1234]: cancel the task:", resp)
if resp == "true" {
fmt.Println("task[1234]: get cancel event and canceld the task")
return
}
}
}()
wg.Add(1)
go func() {
defer wg.Done()
for {
resp := string(cancelTask("5678"))
fmt.Println("task[5678]: cancel the task:", resp)
if resp == "true" {
fmt.Println("task[5678]: get cancel event and canceld the task")
return
}
}
}()
wg.Wait()
}
|
上面程式碼範例請不要直接套用在專案內,因為有些狀況尚未寫進去,像是一般 HTTP Client 不會用預設的 http.DefaultClient
,因為沒有設定 Timeout 會讓系統整個掛掉,另外 Goroutine 也沒有傳入 Context,會造成背景 Goroutine 都不會結束。這邊只是為了講解才這樣寫的。
整合測試
主要測試的目的就是,當第一個 worker 正在處理 1234
任務時,不知道什麼原因,突然跟 HTTP Server 失去連線 (上圖步驟二),此時如果步驟一收到使用者發送取消的請求,Worker 恢復連線後,要正確即時收到 Cancel 事件,才能完成取消任務。
心得
處理長時間的任務,可能會遇到底下問題 (如果不用 MQ 的話)
- 如何取得目前任務的狀態?
- 如何設定任務超時機制?
- 如何跨服務取消任務?
- 當 Worker 失去連線或不正常關閉,該如何讓 Task 可以重新執行?
- 當有多台 Server + 多台 Worker 時,該如何配送任務及取消任務?
團隊除了需要解決此架構之外,也把上述的機制也實現在 AWS SageMaker 上,打造 AWS MLOps 平台。本篇程式碼可以在這邊找到。