Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(stdlib/sql): add SAP HANA db support #3098

Merged
merged 15 commits into from
Oct 6, 2020
Merged
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ require (
github.com/Azure/go-autorest/autorest/adal v0.8.3
github.com/Azure/go-autorest/autorest/azure/auth v0.4.2
github.com/DATA-DOG/go-sqlmock v1.4.1
github.com/SAP/go-hdb v0.14.1
github.com/andreyvit/diff v0.0.0-20170406064948-c7f18ee00883
github.com/apache/arrow/go/arrow v0.0.0-20200923215132-ac86123a3f01
github.com/bonitoo-io/go-sql-bigquery v0.3.4-1.4.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ github.com/Masterminds/semver v1.4.2 h1:WBLTQ37jOCzSLtXNdoo8bNM8876KhNqOKvrlGITg
github.com/Masterminds/semver v1.4.2/go.mod h1:MB6lktGJrhw8PrUyiEoblNEGEQ+RzHPF078ddwwvV3Y=
github.com/OneOfOne/xxhash v1.2.2 h1:KMrpdQIwFcEqXDklaen+P1axHaj9BSKzvpUUfnHldSE=
github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
github.com/SAP/go-hdb v0.14.1 h1:hkw4ozGZ/i4eak7ZuGkY5e0hxiXFdNUBNhr4AvZVNFE=
github.com/SAP/go-hdb v0.14.1/go.mod h1:7fdQLVC2lER3urZLjZCm0AuMQfApof92n3aylBPEkMo=
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
github.com/andreyvit/diff v0.0.0-20170406064948-c7f18ee00883 h1:bvNMNQO63//z+xNgfBlViaCIJKLlCJ6/fmUseuG0wVQ=
Expand Down
2 changes: 2 additions & 0 deletions stdlib/sql/from.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,8 @@ func createFromSQLSource(prSpec plan.ProcedureSpec, dsid execute.DatasetID, a ex
newRowReader = NewAwsAthenaRowReader
case "bigquery":
newRowReader = NewBigQueryRowReader
case "hdb":
newRowReader = NewHdbRowReader
default:
return nil, errors.Newf(codes.Invalid, "sql driver %s not supported", spec.DriverName)
}
Expand Down
8 changes: 8 additions & 0 deletions stdlib/sql/from_private_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,14 @@ func TestFromSqlUrlValidation(t *testing.T) {
Query: "",
},
ErrMsg: "",
}, {
Name: "ok hdb",
Spec: &FromSQLProcedureSpec{
DriverName: "hdb",
DataSourceName: "hdb://user:password@localhost:39013",
Query: "",
},
ErrMsg: "",
}, {
Name: "invalid driver",
Spec: &FromSQLProcedureSpec{
Expand Down
245 changes: 245 additions & 0 deletions stdlib/sql/hdb.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,245 @@
package sql

import (
"database/sql"
"fmt"
"math/big"
"strconv"
"strings"
"time"

_ "github.com/SAP/go-hdb/driver"
hdb "github.com/SAP/go-hdb/driver"
"github.com/influxdata/flux"
"github.com/influxdata/flux/codes"
"github.com/influxdata/flux/execute"
"github.com/influxdata/flux/internal/errors"
"github.com/influxdata/flux/values"
)

// SAP HANA DB support.
// Notes:
// * the last version compatible with Go 1.12 is v0.14.1. In v0.14.3, they started using 1.13 type sql.NullTime
// and added `go 1.14` directive to go.mod (https://github.com/SAP/go-hdb/releases/tag/v0.14.3)
// * HDB returns BOOLEAN as TINYINT
// * TIMESTAMP does not TZ info stored, but:
// - it is "strongly discouraged" to store data in local time zone: https://blogs.sap.com/2018/03/28/trouble-with-time/
// - more on timestamps in HDB: https://help.sap.com/viewer/f1b440ded6144a54ada97ff95dac7adf/2.4/en-US/a394f75dcbe64b42b7a887231af8f15f.html
// Therefore TIMESTAMP is mapped to TTime and vice-versa here.
// * the hdb driver is rather strict, eg. does not convert date- or time-formatted string values to time.Time,
// or float64 to Decimal on its own and just throws "unsupported conversion" error
// * Per naming conventions rules (https://documentation.sas.com/?cdcId=pgmsascdc&cdcVersion=9.4_3.5&docsetId=acreldb&docsetTarget=p1k98908uh9ovsn1jwzl3jg05exr.htm&locale=en),
// `sql.to` target table and column names are assumed in / converted to uppercase.

type HdbRowReader struct {
Cursor *sql.Rows
columns []interface{}
columnTypes []flux.ColType
columnNames []string
sqlTypes []*sql.ColumnType
NextFunc func() bool
CloseFunc func() error
}

// Next prepares HdbRowReader to return rows
func (m *HdbRowReader) Next() bool {
if m.NextFunc != nil {
return m.NextFunc()
}
next := m.Cursor.Next()
if next {
columnNames, err := m.Cursor.Columns()
if err != nil {
return false
}
m.columns = make([]interface{}, len(columnNames))
columnPointers := make([]interface{}, len(columnNames))
for i := 0; i < len(columnNames); i++ {
columnPointers[i] = &m.columns[i]
}
if err := m.Cursor.Scan(columnPointers...); err != nil {
return false
}
}
return next
}

func (m *HdbRowReader) GetNextRow() ([]values.Value, error) {
row := make([]values.Value, len(m.columns))
for i, column := range m.columns {
switch value := column.(type) {
case bool, int64, float64, string:
row[i] = values.New(value)
case time.Time:
// DATE, TIME types get scanned to time.Time by the driver too,
// but they have no counterpart in Flux therefore will be represented as string
switch m.sqlTypes[i].DatabaseTypeName() {
case "DATE":
row[i] = values.NewString(value.Format(layoutDate))
case "TIME":
row[i] = values.NewString(value.Format(layoutTime))
default:
row[i] = values.NewTime(values.ConvertTime(value))
}
case []uint8:
switch m.columnTypes[i] {
case flux.TFloat:
var out hdb.Decimal
err := out.Scan(value)
if err != nil {
return nil, err
}
newFloat, _ := (*big.Rat)(&out).Float64()
row[i] = values.NewFloat(newFloat)
default: // flux.TString
switch m.sqlTypes[i].DatabaseTypeName() {
case "BINARY", "VARBINARY":
return nil, errors.Newf(codes.Invalid, "Flux does not support column type %s", m.sqlTypes[i].DatabaseTypeName())
}
row[i] = values.NewString(string(value))
}
case nil:
row[i] = values.NewNull(flux.SemanticType(m.columnTypes[i]))
default:
execute.PanicUnknownType(flux.TInvalid)
}
}
return row, nil
}

func (m *HdbRowReader) InitColumnNames(names []string) {
m.columnNames = names
}

func (m *HdbRowReader) InitColumnTypes(types []*sql.ColumnType) {
fluxTypes := make([]flux.ColType, len(types))
for i := 0; i < len(types); i++ {
switch types[i].DatabaseTypeName() {
case "TINYINT", "SMALLINT", "INTEGER", "BIGINT":
fluxTypes[i] = flux.TInt
case "REAL", "DOUBLE", "DECIMAL":
fluxTypes[i] = flux.TFloat
case "TIMESTAMP": // not exactly correct (see Notes)
fluxTypes[i] = flux.TTime
default:
fluxTypes[i] = flux.TString
}
}
m.columnTypes = fluxTypes
m.sqlTypes = types
}

func (m *HdbRowReader) ColumnNames() []string {
return m.columnNames
}

func (m *HdbRowReader) ColumnTypes() []flux.ColType {
return m.columnTypes
}

func (m *HdbRowReader) SetColumnTypes(types []flux.ColType) {
m.columnTypes = types
}

func (m *HdbRowReader) SetColumns(i []interface{}) {
m.columns = i
}

func (m *HdbRowReader) Close() error {
if m.CloseFunc != nil {
return m.CloseFunc()
}
if err := m.Cursor.Err(); err != nil {
return err
}
return m.Cursor.Close()
}

func NewHdbRowReader(r *sql.Rows) (execute.RowReader, error) {
reader := &HdbRowReader{
Cursor: r,
}
cols, err := r.Columns()
if err != nil {
return nil, err
}
reader.InitColumnNames(cols)

types, err := r.ColumnTypes()
if err != nil {
return nil, err
}
reader.InitColumnTypes(types)

return reader, nil
}

var fluxToHdb = map[flux.ColType]string{
flux.TFloat: "DOUBLE",
flux.TInt: "BIGINT",
flux.TString: "NVARCHAR(5000)", // 5000 is the max
flux.TBool: "BOOLEAN",
flux.TTime: "TIMESTAMP", // not exactly correct (see Notes)
}

// HdbTranslateColumn translates flux colTypes into their corresponding SAP HANA column type
func HdbColumnTranslateFunc() translationFunc {
return func(f flux.ColType, colName string) (string, error) {
s, found := fluxToHdb[f]
if !found {
return "", errors.Newf(codes.Invalid, "SAP HANA does not support column type %s", f.String())
}
// quote the column name for safety also convert to uppercase per HDB naming conventions
return hdbEscapeName(colName, true) + " " + s, nil
}
}

// Template for conditional query by table existence check
var hdbDoIfTableNotExistsTemplate = `DO
BEGIN
DECLARE SCHEMA_NAME NVARCHAR(%d) = '%s';
DECLARE TABLE_NAME NVARCHAR(%d) = '%s';
DECLARE X_EXISTS INT = 0;
SELECT COUNT(*) INTO X_EXISTS FROM TABLES %s;
IF :X_EXISTS = 0
THEN
%s;
END IF;
END;
`

// hdbAddIfNotExist adds SAP HANA specific table existence check to CREATE TABLE statement.
func hdbAddIfNotExist(table string, query string) string {
var where string
var args []interface{}
parts := strings.SplitN(strings.ToUpper(table), ".", 2) // schema and table name assumed uppercase in HDB by default (see Notes)
if len(parts) == 2 { // fully-qualified table name
where = "WHERE SCHEMA_NAME=ESCAPE_DOUBLE_QUOTES(:SCHEMA_NAME) AND TABLE_NAME=ESCAPE_DOUBLE_QUOTES(:TABLE_NAME)"
args = append(args, len(parts[0]))
args = append(args, parts[0])
args = append(args, len(parts[1]))
args = append(args, parts[1])
} else { // table in user default schema
where = "WHERE TABLE_NAME=ESCAPE_DOUBLE_QUOTES(:TABLE_NAME)"
args = append(args, len("default"))
args = append(args, "default")
args = append(args, len(table))
args = append(args, table)
}
args = append(args, where)
args = append(args, query)

return fmt.Sprintf(hdbDoIfTableNotExistsTemplate, args...)
}

// hdbEscapeName escapes name in double quotes and convert it to uppercase per HDB naming conventions
func hdbEscapeName(name string, toUpper bool) string {
parts := strings.Split(name, ".")
for i := range parts {
if toUpper {
parts[i] = strings.ToUpper(parts[i])
}
parts[i] = strconv.Quote(strings.Trim(parts[i], "\""))
}
return strings.Join(parts, ".")
}
19 changes: 19 additions & 0 deletions stdlib/sql/hdb_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package sql

import (
"strings"
"testing"
)

func TestHdb_IfNoExist(t *testing.T) {
// test table with fqn
q := hdbAddIfNotExist("stores.orders_copy", "CREATE TABLE stores.orders_copy (ORDER_ID BIGINT)")
if !strings.HasPrefix(q, "DO") || !strings.Contains(q, "(:SCHEMA_NAME)") || !strings.Contains(q, "(:TABLE_NAME)") {
t.Fail()
}
// test table in default schema
q = hdbAddIfNotExist("orders_copy", "CREATE TABLE orders_copy (ORDER_ID BIGINT)")
if !strings.HasPrefix(q, "DO") || strings.Contains(q, "(:SCHEMA_NAME)") || !strings.Contains(q, "(:TABLE_NAME)") {
t.Fail()
}
}
6 changes: 6 additions & 0 deletions stdlib/sql/source_validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,12 @@ func validateDataSource(validator url.Validator, driverName string, dataSourceNa
Host: cfg.ProjectID,
Path: cfg.Location,
}
case "hdb": // SAP HANA
// an example is: hdb://user:password@host:port
u, err = neturl.Parse(dataSourceName)
if err != nil {
return errors.Newf(codes.Invalid, "invalid data source url: %v", err)
}
default:
return errors.Newf(codes.Invalid, "sql driver %s not supported", driverName)
}
Expand Down
61 changes: 61 additions & 0 deletions stdlib/sql/sql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -586,6 +586,67 @@ func TestBigQueryParsing(t *testing.T) {
}
}

