Skip to content

Commit

Permalink
feat: using etcd to manage nsq config (#5)
Browse files Browse the repository at this point in the history
* feat: using etcd to manage nsq config

* readme add more configure info
  • Loading branch information
JieTrancender authored May 18, 2022
1 parent c5c6703 commit 509ec03
Show file tree
Hide file tree
Showing 13 changed files with 1,360 additions and 15 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,5 @@ coverage.txt
logs/*.log
conf/dev.yaml
conf/production.yaml

.vscode
23 changes: 22 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# kbm-iam
# nsq-took-kit

This is a nsq tool kit project, can consume nsq messages and post to others.

Expand All @@ -19,10 +19,31 @@ Go to the project directory

Install dependencies

1. etcd
2. nsq
3. elasticsearch
4. go dependencies
```bash
make tidy
```

Configure Etcd
```base
{
"lookupd-http-addresses":[
"http://127.0.0.1:4161"
],
"topics":[
"dev_test"
],
"channel": "nsq_tool_kit",
"dial-timeout": 6,
"read-timeout": 60,
"write-timeout": 6,
"max-in-flight": 200
}
```

Start the server

```bash
Expand Down
12 changes: 12 additions & 0 deletions conf/nsq-consumer.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,15 @@ nsq:
read-timeout: 60 #second
write-timeout: 6 # second
max-in-flight: 200

etcd:
endpoints:
- 127.0.0.1:2379
timeout: 6
request-time: 6
lease-expire: 60
Username: root
Password: 123456
UseTLS: false
namespace: /nsq_tool_kit
path: dev_test
22 changes: 18 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,33 @@ go 1.17
require (
github.com/jehiah/go-strftime v0.0.0-20171201141054-1d33003b3869
github.com/marmotedu/component-base v1.6.2
github.com/marmotedu/errors v1.0.2
github.com/marmotedu/iam v1.6.2
github.com/nsqio/go-nsq v1.1.0
github.com/olivere/elastic/v7 v7.0.32
github.com/spf13/pflag v1.0.5
go.etcd.io/etcd/api/v3 v3.5.4
go.etcd.io/etcd/client/v3 v3.5.4
google.golang.org/grpc v1.46.2
)

require (
github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1 // indirect
github.com/coreos/go-semver v0.3.0 // indirect
github.com/coreos/go-systemd/v22 v22.3.2 // indirect
github.com/fatih/color v1.13.0 // indirect
github.com/fsnotify/fsnotify v1.5.1 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/golang/snappy v0.0.3 // indirect
github.com/gosuri/uitable v0.0.4 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
github.com/inconshreveable/mousetrap v1.0.0 // indirect
github.com/josharian/intern v1.0.0 // indirect
github.com/json-iterator/go v1.1.11 // indirect
github.com/kr/pretty v0.3.0 // indirect
github.com/magiconair/properties v1.8.5 // indirect
github.com/mailru/easyjson v0.7.7 // indirect
github.com/marmotedu/errors v1.0.2 // indirect
github.com/marmotedu/log v0.0.1 // indirect
github.com/mattn/go-colorable v0.1.9 // indirect
github.com/mattn/go-isatty v0.0.14 // indirect
Expand All @@ -34,18 +43,23 @@ require (
github.com/pelletier/go-toml v1.9.4 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/rivo/uniseg v0.1.0 // indirect
github.com/rogpeppe/go-internal v1.8.0 // indirect
github.com/spf13/afero v1.6.0 // indirect
github.com/spf13/cast v1.4.1 // indirect
github.com/spf13/cobra v1.2.1 // indirect
github.com/spf13/jwalterweatherman v1.1.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/spf13/viper v1.9.0 // indirect
github.com/subosito/gotenv v1.2.0 // indirect
go.etcd.io/etcd/client/pkg/v3 v3.5.4 // indirect
go.uber.org/atomic v1.7.0 // indirect
go.uber.org/multierr v1.6.0 // indirect
go.uber.org/zap v1.19.1 // indirect
golang.org/x/sys v0.0.0-20211020064051-0ec99a608a1b // indirect
golang.org/x/text v0.3.6 // indirect
golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd // indirect
golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e // indirect
golang.org/x/text v0.3.7 // indirect
google.golang.org/genproto v0.0.0-20210828152312-66f60bf46e71 // indirect
google.golang.org/protobuf v1.27.1 // indirect
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect
gopkg.in/ini.v1 v1.63.2 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
k8s.io/klog v1.0.0 // indirect
Expand Down
891 changes: 886 additions & 5 deletions go.sum

Large diffs are not rendered by default.

59 changes: 54 additions & 5 deletions internal/nsqconsumer/manager.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package nsqconsumer

import (
"context"
"encoding/json"
"fmt"
"runtime"
"time"
Expand All @@ -13,6 +15,9 @@ import (
"github.com/JieTrancender/nsq-tool-kit/internal/nsqconsumer/config"
"github.com/JieTrancender/nsq-tool-kit/internal/nsqconsumer/message"
es "github.com/JieTrancender/nsq-tool-kit/internal/nsqconsumer/outputs/elasticsearch"
"github.com/JieTrancender/nsq-tool-kit/internal/nsqconsumer/store"
"github.com/JieTrancender/nsq-tool-kit/internal/nsqconsumer/store/etcd"
genericoptions "github.com/JieTrancender/nsq-tool-kit/internal/pkg/options"
)

type manager struct {
Expand All @@ -27,6 +32,8 @@ type manager struct {
topics map[string]*Consumer

msgChan chan *message.Message

storeIns store.Factory
}

func createConsumerManager(cfg *config.Config) (*manager, error) {
Expand All @@ -35,10 +42,6 @@ func createConsumerManager(cfg *config.Config) (*manager, error) {

nsqConfig := nsq.NewConfig()
nsqConfig.UserAgent = fmt.Sprintf("nsq-tool-kit/%s go-nsq/%s", "0.0.1", nsq.VERSION)
nsqConfig.DialTimeout = time.Duration(cfg.Nsq.DialTimeout) * time.Second
nsqConfig.ReadTimeout = time.Duration(cfg.Nsq.ReadTimeout) * time.Second
nsqConfig.WriteTimeout = time.Duration(cfg.Nsq.WriteTimeout) * time.Second
nsqConfig.MaxInFlight = cfg.Nsq.MaxInFlight
return &manager{
gs: gs,
cfg: cfg,
Expand All @@ -65,10 +68,52 @@ func (m *manager) initialize() error {
}
m.esClient = client

storeIns, err := etcd.GetEtcdFactoryOr(m.cfg.Etcd, nil)
if err != nil {
return err
}
store.SetClient(storeIns)

o, err := storeIns.Nsqs().Get(context.Background(), m.cfg.Etcd.Path)
if err != nil {
return err
}
m.cfg.Nsq = o

err = storeIns.Watch(context.Background(), "/nsq", m.updateNsqConfig)
if err != nil {
return err
}
m.storeIns = storeIns

return nil
}

func (m *manager) launch() error {
func (m *manager) updateNsqConfig(ctx context.Context, key, oldvalue, value []byte) {
log.Infof("manager update nsq conifg", string(key))
log.Infof("%s %s", string(key), m.storeIns.Nsqs().GetKey(m.cfg.Etcd.Path))
if string(key) == m.storeIns.Nsqs().GetKey(m.cfg.Etcd.Path) {
var o genericoptions.NsqOptions
if err := json.Unmarshal(value, &o); err != nil {
log.Errorf("failed to unmarshal to nsq options struct, data: %v", string(value))
return
}
m.cfg.Nsq = &o
m.updateTopics()
}
}

func (m *manager) updateTopics() {
// close cur consumers
for _, consumer := range m.topics {
consumer.Stop()
}

m.nsqConfig.DialTimeout = time.Duration(m.cfg.Nsq.DialTimeout) * time.Second
m.nsqConfig.ReadTimeout = time.Duration(m.cfg.Nsq.ReadTimeout) * time.Second
m.nsqConfig.WriteTimeout = time.Duration(m.cfg.Nsq.WriteTimeout) * time.Second
m.nsqConfig.MaxInFlight = m.cfg.Nsq.MaxInFlight

for _, topic := range m.cfg.Nsq.Topics {
log.Infof("launch topic %s", topic)
nsqConsumer, err := nsq.NewConsumer(topic, m.cfg.Nsq.Channel, m.nsqConfig)
Expand All @@ -90,6 +135,10 @@ func (m *manager) launch() error {
}
m.topics[topic] = consumer
}
}

func (m *manager) launch() error {
m.updateTopics()

stopCh := make(chan struct{})
if err := m.gs.Start(); err != nil {
Expand Down
3 changes: 3 additions & 0 deletions internal/nsqconsumer/nsqconsumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,16 @@ func (c *Consumer) HandleMessage(m *nsq.Message) error {
func (c *Consumer) Stop() {
c.consumer.Stop()
<-c.consumer.StopChan

close(c.done)
}

func (c *Consumer) Run(msgChan chan<- *message.Message) {
log.Infof("%s consumer running", c.topic)
for {
select {
case <-c.done:
log.Infof("Consumer %s done", c.topic)
return
case m := <-c.msgChan:
data := make(map[string]interface{})
Expand Down
3 changes: 3 additions & 0 deletions internal/nsqconsumer/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ type Options struct {
Log *log.Options `json:"log" mapstructure:"log"`
Elasticsearch *genericoptions.ElasticsearchOptions `json:"elasticsearch" mapstructure:"elasticsearch"`
Nsq *genericoptions.NsqOptions `json:"nsq" mapstructure:"nsq"`
Etcd *genericoptions.EtcdOptions `json:"etcd" mapstructure:"etcd"`
}

// NewOptions creates a new Options object with default parameters.
Expand All @@ -20,6 +21,7 @@ func NewOptions() *Options {
Log: log.NewOptions(),
Elasticsearch: genericoptions.NewElasticsearchOptions(),
Nsq: genericoptions.NewNsqOptionsOptions(),
Etcd: genericoptions.NewEtcdOptions(),
}

return &o
Expand All @@ -30,5 +32,6 @@ func (o *Options) Flags() (fss cliflag.NamedFlagSets) {
o.Log.AddFlags(fss.FlagSet("logs"))
o.Elasticsearch.AddFlags(fss.FlagSet("elasticsearch"))
o.Nsq.AddFlags(fss.FlagSet("nsq"))
o.Etcd.AddFlags(fss.FlagSet("etcd"))
return fss
}
Loading

0 comments on commit 509ec03

Please sign in to comment.