Skip to content

Commit

Permalink
Add Bolt 5.4 support and Telemetry API (#533)
Browse files Browse the repository at this point in the history
* Add Bolt 5.4 support and Telemetry API
  • Loading branch information
StephenCathcart authored Sep 28, 2023
1 parent 3474208 commit 54efcda
Show file tree
Hide file tree
Showing 17 changed files with 189 additions and 53 deletions.
1 change: 1 addition & 0 deletions neo4j/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ func defaultConfig() *Config {
FetchSize: FetchDefault,
NotificationsMinSeverity: notifications.DefaultLevel,
NotificationsDisabledCategories: notifications.NotificationDisabledCategories{},
TelemetryDisabled: false,
}
}

Expand Down
16 changes: 16 additions & 0 deletions neo4j/config/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,22 @@ type Config struct {
// By default, the server's settings are used.
// Disabling categories allows the server to skip analysis for those, which can speed up query execution.
NotificationsDisabledCategories notifications.NotificationDisabledCategories
// By default, if the server requests it, the driver will automatically transmit anonymous usage
// statistics to the server it is connected to.
//
// By configuring TelemetryDisabled=True, the driver will refrain from transmitting any telemetry data.
//
// Each time one of the specified APIs is utilized to execute a query for the first time, the driver
// informs the server of this action without providing additional details such as arguments or client identifiers:
//
// DriverWithContext.ExecuteQuery
// SessionWithContext.Run
// SessionWithContext.BeginTransaction
// SessionWithContext.ExecuteRead
// SessionWithContext.ExecuteWrite
//
// default: true
TelemetryDisabled bool
}

