Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(stdlib/sql): add BigQuery support #2925

Merged
merged 8 commits into from
Aug 4, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 9 additions & 7 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,15 @@ module github.com/influxdata/flux
go 1.12

require (
cloud.google.com/go v0.43.0
cloud.google.com/go v0.57.0
cloud.google.com/go/bigtable v1.3.0
github.com/Azure/go-autorest/autorest v0.10.1
github.com/Azure/go-autorest/autorest/adal v0.8.3
github.com/Azure/go-autorest/autorest/azure/auth v0.4.2
github.com/DATA-DOG/go-sqlmock v1.4.1
github.com/andreyvit/diff v0.0.0-20170406064948-c7f18ee00883
github.com/apache/arrow/go/arrow v0.0.0-20191024131854-af6fa24be0db
github.com/bonitoo-io/go-sql-bigquery v0.3.4
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see that you forked solcates repository, but also merged some changes to there, does that mean we can use the original github.com/solcates/go-sql-bigquery now?

Copy link
Contributor Author

@alespour alespour Jul 23, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since I wasn't getting a response from the maintainer who seem to have abandoned the project, I gave up on hoping on the merge and continued with modifications in the bonitoo-io fork. The owner then commented later that he _ had totally forgotten about this project_ and merged the PR, but the fork has diverted more from the original before that, into more pure SQL golang driver unlike the original which also contained some ORM stuff etc.

The original project does not seem to be active (no commits since Oct 27, 2019) and maintained (eg. PR was merge despite failing check due to some missing 3rd party lib).

github.com/c-bata/go-prompt v0.2.2
github.com/cespare/xxhash v1.1.0
github.com/dave/jennifer v1.2.0
Expand All @@ -19,7 +21,7 @@ require (
github.com/golang/geo v0.0.0-20190916061304-5b978397cfec
github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db // indirect
github.com/google/flatbuffers v1.11.0
github.com/google/go-cmp v0.3.0
github.com/google/go-cmp v0.4.0
github.com/google/uuid v1.1.1 // indirect
github.com/inconshreveable/mousetrap v1.0.0 // indirect
github.com/influxdata/line-protocol v0.0.0-20180522152040-32c6aa80de5e
Expand All @@ -38,7 +40,7 @@ require (
github.com/pkg/browser v0.0.0-20180916011732-0a3d74bf9ce4 // indirect
github.com/pkg/errors v0.9.1
github.com/pkg/term v0.0.0-20180730021639-bffc007b7fd5 // indirect
github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm curious why you needed to bump the version of this? Maybe as I review it will become clear.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was not intentional, I do not know how to make mod tidy not update it... :(

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I guess that maybe one of the new dependencies needed a newer version of it then.

github.com/prometheus/common v0.6.0
github.com/satori/go.uuid v1.2.1-0.20181028125025-b2ce2384e17b
github.com/segmentio/kafka-go v0.1.0
Expand All @@ -48,9 +50,9 @@ require (
github.com/uber/athenadriver v1.1.4
go.uber.org/zap v1.14.0
golang.org/x/crypto v0.0.0-20200510223506-06a226fb4e37 // indirect
golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522
golang.org/x/net v0.0.0-20200226121028-0de0cce0169b
golang.org/x/tools v0.0.0-20200304024140-c4206d458c3f
golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6
golang.org/x/net v0.0.0-20200513185701-a91f0712d120
golang.org/x/tools v0.0.0-20200515010526-7d3b6ebf133d
gonum.org/v1/gonum v0.0.0-20181121035319-3f7ecaa7e8ca
google.golang.org/api v0.7.0
google.golang.org/api v0.24.0
)
234 changes: 214 additions & 20 deletions go.sum

Large diffs are not rendered by default.

174 changes: 174 additions & 0 deletions stdlib/sql/bigquery.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
package sql

import (
"database/sql"
"math/big"
"time"

"cloud.google.com/go/civil"
_ "github.com/bonitoo-io/go-sql-bigquery"
"github.com/influxdata/flux"
"github.com/influxdata/flux/codes"
"github.com/influxdata/flux/execute"
"github.com/influxdata/flux/internal/errors"
"github.com/influxdata/flux/values"
)

// Google BigQuery support.
// Notes:
// * data types
// https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types
// * connection string
// https://github.com/bonitoo-io/go-sql-bigquery#connection-string

type BigQueryRowReader struct {
Cursor *sql.Rows
columns []interface{}
columnTypes []flux.ColType
columnNames []string
sqlTypes []*sql.ColumnType
NextFunc func() bool
CloseFunc func() error
}

// Next prepares BigQueryRowReader to return rows
func (m *BigQueryRowReader) Next() bool {
if m.NextFunc != nil {
return m.NextFunc()
}
next := m.Cursor.Next()
if next {
columnNames, err := m.Cursor.Columns()
if err != nil {
return false
}
m.columns = make([]interface{}, len(columnNames))
columnPointers := make([]interface{}, len(columnNames))
for i := 0; i < len(columnNames); i++ {
columnPointers[i] = &m.columns[i]
}
if err := m.Cursor.Scan(columnPointers...); err != nil {
return false
}
}
return next
}

func (m *BigQueryRowReader) GetNextRow() ([]values.Value, error) {
row := make([]values.Value, len(m.columns))
for i, column := range m.columns {
switch value := column.(type) {
case bool, int64, float64, string:
row[i] = values.New(value)
case civil.Date:
row[i] = values.NewString(value.String())
case civil.DateTime:
row[i] = values.NewString(value.String())
case civil.Time:
row[i] = values.NewString(value.String())
case time.Time:
row[i] = values.NewTime(values.ConvertTime(value))
case *big.Int:
row[i] = values.NewInt(value.Int64())
case *big.Float:
f, _ := value.Float64()
row[i] = values.NewFloat(f)
case *big.Rat:
f, _ := value.Float64()
row[i] = values.NewFloat(f)
case nil:
row[i] = values.NewNull(flux.SemanticType(m.columnTypes[i]))
default:
execute.PanicUnknownType(flux.TInvalid)
}
}
return row, nil
}

func (m *BigQueryRowReader) InitColumnNames(names []string) {
m.columnNames = names
}

func (m *BigQueryRowReader) InitColumnTypes(types []*sql.ColumnType) {
fluxTypes := make([]flux.ColType, len(types))
for i := 0; i < len(types); i++ {
switch types[i].DatabaseTypeName() {
case "INTEGER":
fluxTypes[i] = flux.TInt
case "FLOAT", "NUMERIC":
fluxTypes[i] = flux.TFloat
case "BOOLEAN":
fluxTypes[i] = flux.TBool
case "TIMESTAMP": // "DATE", "TIME" and "DATETIME" will be represented as string because TZ is unknown
fluxTypes[i] = flux.TTime
default:
fluxTypes[i] = flux.TString
}
}
m.columnTypes = fluxTypes
m.sqlTypes = types
}

func (m *BigQueryRowReader) ColumnNames() []string {
return m.columnNames
}

func (m *BigQueryRowReader) ColumnTypes() []flux.ColType {
return m.columnTypes
}

func (m *BigQueryRowReader) SetColumnTypes(types []flux.ColType) {
m.columnTypes = types
}

func (m *BigQueryRowReader) SetColumns(i []interface{}) {
m.columns = i
}

func (m *BigQueryRowReader) Close() error {
if m.CloseFunc != nil {
return m.CloseFunc()
}
if err := m.Cursor.Err(); err != nil {
return err
}
return m.Cursor.Close()
}

func NewBigQueryRowReader(r *sql.Rows) (execute.RowReader, error) {
reader := &BigQueryRowReader{
Cursor: r,
}
cols, err := r.Columns()
if err != nil {
return nil, err
}
reader.InitColumnNames(cols)

types, err := r.ColumnTypes()
if err != nil {
return nil, err
}
reader.InitColumnTypes(types)

return reader, nil
}

var fluxToBigQuery = map[flux.ColType]string{
flux.TFloat: "FLOAT64",
flux.TInt: "INT64",
flux.TString: "STRING",
flux.TBool: "BOOL",
flux.TTime: "TIMESTAMP",
}

// BigQueryTranslateColumn translates flux colTypes into their corresponding BigQuery column type
func BigQueryColumnTranslateFunc() translationFunc {
return func(f flux.ColType, colName string) (string, error) {
s, found := fluxToBigQuery[f]
if !found {
return "", errors.Newf(codes.Internal, "BigQuery does not support column type %s", f.String())
}
return colName + " " + s, nil
}
}
2 changes: 2 additions & 0 deletions stdlib/sql/from.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,8 @@ func createFromSQLSource(prSpec plan.ProcedureSpec, dsid execute.DatasetID, a ex
newRowReader = NewMssqlRowReader
case "awsathena":
newRowReader = NewAwsAthenaRowReader
case "bigquery":
newRowReader = NewBigQueryRowReader
default:
return nil, errors.Newf(codes.Invalid, "sql driver %s not supported", spec.DriverName)
}
Expand Down
16 changes: 16 additions & 0 deletions stdlib/sql/from_private_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,14 @@ func TestFromSqlUrlValidation(t *testing.T) {
Query: "",
},
ErrMsg: "",
}, {
Name: "ok bigquery",
Spec: &FromSQLProcedureSpec{
DriverName: "bigquery",
DataSourceName: "bigquery://project1/?dataset=dataset1",
Query: "",
},
ErrMsg: "",
}, {
Name: "invalid driver",
Spec: &FromSQLProcedureSpec{
Expand Down Expand Up @@ -107,6 +115,14 @@ func TestFromSqlUrlValidation(t *testing.T) {
},
V: url.PrivateIPValidator{},
ErrMsg: "it connects to a private IP",
}, {
Name: "invalid bigquery",
Spec: &FromSQLProcedureSpec{
DriverName: "bigquery",
DataSourceName: "biqquery://project1/?dataset=dataset1",
Query: "",
},
ErrMsg: "invalid prefix",
}, {
Name: "invalid sqlmock",
Spec: &FromSQLProcedureSpec{
Expand Down
12 changes: 12 additions & 0 deletions stdlib/sql/source_validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
neturl "net/url"
"strings"

"github.com/bonitoo-io/go-sql-bigquery"
"github.com/go-sql-driver/mysql"
"github.com/influxdata/flux/codes"
"github.com/influxdata/flux/dependencies/url"
Expand Down Expand Up @@ -90,6 +91,17 @@ func validateDataSource(validator url.Validator, driverName string, dataSourceNa
if err != nil {
return errors.Newf(codes.Invalid, "invalid data source url: %v", err)
}
case "bigquery":
// an example is: bigquery://projectid/location?dataset=datasetid
cfg, err := bigquery.ConfigFromConnString(dataSourceName)
if err != nil {
return errors.Newf(codes.Invalid, "invalid data source dsn: %v", err)
}
u = &neturl.URL{
Scheme: "bigquery",
Host: cfg.ProjectID,
Path: cfg.Location,
}
default:
return errors.Newf(codes.Invalid, "sql driver %s not supported", driverName)
}
Expand Down
Loading