Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TT-8227: Error handling in datasources #338

Draft
wants to merge 3 commits into
base: master
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 78 additions & 35 deletions pkg/engine/resolve/resolve.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"io"
"net/http"
"strconv"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -318,6 +319,7 @@ type SubscriptionDataSource interface {
}

type Resolver struct {
sync.Mutex
ctx context.Context
dataLoaderEnabled bool
resultSetPool sync.Pool
Expand All @@ -329,6 +331,7 @@ type Resolver struct {
hash64Pool sync.Pool
dataloaderFactory *dataLoaderFactory
fetcher *Fetcher
fetchErrorMap map[int]error
}

type inflightFetch struct {
Expand Down Expand Up @@ -388,23 +391,24 @@ func New(ctx context.Context, fetcher *Fetcher, enableDataLoader bool) *Resolver
dataloaderFactory: newDataloaderFactory(fetcher),
fetcher: fetcher,
dataLoaderEnabled: enableDataLoader,
fetchErrorMap: make(map[int]error),
}
}

func (r *Resolver) resolveNode(ctx *Context, node Node, data []byte, bufPair *BufPair) (err error) {
func (r *Resolver) resolveNode(ctx *Context, node Node, data []byte, fetchError error, bufPair *BufPair) (err error) {
switch n := node.(type) {
case *Object:
return r.resolveObject(ctx, n, data, bufPair)
case *Array:
return r.resolveArray(ctx, n, data, bufPair)
return r.resolveArray(ctx, n, data, fetchError, bufPair)
case *Null:
if n.Defer.Enabled {
r.preparePatch(ctx, n.Defer.PatchIndex, nil, data)
}
r.resolveNull(bufPair.Data)
return
case *String:
return r.resolveString(ctx, n, data, bufPair)
return r.resolveString(ctx, n, data, fetchError, bufPair)
case *Boolean:
return r.resolveBoolean(ctx, n, data, bufPair)
case *Integer:
Expand Down Expand Up @@ -496,7 +500,7 @@ func (r *Resolver) ResolveGraphQLResponse(ctx *Context, response *GraphQLRespons
}

ignoreData := false
err = r.resolveNode(ctx, response.Data, responseBuf.Data.Bytes(), buf)
err = r.resolveNode(ctx, response.Data, responseBuf.Data.Bytes(), nil, buf)
if err != nil {
if !errors.Is(err, errNonNullableFieldValueIsNull) {
return
Expand Down Expand Up @@ -648,21 +652,22 @@ func (r *Resolver) ResolveGraphQLResponsePatch(ctx *Context, patch *GraphQLRespo

ctx.pathPrefix = append(path, extraPath...)

var fetchError error
if patch.Fetch != nil {
set := r.getResultSet()
defer r.freeResultSet(set)
err = r.resolveFetch(ctx, patch.Fetch, data, set)
if err != nil {
return err
}
fetchError = r.resolveFetch(ctx, patch.Fetch, data, set)
//if err != nil {
// return err
//}
_, ok := set.buffers[0]
if ok {
r.MergeBufPairErrors(set.buffers[0], buf)
data = set.buffers[0].Data.Bytes()
}
}

err = r.resolveNode(ctx, patch.Value, data, buf)
err = r.resolveNode(ctx, patch.Value, data, fetchError, buf)
if err != nil {
return
}
Expand Down Expand Up @@ -716,7 +721,7 @@ func (r *Resolver) resolveEmptyObject(b *fastbuffer.FastBuffer) {
b.WriteBytes(rBrace)
}

func (r *Resolver) resolveArray(ctx *Context, array *Array, data []byte, arrayBuf *BufPair) (err error) {
func (r *Resolver) resolveArray(ctx *Context, array *Array, data []byte, fetchError error, arrayBuf *BufPair) (err error) {
if len(array.Path) != 0 {
data, _, _, _ = jsonparser.Get(data, array.Path...)
}
Expand Down Expand Up @@ -753,12 +758,12 @@ func (r *Resolver) resolveArray(ctx *Context, array *Array, data []byte, arrayBu
defer func() { ctx.removeResponseArrayLastElements(array.Path) }()

if array.ResolveAsynchronous && !array.Stream.Enabled && !r.dataLoaderEnabled {
return r.resolveArrayAsynchronous(ctx, array, arrayItems, arrayBuf)
return r.resolveArrayAsynchronous(ctx, array, arrayItems, fetchError, arrayBuf)
}
return r.resolveArraySynchronous(ctx, array, arrayItems, arrayBuf)
return r.resolveArraySynchronous(ctx, array, arrayItems, fetchError, arrayBuf)
}

func (r *Resolver) resolveArraySynchronous(ctx *Context, array *Array, arrayItems *[][]byte, arrayBuf *BufPair) (err error) {
func (r *Resolver) resolveArraySynchronous(ctx *Context, array *Array, arrayItems *[][]byte, fetchError error, arrayBuf *BufPair) (err error) {

itemBuf := r.getBufPair()
defer r.freeBufPair(itemBuf)
Expand All @@ -780,9 +785,12 @@ func (r *Resolver) resolveArraySynchronous(ctx *Context, array *Array, arrayItem
}

ctx.addIntegerPathElement(i)
err = r.resolveNode(ctx, array.Item, (*arrayItems)[i], itemBuf)
err = r.resolveNode(ctx, array.Item, (*arrayItems)[i], fetchError, itemBuf)
ctx.removeLastPathElement()
if err != nil {
if itemBuf.HasErrors() {
r.MergeBufPairErrors(itemBuf, arrayBuf)
}
if errors.Is(err, errNonNullableFieldValueIsNull) && array.Nullable {
arrayBuf.Data.Reset()
r.resolveNull(arrayBuf.Data)
Expand All @@ -805,7 +813,7 @@ func (r *Resolver) resolveArraySynchronous(ctx *Context, array *Array, arrayItem
return
}

func (r *Resolver) resolveArrayAsynchronous(ctx *Context, array *Array, arrayItems *[][]byte, arrayBuf *BufPair) (err error) {
func (r *Resolver) resolveArrayAsynchronous(ctx *Context, array *Array, arrayItems *[][]byte, fetchError error, arrayBuf *BufPair) (err error) {

arrayBuf.Data.WriteBytes(lBrack)

Expand All @@ -827,7 +835,7 @@ func (r *Resolver) resolveArrayAsynchronous(ctx *Context, array *Array, arrayIte
cloned := ctx.Clone()
go func(ctx Context, i int) {
ctx.addPathElement([]byte(strconv.Itoa(i)))
if e := r.resolveNode(&ctx, array.Item, itemData, itemBuf); e != nil && !errors.Is(e, errTypeNameSkipped) {
if e := r.resolveNode(&ctx, array.Item, itemData, fetchError, itemBuf); e != nil && !errors.Is(e, errTypeNameSkipped) {
select {
case errCh <- e:
default:
Expand Down Expand Up @@ -922,14 +930,20 @@ func (r *Resolver) resolveBoolean(ctx *Context, boolean *Boolean, data []byte, b
return nil
}

func (r *Resolver) resolveString(ctx *Context, str *String, data []byte, stringBuf *BufPair) error {
func (r *Resolver) resolveString(ctx *Context, str *String, data []byte, fetchError error, stringBuf *BufPair) error {
var (
value []byte
valueType jsonparser.ValueType
err error
)

// TODO clean this up
value, valueType, _, err = jsonparser.Get(data, str.Path...)
if value == nil && fetchError != nil && str.Nullable {
r.resolveNull(stringBuf.Data)
r.addError(ctx, fetchError, stringBuf)
return nil
}
if err != nil || valueType != jsonparser.String {
if err == nil && str.UnescapeResponseJson {
switch valueType {
Expand All @@ -942,6 +956,9 @@ func (r *Resolver) resolveString(ctx *Context, str *String, data []byte, stringB
return fmt.Errorf("invalid value type '%s' for path %s, expecting string, got: %v. You can fix this by configuring this field as Int/Float/JSON Scalar", valueType, string(ctx.path()), string(value))
}
if !str.Nullable {
if fetchError != nil {
r.addError(ctx, fetchError, stringBuf)
}
return errNonNullableFieldValueIsNull
}
r.resolveNull(stringBuf.Data)
Expand Down Expand Up @@ -1001,7 +1018,17 @@ func (r *Resolver) resolveNull(b *fastbuffer.FastBuffer) {
b.WriteBytes(null)
}

func (r *Resolver) addError(ctx *Context, err error, objectBuf *BufPair) {
r.writeError(ctx, err.Error(), objectBuf)
}

func (r *Resolver) addResolveError(ctx *Context, objectBuf *BufPair) {
r.writeError(ctx, string(unableToResolveMsg), objectBuf)
}

func (r *Resolver) writeError(ctx *Context, msg string, objectBuf *BufPair) {
msg = strings.ReplaceAll(msg, `"`, `\"`)
msg = fmt.Sprintf("error resolving: %s", msg)
locations, path := pool.BytesBuffer.Get(), pool.BytesBuffer.Get()
defer pool.BytesBuffer.Put(locations)
defer pool.BytesBuffer.Put(path)
Expand Down Expand Up @@ -1034,7 +1061,7 @@ func (r *Resolver) addResolveError(ctx *Context, objectBuf *BufPair) {
pathBytes = path.Bytes()
}

objectBuf.WriteErr(unableToResolveMsg, locations.Bytes(), pathBytes, nil)
objectBuf.WriteErr([]byte(msg), locations.Bytes(), pathBytes, nil)
}

func (r *Resolver) resolveObject(ctx *Context, object *Object, data []byte, objectBuf *BufPair) (err error) {
Expand Down Expand Up @@ -1066,9 +1093,9 @@ func (r *Resolver) resolveObject(ctx *Context, object *Object, data []byte, obje
if object.Fetch != nil {
set = r.getResultSet()
defer r.freeResultSet(set)
err = r.resolveFetch(ctx, object.Fetch, data, set)
err := r.resolveFetch(ctx, object.Fetch, data, set)
if err != nil {
return
// TODO figure out what to do with error
}
for i := range set.buffers {
r.MergeBufPairErrors(set.buffers[i], objectBuf)
Expand All @@ -1085,7 +1112,6 @@ func (r *Resolver) resolveObject(ctx *Context, object *Object, data []byte, obje
first := true
skipCount := 0
for i := range object.Fields {

if object.Fields[i].SkipDirectiveDefined {
skip, err := jsonparser.GetBoolean(ctx.Variables, object.Fields[i].SkipVariableName)
if err == nil && skip {
Expand All @@ -1103,6 +1129,13 @@ func (r *Resolver) resolveObject(ctx *Context, object *Object, data []byte, obje
}

var fieldData []byte
var fetchError error
if object.Fields[i].HasBuffer {
fErr, ok := r.fetchErrorMap[object.Fields[i].BufferID]
if ok && fErr != nil {
fetchError = fErr
}
}
if set != nil && object.Fields[i].HasBuffer {
buffer, ok := set.buffers[object.Fields[i].BufferID]
if ok {
Expand Down Expand Up @@ -1152,7 +1185,7 @@ func (r *Resolver) resolveObject(ctx *Context, object *Object, data []byte, obje
objectBuf.Data.WriteBytes(colon)
ctx.addPathElement(object.Fields[i].Name)
ctx.setPosition(object.Fields[i].Position)
err = r.resolveNode(ctx, object.Fields[i].Value, fieldData, fieldBuf)
err = r.resolveNode(ctx, object.Fields[i].Value, fieldData, fetchError, fieldBuf)
ctx.removeLastPathElement()
ctx.responseElements = responseElements
ctx.lastFetchID = lastFetchID
Expand All @@ -1164,15 +1197,16 @@ func (r *Resolver) resolveObject(ctx *Context, object *Object, data []byte, obje
}
if errors.Is(err, errNonNullableFieldValueIsNull) {
objectBuf.Data.Reset()
fieldBufHasErrors := fieldBuf.HasErrors()
r.MergeBufPairErrors(fieldBuf, objectBuf)

if object.Nullable {
r.resolveNull(objectBuf.Data)
return nil
}

// if fied is of object type than we should not add resolve error here
if _, ok := object.Fields[i].Value.(*Object); !ok {
// if field is of object type than we should not add resolve error here
if _, ok := object.Fields[i].Value.(*Object); !ok && !fieldBufHasErrors {
r.addResolveError(ctx, objectBuf)
}
}
Expand Down Expand Up @@ -1266,7 +1300,7 @@ func (r *Resolver) resolveParallelFetch(ctx *Context, fetch *ParallelFetch, data
preparedInputs := r.getBufPairSlice()
defer r.freeBufPairSlice(preparedInputs)

resolvers := make([]func() error, 0, len(fetch.Fetches))
resolvers := make(map[int]func() error)

wg := r.getWaitGroup()
defer r.freeWaitGroup(wg)
Expand All @@ -1282,9 +1316,9 @@ func (r *Resolver) resolveParallelFetch(ctx *Context, fetch *ParallelFetch, data
}
*preparedInputs = append(*preparedInputs, preparedInput)
buf := set.buffers[f.BufferId]
resolvers = append(resolvers, func() error {
resolvers[f.BufferId] = func() error {
return r.resolveSingleFetch(ctx, f, preparedInput.Data, buf)
})
}
case *BatchFetch:
preparedInput := r.getBufPair()
err = r.prepareSingleFetch(ctx, f.Fetch, data, set, preparedInput.Data)
Expand All @@ -1293,17 +1327,21 @@ func (r *Resolver) resolveParallelFetch(ctx *Context, fetch *ParallelFetch, data
}
*preparedInputs = append(*preparedInputs, preparedInput)
buf := set.buffers[f.Fetch.BufferId]
resolvers = append(resolvers, func() error {
resolvers[f.Fetch.BufferId] = func() error {
return r.resolveBatchFetch(ctx, f, preparedInput.Data, buf)
})
}
}
}

for _, resolver := range resolvers {
go func(r func() error) {
_ = r()
for bufferID, resolver := range resolvers {
bufferID := bufferID
go func(r func() error, res *Resolver) {
err := r()
res.Lock()
defer res.Unlock()
res.fetchErrorMap[bufferID] = err
wg.Done()
}(resolver)
}(resolver, r)
}

wg.Wait()
Expand Down Expand Up @@ -1331,10 +1369,15 @@ func (r *Resolver) resolveBatchFetch(ctx *Context, fetch *BatchFetch, preparedIn
}

func (r *Resolver) resolveSingleFetch(ctx *Context, fetch *SingleFetch, preparedInput *fastbuffer.FastBuffer, buf *BufPair) error {
var err error
if r.dataLoaderEnabled && !fetch.DisableDataLoader {
return ctx.dataLoader.Load(ctx, fetch, buf)
err = ctx.dataLoader.Load(ctx, fetch, buf)
}
return r.fetcher.Fetch(ctx, fetch, preparedInput, buf)
err = r.fetcher.Fetch(ctx, fetch, preparedInput, buf)
r.Lock()
defer r.Unlock()
r.fetchErrorMap[fetch.BufferId] = err
return err
}

type Object struct {
Expand Down