// ServerAddressResolver is a function type that defines the resolver function used by the routing driver to
Expand Down
4 changes: 2 additions & 2 deletions neo4j/driver_with_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -703,9 +703,9 @@ type transactionFunction func(context.Context, ManagedTransactionWork, ...func(*
func (c *ExecuteQueryConfiguration) selectTxFunctionApi(session SessionWithContext) (transactionFunction, error) {
switch c.Routing {
case Read:
return session.pipelinedRead, nil
return session.executeQueryRead, nil
case Write:
return session.pipelinedWrite, nil
return session.executeQueryWrite, nil
}
return nil, fmt.Errorf("unsupported routing control, expected %d (Write) or %d (Read) "+
"but got: %d", Write, Read, c.Routing)
Expand Down
4 changes: 2 additions & 2 deletions neo4j/driver_with_context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -591,14 +591,14 @@ func (s *fakeSession) ExecuteWrite(_ context.Context, callback ManagedTransactio
}
return callback(&fakeManagedTransaction{result: result, err: err})
}
func (s *fakeSession) pipelinedRead(_ context.Context, callback ManagedTransactionWork, _ ...func(*TransactionConfig)) (any, error) {
func (s *fakeSession) executeQueryRead(_ context.Context, callback ManagedTransactionWork, _ ...func(*TransactionConfig)) (any, error) {
return callback(&fakeManagedTransaction{
result: s.executeReadTransactionResult,
err: s.executeReadErr,
})
}

func (s *fakeSession) pipelinedWrite(_ context.Context, callback ManagedTransactionWork, _ ...func(*TransactionConfig)) (any, error) {
func (s *fakeSession) executeQueryWrite(_ context.Context, callback ManagedTransactionWork, _ ...func(*TransactionConfig)) (any, error) {
result := s.executeWriteTransactionResult
err := s.executeWriteErr
if s.executeWriteErrs != nil {
Expand Down
5 changes: 5 additions & 0 deletions neo4j/internal/bolt/bolt3.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
iauth "github.com/neo4j/neo4j-go-driver/v5/neo4j/internal/auth"
idb "github.com/neo4j/neo4j-go-driver/v5/neo4j/internal/db"
"github.com/neo4j/neo4j-go-driver/v5/neo4j/internal/errorutil"
"github.com/neo4j/neo4j-go-driver/v5/neo4j/internal/telemetry"
"net"
"reflect"
"time"
Expand Down Expand Up @@ -869,3 +870,7 @@ func (b *bolt3) GetCurrentAuth() (auth.TokenManager, iauth.Token) {
token := iauth.Token{Tokens: b.auth}
return b.authManager, token
}

func (b *bolt3) Telemetry(telemetry.API, func()) {
// TELEMETRY not support by this protocol version, so we ignore it.
}
5 changes: 5 additions & 0 deletions neo4j/internal/bolt/bolt4.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/neo4j/neo4j-go-driver/v5/neo4j/internal/collections"
idb "github.com/neo4j/neo4j-go-driver/v5/neo4j/internal/db"
"github.com/neo4j/neo4j-go-driver/v5/neo4j/internal/errorutil"
"github.com/neo4j/neo4j-go-driver/v5/neo4j/internal/telemetry"
"net"
"reflect"
"time"
Expand Down Expand Up @@ -987,6 +988,10 @@ func (b *bolt4) GetCurrentAuth() (auth.TokenManager, iauth.Token) {
return b.authManager, token
}

func (b *bolt4) Telemetry(telemetry.API, func()) {
// TELEMETRY not support by this protocol version, so we ignore it.
}

func (b *bolt4) helloResponseHandler(checkUtcPatch bool) responseHandler {
return b.expectedSuccessHandler(b.onHelloSuccess(checkUtcPatch))
}
Expand Down
76 changes: 54 additions & 22 deletions neo4j/internal/bolt/bolt5.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/neo4j/neo4j-go-driver/v5/neo4j/internal/boltagent"
idb "github.com/neo4j/neo4j-go-driver/v5/neo4j/internal/db"
"github.com/neo4j/neo4j-go-driver/v5/neo4j/internal/errorutil"
"github.com/neo4j/neo4j-go-driver/v5/neo4j/internal/telemetry"
"net"
"reflect"
"time"
Expand Down Expand Up @@ -93,28 +94,29 @@ func (i *internalTx5) toMeta(logger log.Logger, logId string) map[string]any {
}

type bolt5 struct {
state int
txId idb.TxHandle
streams openstreams
conn net.Conn
serverName string
queue messageQueue
connId string
logId string
serverVersion string
bookmark string // Last bookmark
birthDate time.Time
log log.Logger
databaseName string
err error // Last fatal error
minor int
lastQid int64 // Last seen qid
idleDate time.Time
auth map[string]any
authManager auth.TokenManager
resetAuth bool
errorListener ConnectionErrorListener
now *func() time.Time
state int
txId idb.TxHandle
streams openstreams
conn net.Conn
serverName string
queue messageQueue
connId string
logId string
serverVersion string
bookmark string // Last bookmark
birthDate time.Time
log log.Logger
databaseName string
err error // Last fatal error
minor int
lastQid int64 // Last seen qid
idleDate time.Time
auth map[string]any
authManager auth.TokenManager
resetAuth bool
errorListener ConnectionErrorListener
now *func() time.Time
telemetryEnabled bool
}

func NewBolt5(
Expand Down Expand Up @@ -959,6 +961,16 @@ func (b *bolt5) GetCurrentAuth() (auth.TokenManager, iauth.Token) {
return b.authManager, token
}

func (b *bolt5) Telemetry(api telemetry.API, onSuccess func()) {
if b.telemetryEnabled && b.Version().Minor >= 4 {
b.queue.appendTelemetry(api.AsInt(), b.telemetryResponseHandler(func(*success) {
if onSuccess != nil {
onSuccess()
}
}))
}
}

func (b *bolt5) appendPullN(stream *stream) {
if b.state == bolt5Streaming {
b.queue.appendPullN(stream.fetchSize, b.pullResponseHandler(stream))
Expand Down Expand Up @@ -1092,6 +1104,10 @@ func (b *bolt5) resetResponseHandler() responseHandler {
}
}

func (b *bolt5) telemetryResponseHandler(onSuccess func(*success)) responseHandler {
return b.expectedSuccessHandler(onSuccess)
}

func (b *bolt5) expectedSuccessHandler(onSuccess func(*success)) responseHandler {
return responseHandler{
onSuccess: onSuccess,
Expand All @@ -1108,6 +1124,7 @@ func (b *bolt5) onHelloSuccess(helloSuccess *success) {
b.logId = connectionLogId
b.queue.setLogId(connectionLogId)
b.initializeReadTimeoutHint(helloSuccess.configurationHints)
b.initializeTelemetryHint(helloSuccess.configurationHints)
}

func (b *bolt5) onCommitSuccess(commitSuccess *success) {
Expand Down Expand Up @@ -1155,6 +1172,21 @@ func (b *bolt5) initializeReadTimeoutHint(hints map[string]any) {
b.queue.in.connReadTimeout = time.Duration(readTimeout) * time.Second
}

const readTelemetryHintName = "telemetry.enabled"

func (b *bolt5) initializeTelemetryHint(hints map[string]any) {
readTelemetryHint, ok := hints[readTelemetryHintName]
if !ok {
return
}
readTelemetry, ok := readTelemetryHint.(bool)
if !ok {
b.log.Infof(log.Bolt5, b.logId, `invalid %q value: %v, ignoring hint. Only boolean values are accepted`, readTelemetryHintName, readTelemetryHint)
return
}
b.telemetryEnabled = readTelemetry
}

func (b *bolt5) extractSummary(success *success, stream *stream) *db.Summary {
summary := success.summary()
summary.Agent = b.serverVersion
Expand Down
2 changes: 1 addition & 1 deletion neo4j/internal/bolt/connect.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ type protocolVersion struct {

// Supported versions in priority order
var versions = [4]protocolVersion{
{major: 5, minor: 3, back: 3},
{major: 5, minor: 4, back: 4},
{major: 4, minor: 4, back: 2},
{major: 4, minor: 1},
{major: 3, minor: 0},
Expand Down
5 changes: 5 additions & 0 deletions neo4j/internal/bolt/message_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,11 @@ func (q *messageQueue) appendGoodbye() {
// no response expected here
}

func (q *messageQueue) appendTelemetry(api int, handler responseHandler) {
q.out.appendTelemetry(api)
q.enqueueCallback(handler)
}

func (q *messageQueue) send(ctx context.Context) {
q.out.send(ctx, q.targetConnection)
}
Expand Down
1 change: 1 addition & 0 deletions neo4j/internal/bolt/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,5 @@ const (
msgCommit byte = 0x12
msgRollback byte = 0x13
msgRoute byte = 0x66 // > 4.2
msgTelemetry byte = 0x54
)
38 changes: 24 additions & 14 deletions neo4j/internal/bolt/outgoing.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func (o *outgoing) appendHello(hello map[string]any) {
o.boltLogger.LogClientMessage(o.logId, "HELLO %s", loggableDictionary(hello))
}
o.begin()
o.packer.StructHeader(byte(msgHello), 1)
o.packer.StructHeader(msgHello, 1)
o.packMap(hello)
o.end()
}
Expand Down Expand Up @@ -90,7 +90,7 @@ func (o *outgoing) appendBegin(meta map[string]any) {
o.boltLogger.LogClientMessage(o.logId, "BEGIN %s", loggableDictionary(meta))
}
o.begin()
o.packer.StructHeader(byte(msgBegin), 1)
o.packer.StructHeader(msgBegin, 1)
o.packMap(meta)
o.end()
}
Expand All @@ -100,7 +100,7 @@ func (o *outgoing) appendCommit() {
o.boltLogger.LogClientMessage(o.logId, "COMMIT")
}
o.begin()
o.packer.StructHeader(byte(msgCommit), 0)
o.packer.StructHeader(msgCommit, 0)
o.end()
}

Expand All @@ -109,7 +109,7 @@ func (o *outgoing) appendRollback() {
o.boltLogger.LogClientMessage(o.logId, "ROLLBACK")
}
o.begin()
o.packer.StructHeader(byte(msgRollback), 0)
o.packer.StructHeader(msgRollback, 0)
o.end()
}

Expand All @@ -118,7 +118,7 @@ func (o *outgoing) appendRun(cypher string, params, meta map[string]any) {
o.boltLogger.LogClientMessage(o.logId, "RUN %q %s %s", cypher, loggableDictionary(params), loggableDictionary(meta))
}
o.begin()
o.packer.StructHeader(byte(msgRun), 3)
o.packer.StructHeader(msgRun, 3)
o.packer.String(cypher)
o.packMap(params)
o.packMap(meta)
Expand All @@ -130,7 +130,7 @@ func (o *outgoing) appendPullN(n int) {
o.boltLogger.LogClientMessage(o.logId, "PULL %s", loggableDictionary{"n": n})
}
o.begin()
o.packer.StructHeader(byte(msgPullN), 1)
o.packer.StructHeader(msgPullN, 1)
o.packer.MapHeader(1)
o.packer.String("n")
o.packer.Int(n)
Expand All @@ -142,7 +142,7 @@ func (o *outgoing) appendPullNQid(n int, qid int64) {
o.boltLogger.LogClientMessage(o.logId, "PULL %s", loggableDictionary{"n": n, "qid": qid})
}
o.begin()
o.packer.StructHeader(byte(msgPullN), 1)
o.packer.StructHeader(msgPullN, 1)
o.packer.MapHeader(2)
o.packer.String("n")
o.packer.Int(n)
Expand All @@ -156,7 +156,7 @@ func (o *outgoing) appendDiscardN(n int) {
o.boltLogger.LogClientMessage(o.logId, "DISCARD %s", loggableDictionary{"n": n})
}
o.begin()
o.packer.StructHeader(byte(msgDiscardN), 1)
o.packer.StructHeader(msgDiscardN, 1)
o.packer.MapHeader(1)
o.packer.String("n")
o.packer.Int(n)
Expand All @@ -168,7 +168,7 @@ func (o *outgoing) appendDiscardNQid(n int, qid int64) {
o.boltLogger.LogClientMessage(o.logId, "DISCARD %s", loggableDictionary{"n": n, "qid": qid})
}
o.begin()
o.packer.StructHeader(byte(msgDiscardN), 1)
o.packer.StructHeader(msgDiscardN, 1)
o.packer.MapHeader(2)
o.packer.String("n")
o.packer.Int(n)
Expand All @@ -182,7 +182,7 @@ func (o *outgoing) appendPullAll() {
o.boltLogger.LogClientMessage(o.logId, "PULL ALL")
}
o.begin()
o.packer.StructHeader(byte(msgPullAll), 0)
o.packer.StructHeader(msgPullAll, 0)
o.end()
}

Expand All @@ -192,7 +192,7 @@ func (o *outgoing) appendRouteToV43(context map[string]string, bookmarks []strin
o.boltLogger.LogClientMessage(o.logId, "ROUTE %s %s %q", loggableStringDictionary(context), loggableStringList(bookmarks), database)
}
o.begin()
o.packer.StructHeader(byte(msgRoute), 3)
o.packer.StructHeader(msgRoute, 3)
o.packer.MapHeader(len(context))
for k, v := range context {
o.packer.String(k)
Expand All @@ -215,7 +215,7 @@ func (o *outgoing) appendRoute(context map[string]string, bookmarks []string, wh
o.boltLogger.LogClientMessage(o.logId, "ROUTE %s %s %s", loggableStringDictionary(context), loggableStringList(bookmarks), loggableDictionary(what))
}
o.begin()
o.packer.StructHeader(byte(msgRoute), 3)
o.packer.StructHeader(msgRoute, 3)
o.packer.MapHeader(len(context))
for k, v := range context {
o.packer.String(k)
Expand All @@ -234,7 +234,7 @@ func (o *outgoing) appendReset() {
o.boltLogger.LogClientMessage(o.logId, "RESET")
}
o.begin()
o.packer.StructHeader(byte(msgReset), 0)
o.packer.StructHeader(msgReset, 0)
o.end()
}

Expand All @@ -243,7 +243,17 @@ func (o *outgoing) appendGoodbye() {
o.boltLogger.LogClientMessage(o.logId, "GOODBYE")
}
o.begin()
o.packer.StructHeader(byte(msgGoodbye), 0)
o.packer.StructHeader(msgGoodbye, 0)
o.end()
}

func (o *outgoing) appendTelemetry(api int) {
if o.boltLogger != nil {
o.boltLogger.LogClientMessage(o.logId, "TELEMETRY %d", api)
}
o.begin()
o.packer.StructHeader(msgTelemetry, 1)
o.packer.Int(api)
o.end()
}

Expand Down
3 changes: 3 additions & 0 deletions neo4j/internal/db/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/neo4j/neo4j-go-driver/v5/neo4j/auth"
"github.com/neo4j/neo4j-go-driver/v5/neo4j/db"
iauth "github.com/neo4j/neo4j-go-driver/v5/neo4j/internal/auth"
"github.com/neo4j/neo4j-go-driver/v5/neo4j/internal/telemetry"
"github.com/neo4j/neo4j-go-driver/v5/neo4j/log"
"github.com/neo4j/neo4j-go-driver/v5/neo4j/notifications"
"math"
Expand Down Expand Up @@ -165,6 +166,8 @@ type Connection interface {
ResetAuth()
// GetCurrentAuth returns the current authentication manager and token that this connection is authenticated with
GetCurrentAuth() (auth.TokenManager, iauth.Token)
// Telemetry sends telemetry information about the API usage to the server.
Telemetry(api telemetry.API, onSuccess func())
}

type RoutingTable struct {
Expand Down
Loading

0 comments on commit 54efcda

Please sign in to comment.