Skip to content

Commit

Permalink
fix: closing of keploy due to panic in any parser (keploy#1816)
Browse files Browse the repository at this point in the history
* fix: closing of keploy due to panic in any parser

Signed-off-by: gouravkrosx <gouravgreatkr@gmail.com>

* chore: move parser panic recoverer to proxy utils for more context

Signed-off-by: gouravkrosx <gouravgreatkr@gmail.com>

---------

Signed-off-by: gouravkrosx <gouravgreatkr@gmail.com>
  • Loading branch information
gouravkrosx authored Apr 12, 2024
1 parent aaa712e commit 9f240e3
Show file tree
Hide file tree
Showing 14 changed files with 120 additions and 67 deletions.
2 changes: 1 addition & 1 deletion pkg/core/proxy/integrations/generic/decode.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func decodeGeneric(ctx context.Context, logger *zap.Logger, reqBuf []byte, clien
logger.Debug("Into the generic parser in test mode")
errCh := make(chan error, 1)
go func(errCh chan error, genericRequests [][]byte) {
defer utils.Recover(logger)
defer pUtil.Recover(logger, clientConn, nil)
defer close(errCh)
for {
// Since protocol packets have to be parsed for checking stream end,
Expand Down
4 changes: 2 additions & 2 deletions pkg/core/proxy/integrations/generic/encode.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,14 +61,14 @@ func encodeGeneric(ctx context.Context, logger *zap.Logger, reqBuf []byte, clien

// read requests from client
g.Go(func() error {
defer utils.Recover(logger)
defer pUtil.Recover(logger, clientConn, nil)
defer close(clientBuffChan)
pUtil.ReadBuffConn(ctx, logger, clientConn, clientBuffChan, errChan)
return nil
})
// read responses from destination
g.Go(func() error {
defer utils.Recover(logger)
defer pUtil.Recover(logger, nil, destConn)
defer close(destBuffChan)
pUtil.ReadBuffConn(ctx, logger, destConn, destBuffChan, errChan)
return nil
Expand Down
10 changes: 6 additions & 4 deletions pkg/core/proxy/integrations/grpc/encode.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@ package grpc

import (
"context"
"io"
"net"

pUtil "go.keploy.io/server/v2/pkg/core/proxy/util"
"go.keploy.io/server/v2/pkg/models"
"go.keploy.io/server/v2/utils"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
"io"
"net"
)

func encodeGrpc(ctx context.Context, logger *zap.Logger, reqBuf []byte, clientConn, destConn net.Conn, mocks chan<- *models.Mock, _ models.OutgoingOptions) error {
Expand Down Expand Up @@ -35,7 +37,7 @@ func encodeGrpc(ctx context.Context, logger *zap.Logger, reqBuf []byte, clientCo

// Route requests from the client to the server.
g.Go(func() error {
defer utils.Recover(logger)
defer pUtil.Recover(logger, clientConn, destConn)
err := transferFrame(ctx, destConn, clientConn, streamInfoCollection, reqFromClient, serverSideDecoder, mocks)
if err != nil {
// check for EOF error
Expand All @@ -55,7 +57,7 @@ func encodeGrpc(ctx context.Context, logger *zap.Logger, reqBuf []byte, clientCo
// Route response from the server to the client.
clientSideDecoder := NewDecoder()
g.Go(func() error {
defer utils.Recover(logger)
defer pUtil.Recover(logger, clientConn, destConn)
err := transferFrame(ctx, clientConn, destConn, streamInfoCollection, !reqFromClient, clientSideDecoder, mocks)
if err != nil {
utils.LogError(logger, err, "failed to transfer frame from server to client")
Expand Down
11 changes: 7 additions & 4 deletions pkg/core/proxy/integrations/http/decode.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (

"go.keploy.io/server/v2/pkg"
"go.keploy.io/server/v2/pkg/core/proxy/integrations"
"go.keploy.io/server/v2/pkg/core/proxy/util"
pUtil "go.keploy.io/server/v2/pkg/core/proxy/util"
"go.keploy.io/server/v2/pkg/models"
"go.keploy.io/server/v2/utils"
"go.uber.org/zap"
Expand All @@ -31,6 +31,7 @@ func decodeHTTP(ctx context.Context, logger *zap.Logger, reqBuf []byte, clientCo
errCh := make(chan error, 1)

go func(errCh chan error, reqBuf []byte, opts models.OutgoingOptions) {
defer pUtil.Recover(logger, clientConn, nil)
defer close(errCh)
for {
//Check if the expected header is present
Expand All @@ -48,7 +49,7 @@ func decodeHTTP(ctx context.Context, logger *zap.Logger, reqBuf []byte, clientCo
}
logger.Debug("The 100 continue response has been sent to the user application")
//Read the request buffer again
newRequest, err := util.ReadBytes(ctx, logger, clientConn)
newRequest, err := pUtil.ReadBytes(ctx, logger, clientConn)
if err != nil {
utils.LogError(logger, err, "failed to read the request buffer from the user application")
errCh <- err
Expand Down Expand Up @@ -103,12 +104,14 @@ func decodeHTTP(ctx context.Context, logger *zap.Logger, reqBuf []byte, clientCo
utils.LogError(logger, nil, "Didn't match any preExisting http mock", zap.Any("metadata", getReqMeta(request)))
}

_, err = util.PassThrough(ctx, logger, clientConn, dstCfg, [][]byte{reqBuf})
_, err = pUtil.PassThrough(ctx, logger, clientConn, dstCfg, [][]byte{reqBuf})
if err != nil {
utils.LogError(logger, err, "failed to passThrough http request", zap.Any("metadata", getReqMeta(request)))
errCh <- err
return
}
errCh <- nil
return
}

statusLine := fmt.Sprintf("HTTP/%d.%d %d %s\r\n", stub.Spec.HTTPReq.ProtoMajor, stub.Spec.HTTPReq.ProtoMinor, stub.Spec.HTTPResp.StatusCode, http.StatusText(stub.Spec.HTTPResp.StatusCode))
Expand Down Expand Up @@ -168,7 +171,7 @@ func decodeHTTP(ctx context.Context, logger *zap.Logger, reqBuf []byte, clientCo
return
}

reqBuf, err = util.ReadBytes(ctx, logger, clientConn)
reqBuf, err = pUtil.ReadBytes(ctx, logger, clientConn)
if err != nil {
logger.Debug("failed to read the request buffer from the client", zap.Error(err))
logger.Debug("This was the last response from the server:\n" + string(responseString))
Expand Down
3 changes: 2 additions & 1 deletion pkg/core/proxy/integrations/http/encode.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"golang.org/x/sync/errgroup"

"go.keploy.io/server/v2/pkg/core/proxy/util"
pUtil "go.keploy.io/server/v2/pkg/core/proxy/util"
"go.keploy.io/server/v2/pkg/models"
"go.keploy.io/server/v2/utils"
"go.uber.org/zap"
Expand Down Expand Up @@ -48,7 +49,7 @@ func encodeHTTP(ctx context.Context, logger *zap.Logger, reqBuf []byte, clientCo

//for keeping conn alive
g.Go(func() error {
defer utils.Recover(logger)
defer pUtil.Recover(logger, clientConn, destConn)
defer close(errCh)
for {
//check if expect : 100-continue header is present
Expand Down
2 changes: 1 addition & 1 deletion pkg/core/proxy/integrations/mongo/decode.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func decodeMongo(ctx context.Context, logger *zap.Logger, reqBuf []byte, clientC
errCh := make(chan error, 1)

go func(errCh chan error, reqBuf []byte, startedDecoding time.Time, requestBuffers [][]byte) {
defer utils.Recover(logger)
defer util.Recover(logger, clientConn, nil)
defer close(errCh)
var readRequestDelay time.Duration
for {
Expand Down
14 changes: 7 additions & 7 deletions pkg/core/proxy/integrations/mongo/encode.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (

"golang.org/x/sync/errgroup"

"go.keploy.io/server/v2/pkg/core/proxy/util"
pUtil "go.keploy.io/server/v2/pkg/core/proxy/util"
"go.keploy.io/server/v2/pkg/models"
"go.keploy.io/server/v2/utils"
"go.uber.org/zap"
Expand All @@ -27,7 +27,7 @@ func (m *Mongo) encodeMongo(ctx context.Context, logger *zap.Logger, reqBuf []by
}

g.Go(func() error {
defer utils.Recover(logger)
defer pUtil.Recover(logger, clientConn, destConn)
defer close(errCh)
for {
var err error
Expand All @@ -38,7 +38,7 @@ func (m *Mongo) encodeMongo(ctx context.Context, logger *zap.Logger, reqBuf []by
if string(reqBuf) == "read form client conn" {
// lstr := ""
started := time.Now()
reqBuf, err = util.ReadBytes(ctx, logger, clientConn)
reqBuf, err = pUtil.ReadBytes(ctx, logger, clientConn)
logger.Debug("reading from the mongo conn", zap.Any("", string(reqBuf)))
if err != nil {
if err == io.EOF {
Expand Down Expand Up @@ -85,7 +85,7 @@ func (m *Mongo) encodeMongo(ctx context.Context, logger *zap.Logger, reqBuf []by
// logStr += fmt.Sprintln("after writing the request to the destination: ", time.Since(started))
if val, ok := mongoRequest.(*models.MongoOpMessage); ok && hasSecondSetBit(val.FlagBits) {
for {
requestBuffer1, err := util.ReadBytes(ctx, logger, clientConn)
requestBuffer1, err := pUtil.ReadBytes(ctx, logger, clientConn)

// logStr += tmpStr
if err != nil {
Expand Down Expand Up @@ -138,7 +138,7 @@ func (m *Mongo) encodeMongo(ctx context.Context, logger *zap.Logger, reqBuf []by
// tmpStr := ""
reqTimestampMock := time.Now()
started := time.Now()
responsePckLengthBuffer, err := util.ReadRequiredBytes(ctx, logger, destConn, 4)
responsePckLengthBuffer, err := pUtil.ReadRequiredBytes(ctx, logger, destConn, 4)
if err != nil {
if err == io.EOF {
logger.Debug("recieved response buffer is empty in record mode for mongo call")
Expand All @@ -155,7 +155,7 @@ func (m *Mongo) encodeMongo(ctx context.Context, logger *zap.Logger, reqBuf []by
pckLength := getPacketLength(responsePckLengthBuffer)
logger.Debug("received pck length ", zap.Any("packet length", pckLength))

responsePckDataBuffer, err := util.ReadRequiredBytes(ctx, logger, destConn, int(pckLength)-4)
responsePckDataBuffer, err := pUtil.ReadRequiredBytes(ctx, logger, destConn, int(pckLength)-4)

logger.Debug("recieved these packets", zap.Any("packets", responsePckDataBuffer))

Expand Down Expand Up @@ -204,7 +204,7 @@ func (m *Mongo) encodeMongo(ctx context.Context, logger *zap.Logger, reqBuf []by
m.recordMessage(ctx, logger, mongoRequests, mongoResponses, opReq, reqTimestampMock, mocks)
}
started = time.Now()
responseBuffer, err = util.ReadBytes(ctx, logger, destConn)
responseBuffer, err = pUtil.ReadBytes(ctx, logger, destConn)
// logStr += tmpStr
if err != nil {
if err == io.EOF {
Expand Down
8 changes: 4 additions & 4 deletions pkg/core/proxy/integrations/mysql/decode.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"time"

"go.keploy.io/server/v2/pkg/core/proxy/integrations"
"go.keploy.io/server/v2/pkg/core/proxy/util"
pUtil "go.keploy.io/server/v2/pkg/core/proxy/util"
"go.keploy.io/server/v2/pkg/models"
"go.keploy.io/server/v2/utils"
"go.uber.org/zap"
Expand All @@ -33,7 +33,7 @@ func decodeMySQL(ctx context.Context, logger *zap.Logger, clientConn net.Conn, d
errCh := make(chan error, 1)

go func(errCh chan error, configMocks []*models.Mock, tcsMocks []*models.Mock, prevRequest string, requestBuffers [][]byte) {
defer utils.Recover(logger)
defer pUtil.Recover(logger, clientConn, nil)
defer close(errCh)
for {
//log.Debug("Config and TCS Mocks", zap.Any("configMocks", configMocks), zap.Any("tcsMocks", tcsMocks))
Expand Down Expand Up @@ -98,7 +98,7 @@ func decodeMySQL(ctx context.Context, logger *zap.Logger, clientConn net.Conn, d
}

// Attempt to read from the client
requestBuffer, err := util.ReadBytes(ctx, logger, clientConn)
requestBuffer, err := pUtil.ReadBytes(ctx, logger, clientConn)
if err != nil {
if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
// Timeout occurred, no data received from client
Expand Down Expand Up @@ -178,7 +178,7 @@ func decodeMySQL(ctx context.Context, logger *zap.Logger, clientConn net.Conn, d
if matchedIndex == -1 {
logger.Debug("No matching mock found")

responseBuffer, err := util.PassThrough(ctx, logger, clientConn, dstCfg, requestBuffers)
responseBuffer, err := pUtil.PassThrough(ctx, logger, clientConn, dstCfg, requestBuffers)
if err != nil {
utils.LogError(logger, err, "Failed to passthrough the mysql request to the actual database server")
errCh <- err
Expand Down
37 changes: 19 additions & 18 deletions pkg/core/proxy/integrations/mysql/encode.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@ package mysql
import (
"context"
"errors"
"golang.org/x/sync/errgroup"
"net"
"time"

"go.keploy.io/server/v2/pkg/core/proxy/util"
"golang.org/x/sync/errgroup"

pUtil "go.keploy.io/server/v2/pkg/core/proxy/util"
"go.keploy.io/server/v2/pkg/models"
"go.keploy.io/server/v2/utils"
"go.uber.org/zap"
Expand All @@ -30,7 +31,7 @@ func encodeMySQL(ctx context.Context, logger *zap.Logger, clientConn, destConn n

//for keeping conn alive
g.Go(func() error {
defer utils.Recover(logger)
defer pUtil.Recover(logger, clientConn, destConn)
defer close(errCh)
for {
lastCommand = 0x00 //resetting last command for new loop
Expand All @@ -54,7 +55,7 @@ func encodeMySQL(ctx context.Context, logger *zap.Logger, clientConn, destConn n
errCh <- err
return nil
}
handshakeResponseFromClient, err := util.ReadBytes(ctx, logger, clientConn)
handshakeResponseFromClient, err := pUtil.ReadBytes(ctx, logger, clientConn)
if err != nil {
utils.LogError(logger, err, "failed to read handshake response from client")
errCh <- err
Expand All @@ -71,7 +72,7 @@ func encodeMySQL(ctx context.Context, logger *zap.Logger, clientConn, destConn n
}
//TODO: why is this sleep here?
time.Sleep(100 * time.Millisecond)
okPacket1, err := util.ReadBytes(ctx, logger, destConn)
okPacket1, err := pUtil.ReadBytes(ctx, logger, destConn)
if err != nil {
utils.LogError(logger, err, "failed to read packet from server after handshake")
errCh <- err
Expand Down Expand Up @@ -132,7 +133,7 @@ func encodeMySQL(ctx context.Context, logger *zap.Logger, clientConn, destConn n
})
if oprResponse2 == "AUTH_SWITCH_REQUEST" {

authSwitchResponse, err := util.ReadBytes(ctx, logger, clientConn)
authSwitchResponse, err := pUtil.ReadBytes(ctx, logger, clientConn)
if err != nil {
utils.LogError(logger, err, "failed to read AuthSwitchResponse from client")
errCh <- err
Expand All @@ -147,7 +148,7 @@ func encodeMySQL(ctx context.Context, logger *zap.Logger, clientConn, destConn n
errCh <- err
return nil
}
serverResponse, err := util.ReadBytes(ctx, logger, destConn)
serverResponse, err := pUtil.ReadBytes(ctx, logger, destConn)
if err != nil {
utils.LogError(logger, err, "failed to read response from server")
errCh <- err
Expand Down Expand Up @@ -203,7 +204,7 @@ func encodeMySQL(ctx context.Context, logger *zap.Logger, clientConn, destConn n
}
if pluginType == "cachingSha2PasswordPerformFullAuthentication" {

clientResponse, err := util.ReadBytes(ctx, logger, clientConn)
clientResponse, err := pUtil.ReadBytes(ctx, logger, clientConn)
if err != nil {
utils.LogError(logger, err, "failed to read response from client")
errCh <- err
Expand All @@ -218,7 +219,7 @@ func encodeMySQL(ctx context.Context, logger *zap.Logger, clientConn, destConn n
errCh <- err
return nil
}
finalServerResponse, err := util.ReadBytes(ctx, logger, destConn)
finalServerResponse, err := pUtil.ReadBytes(ctx, logger, destConn)
if err != nil {
utils.LogError(logger, err, "failed to read final response from server")
errCh <- err
Expand Down Expand Up @@ -263,7 +264,7 @@ func encodeMySQL(ctx context.Context, logger *zap.Logger, clientConn, destConn n
},
Message: mysqlRespFinal,
})
clientResponse1, err := util.ReadBytes(ctx, logger, clientConn)
clientResponse1, err := pUtil.ReadBytes(ctx, logger, clientConn)
if err != nil {
utils.LogError(logger, err, "failed to read response from client")
errCh <- err
Expand All @@ -278,7 +279,7 @@ func encodeMySQL(ctx context.Context, logger *zap.Logger, clientConn, destConn n
errCh <- err
return nil
}
finalServerResponse1, err := util.ReadBytes(ctx, logger, destConn)
finalServerResponse1, err := pUtil.ReadBytes(ctx, logger, destConn)
if err != nil {
utils.LogError(logger, err, "failed to read final response from server")
errCh <- err
Expand Down Expand Up @@ -328,7 +329,7 @@ func encodeMySQL(ctx context.Context, logger *zap.Logger, clientConn, destConn n
})
} else {
// time.Sleep(10 * time.Millisecond)
finalServerResponse, err := util.ReadBytes(ctx, logger, destConn)
finalServerResponse, err := pUtil.ReadBytes(ctx, logger, destConn)
if err != nil {
utils.LogError(logger, err, "failed to read final response from server")
errCh <- err
Expand Down Expand Up @@ -369,7 +370,7 @@ func encodeMySQL(ctx context.Context, logger *zap.Logger, clientConn, destConn n
}
if pluginType == "cachingSha2PasswordPerformFullAuthentication" {

clientResponse, err := util.ReadBytes(ctx, logger, clientConn)
clientResponse, err := pUtil.ReadBytes(ctx, logger, clientConn)
if err != nil {
utils.LogError(logger, err, "failed to read response from client")
errCh <- err
Expand All @@ -384,7 +385,7 @@ func encodeMySQL(ctx context.Context, logger *zap.Logger, clientConn, destConn n
errCh <- err
return nil
}
finalServerResponse, err := util.ReadBytes(ctx, logger, destConn)
finalServerResponse, err := pUtil.ReadBytes(ctx, logger, destConn)
if err != nil {
utils.LogError(logger, err, "failed to read final response from server")
errCh <- err
Expand Down Expand Up @@ -429,7 +430,7 @@ func encodeMySQL(ctx context.Context, logger *zap.Logger, clientConn, destConn n
},
Message: mysqlRespFinal,
})
clientResponse1, err := util.ReadBytes(ctx, logger, clientConn)
clientResponse1, err := pUtil.ReadBytes(ctx, logger, clientConn)
if err != nil {
utils.LogError(logger, err, "failed to read response from client")
errCh <- err
Expand All @@ -444,7 +445,7 @@ func encodeMySQL(ctx context.Context, logger *zap.Logger, clientConn, destConn n
errCh <- err
return nil
}
finalServerResponse1, err := util.ReadBytes(ctx, logger, destConn)
finalServerResponse1, err := pUtil.ReadBytes(ctx, logger, destConn)
if err != nil {
utils.LogError(logger, err, "failed to read final response from server")
errCh <- err
Expand Down Expand Up @@ -541,7 +542,7 @@ func handleClientQueries(ctx context.Context, logger *zap.Logger, initialBuffer
queryBuffer = initialBuffer
firstIteration = false
} else {
queryBuffer, err = util.ReadBytes(ctx, logger, clientConn)
queryBuffer, err = pUtil.ReadBytes(ctx, logger, clientConn)
if err != nil {
utils.LogError(logger, err, "failed to read query from the mysql client")
return err
Expand Down Expand Up @@ -574,7 +575,7 @@ func handleClientQueries(ctx context.Context, logger *zap.Logger, initialBuffer
if res == 9 {
return nil
}
queryResponse, err := util.ReadBytes(ctx, logger, destConn)
queryResponse, err := pUtil.ReadBytes(ctx, logger, destConn)
if err != nil {
utils.LogError(logger, err, "failed to read query response from mysql server")
return err
Expand Down
Loading

0 comments on commit 9f240e3

Please sign in to comment.