From 0de77a3d06dd55f67d1822da77992ef5ba1d872a Mon Sep 17 00:00:00 2001 From: Ian Davis <18375+iand@users.noreply.github.com> Date: Thu, 19 Oct 2023 09:58:02 +0100 Subject: [PATCH] chore: return routing state machine events directly instead of queuing (#68) I meant to do this as part of #64 but forgot. It makes the event return slightly more efficient. --- internal/coord/routing.go | 63 ++++++++++----------------------------- 1 file changed, 15 insertions(+), 48 deletions(-) diff --git a/internal/coord/routing.go b/internal/coord/routing.go index f64412b..a62681a 100644 --- a/internal/coord/routing.go +++ b/internal/coord/routing.go @@ -535,10 +535,7 @@ func (r *RoutingBehaviour) perfomNextInbound() (BehaviourEvent, bool) { NodeID: ev.NodeID, } // attempt to advance the include - next, ok := r.advanceInclude(ctx, cmd) - if ok { - r.pendingOutbound = append(r.pendingOutbound, next) - } + return r.advanceInclude(ctx, cmd) case *EventRoutingUpdated: span.SetAttributes(attribute.String("event", "EventRoutingUpdated"), attribute.String("nodeid", ev.NodeID.String())) @@ -546,10 +543,7 @@ func (r *RoutingBehaviour) perfomNextInbound() (BehaviourEvent, bool) { NodeID: ev.NodeID, } // attempt to advance the probe state machine - next, ok := r.advanceProbe(ctx, cmd) - if ok { - r.pendingOutbound = append(r.pendingOutbound, next) - } + return r.advanceProbe(ctx, cmd) case *EventGetCloserNodesSuccess: span.SetAttributes(attribute.String("event", "EventGetCloserNodesSuccess"), attribute.String("queryid", string(ev.QueryID)), attribute.String("nodeid", ev.To.String())) @@ -566,10 +560,7 @@ func (r *RoutingBehaviour) perfomNextInbound() (BehaviourEvent, bool) { CloserNodes: ev.CloserNodes, } // attempt to advance the bootstrap - next, ok := r.advanceBootstrap(ctx, cmd) - if ok { - r.pendingOutbound = append(r.pendingOutbound, next) - } + return r.advanceBootstrap(ctx, cmd) case IncludeQueryID: var cmd routing.IncludeEvent @@ -585,10 +576,7 @@ func (r *RoutingBehaviour) perfomNextInbound() (BehaviourEvent, bool) { } } // attempt to advance the include - next, ok := r.advanceInclude(ctx, cmd) - if ok { - r.pendingOutbound = append(r.pendingOutbound, next) - } + return r.advanceInclude(ctx, cmd) case ProbeQueryID: var cmd routing.ProbeEvent @@ -604,10 +592,7 @@ func (r *RoutingBehaviour) perfomNextInbound() (BehaviourEvent, bool) { } } // attempt to advance the probe state machine - next, ok := r.advanceProbe(ctx, cmd) - if ok { - r.pendingOutbound = append(r.pendingOutbound, next) - } + return r.advanceProbe(ctx, cmd) case routing.ExploreQueryID: for _, info := range ev.CloserNodes { @@ -619,10 +604,7 @@ func (r *RoutingBehaviour) perfomNextInbound() (BehaviourEvent, bool) { NodeID: ev.To, CloserNodes: ev.CloserNodes, } - next, ok := r.advanceExplore(ctx, cmd) - if ok { - r.pendingOutbound = append(r.pendingOutbound, next) - } + return r.advanceExplore(ctx, cmd) default: panic(fmt.Sprintf("unexpected query id: %s", ev.QueryID)) @@ -637,40 +619,31 @@ func (r *RoutingBehaviour) perfomNextInbound() (BehaviourEvent, bool) { Error: ev.Err, } // attempt to advance the bootstrap - next, ok := r.advanceBootstrap(ctx, cmd) - if ok { - r.pendingOutbound = append(r.pendingOutbound, next) - } + return r.advanceBootstrap(ctx, cmd) + case IncludeQueryID: cmd := &routing.EventIncludeConnectivityCheckFailure[kadt.Key, kadt.PeerID]{ NodeID: ev.To, Error: ev.Err, } // attempt to advance the include state machine - next, ok := r.advanceInclude(ctx, cmd) - if ok { - r.pendingOutbound = append(r.pendingOutbound, next) - } + return r.advanceInclude(ctx, cmd) + case ProbeQueryID: cmd := &routing.EventProbeConnectivityCheckFailure[kadt.Key, kadt.PeerID]{ NodeID: ev.To, Error: ev.Err, } // attempt to advance the probe state machine - next, ok := r.advanceProbe(ctx, cmd) - if ok { - r.pendingOutbound = append(r.pendingOutbound, next) - } + return r.advanceProbe(ctx, cmd) + case routing.ExploreQueryID: cmd := &routing.EventExploreFindCloserFailure[kadt.Key, kadt.PeerID]{ NodeID: ev.To, Error: ev.Err, } // attempt to advance the explore - next, ok := r.advanceExplore(ctx, cmd) - if ok { - r.pendingOutbound = append(r.pendingOutbound, next) - } + return r.advanceExplore(ctx, cmd) default: panic(fmt.Sprintf("unexpected query id: %s", ev.QueryID)) @@ -696,10 +669,7 @@ func (r *RoutingBehaviour) perfomNextInbound() (BehaviourEvent, bool) { cmdProbe := &routing.EventProbeNotifyConnectivity[kadt.Key, kadt.PeerID]{ NodeID: ev.NodeID, } - nextProbe, ok := r.advanceProbe(ctx, cmdProbe) - if ok { - r.pendingOutbound = append(r.pendingOutbound, nextProbe) - } + return r.advanceProbe(ctx, cmdProbe) case *EventNotifyNonConnectivity: span.SetAttributes(attribute.String("event", "EventNotifyConnectivity"), attribute.String("nodeid", ev.NodeID.String())) @@ -707,10 +677,7 @@ func (r *RoutingBehaviour) perfomNextInbound() (BehaviourEvent, bool) { cmdProbe := &routing.EventProbeRemove[kadt.Key, kadt.PeerID]{ NodeID: ev.NodeID, } - nextProbe, ok := r.advanceProbe(ctx, cmdProbe) - if ok { - r.pendingOutbound = append(r.pendingOutbound, nextProbe) - } + return r.advanceProbe(ctx, cmdProbe) case *EventRoutingPoll: r.pollChildren(ctx)