From 14cbf6915c396cee7b92cad373e2761618f2d212 Mon Sep 17 00:00:00 2001 From: grant lodge <6323995+thelonelyvulpes@users.noreply.github.com> Date: Tue, 2 Jul 2024 09:31:53 +0100 Subject: [PATCH] Buffered reading. (#581) * add bufio in between conn and racing reader * receiver name correction * fixing tests * adding ReadBufferSize configuration * racing reader over client not server * Refactor * move `DefaultReadBufferSize` to `bolt/connections` * wrap reader in buffered reader before passing it into the bolt implementations * Revert re-using channel inside RacingReader Avoiding potential race condition where message from previous call could linger in the channel. * Code clean-up --------- Co-authored-by: Stephen Cathcart Co-authored-by: Robsdedude --- neo4j/config.go | 2 ++ neo4j/config/driver.go | 5 +++++ neo4j/internal/bolt/bolt3.go | 6 ++--- neo4j/internal/bolt/bolt3_test.go | 2 ++ neo4j/internal/bolt/bolt3server_test.go | 4 ++++ neo4j/internal/bolt/bolt4.go | 6 ++--- neo4j/internal/bolt/bolt4_test.go | 5 +++++ neo4j/internal/bolt/bolt5.go | 6 ++--- neo4j/internal/bolt/bolt5_test.go | 9 ++++++++ neo4j/internal/bolt/connect.go | 9 +++++--- neo4j/internal/bolt/connect_test.go | 2 ++ neo4j/internal/bolt/connections.go | 29 +++++++++++++++++++++++-- neo4j/internal/bolt/dechunker.go | 7 +++--- neo4j/internal/bolt/incoming.go | 6 ++--- neo4j/internal/bolt/message_queue.go | 7 +++--- neo4j/internal/bolt/outgoing_test.go | 4 ++-- neo4j/internal/connector/connector.go | 2 ++ neo4j/internal/testutil/connfake.go | 6 ++--- neo4j/test-integration/dbconn_test.go | 1 + 19 files changed, 90 insertions(+), 28 deletions(-) diff --git a/neo4j/config.go b/neo4j/config.go index f438d0f5..7dc4ee63 100644 --- a/neo4j/config.go +++ b/neo4j/config.go @@ -19,6 +19,7 @@ package neo4j import ( "github.com/neo4j/neo4j-go-driver/v5/neo4j/config" + "github.com/neo4j/neo4j-go-driver/v5/neo4j/internal/bolt" "github.com/neo4j/neo4j-go-driver/v5/neo4j/internal/pool" "github.com/neo4j/neo4j-go-driver/v5/neo4j/notifications" "math" @@ -51,6 +52,7 @@ func defaultConfig() *Config { NotificationsMinSeverity: notifications.DefaultLevel, NotificationsDisabledCategories: notifications.NotificationDisabledCategories{}, TelemetryDisabled: false, + ReadBufferSize: bolt.DefaultReadBufferSize, } } diff --git a/neo4j/config/driver.go b/neo4j/config/driver.go index af515688..8bce202c 100644 --- a/neo4j/config/driver.go +++ b/neo4j/config/driver.go @@ -202,6 +202,11 @@ type Config struct { // // default: true TelemetryDisabled bool + // ReadBufferSize defines the size of the buffer used for reading data from the network connection. + // A larger buffer size can improve performance by reducing the number of read operations required + // for large data transfers. Currently, the default value is 8 KiB, but may change in the future. + // Set to 0 or below to disable buffering. + ReadBufferSize int } // ServerAddressResolver is a function type that defines the resolver function used by the routing driver to diff --git a/neo4j/internal/bolt/bolt3.go b/neo4j/internal/bolt/bolt3.go index 0ff8e772..81efdf31 100644 --- a/neo4j/internal/bolt/bolt3.go +++ b/neo4j/internal/bolt/bolt3.go @@ -27,7 +27,7 @@ import ( "github.com/neo4j/neo4j-go-driver/v5/neo4j/internal/errorutil" "github.com/neo4j/neo4j-go-driver/v5/neo4j/internal/telemetry" itime "github.com/neo4j/neo4j-go-driver/v5/neo4j/internal/time" - "net" + "io" "reflect" "time" @@ -79,7 +79,7 @@ type bolt3 struct { state int txId idb.TxHandle currStream *stream - conn net.Conn + conn io.ReadWriteCloser serverName string out *outgoing in *incoming @@ -100,7 +100,7 @@ type bolt3 struct { func NewBolt3( serverName string, - conn net.Conn, + conn io.ReadWriteCloser, errorListener ConnectionErrorListener, logger log.Logger, boltLog log.BoltLogger, diff --git a/neo4j/internal/bolt/bolt3_test.go b/neo4j/internal/bolt/bolt3_test.go index f16a272e..e8bad80b 100644 --- a/neo4j/internal/bolt/bolt3_test.go +++ b/neo4j/internal/bolt/bolt3_test.go @@ -115,6 +115,7 @@ func TestBolt3(outer *testing.T) { logger, nil, idb.NotificationConfig{}, + DefaultReadBufferSize, ) if err != nil { t.Fatal(err) @@ -167,6 +168,7 @@ func TestBolt3(outer *testing.T) { logger, nil, idb.NotificationConfig{}, + DefaultReadBufferSize, ) AssertNil(t, bolt) AssertError(t, err) diff --git a/neo4j/internal/bolt/bolt3server_test.go b/neo4j/internal/bolt/bolt3server_test.go index 007ad0c9..00c4dccb 100644 --- a/neo4j/internal/bolt/bolt3server_test.go +++ b/neo4j/internal/bolt/bolt3server_test.go @@ -18,6 +18,7 @@ package bolt import ( + "bufio" "context" "fmt" "io" @@ -25,6 +26,7 @@ import ( "testing" "github.com/neo4j/neo4j-go-driver/v5/neo4j/internal/packstream" + "github.com/neo4j/neo4j-go-driver/v5/neo4j/internal/racing" ) // Fake of bolt3 server. @@ -35,6 +37,7 @@ type bolt3server struct { conn net.Conn unpacker *packstream.Unpacker out *outgoing + reader racing.RacingReader } func newBolt3Server(conn net.Conn) *bolt3server { @@ -45,6 +48,7 @@ func newBolt3Server(conn net.Conn) *bolt3server { chunker: newChunker(), packer: packstream.Packer{}, }, + reader: racing.NewRacingReader(bufio.NewReaderSize(conn, DefaultReadBufferSize)), } } diff --git a/neo4j/internal/bolt/bolt4.go b/neo4j/internal/bolt/bolt4.go index f5a77bd7..54b526bf 100644 --- a/neo4j/internal/bolt/bolt4.go +++ b/neo4j/internal/bolt/bolt4.go @@ -21,7 +21,7 @@ import ( "context" "errors" "fmt" - "net" + "io" "reflect" "time" @@ -94,7 +94,7 @@ type bolt4 struct { state int txId idb.TxHandle streams openstreams - conn net.Conn + conn io.ReadWriteCloser serverName string connId string logId string @@ -116,7 +116,7 @@ type bolt4 struct { func NewBolt4( serverName string, - conn net.Conn, + conn io.ReadWriteCloser, errorListener ConnectionErrorListener, logger log.Logger, boltLog log.BoltLogger, diff --git a/neo4j/internal/bolt/bolt4_test.go b/neo4j/internal/bolt/bolt4_test.go index 1816aca0..900977fb 100644 --- a/neo4j/internal/bolt/bolt4_test.go +++ b/neo4j/internal/bolt/bolt4_test.go @@ -118,6 +118,7 @@ func TestBolt4(outer *testing.T) { logger, nil, idb.NotificationConfig{}, + DefaultReadBufferSize, ) if err != nil { t.Fatal(err) @@ -230,6 +231,7 @@ func TestBolt4(outer *testing.T) { logger, nil, idb.NotificationConfig{}, + DefaultReadBufferSize, ) AssertNoError(t, err) bolt.Close(context.Background()) @@ -259,6 +261,7 @@ func TestBolt4(outer *testing.T) { logger, nil, idb.NotificationConfig{}, + DefaultReadBufferSize, ) AssertNoError(t, err) bolt.Close(context.Background()) @@ -289,6 +292,7 @@ func TestBolt4(outer *testing.T) { logger, nil, idb.NotificationConfig{}, + DefaultReadBufferSize, ) AssertNoError(t, err) bolt.Close(context.Background()) @@ -315,6 +319,7 @@ func TestBolt4(outer *testing.T) { logger, nil, idb.NotificationConfig{}, + DefaultReadBufferSize, ) AssertNil(t, bolt) AssertError(t, err) diff --git a/neo4j/internal/bolt/bolt5.go b/neo4j/internal/bolt/bolt5.go index db054a00..471e004a 100644 --- a/neo4j/internal/bolt/bolt5.go +++ b/neo4j/internal/bolt/bolt5.go @@ -21,7 +21,7 @@ import ( "context" "errors" "fmt" - "net" + "io" "reflect" "time" @@ -96,7 +96,7 @@ type bolt5 struct { state int txId idb.TxHandle streams openstreams - conn net.Conn + conn io.ReadWriteCloser serverName string queue messageQueue connId string @@ -119,7 +119,7 @@ type bolt5 struct { func NewBolt5( serverName string, - conn net.Conn, + conn io.ReadWriteCloser, errorListener ConnectionErrorListener, logger log.Logger, boltLog log.BoltLogger, diff --git a/neo4j/internal/bolt/bolt5_test.go b/neo4j/internal/bolt/bolt5_test.go index 9d3a5bcf..2457ed91 100644 --- a/neo4j/internal/bolt/bolt5_test.go +++ b/neo4j/internal/bolt/bolt5_test.go @@ -136,6 +136,7 @@ func TestBolt5(outer *testing.T) { logger, nil, idb.NotificationConfig{}, + DefaultReadBufferSize, ) if err != nil { t.Fatal(err) @@ -304,6 +305,7 @@ func TestBolt5(outer *testing.T) { logger, nil, idb.NotificationConfig{}, + DefaultReadBufferSize, ) AssertNoError(t, err) bolt.Close(context.Background()) @@ -336,6 +338,7 @@ func TestBolt5(outer *testing.T) { logger, nil, idb.NotificationConfig{}, + DefaultReadBufferSize, ) AssertNoError(t, err) bolt.Close(context.Background()) @@ -365,6 +368,7 @@ func TestBolt5(outer *testing.T) { logger, nil, idb.NotificationConfig{}, + DefaultReadBufferSize, ) AssertNoError(t, err) bolt.Close(context.Background()) @@ -396,6 +400,7 @@ func TestBolt5(outer *testing.T) { logger, nil, idb.NotificationConfig{}, + DefaultReadBufferSize, ) AssertNoError(t, err) bolt.Close(context.Background()) @@ -422,6 +427,7 @@ func TestBolt5(outer *testing.T) { logger, nil, idb.NotificationConfig{}, + DefaultReadBufferSize, ) AssertNil(t, bolt) AssertError(t, err) @@ -457,6 +463,7 @@ func TestBolt5(outer *testing.T) { logger, nil, idb.NotificationConfig{}, + DefaultReadBufferSize, ) AssertNil(t, bolt) AssertError(t, err) @@ -1771,6 +1778,7 @@ func TestBolt5(outer *testing.T) { logger, &boltLogger, idb.NotificationConfig{}, + DefaultReadBufferSize, ) if err != nil { t.Error(err) @@ -1847,6 +1855,7 @@ func TestBolt5(outer *testing.T) { logger, &boltLogger, idb.NotificationConfig{}, + DefaultReadBufferSize, ) if err != nil { t.Error(err) diff --git a/neo4j/internal/bolt/connect.go b/neo4j/internal/bolt/connect.go index 9036d118..8a43e4d6 100644 --- a/neo4j/internal/bolt/connect.go +++ b/neo4j/internal/bolt/connect.go @@ -55,6 +55,7 @@ func Connect(ctx context.Context, logger log.Logger, boltLogger log.BoltLogger, notificationConfig db.NotificationConfig, + readBufferSize int, ) (db.Connection, error) { // Perform Bolt handshake to negotiate version // Send handshake to server @@ -89,14 +90,16 @@ func Connect(ctx context.Context, major := buf[3] minor := buf[2] + + bufferedConn := bufferedConnection(conn, readBufferSize) var boltConn db.Connection switch major { case 3: - boltConn = NewBolt3(serverName, conn, errorListener, logger, boltLogger) + boltConn = NewBolt3(serverName, bufferedConn, errorListener, logger, boltLogger) case 4: - boltConn = NewBolt4(serverName, conn, errorListener, logger, boltLogger) + boltConn = NewBolt4(serverName, bufferedConn, errorListener, logger, boltLogger) case 5: - boltConn = NewBolt5(serverName, conn, errorListener, logger, boltLogger) + boltConn = NewBolt5(serverName, bufferedConn, errorListener, logger, boltLogger) case 0: return nil, fmt.Errorf("server did not accept any of the requested Bolt versions (%#v)", versions) default: diff --git a/neo4j/internal/bolt/connect_test.go b/neo4j/internal/bolt/connect_test.go index 0e497b81..c9fa7bcd 100644 --- a/neo4j/internal/bolt/connect_test.go +++ b/neo4j/internal/bolt/connect_test.go @@ -64,6 +64,7 @@ func TestConnect(ot *testing.T) { logger, nil, idb.NotificationConfig{}, + DefaultReadBufferSize, ) AssertError(t, err) }) @@ -90,6 +91,7 @@ func TestConnect(ot *testing.T) { logger, nil, idb.NotificationConfig{}, + DefaultReadBufferSize, ) AssertError(t, err) if boltconn != nil { diff --git a/neo4j/internal/bolt/connections.go b/neo4j/internal/bolt/connections.go index 992cd7f1..5f1639bd 100644 --- a/neo4j/internal/bolt/connections.go +++ b/neo4j/internal/bolt/connections.go @@ -18,20 +18,45 @@ package bolt import ( + "bufio" "context" + "io" + "net" + "github.com/neo4j/neo4j-go-driver/v5/neo4j/db" idb "github.com/neo4j/neo4j-go-driver/v5/neo4j/internal/db" "github.com/neo4j/neo4j-go-driver/v5/neo4j/internal/errorutil" - "net" ) +// DefaultReadBufferSize specifies the default size (in bytes) of the buffer used for reading data from the network connection. +const DefaultReadBufferSize = 8192 + +func bufferedConnection(conn net.Conn, readBufferSize int) io.ReadWriteCloser { + var reader io.Reader + if readBufferSize > 0 { + reader = bufio.NewReaderSize(conn, readBufferSize) + } else { + reader = conn + } + + return struct { + io.Reader + io.Writer + io.Closer + }{ + Reader: reader, + Writer: conn, + Closer: conn, + } +} + type ConnectionErrorListener interface { OnNeo4jError(context.Context, idb.Connection, *db.Neo4jError) error OnIoError(context.Context, idb.Connection, error) OnDialError(context.Context, string, error) } -func handleTerminatedContextError(err error, connection net.Conn) error { +func handleTerminatedContextError(err error, connection io.Closer) error { if !contextTerminatedErr(err) { return nil } diff --git a/neo4j/internal/bolt/dechunker.go b/neo4j/internal/bolt/dechunker.go index 6992cf6b..f3d7b065 100644 --- a/neo4j/internal/bolt/dechunker.go +++ b/neo4j/internal/bolt/dechunker.go @@ -20,10 +20,11 @@ package bolt import ( "context" "encoding/binary" + "io" + "time" + "github.com/neo4j/neo4j-go-driver/v5/neo4j/internal/errorutil" rio "github.com/neo4j/neo4j-go-driver/v5/neo4j/internal/racing" - "net" - "time" ) // dechunkMessage takes a buffer to be reused and returns the reusable buffer @@ -32,7 +33,7 @@ import ( // Reads will race against the provided context ctx // If the server provides the connection read timeout hint readTimeout, a new context will be created from that timeout // and the user-provided context ctx before every read -func dechunkMessage(ctx context.Context, conn net.Conn, msgBuf []byte, readTimeout time.Duration) ([]byte, []byte, error) { +func dechunkMessage(ctx context.Context, conn io.Reader, msgBuf []byte, readTimeout time.Duration) ([]byte, []byte, error) { sizeBuf := []byte{0x00, 0x00} off := 0 diff --git a/neo4j/internal/bolt/incoming.go b/neo4j/internal/bolt/incoming.go index 7a40d5ef..76486bd1 100644 --- a/neo4j/internal/bolt/incoming.go +++ b/neo4j/internal/bolt/incoming.go @@ -19,7 +19,7 @@ package bolt import ( "context" - "net" + "io" "time" ) @@ -29,11 +29,11 @@ type incoming struct { connReadTimeout time.Duration } -func (i *incoming) next(ctx context.Context, rd net.Conn) (any, error) { +func (i *incoming) next(ctx context.Context, reader io.Reader) (any, error) { // Get next message from transport layer var err error var msg []byte - i.buf, msg, err = dechunkMessage(ctx, rd, i.buf, i.connReadTimeout) + i.buf, msg, err = dechunkMessage(ctx, reader, i.buf, i.connReadTimeout) if err != nil { return nil, err } diff --git a/neo4j/internal/bolt/message_queue.go b/neo4j/internal/bolt/message_queue.go index c5b99c2e..dedcc687 100644 --- a/neo4j/internal/bolt/message_queue.go +++ b/neo4j/internal/bolt/message_queue.go @@ -22,16 +22,17 @@ import ( "context" "errors" "fmt" + "io" + "github.com/neo4j/neo4j-go-driver/v5/neo4j/db" "github.com/neo4j/neo4j-go-driver/v5/neo4j/log" - "net" ) type messageQueue struct { in *incoming out *outgoing handlers list.List // List[responseHandler] - targetConnection net.Conn + targetConnection io.ReadWriteCloser err error onNextMessage func() @@ -39,7 +40,7 @@ type messageQueue struct { } func newMessageQueue( - target net.Conn, + target io.ReadWriteCloser, in *incoming, out *outgoing, onNext func(), onIoErr func(context.Context, error), diff --git a/neo4j/internal/bolt/outgoing_test.go b/neo4j/internal/bolt/outgoing_test.go index 896d0aa4..25b2ad57 100644 --- a/neo4j/internal/bolt/outgoing_test.go +++ b/neo4j/internal/bolt/outgoing_test.go @@ -20,8 +20,6 @@ package bolt import ( "context" "fmt" - idb "github.com/neo4j/neo4j-go-driver/v5/neo4j/internal/db" - . "github.com/neo4j/neo4j-go-driver/v5/neo4j/internal/testutil" "net" "reflect" "strings" @@ -30,7 +28,9 @@ import ( "github.com/neo4j/neo4j-go-driver/v5/neo4j/db" "github.com/neo4j/neo4j-go-driver/v5/neo4j/dbtype" + idb "github.com/neo4j/neo4j-go-driver/v5/neo4j/internal/db" "github.com/neo4j/neo4j-go-driver/v5/neo4j/internal/packstream" + . "github.com/neo4j/neo4j-go-driver/v5/neo4j/internal/testutil" ) // Utility to dehydrate/unpack diff --git a/neo4j/internal/connector/connector.go b/neo4j/internal/connector/connector.go index ad481fe2..8cf6b613 100644 --- a/neo4j/internal/connector/connector.go +++ b/neo4j/internal/connector/connector.go @@ -86,6 +86,7 @@ func (c Connector) Connect( c.Log, boltLogger, notificationConfig, + c.Config.ReadBufferSize, ) if err != nil { return nil, err @@ -122,6 +123,7 @@ func (c Connector) Connect( c.Log, boltLogger, notificationConfig, + c.Config.ReadBufferSize, ) if err != nil { return nil, err diff --git a/neo4j/internal/testutil/connfake.go b/neo4j/internal/testutil/connfake.go index 6c5384e0..249055a9 100644 --- a/neo4j/internal/testutil/connfake.go +++ b/neo4j/internal/testutil/connfake.go @@ -19,14 +19,14 @@ package testutil import ( "context" + "time" + "github.com/neo4j/neo4j-go-driver/v5/neo4j/auth" + "github.com/neo4j/neo4j-go-driver/v5/neo4j/db" iauth "github.com/neo4j/neo4j-go-driver/v5/neo4j/internal/auth" idb "github.com/neo4j/neo4j-go-driver/v5/neo4j/internal/db" "github.com/neo4j/neo4j-go-driver/v5/neo4j/internal/telemetry" "github.com/neo4j/neo4j-go-driver/v5/neo4j/log" - "time" - - "github.com/neo4j/neo4j-go-driver/v5/neo4j/db" ) type Next struct { diff --git a/neo4j/test-integration/dbconn_test.go b/neo4j/test-integration/dbconn_test.go index c970eb23..3d7a2f40 100644 --- a/neo4j/test-integration/dbconn_test.go +++ b/neo4j/test-integration/dbconn_test.go @@ -89,6 +89,7 @@ func makeRawConnection(ctx context.Context, logger log.Logger, boltLogger log.Bo logger, boltLogger, idb.NotificationConfig{}, + bolt.DefaultReadBufferSize, ) if err != nil { panic(err)