func(cConsumer)startConsumer(ctxcontext.Context){for{select{casejob:=<-c.inputChan:ifctx.Err()!=nil{close(c.jobsChan)return}c.jobsChan<-jobcase<-ctx.Done():close(c.jobsChan)return}}}func(c*Consumer)worker(ctxcontext.Context,numint){log.Println("start the worker",num)for{select{casejob:=<-c.jobsChan:ifctx.Err()!=nil{log.Println("get next job",job,"and close the worker",num)return}c.process(num,job)case<-ctx.Done():log.Println("close the worker",num)return}}}
這邊要注意的是,當我們按下 ctrl+c 終止 worker 時,理論上會直接到 case <-ctx.Done() 但是實際狀況是有時候會直接在繼續讀取 channel 下一個值。這時候就需要在讀取 channel 後判斷 context 是否已經取消。在 main 最後通常會放一個 channel 來判斷是否需要中斷 main 函式。
funcmain(){finished:=make(chanbool)ctx:=withContextFunc(context.Background(),func(){log.Println("cancel from ctrl+c event")close(finished)})<-finished}
上述完成後,按下 ctrl + c 後,就可以直接執行 close channel,整個主程式都停止,但是這不是我們預期得結果,預期的是需要等到全部的 worker 把正在處理的 Job 完成後,才進行停止才是。
Graceful shutdown with worker
要用什麼方式才可以等到 worker 處理完畢後才結束 main 函式呢?這時候需要用到 sync.WaitGroup 了
func(c*Consumer)worker(ctxcontext.Context,numint,wg*sync.WaitGroup){deferwg.Done()log.Println("start the worker",num)for{select{casejob:=<-c.jobsChan:ifctx.Err()!=nil{log.Println("get next job",job,"and close the worker",num)return}c.process(num,job)case<-ctx.Done():log.Println("close the worker",num)return}}}
constpoolSize=2funcmain(){finished:=make(chanbool)wg:=&sync.WaitGroup{}wg.Add(poolSize)// create the consumerconsumer:=Consumer{inputChan:make(chanint,10),jobsChan:make(chanint,poolSize),}ctx:=withContextFunc(context.Background(),func(){log.Println("cancel from ctrl+c event")wg.Wait()close(finished)})fori:=0;i<poolSize;i++{goconsumer.worker(ctx,i,wg)}<-finishedlog.Println("Game over")}
packagemainimport("context""log""math/rand""os""os/signal""sync""syscall""time")// Consumer structtypeConsumerstruct{inputChanchanintjobsChanchanint}funcgetRandomTime()int{rand.Seed(time.Now().UnixNano())returnrand.Intn(10)}funcwithContextFunc(ctxcontext.Context,ffunc())context.Context{ctx,cancel:=context.WithCancel(ctx)gofunc(){c:=make(chanos.Signal)signal.Notify(c,syscall.SIGINT,syscall.SIGTERM)defersignal.Stop(c)select{case<-ctx.Done():case<-c:cancel()f()}}()returnctx}func(c*Consumer)queue(inputint)bool{select{casec.inputChan<-input:log.Println("already send input value:",input)returntruedefault:returnfalse}}func(cConsumer)startConsumer(ctxcontext.Context){for{select{casejob:=<-c.inputChan:ifctx.Err()!=nil{close(c.jobsChan)return}c.jobsChan<-jobcase<-ctx.Done():close(c.jobsChan)return}}}func(c*Consumer)process(num,jobint){n:=getRandomTime()log.Printf("Sleeping %d seconds...\n",n)time.Sleep(time.Duration(n)*time.Second)log.Println("worker:",num," job value:",job)}func(c*Consumer)worker(ctxcontext.Context,numint,wg*sync.WaitGroup){deferwg.Done()log.Println("start the worker",num)for{select{casejob:=<-c.jobsChan:ifctx.Err()!=nil{log.Println("get next job",job,"and close the worker",num)return}c.process(num,job)case<-ctx.Done():log.Println("close the worker",num)return}}}constpoolSize=2funcmain(){finished:=make(chanbool)wg:=&sync.WaitGroup{}wg.Add(poolSize)// create the consumerconsumer:=Consumer{inputChan:make(chanint,10),jobsChan:make(chanint,poolSize),}ctx:=withContextFunc(context.Background(),func(){log.Println("cancel from ctrl+c event")wg.Wait()close(finished)})fori:=0;i<poolSize;i++{goconsumer.worker(ctx,i,wg)}goconsumer.startConsumer(ctx)gofunc(){consumer.queue(1)consumer.queue(2)consumer.queue(3)consumer.queue(4)consumer.queue(5)}()<-finishedlog.Println("Game over")}