Skip to content

Commit

Permalink
Performance Improvement in tds and ucs2 parsing - V1-Candidate (#14)
Browse files Browse the repository at this point in the history
* Perf improfement tds and decoding ucs2 strings

* Reduce pooled buffer size

* fix unit test expected value message

* fix typo in comment

* fix linting issues
  • Loading branch information
PeteBassettBet365 authored Jun 21, 2022
1 parent f6444fb commit f386d5f
Show file tree
Hide file tree
Showing 5 changed files with 597 additions and 17 deletions.
68 changes: 56 additions & 12 deletions buf.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"encoding/binary"
"errors"
"io"
"sync"
)

type packetType uint8
Expand All @@ -17,6 +18,16 @@ type header struct {
Pad uint8
}

// bufpool provides buffers which are used for reading and writing in the tdsBuffer instances
var bufpool = sync.Pool{
New: func() interface{} {
b := make([]byte, 1<<16)
// If the return value is not a pointer, any conversion from interface{} will
// involve an allocation.
return &b
},
}

// tdsBuffer reads and writes TDS packets of data to the transport.
// The write and read buffers are separate to make sending attn signals
// possible without locks. Currently attn signals are only sent during
Expand All @@ -26,6 +37,9 @@ type tdsBuffer struct {

packetSize int

// bufClose is responsible for returning the buffer back to the pool
bufClose func()

// Write fields.
wbuf []byte
wpos int
Expand All @@ -46,10 +60,15 @@ type tdsBuffer struct {
}

func newTdsBuffer(bufsize uint16, transport io.ReadWriteCloser) *tdsBuffer {

// pull an existing buf if one is available or get and add a new buf to the bufpool
buf := bufpool.Get().(*[]byte)

return &tdsBuffer{
packetSize: int(bufsize),
wbuf: make([]byte, bufsize),
rbuf: make([]byte, bufsize),
wbuf: (*buf)[:1<<15],
rbuf: (*buf)[1<<15:],
bufClose: func() { bufpool.Put(buf) },
rpos: 8,
transport: transport,
}
Expand Down Expand Up @@ -201,32 +220,57 @@ func (r *tdsBuffer) byte() byte {
}

func (r *tdsBuffer) ReadFull(buf []byte) {
_, err := io.ReadFull(r, buf[:])
_, err := io.ReadFull(r, buf)
if err != nil {
badStreamPanic(err)
}
}

func (r *tdsBuffer) uint64() uint64 {
var buf [8]byte
r.ReadFull(buf[:])
return binary.LittleEndian.Uint64(buf[:])
// have we got enough room in the buffer to read 8 bytes, if not, do a ReadFull, else read directly from r.rbuf
if r.rpos+7 >= r.rsize {
var buf [8]byte
r.ReadFull(buf[:])

return uint64(buf[0]) | uint64(buf[1])<<8 | uint64(buf[2])<<16 | uint64(buf[3])<<24 |
uint64(buf[4])<<32 | uint64(buf[5])<<40 | uint64(buf[6])<<48 | uint64(buf[7])<<56
}

res := uint64(r.rbuf[r.rpos]) | uint64(r.rbuf[r.rpos+1])<<8 | uint64(r.rbuf[r.rpos+2])<<16 | uint64(r.rbuf[r.rpos+3])<<24 |
uint64(r.rbuf[r.rpos+4])<<32 | uint64(r.rbuf[r.rpos+5])<<40 | uint64(r.rbuf[r.rpos+6])<<48 | uint64(r.rbuf[r.rpos+7])<<56

r.rpos += 8
return res
}

func (r *tdsBuffer) int32() int32 {
return int32(r.uint32())
}

func (r *tdsBuffer) uint32() uint32 {
var buf [4]byte
r.ReadFull(buf[:])
return binary.LittleEndian.Uint32(buf[:])
// have we got enough room in the buffer to read 4 bytes, if not, do a ReadFull, else read directly from r.rbuf
if r.rpos+3 >= r.rsize {
var buf [4]byte
r.ReadFull(buf[:])
return uint32(buf[0]) | uint32(buf[1])<<8 | uint32(buf[2])<<16 | uint32(buf[3])<<24
}

res := uint32(r.rbuf[r.rpos]) | uint32(r.rbuf[r.rpos+1])<<8 | uint32(r.rbuf[r.rpos+2])<<16 | uint32(r.rbuf[r.rpos+3])<<24
r.rpos += 4
return res
}

func (r *tdsBuffer) uint16() uint16 {
var buf [2]byte
r.ReadFull(buf[:])
return binary.LittleEndian.Uint16(buf[:])
// have we got enough room in the buffer to read 2 bytes, if not, do a ReadFull, else read directly from r.rbuf
if r.rpos+1 >= r.rsize {
var buf [2]byte
r.ReadFull(buf[:])
return uint16(buf[0]) | uint16(buf[1])<<8
}

res := uint16(r.rbuf[r.rpos]) | uint16(r.rbuf[r.rpos+1])<<8
r.rpos += 2
return res
}

func (r *tdsBuffer) BVarChar() string {
Expand Down
243 changes: 243 additions & 0 deletions buf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ package mssql

import (
"bytes"
"encoding/binary"
"errors"
"testing"
"unicode/utf16"
)

type closableBuffer struct {
Expand Down Expand Up @@ -118,6 +120,165 @@ func TestBeginReadSucceeds(t *testing.T) {
}
}

func makeLargeDataBuffer() []byte {
data := make([]byte, 1<<15)

for i := 0; i < len(data); i += 4 {
data[i] = 0xFE
data[i+1] = 0xDC
data[i+2] = 0xBA
data[i+3] = 0x89
}

return data
}

func TestReadUint16Succeeds(t *testing.T) {

data := makeLargeDataBuffer()
size := 0x9 + (1 << 14)
buffer := makeBuf(uint16(size), append([]byte{0x01 /*id*/, 0xFF /*status*/, byte((size >> 8) & 0xFF), byte(size & 0xFF) /*size*/, 0xff, 0xff, 0xff, 0xff, 0xff /* byte pattern data to follow */}, data...))

id, err := buffer.BeginRead()
if err != nil {
t.Fatal("BeginRead failed:", err.Error())
}
if id != 1 {
t.Fatalf("Expected id to be 1 but it is %d", id)
}

buffer.byte()

iterations := 0

defer func() {

if iterations != (1<<14)/4 {
t.Fatalf("Expected to read all data, but only read %v", iterations*4)
}

v := recover()
if v == nil {
t.Fatalf("Expected EOF but got nil")
}

if err, ok := v.(error); ok {
if err.Error() != "Invalid TDS stream: EOF" {
t.Fatalf("Expected EOF but got %v", err)
}
} else {
t.Fatalf("Expected EOF but got %v", v)
}
}()

for {

a := buffer.uint16()
if a != 0xdcfe {
t.Fatalf("Expected read uint16 to be 0xfedc but it is %d", a)
}

b := buffer.uint16()
if b != 0x89ba {
t.Fatalf("Expected read uint16 to be 0x89ba but it is %d", a)
}

iterations++
}

}

func TestReadUint32Succeeds(t *testing.T) {

data := makeLargeDataBuffer()
size := 0x9 + (1 << 14)
buffer := makeBuf(uint16(size), append([]byte{0x01 /*id*/, 0xFF /*status*/, byte((size >> 8) & 0xFF), byte(size & 0xFF) /*size*/, 0xff, 0xff, 0xff, 0xff, 0xff /* byte pattern data to follow */}, data...))

id, err := buffer.BeginRead()
if err != nil {
t.Fatal("BeginRead failed:", err.Error())
}
if id != 1 {
t.Fatalf("Expected id to be 1 but it is %d", id)
}

buffer.byte()

iterations := 0
defer func() {
if iterations != (1<<14)/4 {
t.Fatalf("Expected to read all data, but only read %v", iterations*4)
}

v := recover()
if v == nil {
t.Fatalf("Expected EOF but got nil")
}

if err, ok := v.(error); ok {
if err.Error() != "Invalid TDS stream: EOF" {
t.Fatalf("Expected EOF but got %v", err)
}
} else {
t.Fatalf("Expected EOF but got %v", v)
}
}()
for {
a := buffer.uint32()
if a != 0x89badcfe {
t.Fatalf("Expected read uint16 to be 0x89badcfe but it is %d", a)
}

iterations++
}
}

func TestReadUint64Succeeds(t *testing.T) {

data := makeLargeDataBuffer()
size := 0x9 + (1 << 14)
buffer := makeBuf(uint16(size), append([]byte{0x01 /*id*/, 0xFF /*status*/, byte((size >> 8) & 0xFF), byte(size & 0xFF) /*size*/, 0xff, 0xff, 0xff, 0xff, 0xff /* byte pattern data to follow */}, data...))

id, err := buffer.BeginRead()
if err != nil {
t.Fatal("BeginRead failed:", err.Error())
}
if id != 1 {
t.Fatalf("Expected id to be 1 but it is %d", id)
}

buffer.byte()

iterations := 0
defer func() {
if iterations != (1<<14)/8 {
t.Fatalf("Expected to read all data, but only read %v", iterations*4)
}

v := recover()
if v == nil {
t.Fatalf("Expected EOF but got nil")
}

if err, ok := v.(error); ok {
if err.Error() != "Invalid TDS stream: EOF" {
t.Fatalf("Expected EOF but got %v", err)
}
} else {
t.Fatalf("Expected EOF but got %v", v)
}
}()

for {
a := buffer.uint64()
if a != 0x89badcfe89badcfe {
t.Fatalf("Expected read uint16 to be 0x89badcfe89badcfe but it is %d", a)
}

iterations++
}
}

func TestReadByteFailsOnSecondPacket(t *testing.T) {
buffer := makeBuf(9, []byte{
0x01 /*id*/, 0x0 /*not final*/, 0x0, 0x9 /*size*/, 0xff, 0xff, 0xff, 0xff, 0x02, /*test byte*/
Expand Down Expand Up @@ -299,6 +460,88 @@ func TestReadUsVarCharOrPanic(t *testing.T) {
t.Fatal("UsVarChar() should panic, but it didn't")
}

func TestReadUsVarCharOrPanicWideChars(t *testing.T) {
str := "百度一下,你就知道"
runes := utf16.Encode([]rune(str))
encodedBytes := make([]byte, len(runes)*2)

for i := 0; i < len(runes); i++ {
binary.LittleEndian.PutUint16(encodedBytes[i*2:], runes[i])
}

memBuf := bytes.NewBuffer(append([]byte{byte(len(runes)), 0}, encodedBytes...))

s := readUsVarCharOrPanic(memBuf)
if s != str {
t.Errorf("UsVarChar expected to return %s but it returned %s", str, s)
}
}

func TestReadBVarCharOrPanicWideChars(t *testing.T) {
str := "百度一下,你就知道"
runes := utf16.Encode([]rune(str))
encodedBytes := make([]byte, len(runes)*2)

for i := 0; i < len(runes); i++ {
binary.LittleEndian.PutUint16(encodedBytes[i*2:], runes[i])
}

memBuf := bytes.NewBuffer(append([]byte{byte(len(runes))}, encodedBytes...))

s := readBVarCharOrPanic(memBuf)
if s != str {
t.Errorf("UsVarChar expected to return %s but it returned %s", str, s)
}
}

var sideeffectstring string

func BenchmarkReadBVarCharOrPanicWideChars(b *testing.B) {
str := "百度一下,你就知道"

runes := utf16.Encode([]rune(str))
encodedBytes := make([]byte, len(runes)*2)

for i := 0; i < len(runes); i++ {
binary.LittleEndian.PutUint16(encodedBytes[i*2:], runes[i])
}

encodedBytes = append([]byte{byte(len(runes))}, encodedBytes...)

memBuf := bytes.NewReader(encodedBytes)

for n := 0; n < b.N; n++ {

s := readBVarCharOrPanic(memBuf)
sideeffectstring = s

memBuf.Reset(encodedBytes)
}
}

func BenchmarkReadBVarCharOrPanicOnly1WideChar(b *testing.B) {
str := "abcdefghijklmno百p"

runes := utf16.Encode([]rune(str))
encodedBytes := make([]byte, len(runes)*2)

for i := 0; i < len(runes); i++ {
binary.LittleEndian.PutUint16(encodedBytes[i*2:], runes[i])
}

encodedBytes = append([]byte{byte(len(runes))}, encodedBytes...)

memBuf := bytes.NewReader(encodedBytes)

for n := 0; n < b.N; n++ {

s := readBVarCharOrPanic(memBuf)
sideeffectstring = s

memBuf.Reset(encodedBytes)
}
}

func TestReadBVarCharOrPanic(t *testing.T) {
memBuf := bytes.NewBuffer([]byte{3, 0x31, 0, 0x32, 0, 0x33, 0})
s := readBVarCharOrPanic(memBuf)
Expand Down
1 change: 1 addition & 0 deletions mssql.go
Original file line number Diff line number Diff line change
Expand Up @@ -428,6 +428,7 @@ func (d *Driver) connect(ctx context.Context, c *Connector, params msdsn.Config)
}

func (c *Conn) Close() error {
c.sess.buf.bufClose()
return c.sess.buf.transport.Close()
}

Expand Down
Loading

0 comments on commit f386d5f

Please sign in to comment.