Skip to content

Commit

Permalink
feat: add console output (#21)
Browse files Browse the repository at this point in the history
* feat: add console output

* feat: add libbeat atomic lib

* perf: perfact console output

* perf: fix lint wrong

* perf: fix lint wrong
  • Loading branch information
JTrancender authored Aug 11, 2021
1 parent b1d9d00 commit da12608
Show file tree
Hide file tree
Showing 18 changed files with 793 additions and 42 deletions.
19 changes: 12 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,17 @@
"consumer-type":"tail"
},
"output": {
"tail": {
"desc": "nsq_to_tail"
"nsqd": {
"nsqd-tcp-addresses": ["127.0.0.1:4150"],
"topic": "dev_test_dup",
"enabled": false
},
"console": {
"enabled": true
}
},
"logging": {
"level": 0,
"level": -1,
"to_stderr": true
}
}
Expand All @@ -29,11 +34,11 @@
~~~
make clean && ./build/nsq_to_consumer --etcd-endpoints 127.0.0.1:2379 --etcd-username root --etcd-password 123456 --etcd-path /nsq_consumer/default
~~~

### Consumer list
1. tail
2. http[todo]
### Output list
1. console
3. file[todo]
3. nsqd[todo]
3. http[todo]
4. mysql[todo]
5. elasticsearch[todo]

Expand Down
14 changes: 3 additions & 11 deletions consumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package consumer

