Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(stdlib/sql): add BigQuery support (with no protobuf dep update in influxdb) #3116

Merged
merged 9 commits into from
Aug 31, 2020
Merged
17 changes: 9 additions & 8 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,15 @@ module github.com/influxdata/flux
go 1.12

require (
cloud.google.com/go v0.43.0
cloud.google.com/go v0.52.0
cloud.google.com/go/bigtable v1.3.0
github.com/Azure/go-autorest/autorest v0.10.1
github.com/Azure/go-autorest/autorest/adal v0.8.3
github.com/Azure/go-autorest/autorest/azure/auth v0.4.2
github.com/DATA-DOG/go-sqlmock v1.4.1
github.com/andreyvit/diff v0.0.0-20170406064948-c7f18ee00883
github.com/apache/arrow/go/arrow v0.0.0-20191024131854-af6fa24be0db
github.com/bonitoo-io/go-sql-bigquery v0.3.4-1.4.0
github.com/c-bata/go-prompt v0.2.2
github.com/cespare/xxhash v1.1.0
github.com/dave/jennifer v1.2.0
Expand All @@ -19,7 +21,7 @@ require (
github.com/golang/geo v0.0.0-20190916061304-5b978397cfec
github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db // indirect
github.com/google/flatbuffers v1.11.0
github.com/google/go-cmp v0.3.0
github.com/google/go-cmp v0.4.0
github.com/google/uuid v1.1.1 // indirect
github.com/inconshreveable/mousetrap v1.0.0 // indirect
github.com/influxdata/line-protocol v0.0.0-20180522152040-32c6aa80de5e
Expand All @@ -38,7 +40,7 @@ require (
github.com/pkg/browser v0.0.0-20180916011732-0a3d74bf9ce4 // indirect
github.com/pkg/errors v0.9.1
github.com/pkg/term v0.0.0-20180730021639-bffc007b7fd5 // indirect
github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4
github.com/prometheus/common v0.6.0
github.com/satori/go.uuid v1.2.1-0.20181028125025-b2ce2384e17b
github.com/segmentio/kafka-go v0.1.0
Expand All @@ -47,10 +49,9 @@ require (
github.com/spf13/cobra v0.0.3
github.com/uber/athenadriver v1.1.4
go.uber.org/zap v1.14.0
golang.org/x/crypto v0.0.0-20200510223506-06a226fb4e37 // indirect
golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522
golang.org/x/net v0.0.0-20200226121028-0de0cce0169b
golang.org/x/tools v0.0.0-20200304024140-c4206d458c3f
golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6
golang.org/x/net v0.0.0-20200625001655-4c5254603344
golang.org/x/tools v0.0.0-20200721032237-77f530d86f9a
gonum.org/v1/gonum v0.0.0-20181121035319-3f7ecaa7e8ca
google.golang.org/api v0.7.0
google.golang.org/api v0.17.0
)
144 changes: 138 additions & 6 deletions go.sum

Large diffs are not rendered by default.

174 changes: 174 additions & 0 deletions stdlib/sql/bigquery.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
package sql

import (
"database/sql"
"math/big"
"time"

"cloud.google.com/go/civil"
_ "github.com/bonitoo-io/go-sql-bigquery"
"github.com/influxdata/flux"
"github.com/influxdata/flux/codes"
"github.com/influxdata/flux/execute"
"github.com/influxdata/flux/internal/errors"
"github.com/influxdata/flux/values"
)

// Google BigQuery support.
// Notes:
// * data types
// https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types
// * connection string
// https://github.com/bonitoo-io/go-sql-bigquery#connection-string

type BigQueryRowReader struct {
Cursor *sql.Rows
columns []interface{}
columnTypes []flux.ColType
columnNames []string
sqlTypes []*sql.ColumnType
NextFunc func() bool
CloseFunc func() error
}

// Next prepares BigQueryRowReader to return rows
func (m *BigQueryRowReader) Next() bool {
if m.NextFunc != nil {
return m.NextFunc()
}
next := m.Cursor.Next()
if next {
columnNames, err := m.Cursor.Columns()
if err != nil {
return false
}
m.columns = make([]interface{}, len(columnNames))
columnPointers := make([]interface{}, len(columnNames))
for i := 0; i < len(columnNames); i++ {
columnPointers[i] = &m.columns[i]
}
if err := m.Cursor.Scan(columnPointers...); err != nil {
return false
}
}
return next
}

func (m *BigQueryRowReader) GetNextRow() ([]values.Value, error) {
row := make([]values.Value, len(m.columns))
for i, column := range m.columns {
switch value := column.(type) {
case bool, int64, float64, string:
row[i] = values.New(value)
case civil.Date:
row[i] = values.NewString(value.String())
case civil.DateTime:
row[i] = values.NewString(value.String())
case civil.Time:
row[i] = values.NewString(value.String())
case time.Time:
row[i] = values.NewTime(values.ConvertTime(value))
case *big.Int:
row[i] = values.NewInt(value.Int64())
case *big.Float:
f, _ := value.Float64()
row[i] = values.NewFloat(f)
case *big.Rat:
f, _ := value.Float64()
row[i] = values.NewFloat(f)
case nil:
row[i] = values.NewNull(flux.SemanticType(m.columnTypes[i]))
default:
execute.PanicUnknownType(flux.TInvalid)
}
}
return row, nil
}

func (m *BigQueryRowReader) InitColumnNames(names []string) {
m.columnNames = names
}

func (m *BigQueryRowReader) InitColumnTypes(types []*sql.ColumnType) {
fluxTypes := make([]flux.ColType, len(types))
for i := 0; i < len(types); i++ {
switch types[i].DatabaseTypeName() {
case "INTEGER":
fluxTypes[i] = flux.TInt
case "FLOAT", "NUMERIC":
fluxTypes[i] = flux.TFloat
case "BOOLEAN":
fluxTypes[i] = flux.TBool
case "TIMESTAMP": // "DATE", "TIME" and "DATETIME" will be represented as string because TZ is unknown
fluxTypes[i] = flux.TTime
default:
fluxTypes[i] = flux.TString
}
}
m.columnTypes = fluxTypes
m.sqlTypes = types
}

func (m *BigQueryRowReader) ColumnNames() []string {
return m.columnNames
}

func (m *BigQueryRowReader) ColumnTypes() []flux.ColType {
return m.columnTypes
}

func (m *BigQueryRowReader) SetColumnTypes(types []flux.ColType) {
m.columnTypes = types
}

func (m *BigQueryRowReader) SetColumns(i []interface{}) {
m.columns = i
}

func (m *BigQueryRowReader) Close() error {
if m.CloseFunc != nil {
return m.CloseFunc()
}
if err := m.Cursor.Err(); err != nil {
return err
}
return m.Cursor.Close()
}

func NewBigQueryRowReader(r *sql.Rows) (execute.RowReader, error) {
reader := &BigQueryRowReader{
Cursor: r,
}
cols, err := r.Columns()
if err != nil {
return nil, err
}
reader.InitColumnNames(cols)

types, err := r.ColumnTypes()
if err != nil {
return nil, err
}
reader.InitColumnTypes(types)

return reader, nil
}

var fluxToBigQuery = map[flux.ColType]string{
flux.TFloat: "FLOAT64",
flux.TInt: "INT64",
flux.TString: "STRING",
flux.TBool: "BOOL",
flux.TTime: "TIMESTAMP",
}

// BigQueryTranslateColumn translates flux colTypes into their corresponding BigQuery column type
func BigQueryColumnTranslateFunc() translationFunc {
return func(f flux.ColType, colName string) (string, error) {
s, found := fluxToBigQuery[f]
if !found {
return "", errors.Newf(codes.Internal, "BigQuery does not support column type %s", f.String())
}
return colName + " " + s, nil
}
}
2 changes: 2 additions & 0 deletions stdlib/sql/from.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,8 @@ func createFromSQLSource(prSpec plan.ProcedureSpec, dsid execute.DatasetID, a ex
newRowReader = NewMssqlRowReader
case "awsathena":
newRowReader = NewAwsAthenaRowReader
case "bigquery":
newRowReader = NewBigQueryRowReader
default:
return nil, errors.Newf(codes.Invalid, "sql driver %s not supported", spec.DriverName)
}
Expand Down
16 changes: 16 additions & 0 deletions stdlib/sql/from_private_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,14 @@ func TestFromSqlUrlValidation(t *testing.T) {
Query: "",
},
ErrMsg: "",
}, {
Name: "ok bigquery",
Spec: &FromSQLProcedureSpec{
DriverName: "bigquery",
DataSourceName: "bigquery://project1/?dataset=dataset1",
Query: "",
},
ErrMsg: "",
}, {
Name: "invalid driver",
Spec: &FromSQLProcedureSpec{
Expand Down Expand Up @@ -107,6 +115,14 @@ func TestFromSqlUrlValidation(t *testing.T) {
},
V: url.PrivateIPValidator{},
ErrMsg: "it connects to a private IP",
}, {
Name: "invalid bigquery",
Spec: &FromSQLProcedureSpec{
DriverName: "bigquery",
DataSourceName: "biqquery://project1/?dataset=dataset1",
Query: "",
},
ErrMsg: "invalid prefix",
}, {
Name: "invalid sqlmock",
Spec: &FromSQLProcedureSpec{
Expand Down
12 changes: 12 additions & 0 deletions stdlib/sql/source_validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
neturl "net/url"
"strings"

"github.com/bonitoo-io/go-sql-bigquery"
"github.com/go-sql-driver/mysql"
"github.com/influxdata/flux/codes"
"github.com/influxdata/flux/dependencies/url"
Expand Down Expand Up @@ -90,6 +91,17 @@ func validateDataSource(validator url.Validator, driverName string, dataSourceNa
if err != nil {
return errors.Newf(codes.Invalid, "invalid data source url: %v", err)
}
case "bigquery":
// an example is: bigquery://projectid/location?dataset=datasetid
cfg, err := bigquery.ConfigFromConnString(dataSourceName)
if err != nil {
return errors.Newf(codes.Invalid, "invalid data source dsn: %v", err)
}
u = &neturl.URL{
Scheme: "bigquery",
Host: cfg.ProjectID,
Path: cfg.Location,
}
default:
return errors.Newf(codes.Invalid, "sql driver %s not supported", driverName)
}
Expand Down
Loading