Skip to content

Commit

Permalink
Don't treat errors in aggregates as Go errors
Browse files Browse the repository at this point in the history
  • Loading branch information
nussjustin authored and Brian Picciano committed Jan 10, 2022
1 parent c84f209 commit b2893cb
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 22 deletions.
4 changes: 4 additions & 0 deletions resp/resp.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ type Opts struct {
// deterministic results. This is largely used for ensuring map key/values
// are emitted in a deterministic order.
Deterministic bool

// TreatErrorsAsValues indicates that unmarshaled RESP errors should
// not be treated as actual errors but like other response type.
TreatErrorsAsValues bool
}

const defaultBytePoolThreshold = 10000000 // ~10MB
Expand Down
40 changes: 40 additions & 0 deletions resp/resp3/marshal_unmarshal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1585,6 +1585,46 @@ func TestAnyUnmarshalMarshal(t *testing.T) {
return cases
},
},
{
descr: "blob error in aggregated",
ins: []in{
{
msg: "*2\r\n$5\r\nhello\r\n!10\r\nsome error\r\n",
flattened: []string{"hello", "some error"},
},
},
mkCases: func(in in) []kase {
return []kase{
{
r: true,
ie: ie{
[]interface{}{[]byte("hello"), &BlobError{B: []byte("some error")}},
[]interface{}{[]byte("hello"), errors.New("some error")},
},
},
}
},
},
{
descr: "simple error in aggregated",
ins: []in{
{
msg: "*2\r\n$5\r\nhello\r\n-ERR some error\r\n",
flattened: []string{"hello", "ERR some error"},
},
},
mkCases: func(in in) []kase {
return []kase{
{
r: false,
ie: ie{
[]interface{}{[]byte("hello"), &SimpleError{S: "ERR some error"}},
[]interface{}{[]byte("hello"), errors.New("ERR some error")},
},
},
}
},
},
}

opts := resp.NewOpts()
Expand Down
62 changes: 40 additions & 22 deletions resp/resp3/resp.go
Original file line number Diff line number Diff line change
Expand Up @@ -1609,6 +1609,8 @@ func saneDefault(prefix Prefix) (interface{}, error) {
// we don't handle ErrorPrefix because that always returns an error and
// doesn't touch I
switch prefix {
case BlobErrorPrefix, SimpleErrorPrefix:
return new(error), nil
case BlobStringPrefix:
bb := make([]byte, 16)
return &bb, nil
Expand Down Expand Up @@ -1677,27 +1679,29 @@ func Unmarshal(br resp.BufferedReader, rcv interface{}, o *resp.Opts) error {
}
prefix := Prefix(b[0])

// if the prefix is one of the error types then just parse and return that
// full message here using the actual unmarshalers, which is easier than
// re-implementing them.
switch prefix {
case SimpleErrorPrefix:
var into SimpleError
if err := into.UnmarshalRESP(br, o); err != nil {
return err
}
return resp.ErrConnUsable{Err: into}
case BlobErrorPrefix:
var into BlobError
if err := into.UnmarshalRESP(br, o); err != nil {
return err
}
return resp.ErrConnUsable{Err: into}
case AttributeHeaderPrefix:
if err := DiscardAttribute(br, o); err != nil {
return err
if o == nil || !o.TreatErrorsAsValues {
// if the prefix is one of the error types then just parse and return that
// full message here using the actual unmarshalers, which is easier than
// re-implementing them.
switch prefix {
case SimpleErrorPrefix:
var into SimpleError
if err := into.UnmarshalRESP(br, o); err != nil {
return err
}
return resp.ErrConnUsable{Err: into}
case BlobErrorPrefix:
var into BlobError
if err := into.UnmarshalRESP(br, o); err != nil {
return err
}
return resp.ErrConnUsable{Err: into}
case AttributeHeaderPrefix:
if err := DiscardAttribute(br, o); err != nil {
return err
}
return Unmarshal(br, rcv, o)
}
return Unmarshal(br, rcv, o)
}

// This is a super special case that _must_ be handled before we actually
Expand Down Expand Up @@ -1748,7 +1752,7 @@ func Unmarshal(br resp.BufferedReader, rcv interface{}, o *resp.Opts) error {
}
return unmarshalAgg(prefix, br, l, rcv, o)

case BlobStringPrefix, VerbatimStringPrefix:
case BlobErrorPrefix, BlobStringPrefix, VerbatimStringPrefix:
var l int64
if len(b) == 1 && b[0] == '?' {
l = -1
Expand Down Expand Up @@ -1821,7 +1825,7 @@ func Unmarshal(br resp.BufferedReader, rcv interface{}, o *resp.Opts) error {
}
fallthrough

case SimpleStringPrefix, NumberPrefix, DoublePrefix, BigNumberPrefix:
case SimpleErrorPrefix, SimpleStringPrefix, NumberPrefix, DoublePrefix, BigNumberPrefix:
// We used to have a pool for *bytes.Reader instances which was used
// here. This resulted in one fewer heap allocation than this does, but
// took longer per-op due to the locking around the Pool.
Expand Down Expand Up @@ -1853,6 +1857,9 @@ func unmarshalSingle(body io.Reader, n int, rcv interface{}, o *resp.Opts) error
*ai = []byte{}
}
*ai, err = bytesutil.ReadNAppend(body, (*ai)[:0], n)
case *error:
*scratch, err = bytesutil.ReadNAppend(body, *scratch, n)
*ai = errors.New(string(*scratch))
case *string:
*scratch, err = bytesutil.ReadNAppend(body, *scratch, n)
*ai = string(*scratch)
Expand Down Expand Up @@ -2004,6 +2011,17 @@ func unmarshalAgg(prefix Prefix, br resp.BufferedReader, l int64, rcv interface{
l *= 2
}

if o == nil || o.TreatErrorsAsValues == false {
if o == nil {
o = resp.NewOpts()
} else {
o1 := *o
o = &o1
}

o.TreatErrorsAsValues = true
}

size := int(l)
stream := size < 0
if rcv == nil {
Expand Down

0 comments on commit b2893cb

Please sign in to comment.