Skip to content

Commit

Permalink
chore: return routing state machine events directly instead of queuing (
Browse files Browse the repository at this point in the history
#68)

I meant to do this as part of #64 but forgot. It makes the event return
slightly more efficient.
  • Loading branch information
iand authored Oct 19, 2023
1 parent a092a97 commit 0de77a3
Showing 1 changed file with 15 additions and 48 deletions.
63 changes: 15 additions & 48 deletions internal/coord/routing.go
Original file line number Diff line number Diff line change
Expand Up @@ -535,21 +535,15 @@ func (r *RoutingBehaviour) perfomNextInbound() (BehaviourEvent, bool) {
NodeID: ev.NodeID,
}
// attempt to advance the include
next, ok := r.advanceInclude(ctx, cmd)
if ok {
r.pendingOutbound = append(r.pendingOutbound, next)
}
return r.advanceInclude(ctx, cmd)

case *EventRoutingUpdated:
span.SetAttributes(attribute.String("event", "EventRoutingUpdated"), attribute.String("nodeid", ev.NodeID.String()))
cmd := &routing.EventProbeAdd[kadt.Key, kadt.PeerID]{
NodeID: ev.NodeID,
}
// attempt to advance the probe state machine
next, ok := r.advanceProbe(ctx, cmd)
if ok {
r.pendingOutbound = append(r.pendingOutbound, next)
}
return r.advanceProbe(ctx, cmd)

case *EventGetCloserNodesSuccess:
span.SetAttributes(attribute.String("event", "EventGetCloserNodesSuccess"), attribute.String("queryid", string(ev.QueryID)), attribute.String("nodeid", ev.To.String()))
Expand All @@ -566,10 +560,7 @@ func (r *RoutingBehaviour) perfomNextInbound() (BehaviourEvent, bool) {
CloserNodes: ev.CloserNodes,
}
// attempt to advance the bootstrap
next, ok := r.advanceBootstrap(ctx, cmd)
if ok {
r.pendingOutbound = append(r.pendingOutbound, next)
}
return r.advanceBootstrap(ctx, cmd)

case IncludeQueryID:
var cmd routing.IncludeEvent
Expand All @@ -585,10 +576,7 @@ func (r *RoutingBehaviour) perfomNextInbound() (BehaviourEvent, bool) {
}
}
// attempt to advance the include
next, ok := r.advanceInclude(ctx, cmd)
if ok {
r.pendingOutbound = append(r.pendingOutbound, next)
}
return r.advanceInclude(ctx, cmd)

case ProbeQueryID:
var cmd routing.ProbeEvent
Expand All @@ -604,10 +592,7 @@ func (r *RoutingBehaviour) perfomNextInbound() (BehaviourEvent, bool) {
}
}
// attempt to advance the probe state machine
next, ok := r.advanceProbe(ctx, cmd)
if ok {
r.pendingOutbound = append(r.pendingOutbound, next)
}
return r.advanceProbe(ctx, cmd)

case routing.ExploreQueryID:
for _, info := range ev.CloserNodes {
Expand All @@ -619,10 +604,7 @@ func (r *RoutingBehaviour) perfomNextInbound() (BehaviourEvent, bool) {
NodeID: ev.To,
CloserNodes: ev.CloserNodes,
}
next, ok := r.advanceExplore(ctx, cmd)
if ok {
r.pendingOutbound = append(r.pendingOutbound, next)
}
return r.advanceExplore(ctx, cmd)

default:
panic(fmt.Sprintf("unexpected query id: %s", ev.QueryID))
Expand All @@ -637,40 +619,31 @@ func (r *RoutingBehaviour) perfomNextInbound() (BehaviourEvent, bool) {
Error: ev.Err,
}
// attempt to advance the bootstrap
next, ok := r.advanceBootstrap(ctx, cmd)
if ok {
r.pendingOutbound = append(r.pendingOutbound, next)
}
return r.advanceBootstrap(ctx, cmd)

case IncludeQueryID:
cmd := &routing.EventIncludeConnectivityCheckFailure[kadt.Key, kadt.PeerID]{
NodeID: ev.To,
Error: ev.Err,
}
// attempt to advance the include state machine
next, ok := r.advanceInclude(ctx, cmd)
if ok {
r.pendingOutbound = append(r.pendingOutbound, next)
}
return r.advanceInclude(ctx, cmd)

case ProbeQueryID:
cmd := &routing.EventProbeConnectivityCheckFailure[kadt.Key, kadt.PeerID]{
NodeID: ev.To,
Error: ev.Err,
}
// attempt to advance the probe state machine
next, ok := r.advanceProbe(ctx, cmd)
if ok {
r.pendingOutbound = append(r.pendingOutbound, next)
}
return r.advanceProbe(ctx, cmd)

case routing.ExploreQueryID:
cmd := &routing.EventExploreFindCloserFailure[kadt.Key, kadt.PeerID]{
NodeID: ev.To,
Error: ev.Err,
}
// attempt to advance the explore
next, ok := r.advanceExplore(ctx, cmd)
if ok {
r.pendingOutbound = append(r.pendingOutbound, next)
}
return r.advanceExplore(ctx, cmd)

default:
panic(fmt.Sprintf("unexpected query id: %s", ev.QueryID))
Expand All @@ -696,21 +669,15 @@ func (r *RoutingBehaviour) perfomNextInbound() (BehaviourEvent, bool) {
cmdProbe := &routing.EventProbeNotifyConnectivity[kadt.Key, kadt.PeerID]{
NodeID: ev.NodeID,
}
nextProbe, ok := r.advanceProbe(ctx, cmdProbe)
if ok {
r.pendingOutbound = append(r.pendingOutbound, nextProbe)
}
return r.advanceProbe(ctx, cmdProbe)
case *EventNotifyNonConnectivity:
span.SetAttributes(attribute.String("event", "EventNotifyConnectivity"), attribute.String("nodeid", ev.NodeID.String()))

// tell the probe state machine to remove the node from the routing table and probe list
cmdProbe := &routing.EventProbeRemove[kadt.Key, kadt.PeerID]{
NodeID: ev.NodeID,
}
nextProbe, ok := r.advanceProbe(ctx, cmdProbe)
if ok {
r.pendingOutbound = append(r.pendingOutbound, nextProbe)
}
return r.advanceProbe(ctx, cmdProbe)
case *EventRoutingPoll:
r.pollChildren(ctx)

Expand Down

0 comments on commit 0de77a3

Please sign in to comment.