Skip to content
This repository has been archived by the owner on Aug 19, 2022. It is now read-only.

Support wildcard subscriptions + getter for known types #34

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 33 additions & 7 deletions basic.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,14 @@ import (
"github.com/libp2p/go-libp2p-core/event"
)

var wildCardTypeElem = reflect.TypeOf(event.WildcardSubscriptionType).Elem()

///////////////////////
// BUS

// basicBus is a type-based event delivery system
type basicBus struct {
lk sync.Mutex
lk sync.RWMutex
nodes map[reflect.Type]*node
}

Expand All @@ -26,13 +28,24 @@ type emitter struct {
typ reflect.Type
closed int32
dropper func(reflect.Type)
b *basicBus
}

func (e *emitter) Emit(evt interface{}) error {
if atomic.LoadInt32(&e.closed) != 0 {
return fmt.Errorf("emitter is closed")
}
e.n.emit(evt)

// emit on the wildcard subscription
e.b.lk.RLock()
n, ok := e.b.nodes[wildCardTypeElem]
e.b.lk.RUnlock()
raulk marked this conversation as resolved.
Show resolved Hide resolved
if !ok {
return nil
}
n.emit(evt)

return nil
}

Expand Down Expand Up @@ -216,11 +229,24 @@ func (b *basicBus) Emitter(evtType interface{}, opts ...event.EmitterOpt) (e eve
b.withNode(typ, func(n *node) {
atomic.AddInt32(&n.nEmitters, 1)
n.keepLast = n.keepLast || settings.makeStateful
e = &emitter{n: n, typ: typ, dropper: b.tryDropNode}
e = &emitter{n: n, typ: typ, dropper: b.tryDropNode, b: b}
}, nil)
return
}

// GetAllEventTypes returns all the event types that this bus has emitters
// or subscribers for.
func (b *basicBus) GetAllEventTypes() []reflect.Type {
b.lk.RLock()
defer b.lk.RUnlock()

types := make([]reflect.Type, 0, len(b.nodes))
for t, _ := range b.nodes {
types = append(types, t)
}
return types
}

///////////////////////
// NODE

Expand All @@ -245,19 +271,19 @@ func newNode(typ reflect.Type) *node {
}
}

