Skip to content

Commit

Permalink
Merge pull request #1105 from LiZhenCheng9527/add-metric
Browse files Browse the repository at this point in the history
add workload metrics
  • Loading branch information
kmesh-bot authored Jan 21, 2025
2 parents 35b7b22 + 510bcb1 commit 4bfc7b7
Show file tree
Hide file tree
Showing 5 changed files with 41 additions and 16 deletions.
10 changes: 0 additions & 10 deletions bpf/kmesh/probes/tcp_probe.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,7 @@ struct tcp_probe_info {
__u32 protocol;
__u32 srtt_us; /* smoothed round trip time << 3 in usecs */
__u32 rtt_min;
__u32 mss_cache; /* Cached effective mss, not including SACKS */
__u32 total_retrans; /* Total retransmits for entire connection */
__u32 segs_in; /* RFC4898 tcpEStatsPerfSegsIn
* total number of segments in.
*/
__u32 segs_out; /* RFC4898 tcpEStatsPerfSegsOut
* The total number of segments sent.
*/
__u32 lost_out; /* Lost packets */
};

Expand Down Expand Up @@ -108,10 +101,7 @@ static inline void get_tcp_probe_info(struct bpf_tcp_sock *tcp_sock, struct tcp_
info->received_bytes = tcp_sock->bytes_received;
info->srtt_us = tcp_sock->srtt_us;
info->rtt_min = tcp_sock->rtt_min;
info->mss_cache = tcp_sock->mss_cache;
info->total_retrans = tcp_sock->total_retrans;
info->segs_in = tcp_sock->segs_in;
info->segs_out = tcp_sock->segs_out;
info->lost_out = tcp_sock->lost_out;
return;
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/telemetry/accesslog.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func buildAccesslog(data requestMetric, accesslog logInfo) string {
timeInfo := fmt.Sprintf("%v", uptime)
sourceInfo := fmt.Sprintf("src.addr=%s, src.workload=%s, src.namespace=%s", accesslog.sourceAddress, accesslog.sourceWorkload, accesslog.sourceNamespace)
destinationInfo := fmt.Sprintf("dst.addr=%s, dst.service=%s, dst.workload=%s, dst.namespace=%s", accesslog.destinationAddress, accesslog.destinationService, accesslog.destinationWorkload, accesslog.destinationNamespace)
connectionInfo := fmt.Sprintf("direction=%s, sent_bytes=%d, received_bytes=%d, duration=%vms", accesslog.direction, data.sentBytes, data.receivedBytes, (float64(data.duration) / 1000000.0))
connectionInfo := fmt.Sprintf("direction=%s, sent_bytes=%d, received_bytes=%d, srtt=%dus, min_rtt=%dus, duration=%vms", accesslog.direction, data.sentBytes, data.receivedBytes, data.srtt, data.minRtt, (float64(data.duration) / 1000000.0))

logResult := fmt.Sprintf("%s %s, %s, %s", timeInfo, sourceInfo, destinationInfo, connectionInfo)
return logResult
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/telemetry/accesslog_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func Test_buildAccesslog(t *testing.T) {
destinationNamespace: "kmesh-system",
},
},
want: "2024-08-14 10:11:27.005837715 +0000 UTC src.addr=10.244.0.10:47667, src.workload=sleep-7656cf8794-9v2gv, src.namespace=kmesh-system, dst.addr=10.244.0.7:8080, dst.service=httpbin.ambient-demo.svc.cluster.local, dst.workload=httpbin-86b8ffc5ff-bhvxx, dst.namespace=kmesh-system, direction=INBOUND, sent_bytes=60, received_bytes=172, duration=2.236ms",
want: "2024-08-14 10:11:27.005837715 +0000 UTC src.addr=10.244.0.10:47667, src.workload=sleep-7656cf8794-9v2gv, src.namespace=kmesh-system, dst.addr=10.244.0.7:8080, dst.service=httpbin.ambient-demo.svc.cluster.local, dst.workload=httpbin-86b8ffc5ff-bhvxx, dst.namespace=kmesh-system, direction=INBOUND, sent_bytes=60, received_bytes=172, srtt=0us, min_rtt=0us, duration=2.236ms",
},
}
osStartTime = time.Date(2024, 7, 4, 20, 14, 0, 0, time.UTC)
Expand Down
25 changes: 22 additions & 3 deletions pkg/controller/telemetry/metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ const (

connection_success = uint32(1)

MSG_LEN = 96

metricFlushInterval = 5 * time.Second

DEFAULT_UNKNOWN = "-"
Expand All @@ -66,6 +68,8 @@ type workloadMetricInfo struct {
WorkloadConnSentBytes float64
WorkloadConnReceivedBytes float64
WorkloadConnFailed float64
WorkloadConnTotalRetrans float64
WorkloadConnPacketLost float64
}

type serviceMetricInfo struct {
Expand All @@ -88,10 +92,7 @@ type statistics struct {
Protocol uint32
SRttTime uint32
RttMin uint32
MssCache uint32
Retransmits uint32
SegmentsIn uint32
SegmentsOut uint32
LostPackets uint32
}

Expand Down Expand Up @@ -135,6 +136,10 @@ type requestMetric struct {
success uint32
duration uint64
closeTime uint64
srtt uint32
minRtt uint32
totalRetrans uint32
PacketLost uint32
}

type workloadMetricLabels struct {
Expand Down Expand Up @@ -408,6 +413,10 @@ func buildV4Metric(buf *bytes.Buffer) (requestMetric, error) {
data.success = connectData.ConnectSuccess
data.duration = connectData.Duration
data.closeTime = connectData.CloseTime
data.srtt = connectData.statistics.SRttTime
data.minRtt = connectData.statistics.RttMin
data.totalRetrans = connectData.statistics.Retransmits
data.PacketLost = connectData.statistics.LostPackets

return data, nil
}
Expand Down Expand Up @@ -441,6 +450,10 @@ func buildV6Metric(buf *bytes.Buffer) (requestMetric, error) {
data.success = connectData.ConnectSuccess
data.duration = connectData.Duration
data.closeTime = connectData.CloseTime
data.srtt = connectData.statistics.SRttTime
data.minRtt = connectData.statistics.RttMin
data.totalRetrans = connectData.statistics.Retransmits
data.PacketLost = connectData.statistics.LostPackets

return data, nil
}
Expand Down Expand Up @@ -607,6 +620,8 @@ func (m *MetricController) updateWorkloadMetricCache(data requestMetric, labels
}
v.WorkloadConnReceivedBytes = v.WorkloadConnReceivedBytes + float64(data.receivedBytes)
v.WorkloadConnSentBytes = v.WorkloadConnSentBytes + float64(data.sentBytes)
v.WorkloadConnTotalRetrans = v.WorkloadConnTotalRetrans + float64(data.totalRetrans)
v.WorkloadConnPacketLost = v.WorkloadConnPacketLost + float64(data.PacketLost)
} else {
newWorkloadMetricInfo := workloadMetricInfo{}
if data.state == TCP_ESTABLISHED {
Expand All @@ -620,6 +635,8 @@ func (m *MetricController) updateWorkloadMetricCache(data requestMetric, labels
}
newWorkloadMetricInfo.WorkloadConnReceivedBytes = float64(data.receivedBytes)
newWorkloadMetricInfo.WorkloadConnSentBytes = float64(data.sentBytes)
newWorkloadMetricInfo.WorkloadConnTotalRetrans = float64(data.totalRetrans)
newWorkloadMetricInfo.WorkloadConnPacketLost = float64(data.PacketLost)
m.workloadMetricCache[labels] = &newWorkloadMetricInfo
}
}
Expand Down Expand Up @@ -670,6 +687,8 @@ func (m *MetricController) updatePrometheusMetric() {
tcpSentBytesInWorkload.With(workloadLabels).Add(v.WorkloadConnSentBytes)
tcpReceivedBytesInWorkload.With(workloadLabels).Add(v.WorkloadConnReceivedBytes)
tcpConnectionFailedInWorkload.With(workloadLabels).Add(v.WorkloadConnFailed)
tcpConnectionTotalRetransInWorkload.With(workloadLabels).Add(v.WorkloadConnTotalRetrans)
tcpConnectionPacketLostInWorkload.With(workloadLabels).Add(v.WorkloadConnPacketLost)
}

for k, v := range serviceInfoCache {
Expand Down
18 changes: 17 additions & 1 deletion pkg/controller/telemetry/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,18 @@ var (
Help: "The total number of TCP connections failed to a workload.",
}, workloadLabels)

tcpConnectionTotalRetransInWorkload = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "kmesh_tcp_retrans_total",
Help: "Total number of retransmissions of the workload over the TCP connection.",
}, workloadLabels)

