Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Performance Improvement in tds and ucs2 parsing - V1-Candidate #14

Merged
merged 5 commits into from
Jun 21, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 56 additions & 12 deletions buf.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"encoding/binary"
"errors"
"io"
"sync"
)

type packetType uint8
Expand All @@ -17,6 +18,16 @@ type header struct {
Pad uint8
}

// bufpool provides buffers which are used for reading and writing in the tdsBuffer instances
var bufpool = sync.Pool{
New: func() interface{} {
shueybubbles marked this conversation as resolved.
Show resolved Hide resolved
b := make([]byte, 1<<16)
// If the return value is not a pointer, any conversion from interface{} will
// involve an allocation.
return &b
},
}

// tdsBuffer reads and writes TDS packets of data to the transport.
// The write and read buffers are separate to make sending attn signals
// possible without locks. Currently attn signals are only sent during
Expand All @@ -26,6 +37,9 @@ type tdsBuffer struct {

packetSize int

// bufClose is responsible for returning the buffer back to the pool
bufClose func()

// Write fields.
wbuf []byte
wpos int
Expand All @@ -46,10 +60,15 @@ type tdsBuffer struct {
}

func newTdsBuffer(bufsize uint16, transport io.ReadWriteCloser) *tdsBuffer {

// pull an existing buf if one is available or get and add a new buf to the bufpool
buf := bufpool.Get().(*[]byte)

return &tdsBuffer{
packetSize: int(bufsize),
wbuf: make([]byte, bufsize),
rbuf: make([]byte, bufsize),
wbuf: (*buf)[:1<<15],
rbuf: (*buf)[1<<15:],
bufClose: func() { bufpool.Put(buf) },
rpos: 8,
transport: transport,
}
Expand Down Expand Up @@ -201,32 +220,57 @@ func (r *tdsBuffer) byte() byte {
}

func (r *tdsBuffer) ReadFull(buf []byte) {
_, err := io.ReadFull(r, buf[:])
_, err := io.ReadFull(r, buf)
if err != nil {
badStreamPanic(err)
}
}

func (r *tdsBuffer) uint64() uint64 {
var buf [8]byte
r.ReadFull(buf[:])
return binary.LittleEndian.Uint64(buf[:])
// have we got enough room in the buffer to read 8 bytes, if not, do a ReadFull, else read directly from r.rbuf
if r.rpos+7 >= r.rsize {
var buf [8]byte
r.ReadFull(buf[:])

return uint64(buf[0]) | uint64(buf[1])<<8 | uint64(buf[2])<<16 | uint64(buf[3])<<24 |
uint64(buf[4])<<32 | uint64(buf[5])<<40 | uint64(buf[6])<<48 | uint64(buf[7])<<56
}

res := uint64(r.rbuf[r.rpos]) | uint64(r.rbuf[r.rpos+1])<<8 | uint64(r.rbuf[r.rpos+2])<<16 | uint64(r.rbuf[r.rpos+3])<<24 |
uint64(r.rbuf[r.rpos+4])<<32 | uint64(r.rbuf[r.rpos+5])<<40 | uint64(r.rbuf[r.rpos+6])<<48 | uint64(r.rbuf[r.rpos+7])<<56

r.rpos += 8
return res
}

func (r *tdsBuffer) int32() int32 {
return int32(r.uint32())
}

func (r *tdsBuffer) uint32() uint32 {
var buf [4]byte
r.ReadFull(buf[:])
return binary.LittleEndian.Uint32(buf[:])
// have we got enough room in the buffer to read 4 bytes, if not, do a ReadFull, else read directly from r.rbuf
if r.rpos+3 >= r.rsize {
var buf [4]byte
r.ReadFull(buf[:])
return uint32(buf[0]) | uint32(buf[1])<<8 | uint32(buf[2])<<16 | uint32(buf[3])<<24
}

res := uint32(r.rbuf[r.rpos]) | uint32(r.rbuf[r.rpos+1])<<8 | uint32(r.rbuf[r.rpos+2])<<16 | uint32(r.rbuf[r.rpos+3])<<24
r.rpos += 4
return res
}

func (r *tdsBuffer) uint16() uint16 {
var buf [2]byte
r.ReadFull(buf[:])
return binary.LittleEndian.Uint16(buf[:])
// have we got enough room in the buffer to read 2 bytes, if not, do a ReadFull, else read directly from r.rbuf
if r.rpos+1 >= r.rsize {
var buf [2]byte
r.ReadFull(buf[:])
return uint16(buf[0]) | uint16(buf[1])<<8
}

res := uint16(r.rbuf[r.rpos]) | uint16(r.rbuf[r.rpos+1])<<8
r.rpos += 2
return res
}

func (r *tdsBuffer) BVarChar() string {
Expand Down
243 changes: 243 additions & 0 deletions buf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ package mssql

import (
"bytes"
"encoding/binary"
"errors"
"testing"
"unicode/utf16"
)

type closableBuffer struct {
Expand Down Expand Up @@ -118,6 +120,165 @@ func TestBeginReadSucceeds(t *testing.T) {
}
}

func makeLargeDataBuffer() []byte {
data := make([]byte, 1<<15)

for i := 0; i < len(data); i += 4 {
data[i] = 0xFE
data[i+1] = 0xDC
data[i+2] = 0xBA
data[i+3] = 0x89
}

return data
}

func TestReadUint16Succeeds(t *testing.T) {

data := makeLargeDataBuffer()
size := 0x9 + (1 << 14)
buffer := makeBuf(uint16(size), append([]byte{0x01 /*id*/, 0xFF /*status*/, byte((size >> 8) & 0xFF), byte(size & 0xFF) /*size*/, 0xff, 0xff, 0xff, 0xff, 0xff /* byte pattern data to follow */}, data...))

id, err := buffer.BeginRead()
if err != nil {
t.Fatal("BeginRead failed:", err.Error())
}
if id != 1 {
t.Fatalf("Expected id to be 1 but it is %d", id)
}

buffer.byte()

iterations := 0

defer func() {

if iterations != (1<<14)/4 {
t.Fatalf("Expected to read all data, but only read %v", iterations*4)
}

v := recover()
if v == nil {
t.Fatalf("Expected EOF but got nil")
}

if err, ok := v.(error); ok {
if err.Error() != "Invalid TDS stream: EOF" {
t.Fatalf("Expected EOF but got %v", err)
}
} else {
t.Fatalf("Expected EOF but got %v", v)
}
}()

for {

a := buffer.uint16()
if a != 0xdcfe {
t.Fatalf("Expected read uint16 to be 0xfedc but it is %d", a)
}

b := buffer.uint16()
if b != 0x89ba {
t.Fatalf("Expected read uint16 to be 0x89ba but it is %d", a)
}

iterations++
}

}

func TestReadUint32Succeeds(t *testing.T) {

data := makeLargeDataBuffer()
size := 0x9 + (1 << 14)
buffer := makeBuf(uint16(size), append([]byte{0x01 /*id*/, 0xFF /*status*/, byte((size >> 8) & 0xFF), byte(size & 0xFF) /*size*/, 0xff, 0xff, 0xff, 0xff, 0xff /* byte pattern data to follow */}, data...))

id, err := buffer.BeginRead()
if err != nil {
t.Fatal("BeginRead failed:", err.Error())
}
if id != 1 {
t.Fatalf("Expected id to be 1 but it is %d", id)
}

buffer.byte()

iterations := 0
defer func() {
if iterations != (1<<14)/4 {
t.Fatalf("Expected to read all data, but only read %v", iterations*4)
}

v := recover()
if v == nil {
t.Fatalf("Expected EOF but got nil")
}

if err, ok := v.(error); ok {
if err.Error() != "Invalid TDS stream: EOF" {
t.Fatalf("Expected EOF but got %v", err)
}
} else {
t.Fatalf("Expected EOF but got %v", v)
}
}()
for {
a := buffer.uint32()
if a != 0x89badcfe {
t.Fatalf("Expected read uint16 to be 0x89badcfe but it is %d", a)
}

iterations++
}
}

func TestReadUint64Succeeds(t *testing.T) {

data := makeLargeDataBuffer()
size := 0x9 + (1 << 14)
buffer := makeBuf(uint16(size), append([]byte{0x01 /*id*/, 0xFF /*status*/, byte((size >> 8) & 0xFF), byte(size & 0xFF) /*size*/, 0xff, 0xff, 0xff, 0xff, 0xff /* byte pattern data to follow */}, data...))

id, err := buffer.BeginRead()
if err != nil {
t.Fatal("BeginRead failed:", err.Error())
}
if id != 1 {
t.Fatalf("Expected id to be 1 but it is %d", id)
}

buffer.byte()

iterations := 0
defer func() {
if iterations != (1<<14)/8 {
t.Fatalf("Expected to read all data, but only read %v", iterations*4)
}

v := recover()
if v == nil {
t.Fatalf("Expected EOF but got nil")
}

if err, ok := v.(error); ok {
if err.Error() != "Invalid TDS stream: EOF" {
t.Fatalf("Expected EOF but got %v", err)
}
} else {
t.Fatalf("Expected EOF but got %v", v)
}
}()

for {
a := buffer.uint64()
if a != 0x89badcfe89badcfe {
t.Fatalf("Expected read uint16 to be 0x89badcfe89badcfe but it is %d", a)
}

iterations++
}
}

func TestReadByteFailsOnSecondPacket(t *testing.T) {
buffer := makeBuf(9, []byte{
0x01 /*id*/, 0x0 /*not final*/, 0x0, 0x9 /*size*/, 0xff, 0xff, 0xff, 0xff, 0x02, /*test byte*/
Expand Down Expand Up @@ -299,6 +460,88 @@ func TestReadUsVarCharOrPanic(t *testing.T) {
t.Fatal("UsVarChar() should panic, but it didn't")
}

func TestReadUsVarCharOrPanicWideChars(t *testing.T) {
str := "百度一下,你就知道"
runes := utf16.Encode([]rune(str))
encodedBytes := make([]byte, len(runes)*2)

for i := 0; i < len(runes); i++ {
binary.LittleEndian.PutUint16(encodedBytes[i*2:], runes[i])
}

memBuf := bytes.NewBuffer(append([]byte{byte(len(runes)), 0}, encodedBytes...))

s := readUsVarCharOrPanic(memBuf)
if s != str {
t.Errorf("UsVarChar expected to return %s but it returned %s", str, s)
}
}

func TestReadBVarCharOrPanicWideChars(t *testing.T) {
str := "百度一下,你就知道"
runes := utf16.Encode([]rune(str))
encodedBytes := make([]byte, len(runes)*2)

for i := 0; i < len(runes); i++ {
binary.LittleEndian.PutUint16(encodedBytes[i*2:], runes[i])
}

memBuf := bytes.NewBuffer(append([]byte{byte(len(runes))}, encodedBytes...))

s := readBVarCharOrPanic(memBuf)
if s != str {
t.Errorf("UsVarChar expected to return %s but it returned %s", str, s)
}
}

var sideeffectstring string

func BenchmarkReadBVarCharOrPanicWideChars(b *testing.B) {
str := "百度一下,你就知道"

runes := utf16.Encode([]rune(str))
encodedBytes := make([]byte, len(runes)*2)

for i := 0; i < len(runes); i++ {
binary.LittleEndian.PutUint16(encodedBytes[i*2:], runes[i])
}

encodedBytes = append([]byte{byte(len(runes))}, encodedBytes...)

memBuf := bytes.NewReader(encodedBytes)

for n := 0; n < b.N; n++ {

s := readBVarCharOrPanic(memBuf)
sideeffectstring = s

memBuf.Reset(encodedBytes)
}
}

func BenchmarkReadBVarCharOrPanicOnly1WideChar(b *testing.B) {
str := "abcdefghijklmno百p"

runes := utf16.Encode([]rune(str))
encodedBytes := make([]byte, len(runes)*2)

for i := 0; i < len(runes); i++ {
binary.LittleEndian.PutUint16(encodedBytes[i*2:], runes[i])
}

encodedBytes = append([]byte{byte(len(runes))}, encodedBytes...)

memBuf := bytes.NewReader(encodedBytes)

for n := 0; n < b.N; n++ {

s := readBVarCharOrPanic(memBuf)
sideeffectstring = s

memBuf.Reset(encodedBytes)
}
}

func TestReadBVarCharOrPanic(t *testing.T) {
memBuf := bytes.NewBuffer([]byte{3, 0x31, 0, 0x32, 0, 0x33, 0})
s := readBVarCharOrPanic(memBuf)
Expand Down
1 change: 1 addition & 0 deletions mssql.go
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,7 @@ func (d *Driver) connect(ctx context.Context, c *Connector, params msdsn.Config)
}

func (c *Conn) Close() error {
c.sess.buf.bufClose()
return c.sess.buf.transport.Close()
}

Expand Down
Loading