Skip to content

Commit

Permalink
Remove remaining TODOs
Browse files Browse the repository at this point in the history
  • Loading branch information
caseydavenport committed Jan 16, 2025
1 parent e13e461 commit 9200322
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 8 deletions.
37 changes: 35 additions & 2 deletions goldmane/pkg/aggregator/flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package aggregator
import (
"github.com/projectcalico/calico/goldmane/pkg/internal/types"
"github.com/projectcalico/calico/goldmane/proto"
"github.com/projectcalico/calico/libcalico-go/lib/set"
)

// flowMatches returns true if the flow matches the request.
Expand All @@ -42,7 +43,39 @@ func mergeFlowInto(a, b *types.Flow) {
a.NumConnectionsCompleted += b.NumConnectionsCompleted
a.NumConnectionsLive += b.NumConnectionsLive

// TODO: Update Start/End times.
// Update Start/End times, to indicate the full duration across all of the
// component flows that have been merged into this aggregated one.
if a.StartTime > b.StartTime {
a.StartTime = b.StartTime
}
if a.EndTime < b.EndTime {
a.EndTime = b.EndTime
}

// To merge labels, we include the intersection of the labels from both flows.
// This means the resulting aggregated flow will have all the labels common to
// its component flows.
a.SourceLabels = intersection(a.SourceLabels, b.SourceLabels)
a.DestLabels = intersection(a.DestLabels, b.DestLabels)
}

// TODO: Merge labels.
// intersection returns the intersection of two slices of strings. i.e., all the values that
// exist in both input slices.
func intersection(a, b []string) []string {
labelsA := set.New[string]()
labelsB := set.New[string]()
intersection := set.New[string]()
for _, v := range a {
labelsA.Add(v)
}
for _, v := range b {
labelsB.Add(v)
}
labelsA.Iter(func(l string) error {
if labelsB.Contains(l) {
intersection.Add(l)
}
return nil
})
return intersection.Slice()
}
48 changes: 42 additions & 6 deletions goldmane/pkg/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,14 +83,15 @@ func (c *flowTimedCache) Add(f *proto.Flow) {
}
}

func (c *flowTimedCache) Iter(f func(f *proto.Flow) error) {
func (c *flowTimedCache) Iter(f func(f *proto.Flow) error) error {
c.Lock()
defer c.Unlock()
for _, v := range c.flows {
if err := f(v.Flow); err != nil {
return
return err
}
}
return nil
}

func (c *flowTimedCache) gcRoutine() {
Expand Down Expand Up @@ -128,6 +129,9 @@ func (c *FlowClient) Run(ctx context.Context) {
// Create a new client to push flows to the server.
cli := proto.NewFlowCollectorClient(c.grpcClient)

// Create a backoff helper.
b := newBackoff(1*time.Second, 10*time.Second)

for {
// Check if the parent context has been canceled.
if err := ctx.Err(); err != nil {
Expand All @@ -140,15 +144,17 @@ func (c *FlowClient) Run(ctx context.Context) {
rc, err := cli.Connect(ctx)
if err != nil {
logrus.WithError(err).Warn("Failed to connect to flow server")
time.Sleep(5 * time.Second)
b.Wait()
continue
}

logrus.Info("Connected to flow server")
b.Reset()

// On a new connection, send all of the flows that we have cached. We're assuming
// this indicates a restart of the flow server. The flow server will handle deuplication
// if we happen to send the same flow twice.
c.cache.Iter(func(f *proto.Flow) error {
err = c.cache.Iter(func(f *proto.Flow) error {
// Send.
if err := rc.Send(&proto.FlowUpdate{Flow: f}); err != nil {
logrus.WithError(err).Warn("Failed to send flow")
Expand All @@ -161,6 +167,10 @@ func (c *FlowClient) Run(ctx context.Context) {
}
return nil
})
if err != nil {
b.Wait()
continue
}

// Send new Flows as they are received.
for flog := range c.inChan {
Expand All @@ -183,12 +193,38 @@ func (c *FlowClient) Run(ctx context.Context) {
if err := rc.CloseSend(); err != nil {
logrus.WithError(err).Warn("Failed to close connection")
}
b.Wait()
}
}

// TODO: Exponential backoff.
time.Sleep(1 * time.Second)
// backoff is a small helper to implement exponential backoff.
func newBackoff(base, maxBackoff time.Duration) *backoff {
return &backoff{
base: base,
interval: base,
maxBackoff: maxBackoff,
}
}

type backoff struct {
base time.Duration
interval time.Duration
maxBackoff time.Duration
}

func (b *backoff) Wait() {
logrus.WithField("duration", b.interval).Info("Waiting before next connection attempt")
time.Sleep(b.interval)
b.interval *= 2
if b.interval > b.maxBackoff {
b.interval = b.maxBackoff
}
}

func (b *backoff) Reset() {
b.interval = b.base
}

func (c *FlowClient) Push(f *proto.Flow) {
// Make a copy of the flow to decouple the caller from the client.
cp := f
Expand Down

0 comments on commit 9200322

Please sign in to comment.