diff --git a/clients/redshift/cast.go b/clients/redshift/cast.go index 52af74d7f..e317be7a3 100644 --- a/clients/redshift/cast.go +++ b/clients/redshift/cast.go @@ -10,7 +10,18 @@ import ( const maxRedshiftLength int32 = 65535 -func replaceExceededValues(colVal string, colKind typing.KindDetails, truncateExceededValue bool) string { +// canIncreasePrecision - returns true if column is a string, precision is specified and value length is less than [maxRedshiftLength] +func canIncreasePrecision(colKind typing.KindDetails, valueLength int32) bool { + if colKind.Kind == typing.String.Kind && colKind.OptionalStringPrecision != nil { + return maxRedshiftLength > *colKind.OptionalStringPrecision && valueLength <= maxRedshiftLength + } + + return false +} + +// replaceExceededValues replaces the value with a marker if it exceeds the maximum length +// Returns the value and boolean indicating whether the column should be increased or not. +func replaceExceededValues(colVal string, colKind typing.KindDetails, truncateExceededValue bool, increaseStringPrecision bool) (string, bool) { if colKind.Kind == typing.Struct.Kind || colKind.Kind == typing.String.Kind { maxLength := maxRedshiftLength // If the customer has specified the maximum string precision, let's use that as the max length. @@ -18,38 +29,45 @@ func replaceExceededValues(colVal string, colKind typing.KindDetails, truncateEx maxLength = *colKind.OptionalStringPrecision } - if shouldReplace := int32(len(colVal)) > maxLength; shouldReplace { + colValLength := int32(len(colVal)) + if shouldReplace := colValLength > maxLength; shouldReplace { if colKind.Kind == typing.Struct.Kind { - return fmt.Sprintf(`{"key":"%s"}`, constants.ExceededValueMarker) + return fmt.Sprintf(`{"key":"%s"}`, constants.ExceededValueMarker), false + } + + if increaseStringPrecision && canIncreasePrecision(colKind, colValLength) { + return colVal, true } if truncateExceededValue { - return colVal[:maxLength] + return colVal[:maxLength], false } else { - return constants.ExceededValueMarker + return constants.ExceededValueMarker, false } } } - return colVal + return colVal, false } -func castColValStaging(colVal any, colKind typing.KindDetails, truncateExceededValue bool) (string, error) { +func castColValStaging(colVal any, colKind typing.KindDetails, truncateExceededValue bool, increaseStringPrecision bool) (string, bool, error) { if colVal == nil { if colKind == typing.Struct { // Returning empty here because if it's a struct, it will go through JSON PARSE and JSON_PARSE("") = null - return "", nil + return "", false, nil } // This matches the COPY clause for NULL terminator. - return `\N`, nil + return `\N`, false, nil } colValString, err := values.ToString(colVal, colKind) if err != nil { - return "", err + return "", false, err } // Checks for DDL overflow needs to be done at the end in case there are any conversions that need to be done. - return replaceExceededValues(colValString, colKind, truncateExceededValue), nil + + colValue, shouldIncreaseColumn := replaceExceededValues(colValString, colKind, truncateExceededValue, increaseStringPrecision) + return colValue, shouldIncreaseColumn, nil } diff --git a/clients/redshift/cast_test.go b/clients/redshift/cast_test.go index 7867e164f..42cecb6ff 100644 --- a/clients/redshift/cast_test.go +++ b/clients/redshift/cast_test.go @@ -3,71 +3,124 @@ package redshift import ( "fmt" - "github.com/artie-labs/transfer/lib/stringutil" - "github.com/artie-labs/transfer/lib/config/constants" - + "github.com/artie-labs/transfer/lib/stringutil" "github.com/artie-labs/transfer/lib/typing" "github.com/stretchr/testify/assert" ) +func (r *RedshiftTestSuite) TestCanIncreasePrecision() { + { + // False + { + // Not a string + assert.False(r.T(), canIncreasePrecision(typing.Struct, 123)) + } + { + // String, but precision not specified. + assert.False(r.T(), canIncreasePrecision(typing.String, 123)) + } + { + // String and precision specified, but value length exceeds maxRedshiftLength + assert.False(r.T(), canIncreasePrecision( + typing.KindDetails{ + Kind: typing.String.Kind, + OptionalStringPrecision: typing.ToPtr(int32(123)), + }, + maxRedshiftLength+1), + ) + } + } + { + // True + { + // String, precision is low and can be increased + assert.True(r.T(), canIncreasePrecision( + typing.KindDetails{ + Kind: typing.String.Kind, + OptionalStringPrecision: typing.ToPtr(int32(123)), + }, + 123), + ) + } + } +} + func (r *RedshiftTestSuite) TestReplaceExceededValues() { { // Irrelevant data type { // Integer - assert.Equal(r.T(), "123", replaceExceededValues("123", typing.Integer, false)) + + value, _ := replaceExceededValues("123", typing.Integer, false, false) + assert.Equal(r.T(), "123", value) } { // Returns the full value since it's not a struct or string // This is invalid and should not happen, but it's here to ensure we're only checking for structs and strings. - value := stringutil.Random(int(maxRedshiftLength + 1)) - assert.Equal(r.T(), value, replaceExceededValues(value, typing.Integer, false)) + input := stringutil.Random(int(maxRedshiftLength + 1)) + value, _ := replaceExceededValues(input, typing.Integer, false, false) + assert.Equal(r.T(), input, value) } } { // Exceeded { - // String { - // TruncateExceededValue = false - assert.Equal(r.T(), constants.ExceededValueMarker, replaceExceededValues(stringutil.Random(int(maxRedshiftLength)+1), typing.String, false)) + // TruncateExceededValue = false, IncreaseStringPrecision = false + value, shouldIncrease := replaceExceededValues(stringutil.Random(int(maxRedshiftLength)+1), typing.String, false, false) + assert.Equal(r.T(), constants.ExceededValueMarker, value) + assert.False(r.T(), shouldIncrease) } { - // TruncateExceededValue = false, string precision specified + // TruncateExceededValue = false, string precision specified, IncreaseStringPrecision = false stringKd := typing.KindDetails{ Kind: typing.String.Kind, OptionalStringPrecision: typing.ToPtr(int32(3)), } - - assert.Equal(r.T(), constants.ExceededValueMarker, replaceExceededValues("hello", stringKd, false)) + value, shouldIncrease := replaceExceededValues("hello", stringKd, false, false) + assert.Equal(r.T(), constants.ExceededValueMarker, value) + assert.False(r.T(), shouldIncrease) } { - // TruncateExceededValue = true - superLongString := stringutil.Random(int(maxRedshiftLength) + 1) - assert.Equal(r.T(), superLongString[:maxRedshiftLength], replaceExceededValues(superLongString, typing.String, true)) + // TruncateExceededValue = true, IncreaseStringPrecision = false + input := stringutil.Random(int(maxRedshiftLength) + 1) + value, shouldIncrease := replaceExceededValues(input, typing.String, true, false) + assert.Equal(r.T(), input[:maxRedshiftLength], value) + assert.False(r.T(), shouldIncrease) } { - // TruncateExceededValue = true, string precision specified + // TruncateExceededValue = true, string precision specified, IncreaseStringPrecision = false stringKd := typing.KindDetails{ Kind: typing.String.Kind, OptionalStringPrecision: typing.ToPtr(int32(3)), } - - assert.Equal(r.T(), "hel", replaceExceededValues("hello", stringKd, true)) + value, shouldIncrease := replaceExceededValues("hello", stringKd, true, false) + assert.Equal(r.T(), "hel", value) + assert.False(r.T(), shouldIncrease) } } { // Struct and masked - assert.Equal(r.T(), fmt.Sprintf(`{"key":"%s"}`, constants.ExceededValueMarker), replaceExceededValues(fmt.Sprintf(`{"foo": "%s"}`, stringutil.Random(int(maxRedshiftLength)+1)), typing.Struct, false)) + value, shouldIncrease := replaceExceededValues(fmt.Sprintf(`{"foo": "%s"}`, stringutil.Random(int(maxRedshiftLength)+1)), typing.Struct, false, false) + assert.Equal(r.T(), fmt.Sprintf(`{"key":"%s"}`, constants.ExceededValueMarker), value) + assert.False(r.T(), shouldIncrease) } } { // Valid { // Not masked - assert.Equal(r.T(), `{"foo": "bar"}`, replaceExceededValues(`{"foo": "bar"}`, typing.Struct, false)) - assert.Equal(r.T(), "hello world", replaceExceededValues("hello world", typing.String, false)) + { + value, shouldIncrease := replaceExceededValues(`{"foo": "bar"}`, typing.Struct, false, false) + assert.Equal(r.T(), `{"foo": "bar"}`, value) + assert.False(r.T(), shouldIncrease) + } + { + value, shouldIncrease := replaceExceededValues("hello world", typing.String, false, false) + assert.Equal(r.T(), "hello world", value) + assert.False(r.T(), shouldIncrease) + } } } } @@ -78,39 +131,70 @@ func (r *RedshiftTestSuite) TestCastColValStaging() { { // String { - // TruncateExceededValue = false - value, err := castColValStaging(stringutil.Random(int(maxRedshiftLength)+1), typing.String, false) + // TruncateExceededValue = false, IncreaseStringPrecision = false + value, shouldIncrease, err := castColValStaging(stringutil.Random(int(maxRedshiftLength)+1), typing.String, false, false) assert.NoError(r.T(), err) assert.Equal(r.T(), constants.ExceededValueMarker, value) + assert.False(r.T(), shouldIncrease) + } + { + // TruncateExceededValue = true, IncreaseStringPrecision = false + input := stringutil.Random(int(maxRedshiftLength) + 1) + value, shouldIncrease, err := castColValStaging(input, typing.String, true, false) + assert.NoError(r.T(), err) + assert.Equal(r.T(), input[:maxRedshiftLength], value) + assert.False(r.T(), shouldIncrease) } { - // TruncateExceededValue = true - value := stringutil.Random(int(maxRedshiftLength) + 1) - value, err := castColValStaging(value, typing.String, true) + // TruncateExceededValue = false, IncreaseStringPrecision = true + stringKd := typing.KindDetails{ + Kind: typing.String.Kind, + OptionalStringPrecision: typing.ToPtr(int32(3)), + } + + value, shouldIncrease, err := castColValStaging("hello", stringKd, false, true) + assert.NoError(r.T(), err) + assert.Equal(r.T(), "hello", value) + assert.True(r.T(), shouldIncrease) + } + { + // TruncateExceededValue = true, IncreaseStringPrecision = true + input := stringutil.Random(int(maxRedshiftLength) + 1) + stringPrecision := int32(3) + stringKd := typing.KindDetails{ + Kind: typing.String.Kind, + OptionalStringPrecision: typing.ToPtr(stringPrecision), + } + + value, shouldIncrease, err := castColValStaging(input, stringKd, true, true) assert.NoError(r.T(), err) - assert.Equal(r.T(), value[:maxRedshiftLength], value) + assert.Equal(r.T(), input[:stringPrecision], value) + assert.False(r.T(), shouldIncrease) } } { // Masked struct - value, err := castColValStaging(fmt.Sprintf(`{"foo": "%s"}`, stringutil.Random(int(maxRedshiftLength)+1)), typing.Struct, false) + value, shouldIncrease, err := castColValStaging(fmt.Sprintf(`{"foo": "%s"}`, stringutil.Random(int(maxRedshiftLength)+1)), typing.Struct, false, false) assert.NoError(r.T(), err) assert.Equal(r.T(), fmt.Sprintf(`{"key":"%s"}`, constants.ExceededValueMarker), value) + assert.False(r.T(), shouldIncrease) } } { // Not exceeded { // Valid string - value, err := castColValStaging("thisissuperlongbutnotlongenoughtogetmasked", typing.String, false) + value, shouldIncrease, err := castColValStaging("thisissuperlongbutnotlongenoughtogetmasked", typing.String, false, false) assert.NoError(r.T(), err) assert.Equal(r.T(), "thisissuperlongbutnotlongenoughtogetmasked", value) + assert.False(r.T(), shouldIncrease) } { // Valid struct - value, err := castColValStaging(`{"foo": "bar"}`, typing.Struct, false) + value, shouldIncrease, err := castColValStaging(`{"foo": "bar"}`, typing.Struct, false, false) assert.NoError(r.T(), err) assert.Equal(r.T(), `{"foo": "bar"}`, value) + assert.False(r.T(), shouldIncrease) } } } diff --git a/clients/redshift/dialect/dialect.go b/clients/redshift/dialect/dialect.go index 9cb35bc20..9db4187c3 100644 --- a/clients/redshift/dialect/dialect.go +++ b/clients/redshift/dialect/dialect.go @@ -147,6 +147,10 @@ func (RedshiftDialect) BuildAlterColumnQuery(tableID sql.TableIdentifier, column return fmt.Sprintf("ALTER TABLE %s %s COLUMN %s", tableID.FullyQualifiedName(), columnOp, colSQLPart) } +func (rd RedshiftDialect) BuildIncreaseStringPrecisionQuery(tableID sql.TableIdentifier, column columns.Column, newPrecision int32) string { + return fmt.Sprintf("ALTER TABLE %s ALTER COLUMN %s TYPE VARCHAR(%d)", tableID.FullyQualifiedName(), rd.QuoteIdentifier(column.Name()), newPrecision) +} + func (rd RedshiftDialect) BuildIsNotToastValueExpression(tableAlias constants.TableAlias, column columns.Column) string { colName := sql.QuoteTableAliasColumn(tableAlias, column, rd) if column.KindDetails == typing.Struct { diff --git a/clients/redshift/dialect/dialect_test.go b/clients/redshift/dialect/dialect_test.go index f285c7671..e620b7362 100644 --- a/clients/redshift/dialect/dialect_test.go +++ b/clients/redshift/dialect/dialect_test.go @@ -195,6 +195,16 @@ func TestRedshiftDialect_BuildAlterColumnQuery(t *testing.T) { ) } +func TestRedshiftDialect_BuildIncreaseStringPrecisionQuery(t *testing.T) { + fakeTableID := &mocks.FakeTableIdentifier{} + fakeTableID.FullyQualifiedNameReturns("{TABLE}") + + assert.Equal(t, + `ALTER TABLE {TABLE} ALTER COLUMN "{column}" TYPE VARCHAR(12345)`, + RedshiftDialect{}.BuildIncreaseStringPrecisionQuery(fakeTableID, columns.NewColumn("{COLUMN}", typing.String), 12345), + ) +} + func TestRedshiftDialect_BuildIsNotToastValueExpression(t *testing.T) { assert.Equal(t, `COALESCE(tbl."bar" != '__debezium_unavailable_value', true)`, diff --git a/clients/redshift/staging.go b/clients/redshift/staging.go index 85de9e39b..f5d008a6d 100644 --- a/clients/redshift/staging.go +++ b/clients/redshift/staging.go @@ -94,7 +94,13 @@ func (s *Store) loadTemporaryTable(tableData *optimization.TableData, newTableID for _, value := range tableData.Rows() { var row []string for _, col := range columns { - castedValue, castErr := castColValStaging(value[col.Name()], col.KindDetails, s.config.SharedDestinationSettings.TruncateExceededValues) + // TODO: Implement + castedValue, _, castErr := castColValStaging( + value[col.Name()], + col.KindDetails, + s.config.SharedDestinationSettings.TruncateExceededValues, + s.config.SharedDestinationSettings.IncreaseStringPrecision, + ) if castErr != nil { return "", castErr } diff --git a/lib/config/types.go b/lib/config/types.go index d9247b22f..79d11c17b 100644 --- a/lib/config/types.go +++ b/lib/config/types.go @@ -39,6 +39,8 @@ type Kafka struct { type SharedDestinationSettings struct { // TruncateExceededValues - This will truncate exceeded values instead of replacing it with `__artie_exceeded_value` TruncateExceededValues bool `yaml:"truncateExceededValues"` + // IncreaseStringPrecision - This will increase the string precision to the maximum allowed by the destination. + IncreaseStringPrecision bool `yaml:"increaseStringPrecision"` } type Reporting struct {