From 54efcda44bdf9f45334ade967f9da309b0579ec1 Mon Sep 17 00:00:00 2001 From: Stephen Cathcart Date: Thu, 28 Sep 2023 09:15:33 +0100 Subject: [PATCH] Add Bolt 5.4 support and Telemetry API (#533) * Add Bolt 5.4 support and Telemetry API --- neo4j/config.go | 1 + neo4j/config/driver.go | 16 ++++++ neo4j/driver_with_context.go | 4 +- neo4j/driver_with_context_test.go | 4 +- neo4j/internal/bolt/bolt3.go | 5 ++ neo4j/internal/bolt/bolt4.go | 5 ++ neo4j/internal/bolt/bolt5.go | 76 +++++++++++++++++++-------- neo4j/internal/bolt/connect.go | 2 +- neo4j/internal/bolt/message_queue.go | 5 ++ neo4j/internal/bolt/messages.go | 1 + neo4j/internal/bolt/outgoing.go | 38 +++++++++----- neo4j/internal/db/connection.go | 3 ++ neo4j/internal/retry/state.go | 1 + neo4j/internal/telemetry/telemetry.go | 33 ++++++++++++ neo4j/internal/testutil/connfake.go | 3 ++ neo4j/session_with_context.go | 41 ++++++++++----- testkit-backend/backend.go | 4 ++ 17 files changed, 189 insertions(+), 53 deletions(-) create mode 100644 neo4j/internal/telemetry/telemetry.go diff --git a/neo4j/config.go b/neo4j/config.go index 46ff4046..298c66e4 100644 --- a/neo4j/config.go +++ b/neo4j/config.go @@ -50,6 +50,7 @@ func defaultConfig() *Config { FetchSize: FetchDefault, NotificationsMinSeverity: notifications.DefaultLevel, NotificationsDisabledCategories: notifications.NotificationDisabledCategories{}, + TelemetryDisabled: false, } } diff --git a/neo4j/config/driver.go b/neo4j/config/driver.go index fc2f91d7..07b36c40 100644 --- a/neo4j/config/driver.go +++ b/neo4j/config/driver.go @@ -146,6 +146,22 @@ type Config struct { // By default, the server's settings are used. // Disabling categories allows the server to skip analysis for those, which can speed up query execution. NotificationsDisabledCategories notifications.NotificationDisabledCategories + // By default, if the server requests it, the driver will automatically transmit anonymous usage + // statistics to the server it is connected to. + // + // By configuring TelemetryDisabled=True, the driver will refrain from transmitting any telemetry data. + // + // Each time one of the specified APIs is utilized to execute a query for the first time, the driver + // informs the server of this action without providing additional details such as arguments or client identifiers: + // + // DriverWithContext.ExecuteQuery + // SessionWithContext.Run + // SessionWithContext.BeginTransaction + // SessionWithContext.ExecuteRead + // SessionWithContext.ExecuteWrite + // + // default: true + TelemetryDisabled bool } // ServerAddressResolver is a function type that defines the resolver function used by the routing driver to diff --git a/neo4j/driver_with_context.go b/neo4j/driver_with_context.go index 0fb3c5c3..98e202eb 100644 --- a/neo4j/driver_with_context.go +++ b/neo4j/driver_with_context.go @@ -703,9 +703,9 @@ type transactionFunction func(context.Context, ManagedTransactionWork, ...func(* func (c *ExecuteQueryConfiguration) selectTxFunctionApi(session SessionWithContext) (transactionFunction, error) { switch c.Routing { case Read: - return session.pipelinedRead, nil + return session.executeQueryRead, nil case Write: - return session.pipelinedWrite, nil + return session.executeQueryWrite, nil } return nil, fmt.Errorf("unsupported routing control, expected %d (Write) or %d (Read) "+ "but got: %d", Write, Read, c.Routing) diff --git a/neo4j/driver_with_context_test.go b/neo4j/driver_with_context_test.go index da1dfdfb..eb171555 100644 --- a/neo4j/driver_with_context_test.go +++ b/neo4j/driver_with_context_test.go @@ -591,14 +591,14 @@ func (s *fakeSession) ExecuteWrite(_ context.Context, callback ManagedTransactio } return callback(&fakeManagedTransaction{result: result, err: err}) } -func (s *fakeSession) pipelinedRead(_ context.Context, callback ManagedTransactionWork, _ ...func(*TransactionConfig)) (any, error) { +func (s *fakeSession) executeQueryRead(_ context.Context, callback ManagedTransactionWork, _ ...func(*TransactionConfig)) (any, error) { return callback(&fakeManagedTransaction{ result: s.executeReadTransactionResult, err: s.executeReadErr, }) } -func (s *fakeSession) pipelinedWrite(_ context.Context, callback ManagedTransactionWork, _ ...func(*TransactionConfig)) (any, error) { +func (s *fakeSession) executeQueryWrite(_ context.Context, callback ManagedTransactionWork, _ ...func(*TransactionConfig)) (any, error) { result := s.executeWriteTransactionResult err := s.executeWriteErr if s.executeWriteErrs != nil { diff --git a/neo4j/internal/bolt/bolt3.go b/neo4j/internal/bolt/bolt3.go index caf503a8..051c71bb 100644 --- a/neo4j/internal/bolt/bolt3.go +++ b/neo4j/internal/bolt/bolt3.go @@ -27,6 +27,7 @@ import ( iauth "github.com/neo4j/neo4j-go-driver/v5/neo4j/internal/auth" idb "github.com/neo4j/neo4j-go-driver/v5/neo4j/internal/db" "github.com/neo4j/neo4j-go-driver/v5/neo4j/internal/errorutil" + "github.com/neo4j/neo4j-go-driver/v5/neo4j/internal/telemetry" "net" "reflect" "time" @@ -869,3 +870,7 @@ func (b *bolt3) GetCurrentAuth() (auth.TokenManager, iauth.Token) { token := iauth.Token{Tokens: b.auth} return b.authManager, token } + +func (b *bolt3) Telemetry(telemetry.API, func()) { + // TELEMETRY not support by this protocol version, so we ignore it. +} diff --git a/neo4j/internal/bolt/bolt4.go b/neo4j/internal/bolt/bolt4.go index 8e086c4a..4ab29765 100644 --- a/neo4j/internal/bolt/bolt4.go +++ b/neo4j/internal/bolt/bolt4.go @@ -28,6 +28,7 @@ import ( "github.com/neo4j/neo4j-go-driver/v5/neo4j/internal/collections" idb "github.com/neo4j/neo4j-go-driver/v5/neo4j/internal/db" "github.com/neo4j/neo4j-go-driver/v5/neo4j/internal/errorutil" + "github.com/neo4j/neo4j-go-driver/v5/neo4j/internal/telemetry" "net" "reflect" "time" @@ -987,6 +988,10 @@ func (b *bolt4) GetCurrentAuth() (auth.TokenManager, iauth.Token) { return b.authManager, token } +func (b *bolt4) Telemetry(telemetry.API, func()) { + // TELEMETRY not support by this protocol version, so we ignore it. +} + func (b *bolt4) helloResponseHandler(checkUtcPatch bool) responseHandler { return b.expectedSuccessHandler(b.onHelloSuccess(checkUtcPatch)) } diff --git a/neo4j/internal/bolt/bolt5.go b/neo4j/internal/bolt/bolt5.go index c4d409e7..ece125a8 100644 --- a/neo4j/internal/bolt/bolt5.go +++ b/neo4j/internal/bolt/bolt5.go @@ -28,6 +28,7 @@ import ( "github.com/neo4j/neo4j-go-driver/v5/neo4j/internal/boltagent" idb "github.com/neo4j/neo4j-go-driver/v5/neo4j/internal/db" "github.com/neo4j/neo4j-go-driver/v5/neo4j/internal/errorutil" + "github.com/neo4j/neo4j-go-driver/v5/neo4j/internal/telemetry" "net" "reflect" "time" @@ -93,28 +94,29 @@ func (i *internalTx5) toMeta(logger log.Logger, logId string) map[string]any { } type bolt5 struct { - state int - txId idb.TxHandle - streams openstreams - conn net.Conn - serverName string - queue messageQueue - connId string - logId string - serverVersion string - bookmark string // Last bookmark - birthDate time.Time - log log.Logger - databaseName string - err error // Last fatal error - minor int - lastQid int64 // Last seen qid - idleDate time.Time - auth map[string]any - authManager auth.TokenManager - resetAuth bool - errorListener ConnectionErrorListener - now *func() time.Time + state int + txId idb.TxHandle + streams openstreams + conn net.Conn + serverName string + queue messageQueue + connId string + logId string + serverVersion string + bookmark string // Last bookmark + birthDate time.Time + log log.Logger + databaseName string + err error // Last fatal error + minor int + lastQid int64 // Last seen qid + idleDate time.Time + auth map[string]any + authManager auth.TokenManager + resetAuth bool + errorListener ConnectionErrorListener + now *func() time.Time + telemetryEnabled bool } func NewBolt5( @@ -959,6 +961,16 @@ func (b *bolt5) GetCurrentAuth() (auth.TokenManager, iauth.Token) { return b.authManager, token } +func (b *bolt5) Telemetry(api telemetry.API, onSuccess func()) { + if b.telemetryEnabled && b.Version().Minor >= 4 { + b.queue.appendTelemetry(api.AsInt(), b.telemetryResponseHandler(func(*success) { + if onSuccess != nil { + onSuccess() + } + })) + } +} + func (b *bolt5) appendPullN(stream *stream) { if b.state == bolt5Streaming { b.queue.appendPullN(stream.fetchSize, b.pullResponseHandler(stream)) @@ -1092,6 +1104,10 @@ func (b *bolt5) resetResponseHandler() responseHandler { } } +func (b *bolt5) telemetryResponseHandler(onSuccess func(*success)) responseHandler { + return b.expectedSuccessHandler(onSuccess) +} + func (b *bolt5) expectedSuccessHandler(onSuccess func(*success)) responseHandler { return responseHandler{ onSuccess: onSuccess, @@ -1108,6 +1124,7 @@ func (b *bolt5) onHelloSuccess(helloSuccess *success) { b.logId = connectionLogId b.queue.setLogId(connectionLogId) b.initializeReadTimeoutHint(helloSuccess.configurationHints) + b.initializeTelemetryHint(helloSuccess.configurationHints) } func (b *bolt5) onCommitSuccess(commitSuccess *success) { @@ -1155,6 +1172,21 @@ func (b *bolt5) initializeReadTimeoutHint(hints map[string]any) { b.queue.in.connReadTimeout = time.Duration(readTimeout) * time.Second } +const readTelemetryHintName = "telemetry.enabled" + +func (b *bolt5) initializeTelemetryHint(hints map[string]any) { + readTelemetryHint, ok := hints[readTelemetryHintName] + if !ok { + return + } + readTelemetry, ok := readTelemetryHint.(bool) + if !ok { + b.log.Infof(log.Bolt5, b.logId, `invalid %q value: %v, ignoring hint. Only boolean values are accepted`, readTelemetryHintName, readTelemetryHint) + return + } + b.telemetryEnabled = readTelemetry +} + func (b *bolt5) extractSummary(success *success, stream *stream) *db.Summary { summary := success.summary() summary.Agent = b.serverVersion diff --git a/neo4j/internal/bolt/connect.go b/neo4j/internal/bolt/connect.go index 17bb6397..4666a081 100644 --- a/neo4j/internal/bolt/connect.go +++ b/neo4j/internal/bolt/connect.go @@ -40,7 +40,7 @@ type protocolVersion struct { // Supported versions in priority order var versions = [4]protocolVersion{ - {major: 5, minor: 3, back: 3}, + {major: 5, minor: 4, back: 4}, {major: 4, minor: 4, back: 2}, {major: 4, minor: 1}, {major: 3, minor: 0}, diff --git a/neo4j/internal/bolt/message_queue.go b/neo4j/internal/bolt/message_queue.go index de8e4906..693029ed 100644 --- a/neo4j/internal/bolt/message_queue.go +++ b/neo4j/internal/bolt/message_queue.go @@ -130,6 +130,11 @@ func (q *messageQueue) appendGoodbye() { // no response expected here } +func (q *messageQueue) appendTelemetry(api int, handler responseHandler) { + q.out.appendTelemetry(api) + q.enqueueCallback(handler) +} + func (q *messageQueue) send(ctx context.Context) { q.out.send(ctx, q.targetConnection) } diff --git a/neo4j/internal/bolt/messages.go b/neo4j/internal/bolt/messages.go index e6157004..1f88ad39 100644 --- a/neo4j/internal/bolt/messages.go +++ b/neo4j/internal/bolt/messages.go @@ -40,4 +40,5 @@ const ( msgCommit byte = 0x12 msgRollback byte = 0x13 msgRoute byte = 0x66 // > 4.2 + msgTelemetry byte = 0x54 ) diff --git a/neo4j/internal/bolt/outgoing.go b/neo4j/internal/bolt/outgoing.go index 771467c7..c4dc12f4 100644 --- a/neo4j/internal/bolt/outgoing.go +++ b/neo4j/internal/bolt/outgoing.go @@ -61,7 +61,7 @@ func (o *outgoing) appendHello(hello map[string]any) { o.boltLogger.LogClientMessage(o.logId, "HELLO %s", loggableDictionary(hello)) } o.begin() - o.packer.StructHeader(byte(msgHello), 1) + o.packer.StructHeader(msgHello, 1) o.packMap(hello) o.end() } @@ -90,7 +90,7 @@ func (o *outgoing) appendBegin(meta map[string]any) { o.boltLogger.LogClientMessage(o.logId, "BEGIN %s", loggableDictionary(meta)) } o.begin() - o.packer.StructHeader(byte(msgBegin), 1) + o.packer.StructHeader(msgBegin, 1) o.packMap(meta) o.end() } @@ -100,7 +100,7 @@ func (o *outgoing) appendCommit() { o.boltLogger.LogClientMessage(o.logId, "COMMIT") } o.begin() - o.packer.StructHeader(byte(msgCommit), 0) + o.packer.StructHeader(msgCommit, 0) o.end() } @@ -109,7 +109,7 @@ func (o *outgoing) appendRollback() { o.boltLogger.LogClientMessage(o.logId, "ROLLBACK") } o.begin() - o.packer.StructHeader(byte(msgRollback), 0) + o.packer.StructHeader(msgRollback, 0) o.end() } @@ -118,7 +118,7 @@ func (o *outgoing) appendRun(cypher string, params, meta map[string]any) { o.boltLogger.LogClientMessage(o.logId, "RUN %q %s %s", cypher, loggableDictionary(params), loggableDictionary(meta)) } o.begin() - o.packer.StructHeader(byte(msgRun), 3) + o.packer.StructHeader(msgRun, 3) o.packer.String(cypher) o.packMap(params) o.packMap(meta) @@ -130,7 +130,7 @@ func (o *outgoing) appendPullN(n int) { o.boltLogger.LogClientMessage(o.logId, "PULL %s", loggableDictionary{"n": n}) } o.begin() - o.packer.StructHeader(byte(msgPullN), 1) + o.packer.StructHeader(msgPullN, 1) o.packer.MapHeader(1) o.packer.String("n") o.packer.Int(n) @@ -142,7 +142,7 @@ func (o *outgoing) appendPullNQid(n int, qid int64) { o.boltLogger.LogClientMessage(o.logId, "PULL %s", loggableDictionary{"n": n, "qid": qid}) } o.begin() - o.packer.StructHeader(byte(msgPullN), 1) + o.packer.StructHeader(msgPullN, 1) o.packer.MapHeader(2) o.packer.String("n") o.packer.Int(n) @@ -156,7 +156,7 @@ func (o *outgoing) appendDiscardN(n int) { o.boltLogger.LogClientMessage(o.logId, "DISCARD %s", loggableDictionary{"n": n}) } o.begin() - o.packer.StructHeader(byte(msgDiscardN), 1) + o.packer.StructHeader(msgDiscardN, 1) o.packer.MapHeader(1) o.packer.String("n") o.packer.Int(n) @@ -168,7 +168,7 @@ func (o *outgoing) appendDiscardNQid(n int, qid int64) { o.boltLogger.LogClientMessage(o.logId, "DISCARD %s", loggableDictionary{"n": n, "qid": qid}) } o.begin() - o.packer.StructHeader(byte(msgDiscardN), 1) + o.packer.StructHeader(msgDiscardN, 1) o.packer.MapHeader(2) o.packer.String("n") o.packer.Int(n) @@ -182,7 +182,7 @@ func (o *outgoing) appendPullAll() { o.boltLogger.LogClientMessage(o.logId, "PULL ALL") } o.begin() - o.packer.StructHeader(byte(msgPullAll), 0) + o.packer.StructHeader(msgPullAll, 0) o.end() } @@ -192,7 +192,7 @@ func (o *outgoing) appendRouteToV43(context map[string]string, bookmarks []strin o.boltLogger.LogClientMessage(o.logId, "ROUTE %s %s %q", loggableStringDictionary(context), loggableStringList(bookmarks), database) } o.begin() - o.packer.StructHeader(byte(msgRoute), 3) + o.packer.StructHeader(msgRoute, 3) o.packer.MapHeader(len(context)) for k, v := range context { o.packer.String(k) @@ -215,7 +215,7 @@ func (o *outgoing) appendRoute(context map[string]string, bookmarks []string, wh o.boltLogger.LogClientMessage(o.logId, "ROUTE %s %s %s", loggableStringDictionary(context), loggableStringList(bookmarks), loggableDictionary(what)) } o.begin() - o.packer.StructHeader(byte(msgRoute), 3) + o.packer.StructHeader(msgRoute, 3) o.packer.MapHeader(len(context)) for k, v := range context { o.packer.String(k) @@ -234,7 +234,7 @@ func (o *outgoing) appendReset() { o.boltLogger.LogClientMessage(o.logId, "RESET") } o.begin() - o.packer.StructHeader(byte(msgReset), 0) + o.packer.StructHeader(msgReset, 0) o.end() } @@ -243,7 +243,17 @@ func (o *outgoing) appendGoodbye() { o.boltLogger.LogClientMessage(o.logId, "GOODBYE") } o.begin() - o.packer.StructHeader(byte(msgGoodbye), 0) + o.packer.StructHeader(msgGoodbye, 0) + o.end() +} + +func (o *outgoing) appendTelemetry(api int) { + if o.boltLogger != nil { + o.boltLogger.LogClientMessage(o.logId, "TELEMETRY %d", api) + } + o.begin() + o.packer.StructHeader(msgTelemetry, 1) + o.packer.Int(api) o.end() } diff --git a/neo4j/internal/db/connection.go b/neo4j/internal/db/connection.go index 7144e520..296f8a9f 100644 --- a/neo4j/internal/db/connection.go +++ b/neo4j/internal/db/connection.go @@ -25,6 +25,7 @@ import ( "github.com/neo4j/neo4j-go-driver/v5/neo4j/auth" "github.com/neo4j/neo4j-go-driver/v5/neo4j/db" iauth "github.com/neo4j/neo4j-go-driver/v5/neo4j/internal/auth" + "github.com/neo4j/neo4j-go-driver/v5/neo4j/internal/telemetry" "github.com/neo4j/neo4j-go-driver/v5/neo4j/log" "github.com/neo4j/neo4j-go-driver/v5/neo4j/notifications" "math" @@ -165,6 +166,8 @@ type Connection interface { ResetAuth() // GetCurrentAuth returns the current authentication manager and token that this connection is authenticated with GetCurrentAuth() (auth.TokenManager, iauth.Token) + // Telemetry sends telemetry information about the API usage to the server. + Telemetry(api telemetry.API, onSuccess func()) } type RoutingTable struct { diff --git a/neo4j/internal/retry/state.go b/neo4j/internal/retry/state.go index c4b5329c..d001bcff 100644 --- a/neo4j/internal/retry/state.go +++ b/neo4j/internal/retry/state.go @@ -43,6 +43,7 @@ type State struct { Throttle Throttler MaxDeadConnections int DatabaseName string + TelemetrySent bool start time.Time cause string diff --git a/neo4j/internal/telemetry/telemetry.go b/neo4j/internal/telemetry/telemetry.go new file mode 100644 index 00000000..dc5427ff --- /dev/null +++ b/neo4j/internal/telemetry/telemetry.go @@ -0,0 +1,33 @@ +/* + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [https://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package telemetry + +type API int + +const ( + ManagedTransaction API = 0 + UnmanagedTransaction API = 1 + AutoCommitTransaction API = 2 + ExecuteQuery API = 3 +) + +func (a API) AsInt() int { + return int(a) +} diff --git a/neo4j/internal/testutil/connfake.go b/neo4j/internal/testutil/connfake.go index bc785ade..5b6c6c80 100644 --- a/neo4j/internal/testutil/connfake.go +++ b/neo4j/internal/testutil/connfake.go @@ -24,6 +24,7 @@ import ( "github.com/neo4j/neo4j-go-driver/v5/neo4j/auth" iauth "github.com/neo4j/neo4j-go-driver/v5/neo4j/internal/auth" idb "github.com/neo4j/neo4j-go-driver/v5/neo4j/internal/db" + "github.com/neo4j/neo4j-go-driver/v5/neo4j/internal/telemetry" "github.com/neo4j/neo4j-go-driver/v5/neo4j/log" "time" @@ -217,3 +218,5 @@ func (c *ConnFake) ResetAuth() { func (c *ConnFake) GetCurrentAuth() (auth.TokenManager, iauth.Token) { return nil, iauth.Token{} } + +func (c *ConnFake) Telemetry(telemetry.API, func()) {} diff --git a/neo4j/session_with_context.go b/neo4j/session_with_context.go index 42900b1e..a7784407 100644 --- a/neo4j/session_with_context.go +++ b/neo4j/session_with_context.go @@ -26,6 +26,7 @@ import ( idb "github.com/neo4j/neo4j-go-driver/v5/neo4j/internal/db" "github.com/neo4j/neo4j-go-driver/v5/neo4j/internal/errorutil" "github.com/neo4j/neo4j-go-driver/v5/neo4j/internal/pool" + "github.com/neo4j/neo4j-go-driver/v5/neo4j/internal/telemetry" "github.com/neo4j/neo4j-go-driver/v5/neo4j/notifications" "math" "time" @@ -71,8 +72,8 @@ type SessionWithContext interface { // Close closes any open resources and marks this session as unusable // Contexts terminating too early negatively affect connection pooling and degrade the driver performance. Close(ctx context.Context) error - pipelinedRead(ctx context.Context, work ManagedTransactionWork, configurers ...func(*TransactionConfig)) (any, error) - pipelinedWrite(ctx context.Context, work ManagedTransactionWork, configurers ...func(*TransactionConfig)) (any, error) + executeQueryRead(ctx context.Context, work ManagedTransactionWork, configurers ...func(*TransactionConfig)) (any, error) + executeQueryWrite(ctx context.Context, work ManagedTransactionWork, configurers ...func(*TransactionConfig)) (any, error) legacy() Session getServerInfo(ctx context.Context) (ServerInfo, error) verifyAuthentication(ctx context.Context) error @@ -311,6 +312,10 @@ func (s *sessionWithContext) BeginTransaction(ctx context.Context, configurers . return nil, errorutil.WrapError(err) } + if !s.driverConfig.TelemetryDisabled { + conn.Telemetry(telemetry.UnmanagedTransaction, nil) + } + beginBookmarks, err := s.getBookmarks(ctx) if err != nil { s.pool.Return(ctx, conn) @@ -357,25 +362,25 @@ func (s *sessionWithContext) BeginTransaction(ctx context.Context, configurers . func (s *sessionWithContext) ExecuteRead(ctx context.Context, work ManagedTransactionWork, configurers ...func(*TransactionConfig)) (any, error) { - return s.runRetriable(ctx, idb.ReadMode, work, true, configurers...) + return s.runRetriable(ctx, idb.ReadMode, work, true, telemetry.ManagedTransaction, configurers...) } func (s *sessionWithContext) ExecuteWrite(ctx context.Context, work ManagedTransactionWork, configurers ...func(*TransactionConfig)) (any, error) { - return s.runRetriable(ctx, idb.WriteMode, work, true, configurers...) + return s.runRetriable(ctx, idb.WriteMode, work, true, telemetry.ManagedTransaction, configurers...) } -func (s *sessionWithContext) pipelinedRead(ctx context.Context, +func (s *sessionWithContext) executeQueryRead(ctx context.Context, work ManagedTransactionWork, configurers ...func(*TransactionConfig)) (any, error) { - return s.runRetriable(ctx, idb.ReadMode, work, false, configurers...) + return s.runRetriable(ctx, idb.ReadMode, work, false, telemetry.ExecuteQuery, configurers...) } -func (s *sessionWithContext) pipelinedWrite(ctx context.Context, +func (s *sessionWithContext) executeQueryWrite(ctx context.Context, work ManagedTransactionWork, configurers ...func(*TransactionConfig)) (any, error) { - return s.runRetriable(ctx, idb.WriteMode, work, false, configurers...) + return s.runRetriable(ctx, idb.WriteMode, work, false, telemetry.ExecuteQuery, configurers...) } func (s *sessionWithContext) runRetriable( @@ -383,6 +388,7 @@ func (s *sessionWithContext) runRetriable( mode idb.AccessMode, work ManagedTransactionWork, blockingTxBegin bool, + api telemetry.API, configurers ...func(*TransactionConfig)) (any, error) { // Guard for more than one transaction per session @@ -415,7 +421,7 @@ func (s *sessionWithContext) runRetriable( DatabaseName: s.config.DatabaseName, } for state.Continue() { - if hasCompleted, result := s.executeTransactionFunction(ctx, mode, config, &state, work, blockingTxBegin); hasCompleted { + if hasCompleted, result := s.executeTransactionFunction(ctx, mode, config, &state, work, blockingTxBegin, api); hasCompleted { return result, nil } } @@ -431,7 +437,8 @@ func (s *sessionWithContext) executeTransactionFunction( config TransactionConfig, state *retry.State, work ManagedTransactionWork, - blockingTxBegin bool) (bool, any) { + blockingTxBegin bool, + api telemetry.API) (bool, any) { conn, err := s.getConnection(ctx, mode, pool.DefaultLivenessCheckThreshold) if err != nil { @@ -439,6 +446,12 @@ func (s *sessionWithContext) executeTransactionFunction( return false, nil } + if !s.driverConfig.TelemetryDisabled && !state.TelemetrySent { + conn.Telemetry(api, func() { + state.TelemetrySent = true + }) + } + // handle transaction function panic as well defer func() { s.pool.Return(ctx, conn) @@ -596,6 +609,10 @@ func (s *sessionWithContext) Run(ctx context.Context, return nil, errorutil.WrapError(err) } + if !s.driverConfig.TelemetryDisabled { + conn.Telemetry(telemetry.AutoCommitTransaction, nil) + } + runBookmarks, err := s.getBookmarks(ctx) if err != nil { s.pool.Return(ctx, conn) @@ -771,10 +788,10 @@ func (s *erroredSessionWithContext) ExecuteRead(context.Context, ManagedTransact func (s *erroredSessionWithContext) ExecuteWrite(context.Context, ManagedTransactionWork, ...func(*TransactionConfig)) (any, error) { return nil, s.err } -func (s *erroredSessionWithContext) pipelinedRead(context.Context, ManagedTransactionWork, ...func(*TransactionConfig)) (any, error) { +func (s *erroredSessionWithContext) executeQueryRead(context.Context, ManagedTransactionWork, ...func(*TransactionConfig)) (any, error) { return nil, s.err } -func (s *erroredSessionWithContext) pipelinedWrite(context.Context, ManagedTransactionWork, ...func(*TransactionConfig)) (any, error) { +func (s *erroredSessionWithContext) executeQueryWrite(context.Context, ManagedTransactionWork, ...func(*TransactionConfig)) (any, error) { return nil, s.err } func (s *erroredSessionWithContext) Run(context.Context, string, map[string]any, ...func(*TransactionConfig)) (ResultWithContext, error) { diff --git a/testkit-backend/backend.go b/testkit-backend/backend.go index c4bf4723..8a94a575 100644 --- a/testkit-backend/backend.go +++ b/testkit-backend/backend.go @@ -519,6 +519,9 @@ func (b *backend) handleRequest(req map[string]any) { c.NotificationsDisabledCategories = notifications.DisableCategories(cats...) } } + if data["telemetryDisabled"] != nil { + c.TelemetryDisabled = data["telemetryDisabled"].(bool) + } }) if err != nil { b.writeError(err) @@ -1166,6 +1169,7 @@ func (b *backend) handleRequest(req map[string]any) { "Feature:Bolt:5.1", "Feature:Bolt:5.2", "Feature:Bolt:5.3", + "Feature:Bolt:5.4", "Feature:Bolt:Patch:UTC", "Feature:Impersonation", //"Feature:TLS:1.1",