// Worker for NSQtypeWorkerstruct{q*nsq.Consumerp*nsq.ProducerstartOncesync.OncestopOncesync.Oncestopchanstruct{}maxInFlightintaddrstringtopicstringchannelstringrunFuncfunc(context.Context,queue.QueuedMessage)errorloggerqueue.LoggerstopFlagint32startFlagint32busyWorkersuint64}
// Run start the workerfunc(w*Worker)Run()error{wg:=&sync.WaitGroup{}panicChan:=make(chaninterface{},1)w.q.AddHandler(nsq.HandlerFunc(func(msg*nsq.Message)error{wg.Add(1)deferfunc(){wg.Done()ifp:=recover();p!=nil{panicChan<-p}}()iflen(msg.Body)==0{// Returning nil will automatically send a FIN command to NSQ to mark the message as processed.// In this case, a message with an empty body is simply ignored/discarded.returnnil}vardataqueue.Job_=json.Unmarshal(msg.Body,&data)returnw.handle(data)}))// wait close signalselect{case<-w.stop:caseerr:=<-panicChan:w.logger.Error(err)}// wait job completedwg.Wait()returnnil}
w.q.AddHandler(nsq.HandlerFunc(func(msg *nsq.Message) error {
wg.Add(1)
defer func() {
wg.Done()
if p := recover(); p != nil {
panicChan <- p
}
}()
+ // re-queue the job if worker has been shutdown.
+ if atomic.LoadInt32(&w.stopFlag) == 1 {
+ msg.Requeue(-1)
+ return nil
+ }
if len(msg.Body) == 0 {
// Returning nil will automatically send a FIN command to NSQ to mark the message as processed.
// In this case, a message with an empty body is simply ignored/discarded.
return nil
}
var data queue.Job
_ = json.Unmarshal(msg.Body, &data)
return w.handle(data)
}))