Skip to content

Commit

Permalink
fix: exit wrong (#34)
Browse files Browse the repository at this point in the history
  • Loading branch information
JTrancender authored Aug 27, 2021
1 parent 7b6c72b commit 9c837e7
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 6 deletions.
14 changes: 9 additions & 5 deletions consumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,10 +103,6 @@ func (nc *NSQConsumer) Run(c *consumer.ConsumerEntity) error {

_ = nc.pipeline.Close()

for _, consumer := range nc.topics {
close(consumer.done)
}

nc.wg.Wait()
return nil
}
Expand Down Expand Up @@ -166,6 +162,13 @@ func (nc *NSQConsumer) start() {
func (nc *NSQConsumer) Stop() {
logp.L().Info("Stopping nsq consumer")

// need close consumer first and then close others
for _, consumer := range nc.topics {
// close(consumer.done)
consumer.Close()
close(consumer.done)
}

// Stop nsq consumer
close(nc.done)
}
Expand Down Expand Up @@ -200,14 +203,15 @@ func newNsqConsumer(opts *Options, topic string, cfg *nsq.Config, etcdConfig *et
func (c Consumer) HandleMessage(m *nsq.Message) error {
m.DisableAutoResponse()
c.msgChan <- m

return nil
}

func (c *Consumer) router() {
for {
select {
case <-c.done:
c.Close()
// c.Close()
return
case m := <-c.msgChan:
c.queue <- &Message{
Expand Down
7 changes: 7 additions & 0 deletions libconsumer/cmd/instance/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,13 @@ func (c *Consumer) InitWithSettings(settings Settings) error {
}

func (c *Consumer) launch(settings Settings, ct consumer.Creator) error {
// todo: pprof
// 优先启动pprof
// pprofHandler := http.NewServeMux()
// pprofHandler.Handle("/debug/pprof/", http.HandlerFunc(pprof.Index))
// server := &http.Server{Addr: ":7070", Handler: pprofHandler}
// go server.ListenAndServe()

defer func() {
_ = logp.Sync()
}()
Expand Down
2 changes: 1 addition & 1 deletion libconsumer/publisher/pipeline/output.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func (w *clientWorker) run() {
for {
select {
case <-w.done:
w.logger.Info("clientWorker#run accep done signal")
w.logger.Info("clientWorker#run accept done signal")
return
case m := <-w.msgChan:
if err := w.client.Publish(context.TODO(), m); err != nil {
Expand Down

0 comments on commit 9c837e7

Please sign in to comment.