用 Go 語言實現固定大小的 Ring Buffer 資料結構

logo

Ring Buffer Queue 是固定大小記憶體的 FIFO (first in, first out) 資料結構,用來處理不同 Process 之前的資料交換。工作原理就是在一段連續固定大小區間的記憶體用 (head/tail) 兩個指針來決定現在要將資料放在哪個位置。本篇將帶大家用 Go 語言快速實現 Ring Buffer 資料結構。

使用時機

由於是固定大小的 Queue,在嵌入式系統領域非常好用,尤其時記憶體非常小的時候,使用上格外小心,因此開發者通常會宣告固定大小來儲存數據,而非使用動態分配,避免系統記憶體被用完。

必須要知道此資料結構也是有限制的,固定大小的 Queue,最終會遇到資料滿的時候該如何處理,有兩種方式,第一種就是將最新的資料拋棄,另一種就是將最新的資料繼續寫入舊的記憶體區段。

實作原理

Wiki 上面看到底下這張動畫圖

circular buffer

從上圖可以看到在資料結構內需要底下成員

  • buf: 內容儲存格式
  • capacity: 固定容量大小 (length)
  • head: 指向從 Buffer 內取出資料 (start)
  • tail: 指向從 Buffer 內儲存資料 (end)

API 實作細節需要底下四個函示

  • Enqueue: 將資料放入 Queue
  • Dequeue: 從 Queue 讀取資料
  • IsEmpty: 判斷 Queue 內是否有資料
  • IsFull: 判斷 Queue 是否已經滿了

如何實作

根據上述的資料結構,可以用 Go 語言宣告如下

1
2
3
4
5
6
7
8
9
type T interface{}

type CircularBuffer struct {
  sync.Mutex
  taskQueue []T
  capacity  int
  head      int
  tail      int
}

實作判斷 IsEmpty,是否為空,只要是 headtail 值相同的話,就是空的。資料結構初始化後,head 跟 tail 就都為零。

1
2
3
func (s *CircularBuffer) IsEmpty() bool {
  return s.head == s.tail
}

實作判斷 IsFull 是否已經滿了,無法在寫入資料,當有新資料寫入時,tail 就會 +1,而 head 則是維持不動。等到 Queue 寫入到最後一個空間後,tail + 1 就會等於 head 值,而由於是 Ring 結構,所以需要除以 capacity 取餘數。這邊請注意,由於要計算是否已經滿了,故需要浪費掉一個位置來確認。

1
2
3
func (s *CircularBuffer) IsFull() bool {
  return s.head == (s.tail+1)%s.capacity
}

假設 buffer size 為 4 好了

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
# 初始化
head: 0, tail: 0
# 寫入資料
head: 0, tail: 1
# 寫入資料
head: 0, tail: 2
# 寫入資料
head: 0, tail: 3
# Queue 裡面只有三個了
# IsFull() 已經為 true

接著看看如何實作 Enqueue 及 Dequeue

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
func (s *CircularBuffer) Enqueue(task T) error {
  if s.IsFull() {
    return errMaxCapacity
  }

  s.Lock()
  s.taskQueue[s.tail] = task
  s.tail = (s.tail + 1) % s.capacity
  s.Unlock()

  return nil
}

func (s *CircularBuffer) Dequeue() (T, error) {
  if s.IsEmpty() {
    return nil, queue.ErrNoTaskInQueue
  }

  s.Lock()
  data := s.taskQueue[s.head]
  s.head = (s.head + 1) % s.capacity
  s.Unlock()

  return data, nil
}

可以看到上述代碼實作方式相同,Enqueue 就是將 tail+1,而 Dequeue 就是將 head+1。最後補上 New 函式

1
2
3
4
5
6
7
8
func NewCircularBuffer(size int) *CircularBuffer {
  w := &CircularBuffer{
    taskQueue: make([]T, size),
    capacity:  size,
  }

  return w
}

驗證上述程式碼,底下是測試代碼

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
func TestQueue(t *testing.T) {
  size := 100
  q := NewCircularBuffer(size)

  for i := 0; i < (size - 1); i++ {
    assert.NoError(t, q.Enqueue(i+1))
  }
  // can't insert new data.
  assert.Error(t, q.Enqueue(0))
  assert.Equal(t, errFull, q.Enqueue(0))

  for i := 0; i < (size - 1); i++ {
    v, err := q.Dequeue()
    assert.Equal(t, i+1, v.(int))
    assert.NoError(t, err)
  }

  _, err := q.Dequeue()
  // no task
  assert.Error(t, err)
  assert.Equal(t, errNoTask, err)
}

上面可以看到先宣告 100 size 的 Slice,但是只能使用 99 大小,第 100 個要塞入就會出現錯誤。程式碼請參考,測試程式碼請參考這邊

解決浪費單一空間

上面有提到此 Ring buffer 需要浪費一個空間來計算 Empty 或 Full。來看看怎麼解決這問題,其實很簡單,在資料結構內多新增一個 full 變數來確認 Queue 是否為滿。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
type CircularBuffer struct {
  capacity  int
  head      int
  tail      int
+ full      bool
}

func (s *CircularBuffer) IsEmpty() bool {
- return s.head == s.tail
+ return s.head == s.tail && !s.full
}

func (s *CircularBuffer) IsFull() bool {
- return s.head == (s.tail+1)%s.capacity
+ return s.full
}

func (s *CircularBuffer) Enqueue(task T) error {
  s.Lock()
  s.taskQueue[s.tail] = task
  s.tail = (s.tail + 1) % s.capacity
+ s.full = s.head == s.tail
  s.Unlock()

  return nil
}

func (s *CircularBuffer) Dequeue() (T, error) {

  s.Lock()
  data := s.taskQueue[s.head]
+ s.full = false
  s.head = (s.head + 1) % s.capacity
  s.Unlock()

效能測試

測試代碼如下

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35

func BenchmarkCircularBufferEnqueueDequeue(b *testing.B) {
  q := NewCircularBuffer(b.N)

  b.ReportAllocs()
  b.ResetTimer()
  for i := 0; i < b.N; i++ {
    _ = q.Enqueue(i)
    _, _ = q.Dequeue()
  }
}

func BenchmarkCircularBufferEnqueue(b *testing.B) {
  q := NewCircularBuffer(b.N)

  b.ReportAllocs()
  b.ResetTimer()
  for i := 0; i < b.N; i++ {
    _ = q.Enqueue(i)
  }
}

func BenchmarkCircularBufferDequeue(b *testing.B) {
  q := NewCircularBuffer(b.N)

  for i := 0; i < b.N; i++ {
    _ = q.Enqueue(i)
  }

  b.ReportAllocs()
  b.ResetTimer()
  for i := 0; i < b.N; i++ {
    _, _ = q.Dequeue()
  }
}

透過 GitHub Action 測試結果如下

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
goos: linux
goarch: amd64
pkg: github.com/go-training/training/example52-ring-buffer-queue
cpu: Intel(R) Xeon(R) Platinum 8370C CPU @ 2.80GHz
BenchmarkCircularBufferEnqueueDequeue
BenchmarkCircularBufferEnqueueDequeue-2     21792045          57.14 ns/op         7 B/op         0 allocs/op
BenchmarkCircularBufferEnqueue
BenchmarkCircularBufferEnqueue-2            34254517          37.20 ns/op         7 B/op         0 allocs/op
BenchmarkCircularBufferDequeue
BenchmarkCircularBufferDequeue-2            54213112          22.14 ns/op         0 B/op         0 allocs/op

最後程式碼請參考這邊,測試代碼請參考這裡