Skip to content

Commit

Permalink
fix: connection errors (keploy#1727)
Browse files Browse the repository at this point in the history
Signed-off-by: charankamarapu <kamarapucharan@gmail.com>
  • Loading branch information
charankamarapu authored Mar 27, 2024
1 parent 0ce495f commit 2ed1b79
Show file tree
Hide file tree
Showing 11 changed files with 33 additions and 38 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/sample-run.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ jobs:
- name: Setup Go
uses: actions/setup-go@v2
with:
go-version: "1.20"
go-version: "1.21"

- name: Build Keploy
run: |
Expand Down
4 changes: 4 additions & 0 deletions pkg/core/proxy/integrations/generic/decode.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package generic

import (
"context"
"io"
"net"
"time"

Expand Down Expand Up @@ -111,6 +112,9 @@ func decodeGeneric(ctx context.Context, logger *zap.Logger, reqBuf []byte, clien
case <-ctx.Done():
return ctx.Err()
case err := <-errCh:
if err == io.EOF {
return nil
}
return err
}
}
14 changes: 6 additions & 8 deletions pkg/core/proxy/integrations/generic/encode.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@ import (
"context"
"encoding/base64"
"errors"
"golang.org/x/sync/errgroup"
"io"
"net"
"strconv"
"time"

"golang.org/x/sync/errgroup"

"go.keploy.io/server/v2/pkg/core/proxy/integrations/util"
pUtil "go.keploy.io/server/v2/pkg/core/proxy/util"
"go.keploy.io/server/v2/pkg/models"
Expand All @@ -17,13 +19,6 @@ import (
)

func encodeGeneric(ctx context.Context, logger *zap.Logger, reqBuf []byte, clientConn, destConn net.Conn, mocks chan<- *models.Mock, _ models.OutgoingOptions) error {
//closing the destination conn
defer func(destConn net.Conn) {
err := destConn.Close()
if err != nil {
utils.LogError(logger, err, "failed to close the destination connection")
}
}(destConn)

var genericRequests []models.GenericPayload

Expand Down Expand Up @@ -203,6 +198,9 @@ func encodeGeneric(ctx context.Context, logger *zap.Logger, reqBuf []byte, clien
logger.Debug("the iteration for the generic response ends with no of genericReqs:" + strconv.Itoa(len(genericRequests)) + " and genericResps: " + strconv.Itoa(len(genericResponses)))
prevChunkWasReq = false
case err := <-errChan:
if err == io.EOF {
return nil
}
return err
}
}
Expand Down
10 changes: 3 additions & 7 deletions pkg/core/proxy/integrations/grpc/encode.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,6 @@ import (
)

func encodeGrpc(ctx context.Context, logger *zap.Logger, reqBuf []byte, clientConn, destConn net.Conn, mocks chan<- *models.Mock, _ models.OutgoingOptions) error {
//closing the destination conn
defer func(destConn net.Conn) {
err := destConn.Close()
if err != nil {
utils.LogError(logger, err, "failed to close the destination connection")
}
}(destConn)

// Send the client preface to the server. This should be the first thing sent from the client.
_, err := destConn.Write(reqBuf)
Expand Down Expand Up @@ -78,6 +71,9 @@ func encodeGrpc(ctx context.Context, logger *zap.Logger, reqBuf []byte, clientCo
case <-ctx.Done():
return ctx.Err()
case err := <-errCh:
if err == io.EOF {
return nil
}
return err
}
// This would practically be an infinite loop, unless the client closes the grpc conn
Expand Down
3 changes: 3 additions & 0 deletions pkg/core/proxy/integrations/http/decode.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,9 @@ func decodeHTTP(ctx context.Context, logger *zap.Logger, reqBuf []byte, clientCo
case <-ctx.Done():
return ctx.Err()
case err := <-errCh:
if err == io.EOF {
return nil
}
return err
}
}
10 changes: 3 additions & 7 deletions pkg/core/proxy/integrations/http/encode.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,6 @@ import (

// encodeHTTP function parses the HTTP request and response text messages to capture outgoing network calls as mocks.
func encodeHTTP(ctx context.Context, logger *zap.Logger, reqBuf []byte, clientConn, destConn net.Conn, mocks chan<- *models.Mock, opts models.OutgoingOptions) error {
//closing the destination conn
defer func(destConn net.Conn) {
err := destConn.Close()
if err != nil {
utils.LogError(logger, err, "failed to close the destination connection")
}
}(destConn)

remoteAddr := destConn.RemoteAddr().(*net.TCPAddr)
destPort := uint(remoteAddr.Port)
Expand Down Expand Up @@ -255,6 +248,9 @@ func encodeHTTP(ctx context.Context, logger *zap.Logger, reqBuf []byte, clientCo
case <-ctx.Done():
return ctx.Err()
case err := <-errCh:
if err == io.EOF {
return nil
}
return err
}
}
10 changes: 3 additions & 7 deletions pkg/core/proxy/integrations/mongo/encode.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,6 @@ import (
)

func encodeMongo(ctx context.Context, logger *zap.Logger, reqBuf []byte, clientConn, destConn net.Conn, mocks chan<- *models.Mock, _ models.OutgoingOptions) error {
//closing the destination conn
defer func(destConn net.Conn) {
err := destConn.Close()
if err != nil {
utils.LogError(logger, err, "failed to close the destination connection")
}
}(destConn)

errCh := make(chan error, 1)

Expand Down Expand Up @@ -272,6 +265,9 @@ func encodeMongo(ctx context.Context, logger *zap.Logger, reqBuf []byte, clientC
case <-ctx.Done():
return ctx.Err()
case err := <-errCh:
if err == io.EOF {
return nil
}
return err
}
}
Expand Down
7 changes: 0 additions & 7 deletions pkg/core/proxy/integrations/mysql/encode.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,6 @@ import (
)

func encodeMySQL(ctx context.Context, logger *zap.Logger, clientConn, destConn net.Conn, mocks chan<- *models.Mock, _ models.OutgoingOptions) error {
//closing the destination conn
defer func(destConn net.Conn) {
err := destConn.Close()
if err != nil {
utils.LogError(logger, err, "failed to close the destination connection")
}
}(destConn)

var (
mysqlRequests []models.MySQLRequest
Expand Down
3 changes: 3 additions & 0 deletions pkg/core/proxy/integrations/postgres/v1/decode.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,9 @@ func decodePostgres(ctx context.Context, logger *zap.Logger, reqBuf []byte, clie
case <-ctx.Done():
return ctx.Err()
case err := <-errCh:
if err == io.EOF {
return nil
}
return err
}
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/core/proxy/integrations/postgres/v1/encode.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package v1
import (
"context"
"encoding/binary"
"io"
"net"
"strconv"
"time"
Expand Down Expand Up @@ -387,6 +388,9 @@ func encodePostgres(ctx context.Context, logger *zap.Logger, reqBuf []byte, clie
logger.Debug("the iteration for the postgres response ends with no of postgresReqs:" + strconv.Itoa(len(pgRequests)) + " and pgResps: " + strconv.Itoa(len(pgResponses)))
prevChunkWasReq = false
case err := <-errChan:
if err == io.EOF {
return nil
}
return err
}
}
Expand Down
4 changes: 3 additions & 1 deletion pkg/core/proxy/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,9 @@ func ReadBuffConn(ctx context.Context, logger *zap.Logger, conn net.Conn, buffer
if ctx.Err() != nil { // to avoid sending buffer to closed channel if the context is cancelled
return
}
utils.LogError(logger, err, "failed to read the packet message in proxy")
if err != io.EOF {
utils.LogError(logger, err, "failed to read the packet message in proxy")
}
errChannel <- err
return
}
Expand Down

0 comments on commit 2ed1b79

Please sign in to comment.