tcpConnectionPacketLostInWorkload = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "kmesh_tcp_packet_loss_total",
Help: "Tracks the total number of TCP packets lost between source and destination.",
}, workloadLabels)

tcpConnectionOpenedInService = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: "kmesh_tcp_connections_opened_total",
Help: "The total number of TCP connections opened to a service",
Expand Down Expand Up @@ -243,7 +255,7 @@ func runPrometheusClient(registry *prometheus.Registry) {
// ensure not occur matche the same requests as /status/metric panic in unit test
mu.Lock()
defer mu.Unlock()
registry.MustRegister(tcpConnectionOpenedInWorkload, tcpConnectionClosedInWorkload, tcpReceivedBytesInWorkload, tcpSentBytesInWorkload)
registry.MustRegister(tcpConnectionOpenedInWorkload, tcpConnectionClosedInWorkload, tcpReceivedBytesInWorkload, tcpSentBytesInWorkload, tcpConnectionTotalRetransInWorkload, tcpConnectionPacketLostInWorkload)
registry.MustRegister(tcpConnectionOpenedInService, tcpConnectionClosedInService, tcpReceivedBytesInService, tcpSentBytesInService)
registry.MustRegister(bpfProgOpDuration, bpfProgOpCount)
registry.MustRegister(mapEntryCount, mapCountInNode)
Expand Down Expand Up @@ -272,12 +284,16 @@ func deleteWorkloadMetricInPrometheus(workload *workloadapi.Workload) {
_ = tcpConnectionOpenedInWorkload.DeletePartialMatch(prometheus.Labels{"destination_pod_name": workload.Name, "destination_pod_namespace": workload.Namespace})
_ = tcpReceivedBytesInWorkload.DeletePartialMatch(prometheus.Labels{"destination_pod_name": workload.Name, "destination_pod_namespace": workload.Namespace})
_ = tcpSentBytesInWorkload.DeletePartialMatch(prometheus.Labels{"destination_pod_name": workload.Name, "destination_pod_namespace": workload.Namespace})
_ = tcpConnectionTotalRetransInWorkload.DeletePartialMatch(prometheus.Labels{"destination_pod_name": workload.Name, "destination_pod_namespace": workload.Namespace})
_ = tcpConnectionPacketLostInWorkload.DeletePartialMatch(prometheus.Labels{"destination_pod_name": workload.Name, "destination_pod_namespace": workload.Namespace})
// delete source workload metric labels
_ = tcpConnectionClosedInWorkload.DeletePartialMatch(prometheus.Labels{"source_workload": workload.Name, "source_workload_namespace": workload.Namespace})
_ = tcpConnectionFailedInWorkload.DeletePartialMatch(prometheus.Labels{"source_workload": workload.Name, "source_workload_namespace": workload.Namespace})
_ = tcpConnectionOpenedInWorkload.DeletePartialMatch(prometheus.Labels{"source_workload": workload.Name, "source_workload_namespace": workload.Namespace})
_ = tcpReceivedBytesInWorkload.DeletePartialMatch(prometheus.Labels{"source_workload": workload.Name, "source_workload_namespace": workload.Namespace})
_ = tcpSentBytesInWorkload.DeletePartialMatch(prometheus.Labels{"source_workload": workload.Name, "source_workload_namespace": workload.Namespace})
_ = tcpConnectionTotalRetransInWorkload.DeletePartialMatch(prometheus.Labels{"source_workload": workload.Name, "source_workload_namespace": workload.Namespace})
_ = tcpConnectionPacketLostInWorkload.DeletePartialMatch(prometheus.Labels{"source_workload": workload.Name, "source_workload_namespace": workload.Namespace})
}

func DeleteServiceMetric(serviceName string) {
Expand Down

0 comments on commit 4bfc7b7

Please sign in to comment.