Skip to content

Commit

Permalink
Merge branch 'main' into v2-issue-7-accelerated-dht
Browse files Browse the repository at this point in the history
  • Loading branch information
dennis-tra committed Oct 17, 2023
2 parents 7b270b5 + 80fe66d commit c978c93
Show file tree
Hide file tree
Showing 6 changed files with 233 additions and 144 deletions.
26 changes: 24 additions & 2 deletions internal/coord/behaviour_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@ package coord

import (
"context"
"testing"
)

type RecordingSM[E any, S any] struct {
State S
Received E
Received []E
}

func NewRecordingSM[E any, S any](response S) *RecordingSM[E, S] {
Expand All @@ -16,6 +17,27 @@ func NewRecordingSM[E any, S any](response S) *RecordingSM[E, S] {
}

func (r *RecordingSM[E, S]) Advance(ctx context.Context, e E) S {
r.Received = e
r.Received = append(r.Received, e)
return r.State
}

func (r *RecordingSM[E, S]) first() E {
if len(r.Received) == 0 {
var zero E
return zero
}
return r.Received[0]
}

func DrainBehaviour[I BehaviourEvent, O BehaviourEvent](t *testing.T, ctx context.Context, b Behaviour[I, O]) {
for {
select {
case <-b.Ready():
b.Perform(ctx)
case <-ctx.Done():
t.Fatal("context cancelled while draining behaviour")
default:
return
}
}
}
6 changes: 3 additions & 3 deletions internal/coord/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ type CoordinatorConfig struct {
Routing RoutingConfig

// Query is the configuration used for the [PooledQueryBehaviour] which manages the execution of user queries.
Query PooledQueryConfig
Query QueryConfig
}

// Validate checks the configuration options and returns an error if any have invalid values.
Expand Down Expand Up @@ -141,7 +141,7 @@ func DefaultCoordinatorConfig() *CoordinatorConfig {
TracerProvider: otel.GetTracerProvider(),
}

cfg.Query = *DefaultPooledQueryConfig()
cfg.Query = *DefaultQueryConfig()
cfg.Query.Clock = cfg.Clock

Check failure on line 145 in internal/coord/coordinator.go

View workflow job for this annotation

GitHub Actions / go-test / ubuntu (go next)

cfg.Query.Clock undefined (type QueryConfig has no field or method Clock)

Check failure on line 145 in internal/coord/coordinator.go

View workflow job for this annotation

GitHub Actions / go-test / macos (go next)

cfg.Query.Clock undefined (type QueryConfig has no field or method Clock)

Check failure on line 145 in internal/coord/coordinator.go

View workflow job for this annotation

GitHub Actions / go-test / ubuntu (go this)

cfg.Query.Clock undefined (type QueryConfig has no field or method Clock)

Check failure on line 145 in internal/coord/coordinator.go

View workflow job for this annotation

GitHub Actions / go-test / macos (go this)

cfg.Query.Clock undefined (type QueryConfig has no field or method Clock)

Check failure on line 145 in internal/coord/coordinator.go

View workflow job for this annotation

GitHub Actions / go-check / All

cfg.Query.Clock undefined (type QueryConfig has no field or method Clock)

Check failure on line 145 in internal/coord/coordinator.go

View workflow job for this annotation

GitHub Actions / go-test / windows (go next)

cfg.Query.Clock undefined (type QueryConfig has no field or method Clock)
cfg.Query.Logger = cfg.Logger.With("behaviour", "pooledquery")
cfg.Query.Tracer = cfg.TracerProvider.Tracer(tele.TracerName)
Expand All @@ -168,7 +168,7 @@ func NewCoordinator(self kadt.PeerID, rtr coordt.Router[kadt.Key, kadt.PeerID, *
return nil, fmt.Errorf("init telemetry: %w", err)
}

queryBehaviour, err := NewPooledQueryBehaviour(self, &cfg.Query)
queryBehaviour, err := NewQueryBehaviour(self, &cfg.Query)
if err != nil {
return nil, fmt.Errorf("query behaviour: %w", err)
}
Expand Down
42 changes: 21 additions & 21 deletions internal/coord/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
"github.com/plprobelab/zikade/tele"
)

type PooledQueryConfig struct {
type QueryConfig struct {

Check failure on line 21 in internal/coord/query.go

View workflow job for this annotation

GitHub Actions / go-test / ubuntu (go next)

QueryConfig redeclared in this block

Check failure on line 21 in internal/coord/query.go

View workflow job for this annotation

GitHub Actions / go-test / macos (go next)

QueryConfig redeclared in this block

Check failure on line 21 in internal/coord/query.go

View workflow job for this annotation

GitHub Actions / go-test / ubuntu (go this)

QueryConfig redeclared in this block

Check failure on line 21 in internal/coord/query.go

View workflow job for this annotation

GitHub Actions / go-test / macos (go this)

QueryConfig redeclared in this block

Check failure on line 21 in internal/coord/query.go

View workflow job for this annotation

GitHub Actions / go-check / All

QueryConfig redeclared in this block

Check failure on line 21 in internal/coord/query.go

View workflow job for this annotation

GitHub Actions / go-test / windows (go next)

QueryConfig redeclared in this block
// Clock is a clock that may replaced by a mock when testing
Clock clock.Clock

Expand All @@ -42,7 +42,7 @@ type PooledQueryConfig struct {
}

// Validate checks the configuration options and returns an error if any have invalid values.
func (cfg *PooledQueryConfig) Validate() error {
func (cfg *QueryConfig) Validate() error {
if cfg.Clock == nil {

Check failure on line 46 in internal/coord/query.go

View workflow job for this annotation

GitHub Actions / go-test / ubuntu (go next)

cfg.Clock undefined (type *QueryConfig has no field or method Clock)

Check failure on line 46 in internal/coord/query.go

View workflow job for this annotation

GitHub Actions / go-test / macos (go next)

cfg.Clock undefined (type *QueryConfig has no field or method Clock)

Check failure on line 46 in internal/coord/query.go

View workflow job for this annotation

GitHub Actions / go-test / ubuntu (go this)

cfg.Clock undefined (type *QueryConfig has no field or method Clock)

Check failure on line 46 in internal/coord/query.go

View workflow job for this annotation

GitHub Actions / go-test / macos (go this)

cfg.Clock undefined (type *QueryConfig has no field or method Clock)

Check failure on line 46 in internal/coord/query.go

View workflow job for this annotation

GitHub Actions / go-check / All

cfg.Clock undefined (type *QueryConfig has no field or method Clock)

Check failure on line 46 in internal/coord/query.go

View workflow job for this annotation

GitHub Actions / go-test / windows (go next)

cfg.Clock undefined (type *QueryConfig has no field or method Clock)
return &errs.ConfigurationError{
Component: "PooledQueryConfig",
Expand Down Expand Up @@ -94,8 +94,8 @@ func (cfg *PooledQueryConfig) Validate() error {
return nil
}

func DefaultPooledQueryConfig() *PooledQueryConfig {
return &PooledQueryConfig{
func DefaultQueryConfig() *QueryConfig {

Check failure on line 97 in internal/coord/query.go

View workflow job for this annotation

GitHub Actions / go-test / ubuntu (go next)

DefaultQueryConfig redeclared in this block

Check failure on line 97 in internal/coord/query.go

View workflow job for this annotation

GitHub Actions / go-test / macos (go next)

DefaultQueryConfig redeclared in this block

Check failure on line 97 in internal/coord/query.go

View workflow job for this annotation

GitHub Actions / go-test / ubuntu (go this)

DefaultQueryConfig redeclared in this block

Check failure on line 97 in internal/coord/query.go

View workflow job for this annotation

GitHub Actions / go-test / macos (go this)

DefaultQueryConfig redeclared in this block

Check failure on line 97 in internal/coord/query.go

View workflow job for this annotation

GitHub Actions / go-check / All

DefaultQueryConfig redeclared in this block

Check failure on line 97 in internal/coord/query.go

View workflow job for this annotation

GitHub Actions / go-test / windows (go next)

DefaultQueryConfig redeclared in this block
return &QueryConfig{
Clock: clock.New(),
Logger: tele.DefaultLogger("coord"),
Tracer: tele.NoopTracer(),
Expand All @@ -107,10 +107,10 @@ func DefaultPooledQueryConfig() *PooledQueryConfig {
}
}

// PooledQueryBehaviour holds the behaviour and state for managing a pool of queries.
type PooledQueryBehaviour struct {
// QueryBehaviour holds the behaviour and state for managing a pool of queries.
type QueryBehaviour struct {
// cfg is a copy of the optional configuration supplied to the behaviour.
cfg PooledQueryConfig
cfg QueryConfig

// performMu is held while Perform is executing to ensure sequential execution of work.
performMu sync.Mutex
Expand All @@ -137,11 +137,11 @@ type PooledQueryBehaviour struct {
ready chan struct{}
}

// NewPooledQueryBehaviour initialises a new PooledQueryBehaviour, setting up the query
// NewQueryBehaviour initialises a new [QueryBehaviour], setting up the query
// pool and other internal state.
func NewPooledQueryBehaviour(self kadt.PeerID, cfg *PooledQueryConfig) (*PooledQueryBehaviour, error) {
func NewQueryBehaviour(self kadt.PeerID, cfg *QueryConfig) (*QueryBehaviour, error) {
if cfg == nil {
cfg = DefaultPooledQueryConfig()
cfg = DefaultQueryConfig()
} else if err := cfg.Validate(); err != nil {
return nil, err
}
Expand All @@ -158,7 +158,7 @@ func NewPooledQueryBehaviour(self kadt.PeerID, cfg *PooledQueryConfig) (*PooledQ
return nil, fmt.Errorf("query pool: %w", err)
}

h := &PooledQueryBehaviour{
h := &QueryBehaviour{
cfg: *cfg,
pool: pool,
notifiers: make(map[coordt.QueryID]*queryNotifier[*EventQueryFinished]),
Expand All @@ -170,7 +170,7 @@ func NewPooledQueryBehaviour(self kadt.PeerID, cfg *PooledQueryConfig) (*PooledQ
// Notify receives a behaviour event and takes appropriate actions such as starting,
// stopping, or updating queries. It also queues events for later processing and
// triggers the advancement of the query pool if applicable.
func (p *PooledQueryBehaviour) Notify(ctx context.Context, ev BehaviourEvent) {
func (p *QueryBehaviour) Notify(ctx context.Context, ev BehaviourEvent) {
p.pendingInboundMu.Lock()
defer p.pendingInboundMu.Unlock()

Expand All @@ -187,14 +187,14 @@ func (p *PooledQueryBehaviour) Notify(ctx context.Context, ev BehaviourEvent) {

// Ready returns a channel that signals when the pooled query behaviour is ready to
// perform work.
func (p *PooledQueryBehaviour) Ready() <-chan struct{} {
func (p *QueryBehaviour) Ready() <-chan struct{} {
return p.ready
}

// Perform executes the next available task from the queue of pending events or advances
// the query pool. Returns an event containing the result of the work performed and a
// true value, or nil and a false value if no event was generated.
func (p *PooledQueryBehaviour) Perform(ctx context.Context) (BehaviourEvent, bool) {
func (p *QueryBehaviour) Perform(ctx context.Context) (BehaviourEvent, bool) {
p.performMu.Lock()
defer p.performMu.Unlock()

Expand Down Expand Up @@ -230,7 +230,7 @@ func (p *PooledQueryBehaviour) Perform(ctx context.Context) (BehaviourEvent, boo
return p.nextPendingOutbound()
}

func (p *PooledQueryBehaviour) nextPendingOutbound() (BehaviourEvent, bool) {
func (p *QueryBehaviour) nextPendingOutbound() (BehaviourEvent, bool) {
if len(p.pendingOutbound) == 0 {
return nil, false
}
Expand All @@ -239,7 +239,7 @@ func (p *PooledQueryBehaviour) nextPendingOutbound() (BehaviourEvent, bool) {
return ev, true
}

func (p *PooledQueryBehaviour) nextPendingInbound() (CtxEvent[BehaviourEvent], bool) {
func (p *QueryBehaviour) nextPendingInbound() (CtxEvent[BehaviourEvent], bool) {
p.pendingInboundMu.Lock()
defer p.pendingInboundMu.Unlock()
if len(p.pendingInbound) == 0 {
Expand All @@ -250,7 +250,7 @@ func (p *PooledQueryBehaviour) nextPendingInbound() (CtxEvent[BehaviourEvent], b
return pev, true
}

func (p *PooledQueryBehaviour) perfomNextInbound(ctx context.Context) (BehaviourEvent, bool) {
func (p *QueryBehaviour) perfomNextInbound(ctx context.Context) (BehaviourEvent, bool) {
ctx, span := p.cfg.Tracer.Start(ctx, "PooledQueryBehaviour.perfomNextInbound")
defer span.End()
pev, ok := p.nextPendingInbound()
Expand Down Expand Up @@ -346,7 +346,7 @@ func (p *PooledQueryBehaviour) perfomNextInbound(ctx context.Context) (Behaviour
return p.advancePool(pev.Ctx, cmd)
}

func (p *PooledQueryBehaviour) updateReadyStatus() {
func (p *QueryBehaviour) updateReadyStatus() {
if len(p.pendingOutbound) != 0 {
select {
case p.ready <- struct{}{}:
Expand All @@ -371,7 +371,7 @@ func (p *PooledQueryBehaviour) updateReadyStatus() {
// advancePool advances the query pool state machine and returns an outbound event if
// there is work to be performed. Also notifies waiters of query completion or
// progress.
func (p *PooledQueryBehaviour) advancePool(ctx context.Context, ev query.PoolEvent) (out BehaviourEvent, term bool) {
func (p *QueryBehaviour) advancePool(ctx context.Context, ev query.PoolEvent) (out BehaviourEvent, term bool) {
ctx, span := p.cfg.Tracer.Start(ctx, "PooledQueryBehaviour.advancePool", trace.WithAttributes(tele.AttrInEvent(ev)))
defer func() {
span.SetAttributes(tele.AttrOutEvent(out))
Expand Down Expand Up @@ -420,15 +420,15 @@ func (p *PooledQueryBehaviour) advancePool(ctx context.Context, ev query.PoolEve
return nil, false
}

func (p *PooledQueryBehaviour) queueAddNodeEvents(nodes []kadt.PeerID) {
func (p *QueryBehaviour) queueAddNodeEvents(nodes []kadt.PeerID) {
for _, info := range nodes {
p.pendingOutbound = append(p.pendingOutbound, &EventAddNode{
NodeID: info,
})
}
}

func (p *PooledQueryBehaviour) queueNonConnectivityEvent(nid kadt.PeerID) {
func (p *QueryBehaviour) queueNonConnectivityEvent(nid kadt.PeerID) {
p.pendingOutbound = append(p.pendingOutbound, &EventNotifyNonConnectivity{
NodeID: nid,
})
Expand Down
30 changes: 15 additions & 15 deletions internal/coord/query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,34 +16,34 @@ import (
"github.com/plprobelab/zikade/pb"
)

func TestPooledQueryConfigValidate(t *testing.T) {
func TestQueryConfigValidate(t *testing.T) {
t.Run("default is valid", func(t *testing.T) {
cfg := DefaultPooledQueryConfig()
cfg := DefaultQueryConfig()

require.NoError(t, cfg.Validate())
})

t.Run("clock is not nil", func(t *testing.T) {
cfg := DefaultPooledQueryConfig()
cfg := DefaultQueryConfig()

cfg.Clock = nil
require.Error(t, cfg.Validate())
})

t.Run("logger not nil", func(t *testing.T) {
cfg := DefaultPooledQueryConfig()
cfg := DefaultQueryConfig()
cfg.Logger = nil
require.Error(t, cfg.Validate())
})

t.Run("tracer not nil", func(t *testing.T) {
cfg := DefaultPooledQueryConfig()
cfg := DefaultQueryConfig()
cfg.Tracer = nil
require.Error(t, cfg.Validate())
})

t.Run("query concurrency positive", func(t *testing.T) {
cfg := DefaultPooledQueryConfig()
cfg := DefaultQueryConfig()

cfg.Concurrency = 0
require.Error(t, cfg.Validate())
Expand All @@ -52,7 +52,7 @@ func TestPooledQueryConfigValidate(t *testing.T) {
})

t.Run("query timeout positive", func(t *testing.T) {
cfg := DefaultPooledQueryConfig()
cfg := DefaultQueryConfig()

cfg.Timeout = 0
require.Error(t, cfg.Validate())
Expand All @@ -61,7 +61,7 @@ func TestPooledQueryConfigValidate(t *testing.T) {
})

t.Run("request concurrency positive", func(t *testing.T) {
cfg := DefaultPooledQueryConfig()
cfg := DefaultQueryConfig()

cfg.RequestConcurrency = 0
require.Error(t, cfg.Validate())
Expand All @@ -70,7 +70,7 @@ func TestPooledQueryConfigValidate(t *testing.T) {
})

t.Run("request timeout positive", func(t *testing.T) {
cfg := DefaultPooledQueryConfig()
cfg := DefaultQueryConfig()

cfg.RequestTimeout = 0
require.Error(t, cfg.Validate())
Expand All @@ -86,7 +86,7 @@ func TestQueryBehaviourBase(t *testing.T) {
type QueryBehaviourBaseTestSuite struct {
suite.Suite

cfg *PooledQueryConfig
cfg *QueryConfig
top *nettest.Topology
nodes []*nettest.Peer
}
Expand All @@ -99,7 +99,7 @@ func (ts *QueryBehaviourBaseTestSuite) SetupTest() {
ts.top = top
ts.nodes = nodes

ts.cfg = DefaultPooledQueryConfig()
ts.cfg = DefaultQueryConfig()
ts.cfg.Clock = clk
}

Expand All @@ -111,7 +111,7 @@ func (ts *QueryBehaviourBaseTestSuite) TestNotifiesNoProgress() {
rt := ts.nodes[0].RoutingTable
seeds := rt.NearestNodes(target, 5)

b, err := NewPooledQueryBehaviour(ts.nodes[0].NodeID, ts.cfg)
b, err := NewQueryBehaviour(ts.nodes[0].NodeID, ts.cfg)
ts.Require().NoError(err)

waiter := NewQueryWaiter(5)
Expand Down Expand Up @@ -158,7 +158,7 @@ func (ts *QueryBehaviourBaseTestSuite) TestNotifiesQueryProgressed() {
rt := ts.nodes[0].RoutingTable
seeds := rt.NearestNodes(target, 5)

b, err := NewPooledQueryBehaviour(ts.nodes[0].NodeID, ts.cfg)
b, err := NewQueryBehaviour(ts.nodes[0].NodeID, ts.cfg)
ts.Require().NoError(err)

waiter := NewQueryWaiter(5)
Expand Down Expand Up @@ -206,7 +206,7 @@ func (ts *QueryBehaviourBaseTestSuite) TestNotifiesQueryFinished() {
rt := ts.nodes[0].RoutingTable
seeds := rt.NearestNodes(target, 5)

b, err := NewPooledQueryBehaviour(ts.nodes[0].NodeID, ts.cfg)
b, err := NewQueryBehaviour(ts.nodes[0].NodeID, ts.cfg)
ts.Require().NoError(err)

waiter := NewQueryWaiter(5)
Expand Down Expand Up @@ -274,7 +274,7 @@ func (ts *QueryBehaviourBaseTestSuite) TestNotifiesQueryFinished() {
kadtest.ReadItem[CtxEvent[*EventQueryFinished]](t, ctx, waiter.Finished())
}

func TestPooledQuery_deadlock_regression(t *testing.T) {
func TestQuery_deadlock_regression(t *testing.T) {
t.Skip()
ctx := kadtest.CtxShort(t)
msg := &pb.Message{}
Expand Down
Loading

0 comments on commit c978c93

Please sign in to comment.