func (n *node) emit(event interface{}) {
typ := reflect.TypeOf(event)
if typ != n.typ {
func (n *node) emit(evt interface{}) {
typ := reflect.TypeOf(evt)
if typ != n.typ && n.typ != wildCardTypeElem {
panic(fmt.Sprintf("Emit called with wrong type. expected: %s, got: %s", n.typ, typ))
}

n.lk.Lock()
if n.keepLast {
n.last = event
n.last = evt
}

for _, ch := range n.sinks {
ch <- event
ch <- evt
}
n.lk.Unlock()
}
69 changes: 69 additions & 0 deletions basic_test.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,18 @@
package eventbus

import (
"context"
"fmt"
"reflect"
"sync"
"sync/atomic"
"testing"
"time"

"github.com/libp2p/go-libp2p-core/event"
"github.com/libp2p/go-libp2p-testing/race"

"github.com/stretchr/testify/require"
)

type EventA struct{}
Expand Down Expand Up @@ -89,6 +94,26 @@ func TestSub(t *testing.T) {
}
}

func TestGetAllEventTypes(t *testing.T) {
bus := NewBus()
require.Empty(t, bus.GetAllEventTypes())

_, err := bus.Subscribe(new(EventB))
require.NoError(t, err)

evts := bus.GetAllEventTypes()
require.Len(t, evts, 1)
require.Equal(t, reflect.TypeOf((*EventB)(nil)).Elem(), evts[0])

_, err = bus.Emitter(new(EventA))
require.NoError(t, err)

evts = bus.GetAllEventTypes()
require.Len(t, evts, 2)
require.Contains(t, evts, reflect.TypeOf((*EventB)(nil)).Elem())
require.Contains(t, evts, reflect.TypeOf((*EventA)(nil)).Elem())
}

func TestEmitNoSubNoBlock(t *testing.T) {
bus := NewBus()

Expand Down Expand Up @@ -206,6 +231,50 @@ func TestSubMany(t *testing.T) {
}
}

func TestSubscribeAll(t *testing.T) {
bus := NewBus()
sub, err := bus.Subscribe(event.WildcardSubscriptionType)
require.NoError(t, err)
defer sub.Close()

em1, err := bus.Emitter(new(EventA))
require.NoError(t, err)
defer em1.Close()

em2, err := bus.Emitter(new(EventB))
require.NoError(t, err)
defer em2.Close()

require.NoError(t, em1.Emit(EventA{}))
require.NoError(t, em2.Emit(EventB(1)))

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
var evts []interface{}

LOOP:
for {
select {
case evt := <-sub.Out():
if evta, ok := evt.(EventA); ok {
evts = append(evts, evta)
}

if evtb, ok := evt.(EventB); ok {
evts = append(evts, evtb)
}

if len(evts) == 2 {
break LOOP
}

case <-ctx.Done():
t.Fatalf("did not recieve events")
}
}

}

func TestSubType(t *testing.T) {
bus := NewBus()
sub, err := bus.Subscribe([]interface{}{new(EventA), new(EventB)})
Expand Down
7 changes: 6 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,11 @@ module github.com/libp2p/go-eventbus
go 1.12

require (
github.com/libp2p/go-libp2p-core v0.5.0
github.com/ipfs/go-cid v0.0.5 // indirect
github.com/libp2p/go-buffer-pool v0.0.2 // indirect
github.com/libp2p/go-libp2p-core v0.5.2-0.20200417060929-6957bf8a421d
github.com/libp2p/go-libp2p-testing v0.1.1
github.com/multiformats/go-multiaddr v0.2.1 // indirect
github.com/stretchr/testify v1.4.0
go.opencensus.io v0.22.3 // indirect
)
16 changes: 16 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,16 @@ github.com/ipfs/go-cid v0.0.1/go.mod h1:GHWU/WuQdMPmIosc4Yn1bcCT7dSeX4lBafM7iqUP
github.com/ipfs/go-cid v0.0.2/go.mod h1:GHWU/WuQdMPmIosc4Yn1bcCT7dSeX4lBafM7iqUPQvM=
github.com/ipfs/go-cid v0.0.3 h1:UIAh32wymBpStoe83YCzwVQQ5Oy/H0FdxvUS6DJDzms=
github.com/ipfs/go-cid v0.0.3/go.mod h1:GHWU/WuQdMPmIosc4Yn1bcCT7dSeX4lBafM7iqUPQvM=
github.com/ipfs/go-cid v0.0.4 h1:UlfXKrZx1DjZoBhQHmNHLC1fK1dUJDN20Y28A7s+gJ8=
github.com/ipfs/go-cid v0.0.4/go.mod h1:4LLaPOQwmk5z9LBgQnpkivrx8BJjUyGwTXCd5Xfj6+M=
github.com/ipfs/go-cid v0.0.5 h1:o0Ix8e/ql7Zb5UVUJEUfjsWCIY8t48++9lR8qi6oiJU=
github.com/ipfs/go-cid v0.0.5/go.mod h1:plgt+Y5MnOey4vO4UlUazGqdbEXuFYitED67FexhXog=
github.com/jbenet/go-cienv v0.1.0/go.mod h1:TqNnHUmJgXau0nCzC7kXWeotg3J9W34CUv5Djy1+FlA=
github.com/jbenet/goprocess v0.0.0-20160826012719-b497e2f366b8/go.mod h1:Ly/wlsjFq/qrU3Rar62tu1gASgGw6chQbSh/XgIIXCY=
github.com/jbenet/goprocess v0.1.3 h1:YKyIEECS/XvcfHtBzxtjBBbWK+MbvA6dG8ASiqwvr10=
github.com/jbenet/goprocess v0.1.3/go.mod h1:5yspPrukOVuOLORacaBi858NqyClJPQxYZlqdZVfqY4=
github.com/jbenet/goprocess v0.1.4 h1:DRGOFReOMqqDNXwW70QkacFW0YN9QnwLV0Vqk+3oU0o=
github.com/jbenet/goprocess v0.1.4/go.mod h1:5yspPrukOVuOLORacaBi858NqyClJPQxYZlqdZVfqY4=
github.com/jessevdk/go-flags v0.0.0-20141203071132-1679536dcc89/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI=
github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI=
github.com/jrick/logrotate v1.0.0/go.mod h1:LNinyqDIJnpAur+b8yyulnQw/wDuN1+BYKlTRt3OuAQ=
Expand All @@ -75,8 +79,16 @@ github.com/libp2p/go-libp2p-core v0.2.5 h1:iP1PIiIrlRrGbE1fYq2918yBc5NlCH3pFuIPS
github.com/libp2p/go-libp2p-core v0.2.5/go.mod h1:6+5zJmKhsf7yHn1RbmYDu08qDUpIUxGdqHuEZckmZOA=
github.com/libp2p/go-libp2p-core v0.3.0 h1:F7PqduvrztDtFsAa/bcheQ3azmNo+Nq7m8hQY5GiUW8=
github.com/libp2p/go-libp2p-core v0.3.0/go.mod h1:ACp3DmS3/N64c2jDzcV429ukDpicbL6+TrrxANBjPGw=
github.com/libp2p/go-libp2p-core v0.3.2-0.20200415071619-8fb76f42df50 h1:A/euch2haQOL/a9Z4OA6Sv8hAsamD3IaTX/Wje/wbqI=
github.com/libp2p/go-libp2p-core v0.3.2-0.20200415071619-8fb76f42df50/go.mod h1:thvWy0hvaSBhnVBaW37BvzgVV68OUhgJJLAa6almrII=
github.com/libp2p/go-libp2p-core v0.3.2-0.20200415154218-9126769533e2 h1:3j584ZSRFrrS0ZPfDKqiPtjnLo8enn4WH8xhdRkYa+o=
github.com/libp2p/go-libp2p-core v0.3.2-0.20200415154218-9126769533e2/go.mod h1:thvWy0hvaSBhnVBaW37BvzgVV68OUhgJJLAa6almrII=
github.com/libp2p/go-libp2p-core v0.3.2-0.20200416170247-0a7a9856715c h1:IYaFZg3eDAGtXS32hN8qZ/QFbal9+ASqZuNOnyRkV3g=
github.com/libp2p/go-libp2p-core v0.3.2-0.20200416170247-0a7a9856715c/go.mod h1:thvWy0hvaSBhnVBaW37BvzgVV68OUhgJJLAa6almrII=
github.com/libp2p/go-libp2p-core v0.5.0 h1:FBQ1fpq2Fo/ClyjojVJ5AKXlKhvNc/B6U0O+7AN1ffE=
github.com/libp2p/go-libp2p-core v0.5.0/go.mod h1:49XGI+kc38oGVwqSBhDEwytaAxgZasHhFfQKibzTls0=
github.com/libp2p/go-libp2p-core v0.5.2-0.20200417060929-6957bf8a421d h1:ig8zIKt/shtRBOZqaEsJ/GX+LEpxqJ3CfIk0dKciZXI=
github.com/libp2p/go-libp2p-core v0.5.2-0.20200417060929-6957bf8a421d/go.mod h1:uN7L2D4EvPCvzSH5SrhR72UWbnSGpt5/a35Sm4upn4Y=
github.com/libp2p/go-libp2p-testing v0.0.4 h1:Qev57UR47GcLPXWjrunv5aLIQGO4n9mhI/8/EIrEEFc=
github.com/libp2p/go-libp2p-testing v0.0.4/go.mod h1:gvchhf3FQOtBdr+eFUABet5a4MBLK8jM3V4Zghvmi+E=
github.com/libp2p/go-libp2p-testing v0.1.1 h1:U03z3HnGI7Ni8Xx6ONVZvUFOAzWYmolWf5W5jAOPNmU=
Expand Down Expand Up @@ -138,6 +150,7 @@ github.com/multiformats/go-varint v0.0.5/go.mod h1:3Ls8CIEsrijN6+B7PbrXRPxHRPuXS
github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
github.com/smola/gocompat v0.2.0 h1:6b1oIMlUXIpz//VKEDzPVBK8KG7beVwmHIUEBIs/Pns=
Expand All @@ -152,6 +165,7 @@ github.com/src-d/envconfig v1.0.0/go.mod h1:Q9YQZ7BKITldTBnoxsE5gOeB5y66RyPXeue/
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/x-cray/logrus-prefixed-formatter v0.5.2/go.mod h1:2duySbKsL6M18s5GU7VPsoEPHyzalCE06qoARUCeBBE=
go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU=
Expand All @@ -164,6 +178,7 @@ golang.org/x/crypto v0.0.0-20190211182817-74369b46fc67/go.mod h1:6SG95UA2DQfeDnf
golang.org/x/crypto v0.0.0-20190225124518-7f87c0fbb88b/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20190426145343-a29dc8fdc734/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20190611184440-5c40567a22f8 h1:1wopBVtVdWnn03fZelqdXTqk7U7zPQCb+T4rbU9ZEoU=
golang.org/x/crypto v0.0.0-20190611184440-5c40567a22f8/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20190618222545-ea8f1a30c443 h1:IcSOAf4PyMp3U3XbIEj1/xJ2BjNN2jWv7JoyOsMxXUU=
golang.org/x/crypto v0.0.0-20190618222545-ea8f1a30c443/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
Expand Down Expand Up @@ -218,5 +233,6 @@ gopkg.in/src-d/go-cli.v0 v0.0.0-20181105080154-d492247bbc0d/go.mod h1:z+K8VcOYVY
gopkg.in/src-d/go-log.v1 v1.0.1/go.mod h1:GN34hKP0g305ysm2/hctJ0Y8nWP3zxXXJ8GFabTyABE=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw=
gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=