Skip to content

Commit

Permalink
fix: update iox.sql to detect midstream errors
Browse files Browse the repository at this point in the history
BREAKING CHANGE: Flux's abstraction for IOx's client is changing.
  • Loading branch information
Christopher Wolff committed Dec 17, 2022
1 parent b9d6eb6 commit b7801f3
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 2 deletions.
9 changes: 8 additions & 1 deletion dependencies/iox/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,18 @@ func GetProvider(ctx context.Context) Provider {
return p.(Provider)
}

// RecordReader is similar to the RecordReader interface provided by Arrow's array
// package, but includes a method for detecting errors that are sent mid-stream.
type RecordReader interface {
array.RecordReader
Err() error
}

// Client provides a way to query an iox instance.
type Client interface {
// Query will initiate a query using the given query string, parameters, and memory allocator
// against the iox instance. It returns an array.RecordReader from the arrow flight api.
Query(ctx context.Context, query string, params []interface{}, mem memory.Allocator) (array.RecordReader, error)
Query(ctx context.Context, query string, params []interface{}, mem memory.Allocator) (RecordReader, error)

// GetSchema will retrieve a schema for the given table if this client supports that capability.
// If this Client doesn't support this capability, it should return a flux error with the code
Expand Down
26 changes: 25 additions & 1 deletion stdlib/experimental/iox/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ import (
"github.com/influxdata/flux/internal/function"
"github.com/influxdata/flux/memory"
"github.com/influxdata/flux/plan"
"github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go/ext"
"github.com/opentracing/opentracing-go/log"
)

const SqlKind = "experimental/iox.sql"
Expand Down Expand Up @@ -116,6 +119,10 @@ func (s *sqlSource) createSchema(schema *stdarrow.Schema) ([]flux.ColMeta, error
}

func (s *sqlSource) run(ctx context.Context) error {
span, ctx := opentracing.StartSpanFromContext(ctx, "sqlSouce.run")
defer span.Finish()
span.LogFields(log.String("query", s.query))

// Note: query args are not actually supported yet, see
// https://github.com/influxdata/influxdb_iox/issues/3718
rr, err := s.client.Query(ctx, s.query, nil, s.mem)
Expand All @@ -130,14 +137,31 @@ func (s *sqlSource) run(ctx context.Context) error {
}
key := execute.NewGroupKey(nil, nil)

for rr.Next() {
hasMore, err := nextRecordBatch(rr)
for hasMore && err == nil {
if err := s.produce(key, cols, rr.Record()); err != nil {
ext.LogError(span, err)
return err
}
hasMore, err = nextRecordBatch(rr)
}
if err != nil {
ext.LogError(span, err)
return err
}

return nil
}

func nextRecordBatch(rr iox.RecordReader) (bool, error) {
n := rr.Next()
if n {
return true, nil
}

return false, rr.Err()
}

func (s *sqlSource) produce(key flux.GroupKey, cols []flux.ColMeta, record stdarrow.Record) error {
buffer := arrow.TableBuffer{
GroupKey: key,
Expand Down

0 comments on commit b7801f3

Please sign in to comment.