import (
"fmt"
"os"
"sync"
"time"

Expand Down Expand Up @@ -106,16 +105,7 @@ func (nc *NSQConsumer) router() {
nc.Close()
return
case m := <-nc.msgChan:
// err := nc.publisher.handleMessage(m)
err := nc.pipeline.HandleMessage(m)
if err != nil {
// retry
m.Requeue(-1)
logp.L().Errorf("NSQConsumer#router deal message fail: %v", err)
os.Exit(1)
}

m.Finish()
_ = nc.pipeline.HandleMessage(m)
}
}
}
Expand Down Expand Up @@ -192,6 +182,8 @@ func (tc *TailConsumer) Run(c *consumer.ConsumerEntity) error {
waitFinished.AddChan(tc.done)
waitFinished.Wait()

_ = tc.pipeline.Close()

for _, nsqConsumer := range tc.topics {
close(nsqConsumer.done)
}
Expand Down
5 changes: 5 additions & 0 deletions libconsumer/cmd/instance/imports_common.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package instance

import (
_ "github.com/JieTrancender/nsq_to_consumer/libconsumer/publisher/includes"
)
77 changes: 77 additions & 0 deletions libconsumer/common/atomic/atomic.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
// Package atomic provides common primitive types with atomic accessors.
package atomic

import a "sync/atomic"

// Bool provides an atomic boolean type.
type Bool struct{ u Uint32 }

// Int32 provides an atomic int32 type.
type Int32 struct{ value int32 }

// Int64 provides an atomic int64 type.
type Int64 struct{ value int64 }

// Uint32 provides an atomic uint32 type.
type Uint32 struct{ value uint32 }

// Uint64 provides an atomic uint64 type.
type Uint64 struct{ value uint64 }

func MakeBool(v bool) Bool { return Bool{MakeUint32(encBool(v))} }
func NewBool(v bool) *Bool { return &Bool{MakeUint32(encBool(v))} }
func (b *Bool) Load() bool { return b.u.Load() == 1 }
func (b *Bool) Store(v bool) { b.u.Store(encBool(v)) }
func (b *Bool) Swap(new bool) bool { return b.u.Swap(encBool(new)) == 1 }
func (b *Bool) CAS(old, new bool) bool { return b.u.CAS(encBool(old), encBool(new)) }

func MakeInt32(v int32) Int32 { return Int32{v} }
func NewInt32(v int32) *Int32 { return &Int32{v} }
func (i *Int32) Load() int32 { return a.LoadInt32(&i.value) }
func (i *Int32) Store(v int32) { a.StoreInt32(&i.value, v) }
func (i *Int32) Swap(new int32) int32 { return a.SwapInt32(&i.value, new) }
func (i *Int32) Add(delta int32) int32 { return a.AddInt32(&i.value, delta) }
func (i *Int32) Sub(delta int32) int32 { return a.AddInt32(&i.value, -delta) }
func (i *Int32) Inc() int32 { return i.Add(1) }
func (i *Int32) Dec() int32 { return i.Add(-1) }
func (i *Int32) CAS(old, new int32) bool { return a.CompareAndSwapInt32(&i.value, old, new) }

func MakeInt64(v int64) Int64 { return Int64{v} }
func NewInt64(v int64) *Int64 { return &Int64{v} }
func (i *Int64) Load() int64 { return a.LoadInt64(&i.value) }
func (i *Int64) Store(v int64) { a.StoreInt64(&i.value, v) }
func (i *Int64) Swap(new int64) int64 { return a.SwapInt64(&i.value, new) }
func (i *Int64) Add(delta int64) int64 { return a.AddInt64(&i.value, delta) }
func (i *Int64) Sub(delta int64) int64 { return a.AddInt64(&i.value, -delta) }
func (i *Int64) Inc() int64 { return i.Add(1) }
func (i *Int64) Dec() int64 { return i.Add(-1) }
func (i *Int64) CAS(old, new int64) bool { return a.CompareAndSwapInt64(&i.value, old, new) }

func MakeUint32(v uint32) Uint32 { return Uint32{v} }
func NewUint32(v uint32) *Uint32 { return &Uint32{v} }
func (u *Uint32) Load() uint32 { return a.LoadUint32(&u.value) }
func (u *Uint32) Store(v uint32) { a.StoreUint32(&u.value, v) }
func (u *Uint32) Swap(new uint32) uint32 { return a.SwapUint32(&u.value, new) }
func (u *Uint32) Add(delta uint32) uint32 { return a.AddUint32(&u.value, delta) }
func (u *Uint32) Sub(delta uint32) uint32 { return a.AddUint32(&u.value, ^uint32(delta-1)) }
func (u *Uint32) Inc() uint32 { return u.Add(1) }
func (u *Uint32) Dec() uint32 { return u.Add(^uint32(0)) }
func (u *Uint32) CAS(old, new uint32) bool { return a.CompareAndSwapUint32(&u.value, old, new) }

func MakeUint64(v uint64) Uint64 { return Uint64{v} }
func NewUint64(v uint64) *Uint64 { return &Uint64{v} }
func (u *Uint64) Load() uint64 { return a.LoadUint64(&u.value) }
func (u *Uint64) Store(v uint64) { a.StoreUint64(&u.value, v) }
func (u *Uint64) Swap(new uint64) uint64 { return a.SwapUint64(&u.value, new) }
func (u *Uint64) Add(delta uint64) uint64 { return a.AddUint64(&u.value, delta) }
func (u *Uint64) Sub(delta uint64) uint64 { return a.AddUint64(&u.value, ^uint64(delta-1)) }
func (u *Uint64) Inc() uint64 { return u.Add(1) }
func (u *Uint64) Dec() uint64 { return u.Add(^uint64(0)) }
func (u *Uint64) CAS(old, new uint64) bool { return a.CompareAndSwapUint64(&u.value, old, new) }

func encBool(b bool) uint32 {
if b {
return 1
}
return 0
}
33 changes: 33 additions & 0 deletions libconsumer/common/atomic/atomic32.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// +build 386 arm mips mipsle

package atomic

// atomic Uint/Int for 32bit systems

// Uint provides an architecture specific atomic uint.
type Uint struct{ a Uint32 }

// Int provides an architecture specific atomic uint.
type Int struct{ a Int32 }

func MakeUint(v uint) Uint { return Uint{MakeUint32(uint32(v))} }
func NewUint(v uint) *Uint { return &Uint{MakeUint32(uint32(v))} }
func (u *Uint) Load() uint { return uint(u.a.Load()) }
func (u *Uint) Store(v uint) { u.a.Store(uint32(v)) }
func (u *Uint) Swap(new uint) uint { return uint(u.a.Swap(uint32(new))) }
func (u *Uint) Add(delta uint) uint { return uint(u.a.Add(uint32(delta))) }
func (u *Uint) Sub(delta uint) uint { return uint(u.a.Add(uint32(-delta))) }
func (u *Uint) Inc() uint { return uint(u.a.Inc()) }
func (u *Uint) Dec() uint { return uint(u.a.Dec()) }
func (u *Uint) CAS(old, new uint) bool { return u.a.CAS(uint32(old), uint32(new)) }

func MakeInt(v int) Int { return Int{MakeInt32(int32(v))} }
func NewInt(v int) *Int { return &Int{MakeInt32(int32(v))} }
func (i *Int) Load() int { return int(i.a.Load()) }
func (i *Int) Store(v int) { i.a.Store(int32(v)) }
func (i *Int) Swap(new int) int { return int(i.a.Swap(int32(new))) }
func (i *Int) Add(delta int) int { return int(i.a.Add(int32(delta))) }
func (i *Int) Sub(delta int) int { return int(i.a.Add(int32(-delta))) }
func (i *Int) Inc() int { return int(i.a.Inc()) }
func (i *Int) Dec() int { return int(i.a.Dec()) }
func (i *Int) CAS(old, new int) bool { return i.a.CAS(int32(old), int32(new)) }
33 changes: 33 additions & 0 deletions libconsumer/common/atomic/atomic64.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// +build amd64 arm64 ppc64 ppc64le mips64 mips64le s390x

package atomic

// atomic Uint/Int for 64bit systems

// Uint provides an architecture specific atomic uint.
type Uint struct{ a Uint64 }

// Int provides an architecture specific atomic uint.
type Int struct{ a Int64 }

func MakeUint(v uint) Uint { return Uint{MakeUint64(uint64(v))} }
func NewUint(v uint) *Uint { return &Uint{MakeUint64(uint64(v))} }
func (u *Uint) Load() uint { return uint(u.a.Load()) }
func (u *Uint) Store(v uint) { u.a.Store(uint64(v)) }
func (u *Uint) Swap(new uint) uint { return uint(u.a.Swap(uint64(new))) }
func (u *Uint) Add(delta uint) uint { return uint(u.a.Add(uint64(delta))) }
func (u *Uint) Sub(delta uint) uint { return uint(u.a.Add(uint64(-delta))) }
func (u *Uint) Inc() uint { return uint(u.a.Inc()) }
func (u *Uint) Dec() uint { return uint(u.a.Dec()) }
func (u *Uint) CAS(old, new uint) bool { return u.a.CAS(uint64(old), uint64(new)) }

func MakeInt(v int) Int { return Int{MakeInt64(int64(v))} }
func NewInt(v int) *Int { return &Int{MakeInt64(int64(v))} }
func (i *Int) Load() int { return int(i.a.Load()) }
func (i *Int) Store(v int) { i.a.Store(int64(v)) }
func (i *Int) Swap(new int) int { return int(i.a.Swap(int64(new))) }
func (i *Int) Add(delta int) int { return int(i.a.Add(int64(delta))) }
func (i *Int) Sub(delta int) int { return int(i.a.Add(int64(-delta))) }
func (i *Int) Inc() int { return int(i.a.Inc()) }
func (i *Int) Dec() int { return int(i.a.Dec()) }
func (i *Int) CAS(old, new int) bool { return i.a.CAS(int64(old), int64(new)) }
Loading

0 comments on commit da12608

Please sign in to comment.