相信大家都知道發布 / 訂閱模式,開發者可以透過第三方開源工具像是 Redis, NSQ 或 Nats 等來實現訂閱機制,本篇則是會教大家如何用 Go 語言寫出一個單機版本的 Pub/Sub 模式,在單一系統內非常輕量級,且不需要靠第三方服務就可以輕易實現。底下會直接用單一訂閱 Topic 機制來撰寫 Publisher 及 Subscriber。
影片教學
其他線上課程請參考如下
Subscriber 訂閱訊息
首先第一步需要建立一個 Hub 用來接受多個 Subscriber,而這個 Hub structure 結構如下
1
2
3
4
5
6
7
8
9
10
| type hub struct {
sync.Mutex
subs map[*subscriber]struct{}
}
func newHub() *hub {
return &hub{
subs: map[*subscriber]struct{}{},
}
}
|
透過 newHub
用 map
方式初始化 subcribers,用 map 原因就是之後要實作 unsubscribe 會比較方便。接著建立 subscriber 結構
1
2
3
4
5
6
7
8
9
10
11
| type message struct {
data []byte
}
type subscriber struct {
sync.Mutex
name string
handler chan *message
quit chan struct{}
}
|
其中 name 代表此 subscriber 名稱,接著新增 run
函式,當成功 subscribe 後可以接受訊息
1
2
3
4
5
6
7
8
9
10
11
12
| func (s *subscriber) run(ctx context.Context) {
for {
select {
case msg := <-s.handler:
log.Println(s.name, string(msg.data))
case <-s.quit:
return
case <-ctx.Done():
return
}
}
}
|
透過 for 跟 select 來接受 channel 訊息。底下是初始化單一 Subscriber
1
2
3
4
5
6
7
| func newSubscriber(name string) *subscriber {
return &subscriber{
name: name,
handler: make(chan *message, 100),
quit: make(chan struct{}),
}
}
|
這邊需要注意,每個 subscriber 透過 buffer channel 來接受 Hub 傳送過來的訊息。請大家根據系統情境來決定是否調整 buffer 大小。初始化完成 subscriber,就需要將其丟進 Hub 內進行 subscribe。
1
2
3
4
5
6
7
8
9
| func (h *hub) subscribe(ctx context.Context, s *subscriber) error {
h.Lock()
h.subs[s] = struct{}{}
h.Unlock()
go s.run(ctx)
return nil
}
|
透過 map 將 subscriber 儲存起來,並透過 goroutine 方式將其丟到背景接受訊息。
Publisher 發送訊息
接下來透過 Publisher 將訊息收進來並丟到全部 Subscriber。上一個步驟已經看到 subscriber 實現了 run
函式接受 publisher 訊息。底下看看如何實現 publish 訊息機制
1
2
3
4
5
6
7
8
9
| func (h *hub) publish(ctx context.Context, msg *message) error {
h.Lock()
for s := range h.subs {
s.publish(ctx, msg)
}
h.Unlock()
return nil
}
|
透過 for 迴圈將所有 subscriber 讀出來並街訊息傳入即可。接著看一下如何實現 subscriber 的 publish 方法
1
2
3
4
5
6
7
8
| func (s *subscriber) publish(ctx context.Context, msg *message) {
select {
case <-ctx.Done():
return
case s.handler <- msg:
default:
}
}
|
這邊透過 select
方式來確保整個 main 不會被 block 住,假設訊息處理過慢,又不透過 select + default,則系統會被 block 住,故這邊大家要多注意。
Unsubscribe 取消訂閱
能訂閱就要能夠取消,也就是該如何從 map
正確移除 subscriber。在 hub 內實現 unsubscribe 功能
1
2
3
4
5
6
7
| func (h *hub) unsubscribe(ctx context.Context, s *subscriber) error {
h.Lock()
delete(h.subs, s)
h.Unlock()
close(s.quit)
return nil
}
|
除了透過 unsubscribe
之外,大家可以看到我們也支援了 context 方式來取消訂閱,故如果開發者執行 cancel(),理論上也要可以取消訂閱,這邊我們可以修改 subscribe
函式
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
| func (h *hub) subscribe(ctx context.Context, s *subscriber) error {
h.Lock()
h.subs[s] = struct{}{}
h.Unlock()
go func() {
select {
case <-s.quit:
case <-ctx.Done():
h.Lock()
delete(h.subs, s)
h.Unlock()
}
}()
go s.run(ctx)
return nil
}
|
請大家注意其中的 go func()
監聽了 ctx.Done()
,在程式任何地方執行了 cancel() 就可以刪除 subscriber 了,而在 subscriber 結構內有一個 quit
通道,用來讓手動 unsubscribe 後,可以關閉該 channel,讓原本的 goroutine 可以正常結束,不會造成系統 goroutine 持續變高。
實際範例
完成上述步驟後,打開 main.go 開始撰寫主程式。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
| package main
import (
"context"
"time"
)
func main() {
ctx := context.Background()
h := newHub()
sub01 := newSubscriber("sub01")
sub02 := newSubscriber("sub02")
sub03 := newSubscriber("sub03")
h.subscribe(ctx, sub01)
h.subscribe(ctx, sub02)
h.subscribe(ctx, sub03)
_ = h.publish(ctx, &message{data: []byte("test01")})
_ = h.publish(ctx, &message{data: []byte("test02")})
_ = h.publish(ctx, &message{data: []byte("test03")})
time.Sleep(1 * time.Second)
h.unsubscribe(ctx, sub03)
_ = h.publish(ctx, &message{data: []byte("test04")})
_ = h.publish(ctx, &message{data: []byte("test05")})
time.Sleep(1 * time.Second)
}
|
驗證看看出來的訊息是不是有按照我們的模式跑出結果,另外為了驗證全部的 goroutine 都可以正常關閉,用 go.uber.org/goleak
來撰寫測試驗證。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
| func TestMain(m *testing.M) {
goleak.VerifyTestMain(m)
}
func TestSubscriber(t *testing.T) {
ctx := context.Background()
h := newHub()
sub01 := newSubscriber("sub01")
sub02 := newSubscriber("sub02")
sub03 := newSubscriber("sub03")
h.subscribe(ctx, sub01)
h.subscribe(ctx, sub02)
h.subscribe(ctx, sub03)
assert.Equal(t, 3, h.subscribers())
h.unsubscribe(ctx, sub01)
h.unsubscribe(ctx, sub02)
h.unsubscribe(ctx, sub03)
assert.Equal(t, 0, h.subscribers())
}
|
測試使用 context 來 cancel subscriber
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
| func TestCancelSubscriber(t *testing.T) {
ctx := context.Background()
h := newHub()
sub01 := newSubscriber("sub01")
sub02 := newSubscriber("sub02")
sub03 := newSubscriber("sub03")
h.subscribe(ctx, sub01)
h.subscribe(ctx, sub02)
ctx03, cancel := context.WithCancel(ctx)
h.subscribe(ctx03, sub03)
assert.Equal(t, 3, h.subscribers())
// cancel subscriber 03
cancel()
time.Sleep(100 * time.Millisecond)
assert.Equal(t, 2, h.subscribers())
h.unsubscribe(ctx, sub01)
h.unsubscribe(ctx, sub02)
assert.Equal(t, 0, h.subscribers())
}
|
心得
大家可以發現在 Go 語言,透過簡單的 Buffer Channel 就可以實現 Pub/Sub 模式,可以根據 User 需要的情境,來決定是否導入第三方 Pub/Sub 工具,這種輕量級的寫法,相當方便。最後附上所有程式碼,希望對大家有幫助。
See also