Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
dennis-tra committed Oct 17, 2023
1 parent 7d2f3f4 commit 7b270b5
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 40 deletions.
95 changes: 58 additions & 37 deletions internal/coord/brdcst/mtm.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,30 +26,31 @@ type ManyToMany[K kad.Key[K], N kad.NodeID[K], M coordt.Message] struct {

// keyReports tracks for each key this [ManyToMany] state machine should
// broadcast the number of successes and failures.
// TODO: perhaps this is better tracked outside of this state machine?
keyReports map[string]*report

// unprocessedNodes is a map from a node's ID to its [nodeState]. The
// [nodeState] contains information about all the keys that should be
// unprocessedNodes is a map from a node's ID to its [NodeState]. The
// [NodeState] contains information about all the keys that should be
// stored with that node, as well as, a map of all inflight requests and
// all keys that have already been tried to store with that node.
unprocessedNodes map[string]*nodeState[K, N]
unprocessedNodes map[string]*NodeState[K, N]

// inflightWithCapacity holds information about nodes that we are currently
// contacting but still have capacity to receive more requests from us. The
// term capacity refers to the number of concurrent streams we can open to
// a single node based on [ConfigManyToMany.StreamConcurrency].
inflightWithCapacity map[string]*nodeState[K, N]
inflightWithCapacity map[string]*NodeState[K, N]

// inflightWithCapacity holds information about nodes that we are currently
// contacting with no capacity to receive more concurrent streams. The
// term capacity refers to the number of concurrent streams we can open
// to a single node based on [ConfigManyToMany.StreamConcurrency].
inflightAtCapacity map[string]*nodeState[K, N]
inflightAtCapacity map[string]*NodeState[K, N]

// processedNodes is a map from a node's ID to its [nodeState]. All nodes
// processedNodes is a map from a node's ID to its [NodeState]. All nodes
// in this map have been fully processed. This means that all keys we wanted
// to store with a node have been attempted to be stored with it.
processedNodes map[string]*nodeState[K, N]
processedNodes map[string]*NodeState[K, N]

// msgFunc takes a key and returns the corresponding message that we will
// need to send to the remote node to store said key.
Expand All @@ -61,7 +62,7 @@ type brdcstManyMapVal[K kad.Key[K], N kad.NodeID[K]] struct {
node N
}

type nodeState[K kad.Key[K], N kad.NodeID[K]] struct {
type NodeState[K kad.Key[K], N kad.NodeID[K]] struct {
node N
todo []K
inflight map[string]K
Expand All @@ -81,14 +82,20 @@ func NewManyToMany[K kad.Key[K], N kad.NodeID[K], M coordt.Message](qid coordt.Q
t.Add(s.Key(), s)
}

// TODO: the below is quite expensive for many keys. It's probably worth doing this outside of the event loop

// find out which seed nodes are responsible to hold the provider/put
// record for which target key.
keyReports := make(map[string]*report, len(cfg.Targets))
mappings := map[string]map[string]*brdcstManyMapVal[K, N]{} // map from node -> map of target keys -> target key
for _, target := range cfg.Targets {
entries := trie.Closest(t, target, 20) // TODO: make configurable
targetMapKey := key.HexString(target)
keyReports[targetMapKey] = &report{}

if len(entries) > 0 {
keyReports[targetMapKey] = &report{}
}

for _, entry := range entries {
node := entry.Data
nodeMapKey := node.String()
Expand All @@ -100,13 +107,13 @@ func NewManyToMany[K kad.Key[K], N kad.NodeID[K], M coordt.Message](qid coordt.Q
}
}

unprocessedNodes := make(map[string]*nodeState[K, N], len(mappings))
unprocessedNodes := make(map[string]*NodeState[K, N], len(mappings))
for node, mapVals := range mappings {
if len(mapVals) == 0 {
continue
}

unprocessedNodes[node] = &nodeState[K, N]{
unprocessedNodes[node] = &NodeState[K, N]{
todo: make([]K, 0, len(mapVals)),
done: make([]K, 0, len(mapVals)),
inflight: map[string]K{},
Expand All @@ -122,9 +129,9 @@ func NewManyToMany[K kad.Key[K], N kad.NodeID[K], M coordt.Message](qid coordt.Q
cfg: cfg,
keyReports: keyReports,
unprocessedNodes: unprocessedNodes,
inflightWithCapacity: map[string]*nodeState[K, N]{},
inflightAtCapacity: map[string]*nodeState[K, N]{},
processedNodes: map[string]*nodeState[K, N]{},
inflightWithCapacity: map[string]*NodeState[K, N]{},
inflightAtCapacity: map[string]*NodeState[K, N]{},
processedNodes: map[string]*NodeState[K, N]{},
msgFunc: msgFunc,
}
}
Expand All @@ -140,32 +147,18 @@ func (mtm *ManyToMany[K, N, M]) Advance(ctx context.Context, ev BroadcastEvent)
switch ev := ev.(type) {
case *EventBroadcastStop:
case *EventBroadcastStoreRecordSuccess[K, N, M]:
mapKey := ev.NodeID.String()
if nstate, found := mtm.inflightAtCapacity[mapKey]; found {
delete(mtm.inflightAtCapacity, mapKey)
delete(nstate.inflight, key.HexString(ev.Target))
nstate.done = append(nstate.done, ev.Target)

if len(nstate.todo) == 0 {
if len(nstate.inflight) == 0 {
mtm.processedNodes[mapKey] = nstate
} else {
mtm.inflightAtCapacity[mapKey] = nstate
}
} else if len(nstate.inflight) != 0 {
mtm.inflightWithCapacity[mapKey] = nstate
}
} else if nstate, found := mtm.inflightWithCapacity[mapKey]; found {
delete(mtm.inflightWithCapacity, mapKey)
delete(nstate.inflight, key.HexString(ev.Target))
nstate.done = append(nstate.done, ev.Target)
mtm.handleStoreRecordResult(ev.NodeID, ev.Target)

if len(nstate.todo) != 0 {
mtm.inflightWithCapacity[mapKey] = nstate
}
}
targetMapKey := key.HexString(ev.Target)
mtm.keyReports[targetMapKey].successes += 1
mtm.keyReports[targetMapKey].lastSuccess = time.Now()

case *EventBroadcastStoreRecordFailure[K, N, M]:
mtm.handleStoreRecordResult(ev.NodeID, ev.Target)

targetMapKey := key.HexString(ev.Target)
mtm.keyReports[targetMapKey].failures += 1

case *EventBroadcastPoll:
// ignore, nothing to do
default:
Expand Down Expand Up @@ -236,3 +229,31 @@ func (mtm *ManyToMany[K, N, M]) Advance(ctx context.Context, ev BroadcastEvent)
}{},
}
}

func (mtm *ManyToMany[K, N, M]) handleStoreRecordResult(node N, target K) {
nodeMapKey := node.String()
targetMapKey := key.HexString(target)
if nstate, found := mtm.inflightAtCapacity[nodeMapKey]; found {
delete(mtm.inflightAtCapacity, nodeMapKey)
delete(nstate.inflight, targetMapKey)
nstate.done = append(nstate.done, target)

if len(nstate.todo) == 0 {
if len(nstate.inflight) == 0 {
mtm.processedNodes[nodeMapKey] = nstate
} else {
mtm.inflightAtCapacity[nodeMapKey] = nstate
}
} else if len(nstate.inflight) != 0 {
mtm.inflightWithCapacity[nodeMapKey] = nstate
}
} else if nstate, found := mtm.inflightWithCapacity[nodeMapKey]; found {
delete(mtm.inflightWithCapacity, nodeMapKey)
delete(nstate.inflight, targetMapKey)
nstate.done = append(nstate.done, target)

if len(nstate.todo) != 0 {
mtm.inflightWithCapacity[nodeMapKey] = nstate
}
}
}
9 changes: 9 additions & 0 deletions internal/coord/brdcst/mtm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"math/big"
"testing"

"github.com/plprobelab/go-libdht/kad/key"
"github.com/stretchr/testify/require"

"github.com/plprobelab/zikade/internal/coord/coordt"
Expand Down Expand Up @@ -46,6 +47,8 @@ func TestNewManyToMany(t *testing.T) {
require.Len(t, sm.inflightAtCapacity, 0)
require.NotNil(t, sm.processedNodes)
require.Len(t, sm.processedNodes, 0)
require.Len(t, sm.keyReports, 0)
require.NotNil(t, sm.keyReports)
require.NotNil(t, sm.msgFunc)

state := sm.Advance(ctx, &EventBroadcastPoll{})
Expand All @@ -68,6 +71,7 @@ func TestNewManyToMany(t *testing.T) {
require.Len(t, sm.inflightWithCapacity, 0)
require.Len(t, sm.inflightAtCapacity, 0)
require.Len(t, sm.processedNodes, 0)
require.Len(t, sm.keyReports, 0)
require.NotNil(t, sm.msgFunc)

state := sm.Advance(ctx, &EventBroadcastPoll{})
Expand Down Expand Up @@ -97,6 +101,7 @@ func TestNewManyToMany(t *testing.T) {
require.Len(t, sm.inflightWithCapacity, 0)
require.Len(t, sm.inflightAtCapacity, 0)
require.Len(t, sm.processedNodes, 0)
require.Len(t, sm.keyReports, 20)
require.NotNil(t, sm.msgFunc)
})

Expand Down Expand Up @@ -158,6 +163,10 @@ func TestManyToMany_Advance_single_target_single_seed(t *testing.T) {
require.True(t, ok, "type is %T", state)
require.Equal(t, fstate.QueryID, qid)
require.Equal(t, seed[0], fstate.Contacted[0])

require.Equal(t, 1, sm.keyReports[key.HexString(targets[0])].successes)
require.Equal(t, 0, sm.keyReports[key.HexString(targets[0])].failures)
require.False(t, sm.keyReports[key.HexString(targets[0])].lastSuccess.IsZero())
}

func TestManyToMany_Advance_multi_target_single_seed(t *testing.T) {
Expand Down
16 changes: 13 additions & 3 deletions internal/coord/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"math"
"reflect"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -413,11 +414,20 @@ func (c *Coordinator) BroadcastStatic(ctx context.Context, msg *pb.Message, seed
return c.broadcast(ctx, msgFunc, seed, brdcst.DefaultConfigOneToMany(msg.Target()))
}

func (c *Coordinator) BroadcastMany(ctx context.Context, keys []kadt.Key, seed []kadt.PeerID, msgFn func(k kadt.Key) *pb.Message) error {
func (c *Coordinator) BroadcastMany(ctx context.Context, keys []kadt.Key, msgFn func(k kadt.Key) *pb.Message) error {
// verify that we have keys to push into the network
if len(keys) == 0 {
return fmt.Errorf("no keys to broadcast")
}

// grab the entire routing table contents
seed := c.rt.NearestNodes(keys[0], math.MaxInt)

// start broadcasting
return c.broadcast(ctx, msgFn, seed, brdcst.DefaultConfigManyToMany(keys))
}

func (c *Coordinator) broadcast(ctx context.Context, msgFunc func(k kadt.Key) *pb.Message, seeds []kadt.PeerID, cfg brdcst.Config) error {
func (c *Coordinator) broadcast(ctx context.Context, msgFunc func(k kadt.Key) *pb.Message, seed []kadt.PeerID, cfg brdcst.Config) error {
ctx, span := c.tele.Tracer.Start(ctx, "Coordinator.broadcast")
defer span.End()

Expand All @@ -430,7 +440,7 @@ func (c *Coordinator) broadcast(ctx context.Context, msgFunc func(k kadt.Key) *p
cmd := &EventStartBroadcast{
QueryID: queryID,
MsgFunc: msgFunc,
Seed: seeds,
Seed: seed,
Notify: waiter,
Config: cfg,
}
Expand Down

0 comments on commit 7b270b5

Please sign in to comment.