Skip to content
This repository has been archived by the owner on Aug 19, 2022. It is now read-only.

Commit

Permalink
Support wildcard subscriptions + getter for known types (optimized) (#40
Browse files Browse the repository at this point in the history
)

Co-authored-by: Raúl Kripalani <raul@protocol.ai>
  • Loading branch information
raulk authored May 20, 2020
1 parent 9c15729 commit 6b6139f
Show file tree
Hide file tree
Showing 4 changed files with 241 additions and 72 deletions.
110 changes: 100 additions & 10 deletions basic.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,16 @@ import (

// basicBus is a type-based event delivery system
type basicBus struct {
lk sync.Mutex
nodes map[reflect.Type]*node
lk sync.RWMutex
nodes map[reflect.Type]*node
wildcard *wildcardNode
}

var _ event.Bus = (*basicBus)(nil)

type emitter struct {
n *node
w *wildcardNode
typ reflect.Type
closed int32
dropper func(reflect.Type)
Expand All @@ -33,6 +35,8 @@ func (e *emitter) Emit(evt interface{}) error {
return fmt.Errorf("emitter is closed")
}
e.n.emit(evt)
e.w.emit(evt)

return nil
}

Expand All @@ -48,7 +52,8 @@ func (e *emitter) Close() error {

func NewBus() event.Bus {
return &basicBus{
nodes: map[reflect.Type]*node{},
nodes: map[reflect.Type]*node{},
wildcard: new(wildcardNode),
}
}

Expand Down Expand Up @@ -96,6 +101,20 @@ func (b *basicBus) tryDropNode(typ reflect.Type) {
b.lk.Unlock()
}

type wildcardSub struct {
ch chan interface{}
w *wildcardNode
}

func (w *wildcardSub) Out() <-chan interface{} {
return w.ch
}

func (w *wildcardSub) Close() error {
w.w.removeSink(w.ch)
return nil
}

type sub struct {
ch chan interface{}
nodes []*node
Expand Down Expand Up @@ -143,18 +162,35 @@ var _ event.Subscription = (*sub)(nil)
// publishers to get blocked. CancelFunc is guaranteed to return after last send
// to the channel
func (b *basicBus) Subscribe(evtTypes interface{}, opts ...event.SubscriptionOpt) (_ event.Subscription, err error) {
settings := subSettings(subSettingsDefault)
settings := subSettingsDefault
for _, opt := range opts {
if err := opt(&settings); err != nil {
return nil, err
}
}

if evtTypes == event.WildcardSubscription {
out := &wildcardSub{
ch: make(chan interface{}, settings.buffer),
w: b.wildcard,
}
b.wildcard.addSink(out.ch)
return out, nil
}

types, ok := evtTypes.([]interface{})
if !ok {
types = []interface{}{evtTypes}
}

if len(types) > 1 {
for _, t := range types {
if t == event.WildcardSubscription {
return nil, fmt.Errorf("wildcard subscriptions must be started separately")
}
}
}

out := &sub{
ch: make(chan interface{}, settings.buffer),
nodes: make([]*node, len(types)),
Expand Down Expand Up @@ -199,8 +235,11 @@ func (b *basicBus) Subscribe(evtTypes interface{}, opts ...event.SubscriptionOpt
//
// emit(EventT{})
func (b *basicBus) Emitter(evtType interface{}, opts ...event.EmitterOpt) (e event.Emitter, err error) {
var settings emitterSettings
if evtType == event.WildcardSubscription {
return nil, fmt.Errorf("illegal emitter for wildcard subscription")
}

var settings emitterSettings
for _, opt := range opts {
if err := opt(&settings); err != nil {
return nil, err
Expand All @@ -216,14 +255,65 @@ func (b *basicBus) Emitter(evtType interface{}, opts ...event.EmitterOpt) (e eve
b.withNode(typ, func(n *node) {
atomic.AddInt32(&n.nEmitters, 1)
n.keepLast = n.keepLast || settings.makeStateful
e = &emitter{n: n, typ: typ, dropper: b.tryDropNode}
e = &emitter{n: n, typ: typ, dropper: b.tryDropNode, w: b.wildcard}
}, nil)
return
}

// GetAllEventTypes returns all the event types that this bus has emitters
// or subscribers for.
func (b *basicBus) GetAllEventTypes() []reflect.Type {
b.lk.RLock()
defer b.lk.RUnlock()

types := make([]reflect.Type, 0, len(b.nodes))
for t, _ := range b.nodes {
types = append(types, t)
}
return types
}

///////////////////////
// NODE

type wildcardNode struct {
sync.RWMutex
nSinks int32
sinks []chan interface{}
}

func (n *wildcardNode) addSink(ch chan interface{}) {
atomic.AddInt32(&n.nSinks, 1) // ok to do outside the lock
n.Lock()
n.sinks = append(n.sinks, ch)
n.Unlock()
}

func (n *wildcardNode) removeSink(ch chan interface{}) {
atomic.AddInt32(&n.nSinks, -1) // ok to do outside the lock
n.Lock()
for i := 0; i < len(n.sinks); i++ {
if n.sinks[i] == ch {
n.sinks[i], n.sinks[len(n.sinks)-1] = n.sinks[len(n.sinks)-1], nil
n.sinks = n.sinks[:len(n.sinks)-1]
break
}
}
n.Unlock()
}

func (n *wildcardNode) emit(evt interface{}) {
if atomic.LoadInt32(&n.nSinks) == 0 {
return
}

n.RLock()
for _, ch := range n.sinks {
ch <- evt
}
n.RUnlock()
}

type node struct {
// Note: make sure to NEVER lock basicBus.lk when this lock is held
lk sync.Mutex
Expand All @@ -245,19 +335,19 @@ func newNode(typ reflect.Type) *node {
}
}

func (n *node) emit(event interface{}) {
typ := reflect.TypeOf(event)
func (n *node) emit(evt interface{}) {
typ := reflect.TypeOf(evt)
if typ != n.typ {
panic(fmt.Sprintf("Emit called with wrong type. expected: %s, got: %s", n.typ, typ))
}

n.lk.Lock()
if n.keepLast {
n.last = event
n.last = evt
}

for _, ch := range n.sinks {
ch <- event
ch <- evt
}
n.lk.Unlock()
}
131 changes: 131 additions & 0 deletions basic_test.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,18 @@
package eventbus

import (
"context"
"fmt"
"reflect"
"sync"
"sync/atomic"
"testing"
"time"

"github.com/libp2p/go-libp2p-core/event"
"github.com/libp2p/go-libp2p-testing/race"

"github.com/stretchr/testify/require"
)

type EventA struct{}
Expand Down Expand Up @@ -89,6 +94,30 @@ func TestSub(t *testing.T) {
}
}

func TestGetAllEventTypes(t *testing.T) {
bus := NewBus()
require.Empty(t, bus.GetAllEventTypes())

// the wildcard subscription should be returned.
_, err := bus.Subscribe(event.WildcardSubscription)
require.NoError(t, err)

_, err = bus.Subscribe(new(EventB))
require.NoError(t, err)

evts := bus.GetAllEventTypes()
require.Len(t, evts, 1)
require.Equal(t, reflect.TypeOf((*EventB)(nil)).Elem(), evts[0])

_, err = bus.Emitter(new(EventA))
require.NoError(t, err)

evts = bus.GetAllEventTypes()
require.Len(t, evts, 2)
require.Contains(t, evts, reflect.TypeOf((*EventB)(nil)).Elem())
require.Contains(t, evts, reflect.TypeOf((*EventA)(nil)).Elem())
}

func TestEmitNoSubNoBlock(t *testing.T) {
bus := NewBus()

Expand Down Expand Up @@ -206,6 +235,108 @@ func TestSubMany(t *testing.T) {
}
}

func TestWildcardSubscription(t *testing.T) {
bus := NewBus()
sub, err := bus.Subscribe(event.WildcardSubscription)
require.NoError(t, err)
defer sub.Close()

em1, err := bus.Emitter(new(EventA))
require.NoError(t, err)
defer em1.Close()

em2, err := bus.Emitter(new(EventB))
require.NoError(t, err)
defer em2.Close()

require.NoError(t, em1.Emit(EventA{}))
require.NoError(t, em2.Emit(EventB(1)))

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
var evts []interface{}

LOOP:
for {
select {
case evt := <-sub.Out():
if evta, ok := evt.(EventA); ok {
evts = append(evts, evta)
}

if evtb, ok := evt.(EventB); ok {
evts = append(evts, evtb)
}

if len(evts) == 2 {
break LOOP
}

case <-ctx.Done():
t.Fatalf("did not receive events")
}
}
}

func TestManyWildcardSubscriptions(t *testing.T) {
bus := NewBus()
var subs []event.Subscription
for i := 0; i < 10; i++ {
sub, err := bus.Subscribe(event.WildcardSubscription)
require.NoError(t, err)
subs = append(subs, sub)
}

em1, err := bus.Emitter(new(EventA))
require.NoError(t, err)
defer em1.Close()

em2, err := bus.Emitter(new(EventB))
require.NoError(t, err)
defer em2.Close()

require.NoError(t, em1.Emit(EventA{}))
require.NoError(t, em2.Emit(EventB(1)))

// all 10 subscriptions received all 2 events.
for _, s := range subs {
require.Len(t, s.Out(), 2)
}

// close the first five subscriptions.
for _, s := range subs[:5] {
require.NoError(t, s.Close())
}

// emit another 2 events.
require.NoError(t, em1.Emit(EventA{}))
require.NoError(t, em2.Emit(EventB(1)))

// the first five still have 2 events, while the other five have 4 events.
for _, s := range subs[:5] {
require.Len(t, s.Out(), 2)
}

for _, s := range subs[5:] {
require.Len(t, s.Out(), 4)
}

// close them all, the first five will be closed twice (asserts idempotency).
for _, s := range subs {
require.NoError(t, s.Close())
}
}

func TestWildcardValidations(t *testing.T) {
bus := NewBus()

_, err := bus.Subscribe([]interface{}{event.WildcardSubscription, new(EventA), new(EventB)})
require.Error(t, err)

_, err = bus.Emitter(event.WildcardSubscription)
require.Error(t, err)
}

func TestSubType(t *testing.T) {
bus := NewBus()
sub, err := bus.Subscribe([]interface{}{new(EventA), new(EventB)})
Expand Down
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module github.com/libp2p/go-eventbus
go 1.12

require (
github.com/libp2p/go-libp2p-core v0.5.6
github.com/libp2p/go-libp2p-core v0.5.7-0.20200520143746-39497bae12c5
github.com/libp2p/go-libp2p-testing v0.1.1
github.com/stretchr/testify v1.4.0
)
Loading

0 comments on commit 6b6139f

Please sign in to comment.