func TestHdbParsing(t *testing.T) {
// here we want to build a mocked representation of what's in our MySql db, and then run our RowReader over it, then verify that the results
// are as expected.
// NOTE: no meaningful test for reading bools, because the DB doesn't support them, and we already know that we can read INT types
testCases := []struct {
name string
columnName string
data *sql.Rows
want [][]values.Value
}{
{
name: "ints",
columnName: "_int",
data: mockRowsToSQLRows(sqlmock.NewRows([]string{"column"}).AddRow(int64(6)).AddRow(int64(1)).AddRow(int64(643)).AddRow(int64(42)).AddRow(int64(1283))),
want: [][]values.Value{{values.NewInt(6)}, {values.NewInt(1)}, {values.NewInt(643)}, {values.NewInt(42)}, {values.NewInt(1283)}},
},
{
name: "floats",
columnName: "_float",
data: mockRowsToSQLRows(sqlmock.NewRows([]string{"column"}).AddRow(float64(6)).AddRow(float64(1)).AddRow(float64(643)).AddRow(float64(42)).AddRow(float64(1283))),
want: [][]values.Value{{values.NewFloat(6)}, {values.NewFloat(1)}, {values.NewFloat(643)}, {values.NewFloat(42)}, {values.NewFloat(1283)}},
},
{
name: "strings",
columnName: "_string",
data: mockRowsToSQLRows(sqlmock.NewRows([]string{"column"}).AddRow(string("6")).AddRow(string("1")).AddRow(string("643")).AddRow(string("42")).AddRow(string("1283"))),
want: [][]values.Value{{values.NewString("6")}, {values.NewString("1")}, {values.NewString("643")}, {values.NewString("42")}, {values.NewString("1283")}},
},
{
name: "timestamp",
columnName: "_timestamp",
data: mockRowsToSQLRows(sqlmock.NewRows([]string{"column"}).AddRow(createTestTimes()[0].(time.Time)).AddRow(createTestTimes()[1].(time.Time)).AddRow(createTestTimes()[2].(time.Time)).AddRow(createTestTimes()[3].(time.Time)).AddRow(createTestTimes()[4].(time.Time))),
want: [][]values.Value{
{values.NewTime(values.ConvertTime(createTestTimes()[0].(time.Time)))},
{values.NewTime(values.ConvertTime(createTestTimes()[1].(time.Time)))},
{values.NewTime(values.ConvertTime(createTestTimes()[2].(time.Time)))},
{values.NewTime(values.ConvertTime(createTestTimes()[3].(time.Time)))},
{values.NewTime(values.ConvertTime(createTestTimes()[4].(time.Time)))},
},
},
}
for _, tc := range testCases {
tc := tc
t.Run(tc.name, func(t *testing.T) {

TestReader, err := NewHdbRowReader(tc.data)
if !cmp.Equal(nil, err) {
t.Fatalf("unexpected result -want/+got\n\n%s\n\n", cmp.Diff(nil, err))
}
i := 0
for TestReader.Next() {
row, _ := TestReader.GetNextRow()
if !cmp.Equal(tc.want[i], row) {
t.Fatalf("unexpected result -want/+got\n\n%s\n\n", cmp.Diff(tc.want[i], row))
}
i++
}
})
}
}

func createTestTimes() []interface{} {
str := []string{"2019-06-03 13:59:00",
"2019-06-03 13:59:01",
Expand Down
Loading