diff --git a/bigquery/storage_integration_test.go b/bigquery/storage_integration_test.go index a532e69fcefc..88924ec758a9 100644 --- a/bigquery/storage_integration_test.go +++ b/bigquery/storage_integration_test.go @@ -22,6 +22,7 @@ import ( "time" "cloud.google.com/go/internal/testutil" + "github.com/google/go-cmp/cmp" "google.golang.org/api/iterator" ) @@ -233,10 +234,22 @@ func TestIntegration_StorageReadQueryOrdering(t *testing.T) { t.Fatal(err) } + var firstValue S + err = it.Next(&firstValue) + if err != nil { + t.Fatal(err) + } + + if cmp.Equal(firstValue, S{}) { + t.Fatalf("user defined struct was not filled with data") + } + total, err := countIteratorRows(it) if err != nil { t.Fatal(err) } + total++ // as we read the first value separately + bqSession := it.arrowIterator.session.bqSession if len(bqSession.Streams) == 0 { t.Fatalf("%s: expected to use at least one stream but found %d", tc.name, len(bqSession.Streams)) @@ -263,6 +276,56 @@ func TestIntegration_StorageReadQueryOrdering(t *testing.T) { } } +func TestIntegration_StorageReadQueryStruct(t *testing.T) { + if client == nil { + t.Skip("Integration tests skipped") + } + ctx := context.Background() + table := "`bigquery-public-data.samples.wikipedia`" + sql := fmt.Sprintf(`SELECT id, title, timestamp, comment FROM %s LIMIT 1000`, table) + q := storageOptimizedClient.Query(sql) + q.forceStorageAPI = true + q.DisableQueryCache = true + it, err := q.Read(ctx) + if err != nil { + t.Fatal(err) + } + if !it.IsAccelerated() { + t.Fatal("expected query to use Storage API") + } + + type S struct { + ID int64 + Title string + Timestamp int64 + Comment NullString + } + + total := uint64(0) + for { + var dst S + err := it.Next(&dst) + if err == iterator.Done { + break + } + if err != nil { + t.Fatalf("failed to fetch via storage API: %v", err) + } + if cmp.Equal(dst, S{}) { + t.Fatalf("user defined struct was not filled with data") + } + total++ + } + + bqSession := it.arrowIterator.session.bqSession + if len(bqSession.Streams) == 0 { + t.Fatalf("should use more than one stream but found %d", len(bqSession.Streams)) + } + if total != it.TotalRows { + t.Fatalf("should have read %d rows, but read %d", it.TotalRows, total) + } +} + func TestIntegration_StorageReadQueryMorePages(t *testing.T) { if client == nil { t.Skip("Integration tests skipped") @@ -287,10 +350,22 @@ func TestIntegration_StorageReadQueryMorePages(t *testing.T) { Forks NullInt64 } + var firstValue S + err = it.Next(&firstValue) + if err != nil { + t.Fatal(err) + } + + if cmp.Equal(firstValue, S{}) { + t.Fatalf("user defined struct was not filled with data") + } + total, err := countIteratorRows(it) if err != nil { t.Fatal(err) } + total++ // as we read the first value separately + bqSession := it.arrowIterator.session.bqSession if len(bqSession.Streams) == 0 { t.Fatalf("should use more than one stream but found %d", len(bqSession.Streams)) diff --git a/bigquery/storage_iterator.go b/bigquery/storage_iterator.go index 1c7fa3cc6c53..48f131be0c17 100644 --- a/bigquery/storage_iterator.go +++ b/bigquery/storage_iterator.go @@ -61,6 +61,7 @@ func newStorageRowIteratorFromTable(ctx context.Context, table *Table, ordered b return nil, err } it.arrowIterator.schema = md.Schema + it.Schema = md.Schema return it, nil } @@ -163,7 +164,9 @@ func nextFuncForStorageIterator(it *RowIterator) func() error { if err != nil { return err } - + if it.Schema == nil { + it.Schema = it.arrowIterator.schema + } rows, err := arrowIt.decoder.decodeArrowRecords(record) if err != nil { return err