diff --git a/client/clientimpl_test.go b/client/clientimpl_test.go index dea6bd6c..86271a06 100644 --- a/client/clientimpl_test.go +++ b/client/clientimpl_test.go @@ -141,6 +141,7 @@ func startClient(t *testing.T, settings types.StartSettings, client OpAMPClient) func createNoServerSettings() types.StartSettings { return types.StartSettings{ OpAMPServerURL: "ws://" + testhelpers.GetAvailableLocalAddress(), + Metrics: types.NewClientMetrics(64), } } diff --git a/client/httpclient.go b/client/httpclient.go index 92259d59..0664ae51 100644 --- a/client/httpclient.go +++ b/client/httpclient.go @@ -37,6 +37,8 @@ func NewHTTP(logger types.Logger) *httpClient { // Start implements OpAMPClient.Start. func (c *httpClient) Start(ctx context.Context, settings types.StartSettings) error { + c.sender.SetMetrics(settings.Metrics) + if err := c.common.PrepareStart(ctx, settings); err != nil { return err } diff --git a/client/httpclient_test.go b/client/httpclient_test.go index fc670411..278c58e7 100644 --- a/client/httpclient_test.go +++ b/client/httpclient_test.go @@ -180,6 +180,7 @@ func TestHTTPClientStartWithHeartbeatInterval(t *testing.T) { settings := types.StartSettings{ OpAMPServerURL: "http://" + srv.Endpoint, HeartbeatInterval: &heartbeat, + Metrics: types.NewClientMetrics(64), } if tt.enableHeartbeat { settings.Capabilities = protobufs.AgentCapabilities_AgentCapabilities_ReportsHeartbeat diff --git a/client/internal/httpsender.go b/client/internal/httpsender.go index a97e1311..53e1daa2 100644 --- a/client/internal/httpsender.go +++ b/client/internal/httpsender.go @@ -30,6 +30,8 @@ const encodingTypeGZip = "gzip" type requestWrapper struct { *http.Request + dataLen int + msg *protobufs.AgentToServer bodyReader func() io.ReadCloser } @@ -62,7 +64,17 @@ type HTTPSender struct { getHeader func() http.Header // Processor to handle received messages. - receiveProcessor receivedProcessor + receiveProcessor rcvProcessor + + metrics *types.ClientMetrics +} + +// SetMetrics is used to set the sender's metrics. This is useful because the +// metrics object is not available until the StartSettings are available. +func (s *HTTPSender) SetMetrics(metrics *types.ClientMetrics) { + if metrics != nil { + s.metrics = metrics + } } // NewHTTPSender creates a new Sender that uses HTTP to send messages @@ -73,6 +85,7 @@ func NewHTTPSender(logger types.Logger) *HTTPSender { logger: logger, client: http.DefaultClient, pollingIntervalMs: defaultPollingIntervalMs, + metrics: types.NewClientMetrics(1), } // initialize the headers with no additional headers h.SetRequestHeader(nil, nil) @@ -167,6 +180,15 @@ func (h *HTTPSender) makeOneRequestRoundtrip(ctx context.Context) { h.receiveResponse(ctx, resp) } +func httpTxMessageInfo(req *requestWrapper) types.TxMessageInfo { + return types.TxMessageInfo{ + InstanceUID: req.msg.InstanceUid, + Capabilities: req.msg.Capabilities, + SequenceNum: req.msg.SequenceNum, + Attrs: httpMessageAttrs(txMessageAttrs(req.msg)), + } +} + func (h *HTTPSender) sendRequestWithRetries(ctx context.Context) (*http.Response, error) { req, err := h.prepareRequest(ctx) if err != nil { @@ -197,11 +219,18 @@ func (h *HTTPSender) sendRequestWithRetries(ctx context.Context) (*http.Response case <-timer.C: { req.rewind(ctx) + startSend := time.Now() resp, err := h.client.Do(req.Request) + latency := time.Since(startSend) + h.metrics.TxBytes.Add(int64(req.dataLen)) + h.metrics.TxMessages.Add(1) if err == nil { switch resp.StatusCode { case http.StatusOK: // We consider it connected if we receive 200 status from the Server. + messageInfo := httpTxMessageInfo(req) + messageInfo.TxLatency = latency + h.metrics.TxMessageInfo.Insert(messageInfo) h.callbacks.OnConnect(ctx) return resp, nil @@ -210,6 +239,7 @@ func (h *HTTPSender) sendRequestWithRetries(ctx context.Context) (*http.Response err = fmt.Errorf("server response code=%d", resp.StatusCode) default: + h.metrics.TxErrors.Add(1) return nil, fmt.Errorf("invalid response from server: %d", resp.StatusCode) } } else if errors.Is(err, context.Canceled) { @@ -256,7 +286,7 @@ func (h *HTTPSender) prepareRequest(ctx context.Context) (*requestWrapper, error if err != nil { return nil, err } - req := requestWrapper{Request: r} + req := requestWrapper{Request: r, dataLen: len(data), msg: msgToSend} if h.compressionEnabled { var buf bytes.Buffer @@ -269,8 +299,11 @@ func (h *HTTPSender) prepareRequest(ctx context.Context) (*requestWrapper, error h.logger.Errorf(ctx, "Failed to close the writer: %v", err) return nil, err } - req.bodyReader = bodyReader(buf.Bytes()) + bufBytes := buf.Bytes() + req.dataLen = len(bufBytes) + req.bodyReader = bodyReader(bufBytes) } else { + req.dataLen = len(data) req.bodyReader = bodyReader(data) } if err != nil { @@ -281,14 +314,24 @@ func (h *HTTPSender) prepareRequest(ctx context.Context) (*requestWrapper, error return &req, nil } +func httpMessageAttrs(attrs types.MessageAttrs) types.MessageAttrs { + attrs.Set(types.HTTPTransportAttr) + return attrs +} + func (h *HTTPSender) receiveResponse(ctx context.Context, resp *http.Response) { + startReceive := time.Now() msgBytes, err := io.ReadAll(resp.Body) + h.metrics.RxBytes.Add(int64(len(msgBytes))) if err != nil { + h.metrics.RxErrors.Add(1) _ = resp.Body.Close() h.logger.Errorf(ctx, "cannot read response body: %v", err) return } _ = resp.Body.Close() + latency := time.Since(startReceive) + h.metrics.RxMessages.Add(1) var response protobufs.ServerToAgent if err := proto.Unmarshal(msgBytes, &response); err != nil { @@ -296,6 +339,13 @@ func (h *HTTPSender) receiveResponse(ctx context.Context, resp *http.Response) { return } + h.metrics.RxMessageInfo.Insert(types.RxMessageInfo{ + InstanceUID: response.InstanceUid, + Capabilities: response.Capabilities, + Attrs: httpMessageAttrs(rxMessageAttrs(&response)), + RxLatency: latency, + }) + h.receiveProcessor.ProcessReceivedMessage(ctx, &response) } diff --git a/client/internal/httpsender_test.go b/client/internal/httpsender_test.go index 78ff2f87..f520fa7d 100644 --- a/client/internal/httpsender_test.go +++ b/client/internal/httpsender_test.go @@ -12,7 +12,9 @@ import ( "testing" "time" + "github.com/google/go-cmp/cmp" "github.com/stretchr/testify/assert" + "google.golang.org/protobuf/proto" "github.com/open-telemetry/opamp-go/client/types" sharedinternal "github.com/open-telemetry/opamp-go/internal" @@ -218,7 +220,7 @@ func TestRequestInstanceUidFlagReset(t *testing.T) { }) // Then the RequestInstanceUid flag stays intact. - assert.Equal(t, sender.receiveProcessor.clientSyncedState.flags, protobufs.AgentToServerFlags_AgentToServerFlags_RequestInstanceUid) + assert.Equal(t, sender.receiveProcessor.(*receivedProcessor).clientSyncedState.flags, protobufs.AgentToServerFlags_AgentToServerFlags_RequestInstanceUid) // If we process a message that contains a non-nil AgentIdentification that contains a NewInstanceUid. sender.receiveProcessor.ProcessReceivedMessage(ctx, @@ -227,7 +229,7 @@ func TestRequestInstanceUidFlagReset(t *testing.T) { }) // Then the flag is reset so we don't request a new instance uid yet again. - assert.Equal(t, sender.receiveProcessor.clientSyncedState.flags, protobufs.AgentToServerFlags_AgentToServerFlags_Unspecified) + assert.Equal(t, sender.receiveProcessor.(*receivedProcessor).clientSyncedState.flags, protobufs.AgentToServerFlags_AgentToServerFlags_Unspecified) cancel() } @@ -354,3 +356,92 @@ func TestPackageUpdatesWithError(t *testing.T) { cancel() } + +type fakeReciveProcessor struct { +} + +func (fakeReciveProcessor) ProcessReceivedMessage(context.Context, *protobufs.ServerToAgent) {} + +func TestHTTPSendMessageMetrics(t *testing.T) { + sendMsg := &protobufs.AgentToServer{ + InstanceUid: []byte("abcd"), + Capabilities: 2, + SequenceNum: 1, + } + + recvMsg := &protobufs.ServerToAgent{ + InstanceUid: []byte("dcba"), + Capabilities: 4, + } + + sender := NewHTTPSender(TestLogger{T: t}) + metrics := types.NewClientMetrics(64) + sender.SetMetrics(metrics) + sender.receiveProcessor = fakeReciveProcessor{} + + srv := StartMockServer(t) + defer srv.Close() + + sender.url = "http://" + srv.Endpoint + + srv.OnRequest = func(w http.ResponseWriter, r *http.Request) { + b, _ := proto.Marshal(recvMsg) + _, _ = w.Write(b) + } + + sender.NextMessage().Update(func(msg *protobufs.AgentToServer) { + *msg = *sendMsg + }) + sender.callbacks = types.Callbacks{ + OnConnect: func(ctx context.Context) { + }, + OnConnectFailed: func(ctx context.Context, _ error) { + }, + } + resp, err := sender.sendRequestWithRetries(context.Background()) + if err != nil { + t.Fatal(err) + } + if status := resp.StatusCode; status < 200 || status >= 300 { + t.Fatalf("unexpected http status: %d", status) + } + if got, want := metrics.TxMessages.Read(), int64(1); got != want { + t.Errorf("wrong number of messages: got %d, want %d", got, want) + } + if metrics.TxBytes.Read() == 0 { + t.Error("TxBytes not recorded") + } + txm := metrics.TxMessageInfo.Drain()[0] + if got, want := txm.InstanceUID, []byte("abcd"); !cmp.Equal(want, got) { + t.Errorf("instance uid does not match: %s", cmp.Diff(want, got)) + } + if got, want := txm.Capabilities, uint64(2); got != want { + t.Errorf("incorrect capabilities: got %d, want %d", got, want) + } + if got, want := txm.SequenceNum, uint64(1); got != want { + t.Errorf("incorrect sequence num: got %d, want %d", got, want) + } + if txm.TxLatency == 0 { + t.Error("latency was not measured") + } + + sender.receiveResponse(context.Background(), resp) + + if got, want := metrics.RxMessages.Read(), int64(1); got != want { + t.Errorf("wrong number of messages: got %d, want %d", got, want) + } + if metrics.RxBytes.Read() == 0 { + t.Error("rx bytes not recorded") + } + + rxm := metrics.RxMessageInfo.Drain()[0] + if got, want := rxm.InstanceUID, []byte("dcba"); !cmp.Equal(want, got) { + t.Errorf("instance uid does not match: %s", cmp.Diff(want, got)) + } + if got, want := rxm.Capabilities, uint64(4); got != want { + t.Errorf("incorrect capabilities: got %d, want %d", got, want) + } + if rxm.RxLatency == 0 { + t.Error("latency was not measured") + } +} diff --git a/client/internal/metrics.go b/client/internal/metrics.go new file mode 100644 index 00000000..176349e0 --- /dev/null +++ b/client/internal/metrics.go @@ -0,0 +1,9 @@ +package internal + +type Counter interface { + Add(int64) +} + +type RingBuffer[T any] interface { + Insert(T) +} diff --git a/client/internal/receivedprocessor.go b/client/internal/receivedprocessor.go index aee05a81..5bb1f068 100644 --- a/client/internal/receivedprocessor.go +++ b/client/internal/receivedprocessor.go @@ -10,6 +10,10 @@ import ( "github.com/open-telemetry/opamp-go/protobufs" ) +type rcvProcessor interface { + ProcessReceivedMessage(context.Context, *protobufs.ServerToAgent) +} + // receivedProcessor handles the processing of messages received from the Server. type receivedProcessor struct { logger types.Logger @@ -41,8 +45,8 @@ func newReceivedProcessor( packagesStateProvider types.PackagesStateProvider, capabilities protobufs.AgentCapabilities, packageSyncMutex *sync.Mutex, -) receivedProcessor { - return receivedProcessor{ +) *receivedProcessor { + return &receivedProcessor{ logger: logger, callbacks: callbacks, sender: sender, diff --git a/client/internal/wsreceiver.go b/client/internal/wsreceiver.go index bb9c90bc..74d8c864 100644 --- a/client/internal/wsreceiver.go +++ b/client/internal/wsreceiver.go @@ -13,14 +13,16 @@ import ( // wsReceiver implements the WebSocket client's receiving portion of OpAMP protocol. type wsReceiver struct { - conn *websocket.Conn + conn internal.WebsocketConn logger types.Logger sender *WSSender callbacks types.Callbacks - processor receivedProcessor + processor rcvProcessor // Indicates that the receiver has fully stopped. stopped chan struct{} + + metrics *types.ClientMetrics } // NewWSReceiver creates a new Receiver that uses WebSocket to receive @@ -28,12 +30,13 @@ type wsReceiver struct { func NewWSReceiver( logger types.Logger, callbacks types.Callbacks, - conn *websocket.Conn, + conn internal.WebsocketConn, sender *WSSender, clientSyncedState *ClientSyncedState, packagesStateProvider types.PackagesStateProvider, capabilities protobufs.AgentCapabilities, packageSyncMutex *sync.Mutex, + metrics *types.ClientMetrics, ) *wsReceiver { w := &wsReceiver{ conn: conn, @@ -42,6 +45,7 @@ func NewWSReceiver( callbacks: callbacks, processor: newReceivedProcessor(logger, callbacks, sender, clientSyncedState, packagesStateProvider, capabilities, packageSyncMutex), stopped: make(chan struct{}), + metrics: metrics, } return w @@ -97,14 +101,31 @@ func (r *wsReceiver) ReceiverLoop(ctx context.Context) { } } +func rxMessageAttrs(msg *protobufs.ServerToAgent) types.MessageAttrs { + attrs := types.MessageAttrs(types.RxMessageAttr | types.ServerToAgentMessageAttr) + if msg.ErrorResponse != nil { + attrs.Set(types.ErrorMessageAttr) + } + return attrs +} + func (r *wsReceiver) receiveMessage(msg *protobufs.ServerToAgent) error { _, bytes, err := r.conn.ReadMessage() + r.metrics.RxBytes.Add(int64(len(bytes))) if err != nil { + r.metrics.RxErrors.Add(1) return err } + r.metrics.RxMessages.Add(1) err = internal.DecodeWSMessage(bytes, msg) if err != nil { + r.metrics.RxErrors.Add(1) return fmt.Errorf("cannot decode received message: %w", err) } - return err + r.metrics.RxMessageInfo.Insert(types.RxMessageInfo{ + InstanceUID: msg.InstanceUid, + Capabilities: msg.Capabilities, + Attrs: wsMessageAttrs(rxMessageAttrs(msg)), + }) + return nil } diff --git a/client/internal/wsreceiver_test.go b/client/internal/wsreceiver_test.go index c8ade427..052d19bd 100644 --- a/client/internal/wsreceiver_test.go +++ b/client/internal/wsreceiver_test.go @@ -3,11 +3,14 @@ package internal import ( "context" "fmt" + "io" + "net" "sync" "sync/atomic" "testing" "time" + "github.com/google/go-cmp/cmp" "github.com/gorilla/websocket" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -89,7 +92,8 @@ func TestServerToAgentCommand(t *testing.T) { } sender := WSSender{} capabilities := protobufs.AgentCapabilities_AgentCapabilities_AcceptsRestartCommand - receiver := NewWSReceiver(TestLogger{t}, callbacks, nil, &sender, &clientSyncedState, nil, capabilities, new(sync.Mutex)) + metrics := types.NewClientMetrics(64) + receiver := NewWSReceiver(TestLogger{t}, callbacks, nil, &sender, &clientSyncedState, nil, capabilities, new(sync.Mutex), metrics) receiver.processor.ProcessReceivedMessage(context.Background(), &protobufs.ServerToAgent{ Command: test.command, }) @@ -143,7 +147,9 @@ func TestServerToAgentCommandExclusive(t *testing.T) { }, } clientSyncedState := ClientSyncedState{} - receiver := NewWSReceiver(TestLogger{t}, callbacks, nil, nil, &clientSyncedState, nil, test.capabilities, new(sync.Mutex)) + mux := new(sync.Mutex) + metrics := types.NewClientMetrics(64) + receiver := NewWSReceiver(TestLogger{t}, callbacks, nil, nil, &clientSyncedState, nil, test.capabilities, mux, metrics) receiver.processor.ProcessReceivedMessage(context.Background(), &protobufs.ServerToAgent{ Command: &protobufs.ServerToAgentCommand{ Type: protobufs.CommandType_CommandType_Restart, @@ -152,6 +158,7 @@ func TestServerToAgentCommandExclusive(t *testing.T) { }) assert.Equal(t, test.calledCommand, calledCommand, test.calledCommandMsg) assert.Equal(t, test.calledOnMessageConfig, calledOnMessageConfig, test.calledOnMessageConfigMsg) + } } @@ -206,7 +213,8 @@ func TestReceiverLoopStop(t *testing.T) { } sender := WSSender{} capabilities := protobufs.AgentCapabilities_AgentCapabilities_AcceptsRestartCommand - receiver := NewWSReceiver(TestLogger{t}, callbacks, conn, &sender, &clientSyncedState, nil, capabilities, new(sync.Mutex)) + metrics := types.NewClientMetrics(64) + receiver := NewWSReceiver(TestLogger{t}, callbacks, conn, &sender, &clientSyncedState, nil, capabilities, new(sync.Mutex), metrics) ctx, cancel := context.WithCancel(context.Background()) go func() { @@ -248,7 +256,8 @@ func TestWSPackageUpdatesInParallel(t *testing.T) { clientSyncedState := &ClientSyncedState{} capabilities := protobufs.AgentCapabilities_AgentCapabilities_AcceptsPackages sender := NewSender(&internal.NopLogger{}) - receiver := NewWSReceiver(&internal.NopLogger{}, callbacks, nil, sender, clientSyncedState, localPackageState, capabilities, &mux) + metrics := types.NewClientMetrics(64) + receiver := NewWSReceiver(&internal.NopLogger{}, callbacks, nil, sender, clientSyncedState, localPackageState, capabilities, &mux, metrics) receiver.processor.ProcessReceivedMessage(ctx, &protobufs.ServerToAgent{ @@ -300,3 +309,102 @@ func TestWSPackageUpdatesInParallel(t *testing.T) { cancel() } + +type wsMsg struct { + Message []byte + Error error +} + +type fakeWSConn struct { + messages chan wsMsg +} + +func (f fakeWSConn) ReadMessage() (int, []byte, error) { + msg := <-f.messages + return websocket.BinaryMessage, msg.Message, msg.Error +} + +type fakewsWriteCloser struct { + messages chan wsMsg +} + +func (f fakewsWriteCloser) Write(msg []byte) (int, error) { + f.messages <- wsMsg{Message: msg} + return len(msg), nil +} + +func (f fakewsWriteCloser) Close() error { + return nil +} + +func (f fakeWSConn) NextWriter(msgtype int) (io.WriteCloser, error) { + return fakewsWriteCloser{messages: f.messages}, nil +} + +func (f fakeWSConn) WriteControl(int, []byte, time.Time) error { + return nil +} + +func (f fakeWSConn) Close() error { + return nil +} + +func (f fakeWSConn) UnderlyingConn() net.Conn { + // shouldn't generally be used by us, may need to become a net.Pipe + return nil +} + +func newFakeWSConn() fakeWSConn { + return fakeWSConn{ + messages: make(chan wsMsg, 128), + } +} + +func TestReceiveMessageMetrics(t *testing.T) { + msg := &protobufs.ServerToAgent{ + InstanceUid: []byte("abcd"), + Capabilities: 2, + } + + serialMsg, err := proto.Marshal(msg) + if err != nil { + t.Fatal(err) + } + + callbacks := types.Callbacks{} + callbacks.SetDefaults() + clientSyncedState := ClientSyncedState{ + remoteConfigStatus: &protobufs.RemoteConfigStatus{}, + } + sender := WSSender{} + capabilities := protobufs.AgentCapabilities_AgentCapabilities_AcceptsRestartCommand + metrics := types.NewClientMetrics(64) + conn := newFakeWSConn() + conn.messages <- wsMsg{Message: serialMsg} + receiver := NewWSReceiver(TestLogger{t}, callbacks, conn, &sender, &clientSyncedState, nil, capabilities, new(sync.Mutex), metrics) + if err := receiver.receiveMessage(new(protobufs.ServerToAgent)); err != nil { + t.Fatal(err) + } + if got, want := metrics.RxMessages.Read(), int64(1); got != want { + t.Errorf("bad RxMessages count: got %d, want %d", got, want) + } + if metrics.RxBytes.Read() == 0 { + t.Error("RxBytes not counted") + } + if metrics.RxErrors.Read() != 0 { + t.Error("error counted when there is none") + } + metadata := metrics.RxMessageInfo.Drain() + if got, want := len(metadata), 1; got != want { + t.Errorf("unexpected metadata buffer length: got %d, want %d", got, want) + } + got := metadata[0] + want := types.RxMessageInfo{ + InstanceUID: []byte("abcd"), + Capabilities: 2, + Attrs: types.MessageAttrs(types.RxMessageAttr | types.ServerToAgentMessageAttr | types.WSTransportAttr), + } + if !cmp.Equal(got, want) { + t.Errorf("metadata not equal: %s", cmp.Diff(want, got)) + } +} diff --git a/client/internal/wssender.go b/client/internal/wssender.go index 16b5014a..73521378 100644 --- a/client/internal/wssender.go +++ b/client/internal/wssender.go @@ -22,7 +22,7 @@ const ( // WSSender implements the WebSocket client's sending portion of OpAMP protocol. type WSSender struct { SenderCommon - conn *websocket.Conn + conn internal.WebsocketConn logger types.Logger // Indicates that the sender has fully stopped. @@ -32,6 +32,8 @@ type WSSender struct { heartbeatIntervalUpdated chan struct{} heartbeatIntervalMs atomic.Int64 heartbeatTimer *time.Timer + + metrics *types.ClientMetrics } // NewSender creates a new Sender that uses WebSocket to send @@ -42,15 +44,24 @@ func NewSender(logger types.Logger) *WSSender { heartbeatIntervalUpdated: make(chan struct{}, 1), heartbeatTimer: time.NewTimer(0), SenderCommon: NewSenderCommon(), + metrics: types.NewClientMetrics(1), } s.heartbeatIntervalMs.Store(defaultHeartbeatIntervalMs) return s } +// SetMetrics is used to set the sender's metrics. This is useful because the +// metrics object is not available until the StartSettings are available. +func (s *WSSender) SetMetrics(metrics *types.ClientMetrics) { + if metrics != nil { + s.metrics = metrics + } +} + // Start the sender and send the first message that was set via NextMessage().Update() // earlier. To stop the WSSender cancel the ctx. -func (s *WSSender) Start(ctx context.Context, conn *websocket.Conn) error { +func (s *WSSender) Start(ctx context.Context, conn internal.WebsocketConn) error { s.conn = conn err := s.sendNextMessage(ctx) @@ -151,11 +162,33 @@ func (s *WSSender) sendNextMessage(ctx context.Context) error { return nil } +func txMessageAttrs(msg *protobufs.AgentToServer) types.MessageAttrs { + return types.MessageAttrs(types.TxMessageAttr | types.AgentToServerMessageAttr) +} + +func wsMessageAttrs(attrs types.MessageAttrs) types.MessageAttrs { + attrs.Set(types.WSTransportAttr) + return attrs +} + func (s *WSSender) sendMessage(ctx context.Context, msg *protobufs.AgentToServer) error { - if err := internal.WriteWSMessage(s.conn, msg); err != nil { + startSend := time.Now() + n, err := internal.WriteWSMessage(s.conn, msg) + latency := time.Since(startSend) + s.metrics.TxBytes.Add(int64(n)) + if err != nil { + s.metrics.TxErrors.Add(1) s.logger.Errorf(ctx, "Cannot write WS message: %v", err) // TODO: check if it is a connection error then propagate error back to Client and reconnect. return err } + s.metrics.TxMessages.Add(1) + s.metrics.TxMessageInfo.Insert(types.TxMessageInfo{ + InstanceUID: msg.InstanceUid, + Capabilities: msg.Capabilities, + SequenceNum: msg.SequenceNum, + Attrs: wsMessageAttrs(txMessageAttrs(msg)), + TxLatency: latency, + }) return nil } diff --git a/client/internal/wssender_test.go b/client/internal/wssender_test.go index c3c58d62..c08136c8 100644 --- a/client/internal/wssender_test.go +++ b/client/internal/wssender_test.go @@ -1,10 +1,14 @@ package internal import ( + "context" "testing" "time" + "github.com/google/go-cmp/cmp" + "github.com/open-telemetry/opamp-go/client/types" sharedinternal "github.com/open-telemetry/opamp-go/internal" + "github.com/open-telemetry/opamp-go/protobufs" "github.com/stretchr/testify/assert" ) @@ -26,3 +30,53 @@ func TestWSSenderSetHeartbeatInterval(t *testing.T) { assert.NoError(t, sender.SetHeartbeatInterval(time.Duration(expected)*time.Millisecond)) assert.Equal(t, expected, sender.heartbeatIntervalMs.Load()) } + +func TestWSSendMessageMetrics(t *testing.T) { + msg := &protobufs.AgentToServer{ + InstanceUid: []byte("abcd"), + Capabilities: 2, + SequenceNum: 1, + } + + sender := NewSender(TestLogger{T: t}) + metrics := types.NewClientMetrics(64) + sender.SetMetrics(metrics) + wsConn := newFakeWSConn() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + if err := sender.Start(ctx, wsConn); err != nil { + t.Fatal(err) + } + // this would be better to do within the interface of the sender, but by + // calling this method we bypass its internal queue which simplifies things + // somewhat + if err := sender.sendMessage(ctx, msg); err != nil { + t.Fatal(err) + } + if got, want := metrics.TxMessages.Read(), int64(1); got != want { + t.Errorf("bad TxMessages count: got %d, want %d", got, want) + } + if metrics.TxBytes.Read() == 0 { + t.Error("TxBytes not counted") + } + if metrics.TxErrors.Read() != 0 { + t.Errorf("TxErrors counted when there were none") + } + messageInfo := metrics.TxMessageInfo.Drain() + if len(messageInfo) != 1 { + t.Fatal("wrong messageInfo len") + } + mi := messageInfo[0] + if got, want := mi.InstanceUID, []byte("abcd"); !cmp.Equal(got, want) { + t.Errorf("bad InstanceUID: got %v, want %v", got, want) + } + if got, want := mi.Capabilities, uint64(2); got != want { + t.Errorf("bad Capabilities: got %d, want %d", got, want) + } + if got, want := mi.SequenceNum, uint64(1); got != want { + t.Errorf("bad SequenceNum: got %d, want %d", got, want) + } + if mi.TxLatency == 0 { + t.Error("latency not tracked") + } +} diff --git a/client/types/metrics.go b/client/types/metrics.go new file mode 100644 index 00000000..8a2294e5 --- /dev/null +++ b/client/types/metrics.go @@ -0,0 +1,161 @@ +package types + +import ( + "sync/atomic" + "time" +) + +// NewClientMetrics creates new ClientMetrics with a given buffer size for +// collecting TxMessageInfo and RxMessageInfo. The buffer should be at least +// size 1. +func NewClientMetrics(bufSize int) *ClientMetrics { + return &ClientMetrics{ + TxMessageInfo: NewRingBuffer[TxMessageInfo](bufSize), + RxMessageInfo: NewRingBuffer[RxMessageInfo](bufSize), + } +} + +// ClientMetrics contains metrics about an OpAMP client and its interactions +// with an OpAMP server. +type ClientMetrics struct { + // RxBytes is the total number of bytes received by the client. + RxBytes Counter + + // RxMessages is the total number of proto messages received by the client. + RxMessages Counter + + // RxErrors is the total number of errors encountered while receiving + // messages. Typically, this would be due to interrupted connections or + // the client receiving a message that could not be decoded. + RxErrors Counter + + // RxMessageInfo contains details about the last several messages to arrive. + RxMessageInfo *RingBuffer[RxMessageInfo] + + // TxBytes is the total number of bytes sent during the session. + TxBytes Counter + + // TxMessages is the total number of proto messages sent during the session. + TxMessages Counter + + // TxErrors is the total number of errors encountered while sending + // messages to the server. Typically, this would be due to network errors. + TxErrors Counter + + // TxMessageInfo contains details about the last several messages that + // were sent. + TxMessageInfo *RingBuffer[TxMessageInfo] +} + +// Counter is an atomic counter with labels. The labels should not be modified +// after creation. +type Counter struct { + count int64 + Labels []string +} + +// Add adds the amount to the counter. Negative amounts are OK. +// Add is goroutine-safe. +func (c *Counter) Add(amount int64) { + atomic.AddInt64(&c.count, amount) +} + +// Read reads the current value of the counter. Read is goroutine-safe. +func (c *Counter) Read() int64 { + return atomic.LoadInt64(&c.count) +} + +// TxMessageInfo contains message metadata as well as the transmission latency. +type TxMessageInfo struct { + // InstanceUID is the UUID of the agent or server. It should be 16 bytes long. + InstanceUID []byte + + // SequenceNum is the sequence number of the message. + SequenceNum uint64 + + // Capabilities is ServerCapabilities or AgentCapabilities depending on the origin. + Capabilities uint64 + + // Attrs contain message metadata as bitfields. + Attrs MessageAttrs + + // TxLatency is the time it took for the message to be sent. + TxLatency time.Duration +} + +// RxMessageInfo contains message metadata. +type RxMessageInfo struct { + // InstanceUID is the UUID of the agent or server. It should be 16 bytes long. + InstanceUID []byte + + // SequenceNum is the sequence number of the message. + SequenceNum uint64 + + // Capabilities is ServerCapabilities or AgentCapabilities depending on the origin. + Capabilities uint64 + + // Attrs contain message metadata as bitfields. + Attrs MessageAttrs + + // RxLatency is the time it took for the message to be received. RxLatency + // is not tracked for the websocket transport, since the protocol is + // asynchronous. + RxLatency time.Duration +} + +// MessageAttrs are bitsets of MessageAttrs. +type MessageAttrs uint64 + +// MessageAttr is an individual message attribute. +type MessageAttr uint64 + +const ( + // TxMessageAttr is set when the message was transmitted to us. + TxMessageAttr MessageAttr = 1 + + // RxMessageAttr is set when the message was sent by us. + RxMessageAttr MessageAttr = 1 << 1 + + // AgentToServerMessageAttr is sent when the message is an AgentToServer message. + AgentToServerMessageAttr MessageAttr = 1 << 2 + + // ServerToAgentMessageAttr is sent when the message is a ServerToAgent message. + ServerToAgentMessageAttr MessageAttr = 1 << 3 + + // ErrorMessageAttr is set when the message is an error response. + ErrorMessageAttr MessageAttr = 1 << 4 + + // HTTPTransportAttr is set when the message was sent on an HTTP transport. + HTTPTransportAttr MessageAttr = 1 << 5 + + // WSTransportAttr is set when the message was sent on a WS transport. + WSTransportAttr MessageAttr = 1 << 6 +) + +// Set adds attr to the bitset. +func (m *MessageAttrs) Set(attr MessageAttr) { + *m = *m | MessageAttrs(attr) +} + +// Unset removes attr from the bitset. +func (m *MessageAttrs) Unset(attr MessageAttr) { + *m = *m & MessageAttrs(^attr) +} + +// Slice returns all of the individual attributes as a slice. +func (m *MessageAttrs) Slice() []MessageAttr { + result := []MessageAttr{} + selector := MessageAttr(1) + for i := 0; i < 64; i++ { + if m.Contains(selector) { + result = append(result, selector) + } + selector = selector << 1 + } + return result +} + +// Contains checks to see if the attr is contained in the bitset. +func (m *MessageAttrs) Contains(attr MessageAttr) bool { + return (*m & MessageAttrs(attr)) > 0 +} diff --git a/client/types/metrics_test.go b/client/types/metrics_test.go new file mode 100644 index 00000000..f6018976 --- /dev/null +++ b/client/types/metrics_test.go @@ -0,0 +1,67 @@ +package types + +import ( + "testing" + + "github.com/google/go-cmp/cmp" +) + +func TestMessageAttrs(t *testing.T) { + tests := []struct { + Name string + Initial MessageAttrs + AddAttrs []MessageAttr + RemoveAttrs []MessageAttr + Exp MessageAttrs + ExpSlice []MessageAttr + }{ + { + Name: "basic", + AddAttrs: []MessageAttr{ + TxMessageAttr, + AgentToServerMessageAttr, + }, + Exp: MessageAttrs(TxMessageAttr | AgentToServerMessageAttr), + ExpSlice: []MessageAttr{TxMessageAttr, AgentToServerMessageAttr}, + }, + { + Name: "add to existing", + Initial: MessageAttrs(RxMessageAttr | ServerToAgentMessageAttr), + AddAttrs: []MessageAttr{ErrorMessageAttr}, + Exp: MessageAttrs(RxMessageAttr | ServerToAgentMessageAttr | ErrorMessageAttr), + ExpSlice: []MessageAttr{RxMessageAttr, ServerToAgentMessageAttr, ErrorMessageAttr}, + }, + { + Name: "remove from existing", + Initial: MessageAttrs(RxMessageAttr | ServerToAgentMessageAttr), + RemoveAttrs: []MessageAttr{RxMessageAttr}, + Exp: MessageAttrs(ServerToAgentMessageAttr), + ExpSlice: []MessageAttr{ServerToAgentMessageAttr}, + }, + { + Name: "add and remove", + Initial: MessageAttrs(RxMessageAttr | ServerToAgentMessageAttr), + RemoveAttrs: []MessageAttr{RxMessageAttr, ServerToAgentMessageAttr}, + AddAttrs: []MessageAttr{TxMessageAttr, AgentToServerMessageAttr}, + Exp: MessageAttrs(TxMessageAttr | AgentToServerMessageAttr), + ExpSlice: []MessageAttr{TxMessageAttr, AgentToServerMessageAttr}, + }, + } + for _, test := range tests { + t.Run(test.Name, func(t *testing.T) { + initial := test.Initial + for _, attr := range test.AddAttrs { + initial.Set(attr) + } + for _, attr := range test.RemoveAttrs { + initial.Unset(attr) + } + if got, want := initial, test.Exp; got != want { + t.Errorf("bad MessageAttrs: got %v, want %v", got, want) + } + if got, want := test.Exp.Slice(), test.ExpSlice; !cmp.Equal(got, want) { + t.Errorf("bad []MessageAttr: %s", cmp.Diff(want, got)) + } + }) + } +} diff --git a/client/types/ringbuffer.go b/client/types/ringbuffer.go new file mode 100644 index 00000000..08214064 --- /dev/null +++ b/client/types/ringbuffer.go @@ -0,0 +1,62 @@ +package types + +import "sync" + +// RingBuffer is a simple goroutine-safe ring buffer. +type RingBuffer[T any] struct { + buffer []T + capacity int + offset int + mu sync.Mutex +} + +// NewRingBuffer creates a new RingBuffer with a set capacity. +func NewRingBuffer[T any](capacity int) *RingBuffer[T] { + return &RingBuffer[T]{ + capacity: capacity, + } +} + +// Insert puts a new item into the RingBuffer. +func (r *RingBuffer[T]) Insert(item T) { + r.mu.Lock() + defer r.mu.Unlock() + if len(r.buffer) < r.capacity { + r.buffer = append(r.buffer, item) + } else { + r.buffer[r.offset] = item + r.offset = (r.offset + 1) % r.capacity + } +} + +func (r *RingBuffer[T]) read(into []T) int { + if len(into) > len(r.buffer) { + into = into[:len(r.buffer)] + } + for i := range into { + into[i] = r.buffer[(r.offset+i)%len(r.buffer)] + } + return len(into) +} + +// Read reads the items of the RingBuffer in the order they were inserted in. +func (r *RingBuffer[T]) Read(into []T) int { + r.mu.Lock() + defer r.mu.Unlock() + return r.read(into) +} + +// Drain reads all of the items in the RingBuffer and resets the RingBuffer's +// internal buffer and offset. +func (r *RingBuffer[T]) Drain() []T { + result := make([]T, r.capacity) + + r.mu.Lock() + defer r.mu.Unlock() + + read := r.read(result) + r.buffer = r.buffer[:0] + r.offset = 0 + + return result[:read] +} diff --git a/client/types/ringbuffer_test.go b/client/types/ringbuffer_test.go new file mode 100644 index 00000000..4caaf817 --- /dev/null +++ b/client/types/ringbuffer_test.go @@ -0,0 +1,120 @@ +package types + +import ( + "testing" + + "github.com/google/go-cmp/cmp" +) + +func TestRingBufferReadWrite(t *testing.T) { + tests := []struct { + Name string + Cap int + ReadCap int + Insert []int + Exp []int + }{ + { + Name: "less than capacity", + Cap: 10, + ReadCap: 5, + Insert: []int{1, 2, 3, 4, 5}, + Exp: []int{1, 2, 3, 4, 5}, + }, + { + Name: "more than capacity", + Cap: 5, + ReadCap: 5, + Insert: []int{1, 2, 3, 4, 5, 6, 7, 8}, + Exp: []int{4, 5, 6, 7, 8}, + }, + { + Name: "equal to capacity", + Cap: 5, + ReadCap: 5, + Insert: []int{1, 2, 3, 4, 5}, + Exp: []int{1, 2, 3, 4, 5}, + }, + { + Name: "read capacity greater than capacity", + Cap: 5, + ReadCap: 10, + Insert: []int{1, 2, 3, 4, 5}, + Exp: []int{1, 2, 3, 4, 5, 0, 0, 0, 0, 0}, + }, + { + Name: "read capacity smaller than capacity", + Cap: 5, + ReadCap: 3, + Insert: []int{1, 2, 3, 4, 5}, + Exp: []int{1, 2, 3}, + }, + } + + for _, test := range tests { + t.Run(test.Name, func(t *testing.T) { + buf := NewRingBuffer[int](test.Cap) + for _, item := range test.Insert { + buf.Insert(item) + } + got := make([]int, test.ReadCap) + read := buf.Read(got) + if !cmp.Equal(got, test.Exp) { + t.Errorf("bad read: %v", cmp.Diff(test.Exp, got)) + } + expReadLen := test.Cap + if expReadLen > test.ReadCap { + expReadLen = test.ReadCap + } + if got, want := read, expReadLen; got != want { + t.Error(expReadLen) + t.Errorf("bad read length: got %d, want %d", got, want) + } + }) + } +} + +func TestRingBufferDrain(t *testing.T) { + tests := []struct { + Name string + Cap int + ReadCap int + Insert []int + Exp []int + }{ + { + Name: "less than capacity", + Cap: 10, + ReadCap: 5, + Insert: []int{1, 2, 3, 4, 5}, + Exp: []int{1, 2, 3, 4, 5}, + }, + { + Name: "more than capacity", + Cap: 5, + ReadCap: 5, + Insert: []int{1, 2, 3, 4, 5, 6, 7, 8}, + Exp: []int{4, 5, 6, 7, 8}, + }, + { + Name: "equal to capacity", + Cap: 5, + ReadCap: 5, + Insert: []int{1, 2, 3, 4, 5}, + Exp: []int{1, 2, 3, 4, 5}, + }, + } + + for _, test := range tests { + t.Run(test.Name, func(t *testing.T) { + buf := NewRingBuffer[int](test.Cap) + for _, item := range test.Insert { + buf.Insert(item) + } + got := buf.Drain() + if !cmp.Equal(got, test.Exp) { + t.Errorf("bad drain: got %v, want %v", got, test.Exp) + } + }) + } +} diff --git a/client/types/startsettings.go b/client/types/startsettings.go index 6184d575..4a76af51 100644 --- a/client/types/startsettings.go +++ b/client/types/startsettings.go @@ -64,4 +64,8 @@ type StartSettings struct { // // If the ReportsHeartbeat capability is disabled, this option has no effect. HeartbeatInterval *time.Duration + + // Metrics are opamp-specific internal metrics such as latency and + // bandwidth measurements. + Metrics *ClientMetrics } diff --git a/client/wsclient.go b/client/wsclient.go index f19d8ab4..c93ce8bd 100644 --- a/client/wsclient.go +++ b/client/wsclient.go @@ -19,7 +19,8 @@ import ( ) const ( - defaultShutdownTimeout = 5 * time.Second + defaultShutdownTimeout = 5 * time.Second + defaultRingBufferCapacity = 1024 ) // wsClient is an OpAMP Client implementation for WebSocket transport. @@ -54,6 +55,9 @@ type wsClient struct { // connection. responseChain should only be referred to by the goroutine that // runs tryConnectOnce and its synchronous callees. responseChain []*http.Response + + // metrics contain information about the client's interactions with the server + metrics *types.ClientMetrics } // NewWebSocket creates a new OpAMP Client that uses WebSocket transport. @@ -67,11 +71,19 @@ func NewWebSocket(logger types.Logger) *wsClient { common: internal.NewClientCommon(logger, sender), sender: sender, connShutdownTimeout: defaultShutdownTimeout, + metrics: types.NewClientMetrics(1), } return w } func (c *wsClient) Start(ctx context.Context, settings types.StartSettings) error { + // initialize client metrics object. if nil is supplied, a default object + // will be used. + c.sender.SetMetrics(settings.Metrics) + if settings.Metrics != nil { + c.metrics = settings.Metrics + } + if err := c.common.PrepareStart(ctx, settings); err != nil { return err } @@ -359,6 +371,7 @@ func (c *wsClient) runOneCycle(ctx context.Context) { c.common.PackagesStateProvider, c.common.Capabilities, &c.common.PackageSyncMutex, + c.metrics, ) // When the wsclient is closed, the context passed to runOneCycle will be canceled. diff --git a/go.mod b/go.mod index 2cab574a..23f700bf 100644 --- a/go.mod +++ b/go.mod @@ -4,6 +4,7 @@ go 1.22 require ( github.com/cenkalti/backoff/v4 v4.3.0 + github.com/google/go-cmp v0.6.0 github.com/google/uuid v1.6.0 github.com/gorilla/websocket v1.5.3 github.com/stretchr/testify v1.10.0 @@ -12,7 +13,6 @@ require ( require ( github.com/davecgh/go-spew v1.1.1 // indirect - github.com/google/go-cmp v0.6.0 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/stretchr/objx v0.5.2 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect diff --git a/internal/opamp-spec b/internal/opamp-spec index 240253d4..cb1cf6db 160000 --- a/internal/opamp-spec +++ b/internal/opamp-spec @@ -1 +1 @@ -Subproject commit 240253d4479dbb1180a757701348ee3d5734b7c6 +Subproject commit cb1cf6dbf223440727d6ad654226bf03cecf4d4a diff --git a/internal/wsmessage.go b/internal/wsmessage.go index 8a1a0cc8..d66405ba 100644 --- a/internal/wsmessage.go +++ b/internal/wsmessage.go @@ -3,6 +3,9 @@ package internal import ( "encoding/binary" "errors" + "io" + "net" + "time" "github.com/gorilla/websocket" "google.golang.org/protobuf/proto" @@ -36,15 +39,27 @@ func DecodeWSMessage(bytes []byte, msg proto.Message) error { return nil } -func WriteWSMessage(conn *websocket.Conn, msg proto.Message) error { +// WebsocketConn is an abstraction of a websocket.Conn. It is useful for testing +// purposes. +type WebsocketConn interface { + ReadMessage() (int, []byte, error) + NextWriter(int) (io.WriteCloser, error) + WriteControl(int, []byte, time.Time) error + Close() error + UnderlyingConn() net.Conn +} + +// WriteWSMessage writes a proto.Message to a websocket.Conn. It returns the +// number of bytes written, and any error that occurs. +func WriteWSMessage(conn WebsocketConn, msg proto.Message) (int, error) { data, err := proto.Marshal(msg) if err != nil { - return err + return 0, err } writer, err := conn.NextWriter(websocket.BinaryMessage) if err != nil { - return err + return 0, err } // Encode header as a varint. @@ -53,18 +68,18 @@ func WriteWSMessage(conn *websocket.Conn, msg proto.Message) error { hdrBuf = hdrBuf[:n] // Write the header bytes. - _, err = writer.Write(hdrBuf) + hdrBytes, err := writer.Write(hdrBuf) if err != nil { writer.Close() - return err + return hdrBytes, err } // Write the encoded data. - _, err = writer.Write(data) + dataBytes, err := writer.Write(data) if err != nil { writer.Close() - return err + return dataBytes + hdrBytes, err } - return writer.Close() + return hdrBytes + dataBytes, writer.Close() } diff --git a/server/httpconnection.go b/server/httpconnection.go index 928c62bf..ee8fd9c2 100644 --- a/server/httpconnection.go +++ b/server/httpconnection.go @@ -29,10 +29,10 @@ func (c httpConnection) Connection() net.Conn { var _ types.Connection = (*httpConnection)(nil) -func (c httpConnection) Send(_ context.Context, _ *protobufs.ServerToAgent) error { +func (c httpConnection) Send(_ context.Context, _ *protobufs.ServerToAgent) (int, error) { // Send() should not be called for plain HTTP connection. Instead, the response will // be sent after the onMessage callback returns. - return ErrInvalidHTTPConnection + return 0, ErrInvalidHTTPConnection } func (c httpConnection) Disconnect() error { diff --git a/server/serverimpl.go b/server/serverimpl.go index dcdd044c..3ede50ab 100644 --- a/server/serverimpl.go +++ b/server/serverimpl.go @@ -260,7 +260,7 @@ func (s *server) handleWSConnection(reqCtx context.Context, wsConn *websocket.Co } sentCustomCapabilities = true } - err = agentConn.Send(msgContext, response) + _, err = agentConn.Send(msgContext, response) if err != nil { s.logger.Errorf(msgContext, "Cannot send message to WebSocket: %v", err) } diff --git a/server/serverimpl_test.go b/server/serverimpl_test.go index ca7a5c33..89d4786c 100644 --- a/server/serverimpl_test.go +++ b/server/serverimpl_test.go @@ -1016,7 +1016,7 @@ func BenchmarkSendToClient(b *testing.B) { cancel() for _, conn := range serverConnections { - err := conn.Send(context.Background(), &protobufs.ServerToAgent{}) + _, err := conn.Send(context.Background(), &protobufs.ServerToAgent{}) if err != nil { b.Error(err) diff --git a/server/types/connection.go b/server/types/connection.go index 2b2bd814..c72942da 100644 --- a/server/types/connection.go +++ b/server/types/connection.go @@ -18,7 +18,8 @@ type Connection interface { // connections. // Blocks until the message is sent. // Should return as soon as possible if the ctx is cancelled. - Send(ctx context.Context, message *protobufs.ServerToAgent) error + // Returns the number of bytes written and any error encountered. + Send(ctx context.Context, message *protobufs.ServerToAgent) (int, error) // Disconnect closes the network connection. // Any blocked Read or Write operations will be unblocked and return errors. diff --git a/server/wsconnection.go b/server/wsconnection.go index b2b99219..06ea62aa 100644 --- a/server/wsconnection.go +++ b/server/wsconnection.go @@ -5,8 +5,6 @@ import ( "net" "sync" - "github.com/gorilla/websocket" - "github.com/open-telemetry/opamp-go/internal" "github.com/open-telemetry/opamp-go/protobufs" "github.com/open-telemetry/opamp-go/server/types" @@ -18,7 +16,7 @@ type wsConnection struct { // so ensure that we only have a single operation in progress at a time. // For more: https://pkg.go.dev/github.com/gorilla/websocket#hdr-Concurrency connMutex *sync.Mutex - wsConn *websocket.Conn + wsConn internal.WebsocketConn } var _ types.Connection = (*wsConnection)(nil) @@ -27,7 +25,7 @@ func (c wsConnection) Connection() net.Conn { return c.wsConn.UnderlyingConn() } -func (c wsConnection) Send(_ context.Context, message *protobufs.ServerToAgent) error { +func (c wsConnection) Send(_ context.Context, message *protobufs.ServerToAgent) (int, error) { c.connMutex.Lock() defer c.connMutex.Unlock()