Skip to content

Commit

Permalink
feat(stdlib/sql): add BigQuery support (#2925)
Browse files Browse the repository at this point in the history
  • Loading branch information
alespour authored Aug 4, 2020
1 parent 3fa170d commit 8d29781
Show file tree
Hide file tree
Showing 9 changed files with 788 additions and 27 deletions.
16 changes: 9 additions & 7 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,15 @@ module github.com/influxdata/flux
go 1.12

require (
cloud.google.com/go v0.43.0
cloud.google.com/go v0.57.0
cloud.google.com/go/bigtable v1.3.0
github.com/Azure/go-autorest/autorest v0.10.1
github.com/Azure/go-autorest/autorest/adal v0.8.3
github.com/Azure/go-autorest/autorest/azure/auth v0.4.2
github.com/DATA-DOG/go-sqlmock v1.4.1
github.com/andreyvit/diff v0.0.0-20170406064948-c7f18ee00883
github.com/apache/arrow/go/arrow v0.0.0-20191024131854-af6fa24be0db
github.com/bonitoo-io/go-sql-bigquery v0.3.4
github.com/c-bata/go-prompt v0.2.2
github.com/cespare/xxhash v1.1.0
github.com/dave/jennifer v1.2.0
Expand All @@ -19,7 +21,7 @@ require (
github.com/golang/geo v0.0.0-20190916061304-5b978397cfec
github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db // indirect
github.com/google/flatbuffers v1.11.0
github.com/google/go-cmp v0.3.0
github.com/google/go-cmp v0.4.0
github.com/google/uuid v1.1.1 // indirect
github.com/inconshreveable/mousetrap v1.0.0 // indirect
github.com/influxdata/line-protocol v0.0.0-20180522152040-32c6aa80de5e
Expand All @@ -38,7 +40,7 @@ require (
github.com/pkg/browser v0.0.0-20180916011732-0a3d74bf9ce4 // indirect
github.com/pkg/errors v0.9.1
github.com/pkg/term v0.0.0-20180730021639-bffc007b7fd5 // indirect
github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4
github.com/prometheus/common v0.6.0
github.com/satori/go.uuid v1.2.1-0.20181028125025-b2ce2384e17b
github.com/segmentio/kafka-go v0.1.0
Expand All @@ -48,9 +50,9 @@ require (
github.com/uber/athenadriver v1.1.4
go.uber.org/zap v1.14.0
golang.org/x/crypto v0.0.0-20200510223506-06a226fb4e37 // indirect
golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522
golang.org/x/net v0.0.0-20200226121028-0de0cce0169b
golang.org/x/tools v0.0.0-20200304024140-c4206d458c3f
golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6
golang.org/x/net v0.0.0-20200513185701-a91f0712d120
golang.org/x/tools v0.0.0-20200515010526-7d3b6ebf133d
gonum.org/v1/gonum v0.0.0-20181121035319-3f7ecaa7e8ca
google.golang.org/api v0.7.0
google.golang.org/api v0.24.0
)
234 changes: 214 additions & 20 deletions go.sum

Large diffs are not rendered by default.

174 changes: 174 additions & 0 deletions stdlib/sql/bigquery.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
package sql

import (
"database/sql"
"math/big"
"time"

"cloud.google.com/go/civil"
_ "github.com/bonitoo-io/go-sql-bigquery"
"github.com/influxdata/flux"
"github.com/influxdata/flux/codes"
"github.com/influxdata/flux/execute"
"github.com/influxdata/flux/internal/errors"
"github.com/influxdata/flux/values"
)

// Google BigQuery support.
// Notes:
// * data types
// https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types
// * connection string
// https://github.com/bonitoo-io/go-sql-bigquery#connection-string

type BigQueryRowReader struct {
Cursor *sql.Rows
columns []interface{}
columnTypes []flux.ColType
columnNames []string
sqlTypes []*sql.ColumnType
NextFunc func() bool
CloseFunc func() error
}

// Next prepares BigQueryRowReader to return rows
func (m *BigQueryRowReader) Next() bool {
if m.NextFunc != nil {
return m.NextFunc()
}
next := m.Cursor.Next()
if next {
columnNames, err := m.Cursor.Columns()
if err != nil {
return false
}
m.columns = make([]interface{}, len(columnNames))
columnPointers := make([]interface{}, len(columnNames))
for i := 0; i < len(columnNames); i++ {
columnPointers[i] = &m.columns[i]
}
if err := m.Cursor.Scan(columnPointers...); err != nil {
return false
}
}
return next
}

func (m *BigQueryRowReader) GetNextRow() ([]values.Value, error) {
row := make([]values.Value, len(m.columns))
for i, column := range m.columns {
switch value := column.(type) {
case bool, int64, float64, string:
row[i] = values.New(value)
case civil.Date:
row[i] = values.NewString(value.String())
case civil.DateTime:
row[i] = values.NewString(value.String())
case civil.Time:
row[i] = values.NewString(value.String())
case time.Time:
row[i] = values.NewTime(values.ConvertTime(value))
case *big.Int:
row[i] = values.NewInt(value.Int64())
case *big.Float:
f, _ := value.Float64()
row[i] = values.NewFloat(f)
case *big.Rat:
f, _ := value.Float64()
row[i] = values.NewFloat(f)
case nil:
row[i] = values.NewNull(flux.SemanticType(m.columnTypes[i]))
default:
execute.PanicUnknownType(flux.TInvalid)
}
}
return row, nil
}

func (m *BigQueryRowReader) InitColumnNames(names []string) {
m.columnNames = names
}

func (m *BigQueryRowReader) InitColumnTypes(types []*sql.ColumnType) {
fluxTypes := make([]flux.ColType, len(types))
for i := 0; i < len(types); i++ {
switch types[i].DatabaseTypeName() {
case "INTEGER":
fluxTypes[i] = flux.TInt
case "FLOAT", "NUMERIC":
fluxTypes[i] = flux.TFloat
case "BOOLEAN":
fluxTypes[i] = flux.TBool
case "TIMESTAMP": // "DATE", "TIME" and "DATETIME" will be represented as string because TZ is unknown
fluxTypes[i] = flux.TTime
default:
fluxTypes[i] = flux.TString
}
}
m.columnTypes = fluxTypes
m.sqlTypes = types
}

func (m *BigQueryRowReader) ColumnNames() []string {
return m.columnNames
}

func (m *BigQueryRowReader) ColumnTypes() []flux.ColType {
return m.columnTypes
}

func (m *BigQueryRowReader) SetColumnTypes(types []flux.ColType) {
m.columnTypes = types
}

func (m *BigQueryRowReader) SetColumns(i []interface{}) {
m.columns = i
}

func (m *BigQueryRowReader) Close() error {
if m.CloseFunc != nil {
return m.CloseFunc()
}
if err := m.Cursor.Err(); err != nil {
return err
}
return m.Cursor.Close()
}

func NewBigQueryRowReader(r *sql.Rows) (execute.RowReader, error) {
reader := &BigQueryRowReader{
Cursor: r,
}
cols, err := r.Columns()
if err != nil {
return nil, err
}
reader.InitColumnNames(cols)

types, err := r.ColumnTypes()
if err != nil {
return nil, err
}
reader.InitColumnTypes(types)

return reader, nil
}

var fluxToBigQuery = map[flux.ColType]string{
flux.TFloat: "FLOAT64",
flux.TInt: "INT64",
flux.TString: "STRING",
flux.TBool: "BOOL",
flux.TTime: "TIMESTAMP",
}

// BigQueryTranslateColumn translates flux colTypes into their corresponding BigQuery column type
func BigQueryColumnTranslateFunc() translationFunc {
return func(f flux.ColType, colName string) (string, error) {
s, found := fluxToBigQuery[f]
if !found {
return "", errors.Newf(codes.Internal, "BigQuery does not support column type %s", f.String())
}
return colName + " " + s, nil
}
}
2 changes: 2 additions & 0 deletions stdlib/sql/from.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,8 @@ func createFromSQLSource(prSpec plan.ProcedureSpec, dsid execute.DatasetID, a ex
newRowReader = NewMssqlRowReader
case "awsathena":
newRowReader = NewAwsAthenaRowReader
case "bigquery":
newRowReader = NewBigQueryRowReader
default:
return nil, errors.Newf(codes.Invalid, "sql driver %s not supported", spec.DriverName)
}
Expand Down
16 changes: 16 additions & 0 deletions stdlib/sql/from_private_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,14 @@ func TestFromSqlUrlValidation(t *testing.T) {
Query: "",
},
ErrMsg: "",
}, {
Name: "ok bigquery",
Spec: &FromSQLProcedureSpec{
DriverName: "bigquery",
DataSourceName: "bigquery://project1/?dataset=dataset1",
Query: "",
},
ErrMsg: "",
}, {
Name: "invalid driver",
Spec: &FromSQLProcedureSpec{
Expand Down Expand Up @@ -107,6 +115,14 @@ func TestFromSqlUrlValidation(t *testing.T) {
},
V: url.PrivateIPValidator{},
ErrMsg: "it connects to a private IP",
}, {
Name: "invalid bigquery",
Spec: &FromSQLProcedureSpec{
DriverName: "bigquery",
DataSourceName: "biqquery://project1/?dataset=dataset1",
Query: "",
},
ErrMsg: "invalid prefix",
}, {
Name: "invalid sqlmock",
Spec: &FromSQLProcedureSpec{
Expand Down
12 changes: 12 additions & 0 deletions stdlib/sql/source_validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
neturl "net/url"
"strings"

"github.com/bonitoo-io/go-sql-bigquery"
"github.com/go-sql-driver/mysql"
"github.com/influxdata/flux/codes"
"github.com/influxdata/flux/dependencies/url"
Expand Down Expand Up @@ -90,6 +91,17 @@ func validateDataSource(validator url.Validator, driverName string, dataSourceNa
if err != nil {
return errors.Newf(codes.Invalid, "invalid data source url: %v", err)
}
case "bigquery":
// an example is: bigquery://projectid/location?dataset=datasetid
cfg, err := bigquery.ConfigFromConnString(dataSourceName)
if err != nil {
return errors.Newf(codes.Invalid, "invalid data source dsn: %v", err)
}
u = &neturl.URL{
Scheme: "bigquery",
Host: cfg.ProjectID,
Path: cfg.Location,
}
default:
return errors.Newf(codes.Invalid, "sql driver %s not supported", driverName)
}
Expand Down
Loading

0 comments on commit 8d29781

Please sign in to comment.