Skip to content

Commit

Permalink
[FlowAggregator] Use correct input channel in flowExportLoopProxy (#6990
Browse files Browse the repository at this point in the history
)

The code was invalid as the function was consuming messages straight
from the collector instead of from the preprocessor, hence bypasing the
preprocessor. This was not caught by e2e tests because the preprocessor
output channel is small (capacity of 16), hence it was filling up very
quickly and only a few uninteresting (i.e., irrelevant to the test
cases) records were lost.

Signed-off-by: Antonin Bas <antonin.bas@broadcom.com>
  • Loading branch information
antoninbas authored Feb 12, 2025
1 parent 0eb0709 commit 3bf8441
Show file tree
Hide file tree
Showing 2 changed files with 2 additions and 3 deletions.
2 changes: 1 addition & 1 deletion pkg/flowaggregator/flowaggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -560,7 +560,7 @@ func (fa *flowAggregator) proxyRecord(record ipfixentities.Record, obsDomainID u
func (fa *flowAggregator) flowExportLoopProxy(stopCh <-chan struct{}) {
logTicker := time.NewTicker(fa.logTickerDuration)
defer logTicker.Stop()
msgCh := fa.collectingProcess.GetMsgChan()
msgCh := fa.preprocessorOutCh

proxyRecords := func(msg *ipfixentities.Message) {
set := msg.GetSet()
Expand Down
3 changes: 1 addition & 2 deletions test/e2e/fixtures.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,8 +315,7 @@ func setupFlowAggregator(tb testing.TB, testData *TestData, o flowVisibilityTest
return err
}
tb.Logf("ClickHouse Service created with ClusterIP: %v", chSvcIP)
tb.Logf("Applying flow aggregator YAML with ipfix collector: %s and clickHouse enabled",
ipfixCollectorAddr)
tb.Logf("Deploying FlowAggregator with ipfix collector: %s and options: %+v", ipfixCollectorAddr, o)

if err := testData.deployFlowAggregator(ipfixCollectorAddr, o); err != nil {
return err
Expand Down

0 comments on commit 3bf8441

Please sign in to comment.