diff --git a/go.mod b/go.mod index 96f1394c47..0c1ac81de5 100644 --- a/go.mod +++ b/go.mod @@ -9,6 +9,7 @@ require ( github.com/Azure/go-autorest/autorest/adal v0.8.3 github.com/Azure/go-autorest/autorest/azure/auth v0.4.2 github.com/DATA-DOG/go-sqlmock v1.4.1 + github.com/SAP/go-hdb v0.14.1 github.com/andreyvit/diff v0.0.0-20170406064948-c7f18ee00883 github.com/apache/arrow/go/arrow v0.0.0-20200923215132-ac86123a3f01 github.com/bonitoo-io/go-sql-bigquery v0.3.4-1.4.0 diff --git a/go.sum b/go.sum index ea64f80e98..3f95eeccce 100644 --- a/go.sum +++ b/go.sum @@ -58,6 +58,8 @@ github.com/Masterminds/semver v1.4.2 h1:WBLTQ37jOCzSLtXNdoo8bNM8876KhNqOKvrlGITg github.com/Masterminds/semver v1.4.2/go.mod h1:MB6lktGJrhw8PrUyiEoblNEGEQ+RzHPF078ddwwvV3Y= github.com/OneOfOne/xxhash v1.2.2 h1:KMrpdQIwFcEqXDklaen+P1axHaj9BSKzvpUUfnHldSE= github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= +github.com/SAP/go-hdb v0.14.1 h1:hkw4ozGZ/i4eak7ZuGkY5e0hxiXFdNUBNhr4AvZVNFE= +github.com/SAP/go-hdb v0.14.1/go.mod h1:7fdQLVC2lER3urZLjZCm0AuMQfApof92n3aylBPEkMo= github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= github.com/andreyvit/diff v0.0.0-20170406064948-c7f18ee00883 h1:bvNMNQO63//z+xNgfBlViaCIJKLlCJ6/fmUseuG0wVQ= diff --git a/stdlib/sql/from.go b/stdlib/sql/from.go index 975f540925..4f9043b81a 100644 --- a/stdlib/sql/from.go +++ b/stdlib/sql/from.go @@ -127,6 +127,8 @@ func createFromSQLSource(prSpec plan.ProcedureSpec, dsid execute.DatasetID, a ex newRowReader = NewAwsAthenaRowReader case "bigquery": newRowReader = NewBigQueryRowReader + case "hdb": + newRowReader = NewHdbRowReader default: return nil, errors.Newf(codes.Invalid, "sql driver %s not supported", spec.DriverName) } diff --git a/stdlib/sql/from_private_test.go b/stdlib/sql/from_private_test.go index 61bc13f25d..323e43cdef 100644 --- a/stdlib/sql/from_private_test.go +++ b/stdlib/sql/from_private_test.go @@ -81,6 +81,14 @@ func TestFromSqlUrlValidation(t *testing.T) { Query: "", }, ErrMsg: "", + }, { + Name: "ok hdb", + Spec: &FromSQLProcedureSpec{ + DriverName: "hdb", + DataSourceName: "hdb://user:password@localhost:39013", + Query: "", + }, + ErrMsg: "", }, { Name: "invalid driver", Spec: &FromSQLProcedureSpec{ diff --git a/stdlib/sql/hdb.go b/stdlib/sql/hdb.go new file mode 100644 index 0000000000..d6eb6e43f6 --- /dev/null +++ b/stdlib/sql/hdb.go @@ -0,0 +1,245 @@ +package sql + +import ( + "database/sql" + "fmt" + "math/big" + "strconv" + "strings" + "time" + + _ "github.com/SAP/go-hdb/driver" + hdb "github.com/SAP/go-hdb/driver" + "github.com/influxdata/flux" + "github.com/influxdata/flux/codes" + "github.com/influxdata/flux/execute" + "github.com/influxdata/flux/internal/errors" + "github.com/influxdata/flux/values" +) + +// SAP HANA DB support. +// Notes: +// * the last version compatible with Go 1.12 is v0.14.1. In v0.14.3, they started using 1.13 type sql.NullTime +// and added `go 1.14` directive to go.mod (https://github.com/SAP/go-hdb/releases/tag/v0.14.3) +// * HDB returns BOOLEAN as TINYINT +// * TIMESTAMP does not TZ info stored, but: +// - it is "strongly discouraged" to store data in local time zone: https://blogs.sap.com/2018/03/28/trouble-with-time/ +// - more on timestamps in HDB: https://help.sap.com/viewer/f1b440ded6144a54ada97ff95dac7adf/2.4/en-US/a394f75dcbe64b42b7a887231af8f15f.html +// Therefore TIMESTAMP is mapped to TTime and vice-versa here. +// * the hdb driver is rather strict, eg. does not convert date- or time-formatted string values to time.Time, +// or float64 to Decimal on its own and just throws "unsupported conversion" error +// * Per naming conventions rules (https://documentation.sas.com/?cdcId=pgmsascdc&cdcVersion=9.4_3.5&docsetId=acreldb&docsetTarget=p1k98908uh9ovsn1jwzl3jg05exr.htm&locale=en), +// `sql.to` target table and column names are assumed in / converted to uppercase. + +type HdbRowReader struct { + Cursor *sql.Rows + columns []interface{} + columnTypes []flux.ColType + columnNames []string + sqlTypes []*sql.ColumnType + NextFunc func() bool + CloseFunc func() error +} + +// Next prepares HdbRowReader to return rows +func (m *HdbRowReader) Next() bool { + if m.NextFunc != nil { + return m.NextFunc() + } + next := m.Cursor.Next() + if next { + columnNames, err := m.Cursor.Columns() + if err != nil { + return false + } + m.columns = make([]interface{}, len(columnNames)) + columnPointers := make([]interface{}, len(columnNames)) + for i := 0; i < len(columnNames); i++ { + columnPointers[i] = &m.columns[i] + } + if err := m.Cursor.Scan(columnPointers...); err != nil { + return false + } + } + return next +} + +func (m *HdbRowReader) GetNextRow() ([]values.Value, error) { + row := make([]values.Value, len(m.columns)) + for i, column := range m.columns { + switch value := column.(type) { + case bool, int64, float64, string: + row[i] = values.New(value) + case time.Time: + // DATE, TIME types get scanned to time.Time by the driver too, + // but they have no counterpart in Flux therefore will be represented as string + switch m.sqlTypes[i].DatabaseTypeName() { + case "DATE": + row[i] = values.NewString(value.Format(layoutDate)) + case "TIME": + row[i] = values.NewString(value.Format(layoutTime)) + default: + row[i] = values.NewTime(values.ConvertTime(value)) + } + case []uint8: + switch m.columnTypes[i] { + case flux.TFloat: + var out hdb.Decimal + err := out.Scan(value) + if err != nil { + return nil, err + } + newFloat, _ := (*big.Rat)(&out).Float64() + row[i] = values.NewFloat(newFloat) + default: // flux.TString + switch m.sqlTypes[i].DatabaseTypeName() { + case "BINARY", "VARBINARY": + return nil, errors.Newf(codes.Invalid, "Flux does not support column type %s", m.sqlTypes[i].DatabaseTypeName()) + } + row[i] = values.NewString(string(value)) + } + case nil: + row[i] = values.NewNull(flux.SemanticType(m.columnTypes[i])) + default: + execute.PanicUnknownType(flux.TInvalid) + } + } + return row, nil +} + +func (m *HdbRowReader) InitColumnNames(names []string) { + m.columnNames = names +} + +func (m *HdbRowReader) InitColumnTypes(types []*sql.ColumnType) { + fluxTypes := make([]flux.ColType, len(types)) + for i := 0; i < len(types); i++ { + switch types[i].DatabaseTypeName() { + case "TINYINT", "SMALLINT", "INTEGER", "BIGINT": + fluxTypes[i] = flux.TInt + case "REAL", "DOUBLE", "DECIMAL": + fluxTypes[i] = flux.TFloat + case "TIMESTAMP": // not exactly correct (see Notes) + fluxTypes[i] = flux.TTime + default: + fluxTypes[i] = flux.TString + } + } + m.columnTypes = fluxTypes + m.sqlTypes = types +} + +func (m *HdbRowReader) ColumnNames() []string { + return m.columnNames +} + +func (m *HdbRowReader) ColumnTypes() []flux.ColType { + return m.columnTypes +} + +func (m *HdbRowReader) SetColumnTypes(types []flux.ColType) { + m.columnTypes = types +} + +func (m *HdbRowReader) SetColumns(i []interface{}) { + m.columns = i +} + +func (m *HdbRowReader) Close() error { + if m.CloseFunc != nil { + return m.CloseFunc() + } + if err := m.Cursor.Err(); err != nil { + return err + } + return m.Cursor.Close() +} + +func NewHdbRowReader(r *sql.Rows) (execute.RowReader, error) { + reader := &HdbRowReader{ + Cursor: r, + } + cols, err := r.Columns() + if err != nil { + return nil, err + } + reader.InitColumnNames(cols) + + types, err := r.ColumnTypes() + if err != nil { + return nil, err + } + reader.InitColumnTypes(types) + + return reader, nil +} + +var fluxToHdb = map[flux.ColType]string{ + flux.TFloat: "DOUBLE", + flux.TInt: "BIGINT", + flux.TString: "NVARCHAR(5000)", // 5000 is the max + flux.TBool: "BOOLEAN", + flux.TTime: "TIMESTAMP", // not exactly correct (see Notes) +} + +// HdbTranslateColumn translates flux colTypes into their corresponding SAP HANA column type +func HdbColumnTranslateFunc() translationFunc { + return func(f flux.ColType, colName string) (string, error) { + s, found := fluxToHdb[f] + if !found { + return "", errors.Newf(codes.Invalid, "SAP HANA does not support column type %s", f.String()) + } + // quote the column name for safety also convert to uppercase per HDB naming conventions + return hdbEscapeName(colName, true) + " " + s, nil + } +} + +// Template for conditional query by table existence check +var hdbDoIfTableNotExistsTemplate = `DO +BEGIN + DECLARE SCHEMA_NAME NVARCHAR(%d) = '%s'; + DECLARE TABLE_NAME NVARCHAR(%d) = '%s'; + DECLARE X_EXISTS INT = 0; + SELECT COUNT(*) INTO X_EXISTS FROM TABLES %s; + IF :X_EXISTS = 0 + THEN + %s; + END IF; +END; +` + +// hdbAddIfNotExist adds SAP HANA specific table existence check to CREATE TABLE statement. +func hdbAddIfNotExist(table string, query string) string { + var where string + var args []interface{} + parts := strings.SplitN(strings.ToUpper(table), ".", 2) // schema and table name assumed uppercase in HDB by default (see Notes) + if len(parts) == 2 { // fully-qualified table name + where = "WHERE SCHEMA_NAME=ESCAPE_DOUBLE_QUOTES(:SCHEMA_NAME) AND TABLE_NAME=ESCAPE_DOUBLE_QUOTES(:TABLE_NAME)" + args = append(args, len(parts[0])) + args = append(args, parts[0]) + args = append(args, len(parts[1])) + args = append(args, parts[1]) + } else { // table in user default schema + where = "WHERE TABLE_NAME=ESCAPE_DOUBLE_QUOTES(:TABLE_NAME)" + args = append(args, len("default")) + args = append(args, "default") + args = append(args, len(table)) + args = append(args, table) + } + args = append(args, where) + args = append(args, query) + + return fmt.Sprintf(hdbDoIfTableNotExistsTemplate, args...) +} + +// hdbEscapeName escapes name in double quotes and convert it to uppercase per HDB naming conventions +func hdbEscapeName(name string, toUpper bool) string { + parts := strings.Split(name, ".") + for i := range parts { + if toUpper { + parts[i] = strings.ToUpper(parts[i]) + } + parts[i] = strconv.Quote(strings.Trim(parts[i], "\"")) + } + return strings.Join(parts, ".") +} diff --git a/stdlib/sql/hdb_test.go b/stdlib/sql/hdb_test.go new file mode 100644 index 0000000000..f400c21c99 --- /dev/null +++ b/stdlib/sql/hdb_test.go @@ -0,0 +1,19 @@ +package sql + +import ( + "strings" + "testing" +) + +func TestHdb_IfNoExist(t *testing.T) { + // test table with fqn + q := hdbAddIfNotExist("stores.orders_copy", "CREATE TABLE stores.orders_copy (ORDER_ID BIGINT)") + if !strings.HasPrefix(q, "DO") || !strings.Contains(q, "(:SCHEMA_NAME)") || !strings.Contains(q, "(:TABLE_NAME)") { + t.Fail() + } + // test table in default schema + q = hdbAddIfNotExist("orders_copy", "CREATE TABLE orders_copy (ORDER_ID BIGINT)") + if !strings.HasPrefix(q, "DO") || strings.Contains(q, "(:SCHEMA_NAME)") || !strings.Contains(q, "(:TABLE_NAME)") { + t.Fail() + } +} diff --git a/stdlib/sql/source_validator.go b/stdlib/sql/source_validator.go index 3e07526f99..7a0a1c9316 100644 --- a/stdlib/sql/source_validator.go +++ b/stdlib/sql/source_validator.go @@ -102,6 +102,12 @@ func validateDataSource(validator url.Validator, driverName string, dataSourceNa Host: cfg.ProjectID, Path: cfg.Location, } + case "hdb": // SAP HANA + // an example is: hdb://user:password@host:port + u, err = neturl.Parse(dataSourceName) + if err != nil { + return errors.Newf(codes.Invalid, "invalid data source url: %v", err) + } default: return errors.Newf(codes.Invalid, "sql driver %s not supported", driverName) } diff --git a/stdlib/sql/sql_test.go b/stdlib/sql/sql_test.go index aba8f6e8ce..feb0f37396 100644 --- a/stdlib/sql/sql_test.go +++ b/stdlib/sql/sql_test.go @@ -586,6 +586,67 @@ func TestBigQueryParsing(t *testing.T) { } } +func TestHdbParsing(t *testing.T) { + // here we want to build a mocked representation of what's in our MySql db, and then run our RowReader over it, then verify that the results + // are as expected. + // NOTE: no meaningful test for reading bools, because the DB doesn't support them, and we already know that we can read INT types + testCases := []struct { + name string + columnName string + data *sql.Rows + want [][]values.Value + }{ + { + name: "ints", + columnName: "_int", + data: mockRowsToSQLRows(sqlmock.NewRows([]string{"column"}).AddRow(int64(6)).AddRow(int64(1)).AddRow(int64(643)).AddRow(int64(42)).AddRow(int64(1283))), + want: [][]values.Value{{values.NewInt(6)}, {values.NewInt(1)}, {values.NewInt(643)}, {values.NewInt(42)}, {values.NewInt(1283)}}, + }, + { + name: "floats", + columnName: "_float", + data: mockRowsToSQLRows(sqlmock.NewRows([]string{"column"}).AddRow(float64(6)).AddRow(float64(1)).AddRow(float64(643)).AddRow(float64(42)).AddRow(float64(1283))), + want: [][]values.Value{{values.NewFloat(6)}, {values.NewFloat(1)}, {values.NewFloat(643)}, {values.NewFloat(42)}, {values.NewFloat(1283)}}, + }, + { + name: "strings", + columnName: "_string", + data: mockRowsToSQLRows(sqlmock.NewRows([]string{"column"}).AddRow(string("6")).AddRow(string("1")).AddRow(string("643")).AddRow(string("42")).AddRow(string("1283"))), + want: [][]values.Value{{values.NewString("6")}, {values.NewString("1")}, {values.NewString("643")}, {values.NewString("42")}, {values.NewString("1283")}}, + }, + { + name: "timestamp", + columnName: "_timestamp", + data: mockRowsToSQLRows(sqlmock.NewRows([]string{"column"}).AddRow(createTestTimes()[0].(time.Time)).AddRow(createTestTimes()[1].(time.Time)).AddRow(createTestTimes()[2].(time.Time)).AddRow(createTestTimes()[3].(time.Time)).AddRow(createTestTimes()[4].(time.Time))), + want: [][]values.Value{ + {values.NewTime(values.ConvertTime(createTestTimes()[0].(time.Time)))}, + {values.NewTime(values.ConvertTime(createTestTimes()[1].(time.Time)))}, + {values.NewTime(values.ConvertTime(createTestTimes()[2].(time.Time)))}, + {values.NewTime(values.ConvertTime(createTestTimes()[3].(time.Time)))}, + {values.NewTime(values.ConvertTime(createTestTimes()[4].(time.Time)))}, + }, + }, + } + for _, tc := range testCases { + tc := tc + t.Run(tc.name, func(t *testing.T) { + + TestReader, err := NewHdbRowReader(tc.data) + if !cmp.Equal(nil, err) { + t.Fatalf("unexpected result -want/+got\n\n%s\n\n", cmp.Diff(nil, err)) + } + i := 0 + for TestReader.Next() { + row, _ := TestReader.GetNextRow() + if !cmp.Equal(tc.want[i], row) { + t.Fatalf("unexpected result -want/+got\n\n%s\n\n", cmp.Diff(tc.want[i], row)) + } + i++ + } + }) + } +} + func createTestTimes() []interface{} { str := []string{"2019-06-03 13:59:00", "2019-06-03 13:59:01", diff --git a/stdlib/sql/to.go b/stdlib/sql/to.go index d6b7659b1d..981a57b9ec 100644 --- a/stdlib/sql/to.go +++ b/stdlib/sql/to.go @@ -275,6 +275,8 @@ func getTranslationFunc(driverName string) (func() translationFunc, error) { return nil, errors.Newf(codes.Invalid, "writing is not supported for %s", driverName) case "bigquery": return BigQueryColumnTranslateFunc, nil + case "hdb": + return HdbColumnTranslateFunc, nil default: return nil, errors.Newf(codes.Internal, "invalid driverName: %s", driverName) } @@ -335,10 +337,16 @@ func CreateInsertComponents(t *ToSQLTransformation, tbl flux.Table) (colNames [] if t.spec.Spec.DriverName != "sqlmock" { var q string - if !isMssqlDriver(t.spec.Spec.DriverName) { - q = fmt.Sprintf("CREATE TABLE IF NOT EXISTS %s (%s)", t.spec.Spec.Table, strings.Join(newSQLTableCols, ",")) - } else { // SQL Server does not support IF NOT EXIST + if isMssqlDriver(t.spec.Spec.DriverName) { // SQL Server does not support IF NOT EXIST q = fmt.Sprintf("IF OBJECT_ID('%s', 'U') IS NULL BEGIN CREATE TABLE %s (%s) END", t.spec.Spec.Table, t.spec.Spec.Table, strings.Join(newSQLTableCols, ",")) + } else if t.spec.Spec.DriverName == "hdb" { // SAP HANA does not support IF NOT EXIST + // wrap CREATE TABLE statement with HDB-specific "if not exists" SQLScript check + q = fmt.Sprintf("CREATE TABLE %s (%s)", hdbEscapeName(t.spec.Spec.Table, true), strings.Join(newSQLTableCols, ",")) + q = hdbAddIfNotExist(t.spec.Spec.Table, q) + // SAP HANA does not support INSERT/UPDATE batching via a single SQL command + batchSize = 1 + } else { + q = fmt.Sprintf("CREATE TABLE IF NOT EXISTS %s (%s)", t.spec.Spec.Table, strings.Join(newSQLTableCols, ",")) } _, err = t.tx.Exec(q) if err != nil { @@ -395,7 +403,7 @@ func CreateInsertComponents(t *ToSQLTransformation, tbl flux.Table) (colNames [] return err } - if i != 0 && i%batchSize == 0 { + if (i != 0 && i%batchSize == 0) || (batchSize == 1) { // create "mini batches" of values - each one represents a single db.Exec to SQL valArgsArray = append(valArgsArray, valueArgs) valStringArray = append(valStringArray, valueStrings) diff --git a/stdlib/sql/to_privates_test.go b/stdlib/sql/to_privates_test.go index 6ea1c03368..8dd3ac7c0f 100644 --- a/stdlib/sql/to_privates_test.go +++ b/stdlib/sql/to_privates_test.go @@ -1,6 +1,8 @@ package sql import ( + "strconv" + "strings" "testing" "github.com/google/go-cmp/cmp" @@ -90,6 +92,21 @@ func TestTranslationDriverReturn(t *testing.T) { t.Fail() } + // verify that valid returns expected error for AWS Athena (yes, error) + expectedErr := errors.Newf(codes.Invalid, "writing is not supported for awsathena") + _, err = getTranslationFunc("awsathena") + if !cmp.Equal(expectedErr, err) { + t.Log(cmp.Diff(expectedErr, err)) + t.Fail() + } + + // verify that valid returns expected happiness for SAP HANA + _, err = getTranslationFunc("hdb") + if !cmp.Equal(nil, err) { + t.Log(cmp.Diff(nil, err)) + t.Fail() + } + } func TestSqliteTranslation(t *testing.T) { @@ -335,3 +352,45 @@ func TestBigQueryTranslation(t *testing.T) { t.Fail() } } + +func TestHdbTranslation(t *testing.T) { + hdbTypeTranslations := map[string]flux.ColType{ + "DOUBLE": flux.TFloat, + "BIGINT": flux.TInt, + "NVARCHAR(5000)": flux.TString, + "TIMESTAMP": flux.TTime, + "BOOLEAN": flux.TBool, + } + + columnLabel := "apples" + // verify that valid returns expected happiness for hdb + sqlT, err := getTranslationFunc("hdb") + if !cmp.Equal(nil, err) { + t.Log(cmp.Diff(nil, err)) + t.Fail() + } + + for dbTypeString, fluxType := range hdbTypeTranslations { + v, err := sqlT()(fluxType, columnLabel) + if !cmp.Equal(nil, err) { + t.Log(cmp.Diff(nil, err)) + t.Fail() + } + if !cmp.Equal(strconv.Quote(strings.ToUpper(columnLabel))+" "+dbTypeString, v) { + t.Log(cmp.Diff(strconv.Quote(strings.ToUpper(columnLabel))+" "+dbTypeString, v)) + t.Fail() + } + } + + // test no match + var _unsupportedType flux.ColType = 666 + _, err = sqlT()(_unsupportedType, columnLabel) + if cmp.Equal(nil, err) { + t.Log(cmp.Diff(nil, err)) + t.Fail() + } + if !cmp.Equal("SAP HANA does not support column type unknown", err.Error()) { + t.Log(cmp.Diff("SAP HANA does not support column type unknown", err.Error())) + t.Fail() + } +} \ No newline at end of file diff --git a/stdlib/sql/to_test.go b/stdlib/sql/to_test.go index 1a382ab50c..248d5c62c2 100644 --- a/stdlib/sql/to_test.go +++ b/stdlib/sql/to_test.go @@ -334,6 +334,15 @@ func TestToSql_NewTransformation(t *testing.T) { }, }, WantErr: "connection refused", + }, { + Name: "ok hdb", + Spec: &fsql.ToSQLProcedureSpec{ + Spec: &fsql.ToSQLOpSpec{ + DriverName: "hdb", + DataSourceName: "hdb://user:password@localhost:39013", + }, + }, + WantErr: "connection refused", }, { Name: "invalid driver", Spec: &fsql.ToSQLProcedureSpec{