Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: consume nsq messages and post to elasticsearch #1

Merged
merged 10 commits into from
May 17, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,7 @@
build

coverage.txt

logs/*.log
conf/dev.yaml
conf/production.yaml
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
.PHONY: all
all: build

ROOT_PACKAGE=github.com/JieTrancender/kbm-iam
ROOT_PACKAGE=github.com/JieTrancender/nsq-tool-kit
VERSION_PACKAGE=github.com/marmotedu/component-base/pkg/version

include scripts/make-rules/common.mk
Expand Down
4 changes: 2 additions & 2 deletions cmd/iam-apiserver/apiserver.go → cmd/nsq-consumer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"runtime"
"time"

"github.com/JieTrancender/kbm-iam/internal/apiserver"
"github.com/JieTrancender/nsq-tool-kit/internal/nsqconsumer"
)

func main() {
Expand All @@ -15,5 +15,5 @@ func main() {
runtime.GOMAXPROCS(runtime.NumCPU())
}

apiserver.NewApp("iam-apiserver").Run()
nsqconsumer.NewApp("nsq-consumer").Run()
}
27 changes: 27 additions & 0 deletions conf/nsq-consumer.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
log:
name: nsq-consumer # Logger的名字
development: true # 是否是开发模式。如果是开发模式,会对DPanicLevel进行堆栈跟踪。
level: debug # 日志级别,优先级从低到高依次为:debug, info, warn, error, dpanic, panic, fatal。
format: console # 支持的日志输出格式,目前支持console和json两种。console其实就是text格式。
enable-color: true # 是否开启颜色输出,true:是,false:否
disable-caller: false # 是否开启 caller,如果开启会在日志中显示调用日志所在的文件、函数和行号
disable-stacktrace: false # 是否再panic及以上级别禁止打印堆栈信息
output-paths: logs/nsq-consumer.log,stdout # 多个输出,逗号分开。stdout:标准输出,
error-output-paths: logs/nsq-consumer.error.log # zap内部(非业务)错误日志输出路径,多个输出,逗号分开

elasticsearch:
addrs:
- http://127.0.0.1:9200
username: root
password: 123456

nsq:
lookupd-http-addresses:
- http://127.0.0.1:4161
topics:
- dev_test
channel: nsq_tool_kit
dial-timeout: 6 #second
read-timeout: 60 #second
write-timeout: 6 # second
max-in-flight: 200
56 changes: 48 additions & 8 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,12 +1,52 @@
module github.com/JieTrancender/kbm-iam
module github.com/JieTrancender/nsq-tool-kit

go 1.16
go 1.17

require (
github.com/fatih/color v1.13.0
github.com/marmotedu/component-base v1.0.1
github.com/marmotedu/errors v1.0.2
github.com/spf13/cobra v1.2.1
github.com/spf13/pflag v1.0.5
github.com/spf13/viper v1.9.0
github.com/jehiah/go-strftime v0.0.0-20171201141054-1d33003b3869
github.com/marmotedu/component-base v1.6.2
github.com/marmotedu/iam v1.6.2
github.com/nsqio/go-nsq v1.1.0
github.com/olivere/elastic/v7 v7.0.32
)

require (
github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1 // indirect
github.com/fatih/color v1.13.0 // indirect
github.com/fsnotify/fsnotify v1.5.1 // indirect
github.com/golang/snappy v0.0.3 // indirect
github.com/gosuri/uitable v0.0.4 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
github.com/inconshreveable/mousetrap v1.0.0 // indirect
github.com/josharian/intern v1.0.0 // indirect
github.com/json-iterator/go v1.1.11 // indirect
github.com/magiconair/properties v1.8.5 // indirect
github.com/mailru/easyjson v0.7.7 // indirect
github.com/marmotedu/errors v1.0.2 // indirect
github.com/marmotedu/log v0.0.1 // indirect
github.com/mattn/go-colorable v0.1.9 // indirect
github.com/mattn/go-isatty v0.0.14 // indirect
github.com/mattn/go-runewidth v0.0.10 // indirect
github.com/mitchellh/mapstructure v1.4.2 // indirect
github.com/moby/term v0.0.0-20210619224110-3f7ff695adc6 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.1 // indirect
github.com/pelletier/go-toml v1.9.4 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/rivo/uniseg v0.1.0 // indirect
github.com/spf13/afero v1.6.0 // indirect
github.com/spf13/cast v1.4.1 // indirect
github.com/spf13/cobra v1.2.1 // indirect
github.com/spf13/jwalterweatherman v1.1.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/spf13/viper v1.9.0 // indirect
github.com/subosito/gotenv v1.2.0 // indirect
go.uber.org/atomic v1.7.0 // indirect
go.uber.org/multierr v1.6.0 // indirect
go.uber.org/zap v1.19.1 // indirect
golang.org/x/sys v0.0.0-20211020064051-0ec99a608a1b // indirect
golang.org/x/text v0.3.6 // indirect
gopkg.in/ini.v1 v1.63.2 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
k8s.io/klog v1.0.0 // indirect
)
70 changes: 33 additions & 37 deletions go.sum

Large diffs are not rendered by default.

29 changes: 0 additions & 29 deletions internal/apiserver/app.go

This file was deleted.

21 changes: 0 additions & 21 deletions internal/apiserver/options/options.go

This file was deleted.

9 changes: 0 additions & 9 deletions internal/apiserver/run.go

This file was deleted.

38 changes: 38 additions & 0 deletions internal/nsqconsumer/app.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package nsqconsumer

import (
"github.com/marmotedu/iam/pkg/app"
"github.com/marmotedu/iam/pkg/log"

"github.com/JieTrancender/nsq-tool-kit/internal/nsqconsumer/config"
"github.com/JieTrancender/nsq-tool-kit/internal/nsqconsumer/options"
)

const commandDesc = `Consumers nsq messages and pubs to outputs.`

// NewApp creates a App object with default parameters.
func NewApp(basename string) *app.App {
opts := options.NewOptions()
application := app.NewApp("nsq consumer",
basename,
app.WithOptions(opts),
app.WithDescription(commandDesc),
app.WithDefaultValidArgs(),
app.WithRunFunc(run(opts)),
)

return application
}

func run(opts *options.Options) app.RunFunc {
return func(basename string) error {
log.Init(opts.Log)
defer log.Flush()
cfg, err := config.CreateConfigFromOptions(opts)
if err != nil {
return err
}

return Run(cfg)
}
}
13 changes: 13 additions & 0 deletions internal/nsqconsumer/config/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package config

import (
"github.com/JieTrancender/nsq-tool-kit/internal/nsqconsumer/options"
)

type Config struct {
*options.Options
}

func CreateConfigFromOptions(opts *options.Options) (*Config, error) {
return &Config{opts}, nil
}
138 changes: 138 additions & 0 deletions internal/nsqconsumer/manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
package nsqconsumer

import (
"fmt"
"runtime"
"time"

"github.com/marmotedu/iam/pkg/log"
"github.com/marmotedu/iam/pkg/shutdown"
"github.com/marmotedu/iam/pkg/shutdown/shutdownmanagers/posixsignal"
"github.com/nsqio/go-nsq"

"github.com/JieTrancender/nsq-tool-kit/internal/nsqconsumer/config"
"github.com/JieTrancender/nsq-tool-kit/internal/nsqconsumer/message"
es "github.com/JieTrancender/nsq-tool-kit/internal/nsqconsumer/outputs/elasticsearch"
)

type manager struct {
gs *shutdown.GracefulShutdown
cfg *config.Config

esConfig *es.Config
esClient *es.Client

nsqConfig *nsq.Config

topics map[string]*Consumer

msgChan chan *message.Message
}

func createConsumerManager(cfg *config.Config) (*manager, error) {
gs := shutdown.New()
gs.AddShutdownManager(posixsignal.NewPosixSignalManager())

nsqConfig := nsq.NewConfig()
nsqConfig.UserAgent = fmt.Sprintf("nsq-tool-kit/%s go-nsq/%s", "0.0.1", nsq.VERSION)
nsqConfig.DialTimeout = time.Duration(cfg.Nsq.DialTimeout) * time.Second
nsqConfig.ReadTimeout = time.Duration(cfg.Nsq.ReadTimeout) * time.Second
nsqConfig.WriteTimeout = time.Duration(cfg.Nsq.WriteTimeout) * time.Second
nsqConfig.MaxInFlight = cfg.Nsq.MaxInFlight
return &manager{
gs: gs,
cfg: cfg,
esConfig: &es.Config{
Addrs: cfg.Elasticsearch.Addrs,
Username: cfg.Elasticsearch.Username,
Password: cfg.Elasticsearch.Password,
},
nsqConfig: nsqConfig,
topics: make(map[string]*Consumer),
}, nil
}

func (m *manager) initialize() error {
client, err := es.NewClient(m.esConfig)
if err != nil {
log.Errorf("New elasticsearch client fail: %v", err)
return err
}

err = client.Connect()
if err != nil {
return err
}
m.esClient = client

return nil
}

func (m *manager) launch() error {
for _, topic := range m.cfg.Nsq.Topics {
log.Infof("launch topic %s", topic)
nsqConsumer, err := nsq.NewConsumer(topic, m.cfg.Nsq.Channel, m.nsqConfig)
if err != nil {
log.Errorf("nsq.NewConsumer fail: %v", err)
continue
}
consumer := &Consumer{
topic: topic,
consumer: nsqConsumer,
done: make(chan struct{}),
msgChan: make(chan *nsq.Message),
}
nsqConsumer.AddConcurrentHandlers(consumer, runtime.NumCPU())
err = nsqConsumer.ConnectToNSQLookupds(m.cfg.Nsq.LookupdHttpAddresses)
if err != nil {
log.Errorf("ConnectToNSQLookupd fail: %v", err)
continue
}
m.topics[topic] = consumer
}

stopCh := make(chan struct{})
if err := m.gs.Start(); err != nil {
log.Fatalf("start shutdown manager failed: %s", err.Error())
}

msgChan := make(chan *message.Message)
m.msgChan = msgChan
for _, consumer := range m.topics {
go func(consumer *Consumer, msgChan chan<- *message.Message) {
consumer.Run(msgChan)
}(consumer, msgChan)
}

go m.esClient.Run(msgChan)

m.gs.AddShutdownCallback(shutdown.ShutdownFunc(func(string) error {
m.Stop()
return nil
}))

<-stopCh
return nil
}

func (m *manager) Run() error {
err := m.initialize()
if err != nil {
return err
}

return m.launch()
}

func (m *manager) Stop() {
log.Info("manager Stopping")
for _, consumer := range m.topics {
consumer.Stop()
}

close(m.msgChan)

// 最后关闭elasticsearch
m.esClient.Close()
log.Info("manager stopped")
}
Loading