Skip to content

Commit

Permalink
Reduce use of goroutines
Browse files Browse the repository at this point in the history
Previously processResponse function would be run as a go routine
which in turn starts another go routine for processSingleResponse
function.
First layer of go routine was responsible for handling cancellation.
While second layer was responsible for reading and parsing tokens
from the response.
In this version I replace first go routine with an iterator.
New version runs slightly faster according to mssql_perf_test.
This change addresses issue denisenkom#454
  • Loading branch information
denisenkom committed Nov 12, 2020
1 parent 2b7eb88 commit 162e654
Show file tree
Hide file tree
Showing 8 changed files with 313 additions and 431 deletions.
129 changes: 40 additions & 89 deletions bad_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,77 +115,54 @@ func TestBadServerPreLoginPacketWithJustEncryptionField(t *testing.T) {
})
}

func TestBadServerNoLoginResponse(t *testing.T) {
testBadServer(t, func(conn net.Conn) {
inBuf := newTdsBuffer(defaultPacketSize, conn)
outBuf := newTdsBuffer(defaultPacketSize, conn)
func goodPreloginSequence(t *testing.T, buf *tdsBuffer) {
// read prelogin request
packetType, err := buf.BeginRead()
if err != nil {
t.Fatal("Failed to read PRELOGIN request", err)
}
if packetType != packPrelogin {
t.Fatal("Client sent non PRELOGIN request packet type", packetType)
}

// read prelogin request
packetType, err := inBuf.BeginRead()
if err != nil {
t.Fatal("Failed to read PRELOGIN request", err)
}
if packetType != packPrelogin {
t.Fatal("Client sent non PRELOGIN request packet type", packetType)
}
// write prelogin response
fields := map[uint8][]byte{
preloginENCRYPTION: {encryptNotSup},
}
err = writePrelogin(packReply, buf, fields)
if err != nil {
t.Fatal("Writing PRELOGIN packet failed", err)
}

// write prelogin response
fields := map[uint8][]byte{
preloginENCRYPTION: {encryptNotSup},
}
err = writePrelogin(packReply, outBuf, fields)
if err != nil {
t.Fatal("Writing PRELOGIN packet failed", err)
}
// read login request
packetType, err = buf.BeginRead()
if err != nil {
t.Fatal("Failed to read LOGIN request", err)
}
if packetType != packLogin7 {
t.Fatal("Client sent non LOGIN request packet type", packetType)
}
}

// read login request
packetType, err = inBuf.BeginRead()
if err != nil {
t.Fatal("Failed to read LOGIN request", err)
}
if packetType != packLogin7 {
t.Fatal("Client sent non LOGIN request packet type", packetType)
}
func TestBadServerNoLoginResponse(t *testing.T) {
testBadServer(t, func(conn net.Conn) {
buf := newTdsBuffer(defaultPacketSize, conn)

goodPreloginSequence(t, buf)

// not sending login response
})
}

func TestBadServerIncorrectLoginResponseType(t *testing.T) {
testBadServer(t, func(conn net.Conn) {
inBuf := newTdsBuffer(defaultPacketSize, conn)
outBuf := newTdsBuffer(defaultPacketSize, conn)

// read prelogin request
packetType, err := inBuf.BeginRead()
if err != nil {
t.Fatal("Failed to read PRELOGIN request", err)
}
if packetType != packPrelogin {
t.Fatal("Client sent non PRELOGIN request packet type", packetType)
}

// write prelogin response
fields := map[uint8][]byte{
preloginENCRYPTION: {encryptNotSup},
}
err = writePrelogin(packReply, outBuf, fields)
if err != nil {
t.Fatal("Writing PRELOGIN packet failed", err)
}
buf := newTdsBuffer(defaultPacketSize, conn)

// read login request
packetType, err = inBuf.BeginRead()
if err != nil {
t.Fatal("Failed to read LOGIN request", err)
}
if packetType != packLogin7 {
t.Fatal("Client sent non LOGIN request packet type", packetType)
}
goodPreloginSequence(t, buf)

// sending incorrect packet type
outBuf.BeginPacket(packPrelogin, false)
err = outBuf.flush()
buf.BeginPacket(packPrelogin, false)
err := buf.flush()
if err != nil {
t.Fatal(err)
}
Expand All @@ -194,44 +171,18 @@ func TestBadServerIncorrectLoginResponseType(t *testing.T) {

func TestBadServerInvalidTokenId(t *testing.T) {
testBadServer(t, func(conn net.Conn) {
inBuf := newTdsBuffer(defaultPacketSize, conn)
outBuf := newTdsBuffer(defaultPacketSize, conn)

// read prelogin request
packetType, err := inBuf.BeginRead()
if err != nil {
t.Fatal("Failed to read PRELOGIN request", err)
}
if packetType != packPrelogin {
t.Fatal("Client sent non PRELOGIN request packet type", packetType)
}

// write prelogin response
fields := map[uint8][]byte{
preloginENCRYPTION: {encryptNotSup},
}
err = writePrelogin(packReply, outBuf, fields)
if err != nil {
t.Fatal("Writing PRELOGIN packet failed", err)
}
buf := newTdsBuffer(defaultPacketSize, conn)

// read login request
packetType, err = inBuf.BeginRead()
if err != nil {
t.Fatal("Failed to read LOGIN request", err)
}
if packetType != packLogin7 {
t.Fatal("Client sent non LOGIN request packet type", packetType)
}
goodPreloginSequence(t, buf)

// sending reply to LOGIN request
outBuf.BeginPacket(packReply, false)
buf.BeginPacket(packReply, false)
// this is invalid token id
err = outBuf.WriteByte(0)
err := buf.WriteByte(0)
if err != nil {
t.Fatal(err)
}
err = outBuf.flush()
err = buf.flush()
if err != nil {
t.Fatal(err)
}
Expand Down
15 changes: 12 additions & 3 deletions buf.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,19 +137,28 @@ func (w *tdsBuffer) FinishPacket() error {
var headerSize = binary.Size(header{})

func (r *tdsBuffer) readNextPacket() error {
h := header{}
var err error
err = binary.Read(r.transport, binary.BigEndian, &h)
buf := r.rbuf[:headerSize]
_, err := io.ReadFull(r.transport, buf)
if err != nil {
return err
}
h := header{
PacketType: packetType(buf[0]),
Status: buf[1],
Size: binary.BigEndian.Uint16(buf[2:4]),
Spid: binary.BigEndian.Uint16(buf[4:6]),
PacketNo: buf[6],
Pad: buf[7],
}
if int(h.Size) > r.packetSize {
return errors.New("invalid packet size, it is longer than buffer size")
}
if headerSize > int(h.Size) {
return errors.New("invalid packet size, it is shorter than header size")
}
_, err = io.ReadFull(r.transport, r.rbuf[headerSize:h.Size])
//s := base64.StdEncoding.EncodeToString(r.rbuf[headerSize:h.Size])
//fmt.Print(s)
if err != nil {
return err
}
Expand Down
23 changes: 6 additions & 17 deletions bulkcopy.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,24 +234,13 @@ func (b *Bulk) Done() (rowcount int64, err error) {

buf.FinishPacket()

tokchan := make(chan tokenStruct, 5)
go processResponse(b.ctx, b.cn.sess, tokchan, nil)

var rowCount int64
for token := range tokchan {
switch token := token.(type) {
case doneStruct:
if token.Status&doneCount != 0 {
rowCount = int64(token.RowCount)
}
if token.isError() {
return 0, token.getError()
}
case error:
return 0, b.cn.checkBadConn(token)
}
reader := startReading(b.cn.sess, b.ctx, nil)
err = reader.iterateResponse()
if err != nil {
return 0, b.cn.checkBadConn(err)
}
return rowCount, nil

return reader.rowCount, nil
}

func (b *Bulk) createColMetadata() []byte {
Expand Down
Loading

0 comments on commit 162e654

Please sign in to comment.