From b7801f36c0bdf096c2db740cf57cd183bc5705e7 Mon Sep 17 00:00:00 2001 From: Christopher Wolff Date: Sat, 17 Dec 2022 08:05:29 -0800 Subject: [PATCH] fix: update iox.sql to detect midstream errors BREAKING CHANGE: Flux's abstraction for IOx's client is changing. --- dependencies/iox/client.go | 9 ++++++++- stdlib/experimental/iox/source.go | 26 +++++++++++++++++++++++++- 2 files changed, 33 insertions(+), 2 deletions(-) diff --git a/dependencies/iox/client.go b/dependencies/iox/client.go index dd287475d6..e6c600ce9d 100644 --- a/dependencies/iox/client.go +++ b/dependencies/iox/client.go @@ -42,11 +42,18 @@ func GetProvider(ctx context.Context) Provider { return p.(Provider) } +// RecordReader is similar to the RecordReader interface provided by Arrow's array +// package, but includes a method for detecting errors that are sent mid-stream. +type RecordReader interface { + array.RecordReader + Err() error +} + // Client provides a way to query an iox instance. type Client interface { // Query will initiate a query using the given query string, parameters, and memory allocator // against the iox instance. It returns an array.RecordReader from the arrow flight api. - Query(ctx context.Context, query string, params []interface{}, mem memory.Allocator) (array.RecordReader, error) + Query(ctx context.Context, query string, params []interface{}, mem memory.Allocator) (RecordReader, error) // GetSchema will retrieve a schema for the given table if this client supports that capability. // If this Client doesn't support this capability, it should return a flux error with the code diff --git a/stdlib/experimental/iox/source.go b/stdlib/experimental/iox/source.go index 8590b53374..a34dcd1a16 100644 --- a/stdlib/experimental/iox/source.go +++ b/stdlib/experimental/iox/source.go @@ -17,6 +17,9 @@ import ( "github.com/influxdata/flux/internal/function" "github.com/influxdata/flux/memory" "github.com/influxdata/flux/plan" + "github.com/opentracing/opentracing-go" + "github.com/opentracing/opentracing-go/ext" + "github.com/opentracing/opentracing-go/log" ) const SqlKind = "experimental/iox.sql" @@ -116,6 +119,10 @@ func (s *sqlSource) createSchema(schema *stdarrow.Schema) ([]flux.ColMeta, error } func (s *sqlSource) run(ctx context.Context) error { + span, ctx := opentracing.StartSpanFromContext(ctx, "sqlSouce.run") + defer span.Finish() + span.LogFields(log.String("query", s.query)) + // Note: query args are not actually supported yet, see // https://github.com/influxdata/influxdb_iox/issues/3718 rr, err := s.client.Query(ctx, s.query, nil, s.mem) @@ -130,14 +137,31 @@ func (s *sqlSource) run(ctx context.Context) error { } key := execute.NewGroupKey(nil, nil) - for rr.Next() { + hasMore, err := nextRecordBatch(rr) + for hasMore && err == nil { if err := s.produce(key, cols, rr.Record()); err != nil { + ext.LogError(span, err) return err } + hasMore, err = nextRecordBatch(rr) } + if err != nil { + ext.LogError(span, err) + return err + } + return nil } +func nextRecordBatch(rr iox.RecordReader) (bool, error) { + n := rr.Next() + if n { + return true, nil + } + + return false, rr.Err() +} + func (s *sqlSource) produce(key flux.GroupKey, cols []flux.ColMeta, record stdarrow.Record) error { buffer := arrow.TableBuffer{ GroupKey: key,