Skip to content

Commit

Permalink
Order result-set columns by query projection ordering (#16)
Browse files Browse the repository at this point in the history
  • Loading branch information
flobernd authored Jun 21, 2023
1 parent 9ba7905 commit 05f8e19
Showing 1 changed file with 41 additions and 12 deletions.
53 changes: 41 additions & 12 deletions pkg/plugin/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/SnellerInc/sneller/ion"
"github.com/grafana/grafana-plugin-sdk-go/data"
"golang.org/x/exp/slices"
)

// frameFromSnellerResult builds a Grafana data frame from a raw Sneller query result.
Expand Down Expand Up @@ -362,10 +363,11 @@ type snellerColumn struct {
}

type snellerFinalStatus struct {
Hits int64 `ion:"hits"`
Misses int64 `ion:"misses"`
Scanned int64 `ion:"scanned"`
Error string `ion:"error"`
Hits int64 `ion:"hits"`
Misses int64 `ion:"misses"`
Scanned int64 `ion:"scanned"`
Error string `ion:"error"`
ResultSet ion.Datum `ion:"result_set"`
}

type snellerQueryError struct {
Expand All @@ -374,20 +376,21 @@ type snellerQueryError struct {

// snellerSchema represents the derived schema of a Sneller query result-set.
type snellerSchema struct {
RowCount int // The total number of rows returned by the query
Columns map[string]*snellerColumn // The individual columns indexed by their field names
FinalStatus *snellerFinalStatus // The final query status
RowCount int // The total number of rows returned by the query
Columns []*snellerColumn // The individual columns
FinalStatus *snellerFinalStatus // The final query status
}

func deriveSchema(buf []byte) (*snellerSchema, error) {
schema := snellerSchema{
RowCount: 0,
Columns: map[string]*snellerColumn{},
Columns: []*snellerColumn{},
}
lookup := map[string]*snellerColumn{}

status, err := iterateRows(buf, func(reader *IonReader, index int) error {
schema.RowCount += 1
return analyzeRow(reader, &schema)
return analyzeRow(reader, &schema, lookup)
})
if err != nil {
return nil, err
Expand All @@ -402,10 +405,34 @@ func deriveSchema(buf []byte) (*snellerSchema, error) {
}
}

if status.ResultSet.IsEmpty() {
return &schema, nil
}

// Restore column order
index := 0
err = status.ResultSet.UnpackStruct(func(field ion.Field) error {
for _, col := range schema.Columns {
if col.Name == field.Label {
col.Index = index
break
}
}
index++
return nil
})
if err != nil {
return nil, err
}

slices.SortFunc(schema.Columns, func(a, b *snellerColumn) bool {
return a.Index < b.Index
})

return &schema, nil
}

func analyzeRow(reader *IonReader, schema *snellerSchema) error {
func analyzeRow(reader *IonReader, schema *snellerSchema, lookup map[string]*snellerColumn) error {
index := 0
for reader.Next() {
name, err := reader.FieldName()
Expand All @@ -416,7 +443,7 @@ func analyzeRow(reader *IonReader, schema *snellerSchema) error {
ionType := reader.Type()
snellerType := snellerType(ionType)

col, ok := schema.Columns[name]
col, ok := lookup[name]
if !ok {
col = &snellerColumn{
Index: index,
Expand All @@ -427,7 +454,8 @@ func analyzeRow(reader *IonReader, schema *snellerSchema) error {
Optional: schema.RowCount != 1,
Count: 0,
}
schema.Columns[name] = col
lookup[name] = col
schema.Columns = append(schema.Columns, col)
}

if index != col.Index {
Expand Down Expand Up @@ -464,6 +492,7 @@ func analyzeRow(reader *IonReader, schema *snellerSchema) error {

index++
}

return reader.Error()
}

Expand Down

0 comments on commit 05f8e19

Please sign in to comment.