From 48801a1f6ed17d14e7b9d40af1f77e9fe9834239 Mon Sep 17 00:00:00 2001 From: QxBytes <39818795+QxBytes@users.noreply.github.com> Date: Mon, 29 Apr 2024 12:37:31 -0700 Subject: [PATCH] fix: close connection on any return from azure-vnet-telemetry (#2711) * move error handling code to defer * invert conditional * address linter issues * add close connection on telemetry unmarshal error unit test * Revert "add close connection on telemetry unmarshal error unit test" This reverts commit e8447f9d0d2370b5c742697ffa25d1532eedbac2. --- telemetry/telemetrybuffer.go | 89 +++++++++++++++++++----------------- 1 file changed, 48 insertions(+), 41 deletions(-) diff --git a/telemetry/telemetrybuffer.go b/telemetry/telemetrybuffer.go index 90c9300f53..8aaaa90113 100644 --- a/telemetry/telemetrybuffer.go +++ b/telemetry/telemetrybuffer.go @@ -116,55 +116,62 @@ func (tb *TelemetryBuffer) StartServer() error { tb.connections = append(tb.connections, conn) tb.mutex.Unlock() go func() { + defer func() { + var index int + var value net.Conn + var found bool + + tb.mutex.Lock() + defer tb.mutex.Unlock() + + for index, value = range tb.connections { + if value == conn { + conn.Close() + found = true + break + } + } + + if found { + tb.connections = remove(tb.connections, index) + } + }() + for { reportStr, err := read(conn) - if err == nil { - var tmp map[string]interface{} - err = json.Unmarshal(reportStr, &tmp) + if err != nil { + return + } + var tmp map[string]interface{} + err = json.Unmarshal(reportStr, &tmp) + if err != nil { + if tb.logger != nil { + tb.logger.Error("StartServer: unmarshal error", zap.Error(err)) + } else { + log.Logf("StartServer: unmarshal error:%v", err) + } + return + } + if _, ok := tmp["CniSucceeded"]; ok { + var cniReport CNIReport + err = json.Unmarshal([]byte(reportStr), &cniReport) if err != nil { - if tb.logger != nil { - tb.logger.Error("StartServer: unmarshal error", zap.Error(err)) - } else { - log.Logf("StartServer: unmarshal error:%v", err) - } return } - if _, ok := tmp["CniSucceeded"]; ok { - var cniReport CNIReport - json.Unmarshal([]byte(reportStr), &cniReport) - tb.data <- cniReport - } else if _, ok := tmp["Metric"]; ok { - var aiMetric AIMetric - json.Unmarshal([]byte(reportStr), &aiMetric) - tb.data <- aiMetric - } else { - if tb.logger != nil { - tb.logger.Info("StartServer: default", zap.Any("case", tmp)) - } else { - log.Logf("StartServer: default case:%+v...", tmp) - } + tb.data <- cniReport + } else if _, ok := tmp["Metric"]; ok { + var aiMetric AIMetric + err = json.Unmarshal([]byte(reportStr), &aiMetric) + if err != nil { + return } + tb.data <- aiMetric } else { - var index int - var value net.Conn - var found bool - - tb.mutex.Lock() - defer tb.mutex.Unlock() - - for index, value = range tb.connections { - if value == conn { - conn.Close() - found = true - break - } - } - - if found { - tb.connections = remove(tb.connections, index) + if tb.logger != nil { + tb.logger.Info("StartServer: default", zap.Any("case", tmp)) + } else { + log.Logf("StartServer: default case:%+v...", tmp) } - - return } } }()