Skip to content

Commit

Permalink
feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
caseydavenport committed Feb 6, 2025
1 parent 7ca831e commit 09dc816
Show file tree
Hide file tree
Showing 13 changed files with 344 additions and 268 deletions.
50 changes: 42 additions & 8 deletions goldmane/pkg/aggregator/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ const (
// listRequest is an internal helper used to synchronously request matching flows from the aggregator.
type listRequest struct {
respCh chan *listResponse
req *proto.ListRequest
req *proto.FlowListRequest
}

type listResponse struct {
Expand All @@ -49,7 +49,7 @@ type listResponse struct {

type streamRequest struct {
respCh chan *Stream
req *proto.StreamRequest
req *proto.FlowStreamRequest
}

type LogAggregator struct {
Expand Down Expand Up @@ -201,7 +201,7 @@ func (a *LogAggregator) Run(startTime int64) {
case req := <-a.streamRequests:
stream := a.streams.register(req)
req.respCh <- stream
a.buckets.StreamFrom(req.req.StartTimeGt, stream)
a.backfill(stream, req.req)
case id := <-a.streams.closedStreams():
a.streams.close(id)
case <-a.done:
Expand Down Expand Up @@ -243,7 +243,7 @@ func (a *LogAggregator) maybeEmitFlows() {

// Stream returns a new Stream from the aggregator. It uses a channel to synchronously request the stream
// from the aggregator.
func (a *LogAggregator) Stream(req *proto.StreamRequest) (*Stream, error) {
func (a *LogAggregator) Stream(req *proto.FlowStreamRequest) (*Stream, error) {
respCh := make(chan *Stream)
defer close(respCh)
a.streamRequests <- streamRequest{respCh, req}
Expand All @@ -256,7 +256,7 @@ func (a *LogAggregator) Stream(req *proto.StreamRequest) (*Stream, error) {

// List returns a list of flows that match the given request. It uses a channel to
// synchronously request the flows from the aggregator.
func (a *LogAggregator) List(req *proto.ListRequest) ([]*proto.FlowResult, error) {
func (a *LogAggregator) List(req *proto.FlowListRequest) ([]*proto.FlowResult, error) {
respCh := make(chan *listResponse)
defer close(respCh)
a.listRequests <- listRequest{respCh, req}
Expand All @@ -281,7 +281,7 @@ func (a *LogAggregator) Hints(req *proto.FilterHintsRequest) ([]*proto.FilterHin
case proto.FilterType_FilterTypeSourceNamespace:
sortBy = proto.SortBy_SourceNamespace
}
flows, err := a.List(&proto.ListRequest{
flows, err := a.List(&proto.FlowListRequest{
SortBy: []*proto.SortOption{{SortBy: sortBy}},
Filter: req.Filter,
StartTimeGt: req.StartTimeGt,
Expand Down Expand Up @@ -338,7 +338,7 @@ func (a *LogAggregator) Hints(req *proto.FilterHintsRequest) ([]*proto.FilterHin
return hints, nil
}

func (a *LogAggregator) validateRequest(req *proto.ListRequest) error {
func (a *LogAggregator) validateRequest(req *proto.FlowListRequest) error {
if req.StartTimeGt != 0 && req.StartTimeLt != 0 && req.StartTimeGt > req.StartTimeLt {
return fmt.Errorf("invalid time range")
}
Expand All @@ -348,7 +348,41 @@ func (a *LogAggregator) validateRequest(req *proto.ListRequest) error {
return nil
}

func (a *LogAggregator) queryFlows(req *proto.ListRequest) *listResponse {
// backfill fills a new Stream instance with historical Flow data based on the request.
func (a *LogAggregator) backfill(stream *Stream, request *proto.FlowStreamRequest) {
if request.StartTimeGt == 0 {
// If no start time is provided, we don't need to backfill any data
// to this stream.
logrus.WithField("id", stream.id).Debug("No start time provided, skipping backfill")
return
}

// ListFlows matching the streaming request and send them to the stream.
resp := a.queryFlows(&proto.FlowListRequest{
StartTimeGt: request.StartTimeGt,
Filter: request.Filter,
AggregationInterval: request.AggregationInterval,
})
if resp.err != nil {
logrus.WithError(resp.err).Error("Failed to stream flows")
return
}

// ListFlows returns a list of flows started with the newest flows first. But the stream
// expects the oldest flows first. So we need to reverse the list before sending it to the stream.
for i := len(resp.flows) - 1; i >= 0; i-- {
flow := resp.flows[i]
logrus.WithField("flow", flow).Debug("Sending backfilled flow to stream")
k := types.ProtoToFlowKey(flow.Flow.Key)
stream.Receive(bucketing.NewFlowBuilder(
a.diachronics[*k],
flow.Flow.StartTime,
flow.Flow.EndTime,
))
}
}

func (a *LogAggregator) queryFlows(req *proto.FlowListRequest) *listResponse {
logrus.WithFields(logrus.Fields{"req": req}).Debug("Received flow request")

// Validate the request.
Expand Down
Loading

0 comments on commit 09dc816

Please sign in to comment.