diff --git a/telemetry/telemetrybuffer.go b/telemetry/telemetrybuffer.go index 90c9300f53..8aaaa90113 100644 --- a/telemetry/telemetrybuffer.go +++ b/telemetry/telemetrybuffer.go @@ -116,55 +116,62 @@ func (tb *TelemetryBuffer) StartServer() error { tb.connections = append(tb.connections, conn) tb.mutex.Unlock() go func() { + defer func() { + var index int + var value net.Conn + var found bool + + tb.mutex.Lock() + defer tb.mutex.Unlock() + + for index, value = range tb.connections { + if value == conn { + conn.Close() + found = true + break + } + } + + if found { + tb.connections = remove(tb.connections, index) + } + }() + for { reportStr, err := read(conn) - if err == nil { - var tmp map[string]interface{} - err = json.Unmarshal(reportStr, &tmp) + if err != nil { + return + } + var tmp map[string]interface{} + err = json.Unmarshal(reportStr, &tmp) + if err != nil { + if tb.logger != nil { + tb.logger.Error("StartServer: unmarshal error", zap.Error(err)) + } else { + log.Logf("StartServer: unmarshal error:%v", err) + } + return + } + if _, ok := tmp["CniSucceeded"]; ok { + var cniReport CNIReport + err = json.Unmarshal([]byte(reportStr), &cniReport) if err != nil { - if tb.logger != nil { - tb.logger.Error("StartServer: unmarshal error", zap.Error(err)) - } else { - log.Logf("StartServer: unmarshal error:%v", err) - } return } - if _, ok := tmp["CniSucceeded"]; ok { - var cniReport CNIReport - json.Unmarshal([]byte(reportStr), &cniReport) - tb.data <- cniReport - } else if _, ok := tmp["Metric"]; ok { - var aiMetric AIMetric - json.Unmarshal([]byte(reportStr), &aiMetric) - tb.data <- aiMetric - } else { - if tb.logger != nil { - tb.logger.Info("StartServer: default", zap.Any("case", tmp)) - } else { - log.Logf("StartServer: default case:%+v...", tmp) - } + tb.data <- cniReport + } else if _, ok := tmp["Metric"]; ok { + var aiMetric AIMetric + err = json.Unmarshal([]byte(reportStr), &aiMetric) + if err != nil { + return } + tb.data <- aiMetric } else { - var index int - var value net.Conn - var found bool - - tb.mutex.Lock() - defer tb.mutex.Unlock() - - for index, value = range tb.connections { - if value == conn { - conn.Close() - found = true - break - } - } - - if found { - tb.connections = remove(tb.connections, index) + if tb.logger != nil { + tb.logger.Info("StartServer: default", zap.Any("case", tmp)) + } else { + log.Logf("StartServer: default case:%+v...", tmp) } - - return } } }()