Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Redshift] - Increasing string precision scaffold #932

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 29 additions & 11 deletions clients/redshift/cast.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,46 +10,64 @@ import (

const maxRedshiftLength int32 = 65535

func replaceExceededValues(colVal string, colKind typing.KindDetails, truncateExceededValue bool) string {
// canIncreasePrecision - returns true if column is a string, precision is specified and value length is less than [maxRedshiftLength]
func canIncreasePrecision(colKind typing.KindDetails, valueLength int32) bool {
if colKind.Kind == typing.String.Kind && colKind.OptionalStringPrecision != nil {
return maxRedshiftLength > *colKind.OptionalStringPrecision && valueLength <= maxRedshiftLength
}

return false
}

// replaceExceededValues replaces the value with a marker if it exceeds the maximum length
// Returns the value and boolean indicating whether the column should be increased or not.
func replaceExceededValues(colVal string, colKind typing.KindDetails, truncateExceededValue bool, increaseStringPrecision bool) (string, bool) {
if colKind.Kind == typing.Struct.Kind || colKind.Kind == typing.String.Kind {
maxLength := maxRedshiftLength
// If the customer has specified the maximum string precision, let's use that as the max length.
if colKind.OptionalStringPrecision != nil {
maxLength = *colKind.OptionalStringPrecision
}

if shouldReplace := int32(len(colVal)) > maxLength; shouldReplace {
colValLength := int32(len(colVal))
if shouldReplace := colValLength > maxLength; shouldReplace {
if colKind.Kind == typing.Struct.Kind {
return fmt.Sprintf(`{"key":"%s"}`, constants.ExceededValueMarker)
return fmt.Sprintf(`{"key":"%s"}`, constants.ExceededValueMarker), false
}

if increaseStringPrecision && canIncreasePrecision(colKind, colValLength) {
return colVal, true
}

if truncateExceededValue {
return colVal[:maxLength]
return colVal[:maxLength], false
} else {
return constants.ExceededValueMarker
return constants.ExceededValueMarker, false
}
}
}

return colVal
return colVal, false
}

func castColValStaging(colVal any, colKind typing.KindDetails, truncateExceededValue bool) (string, error) {
func castColValStaging(colVal any, colKind typing.KindDetails, truncateExceededValue bool, increaseStringPrecision bool) (string, bool, error) {
if colVal == nil {
if colKind == typing.Struct {
// Returning empty here because if it's a struct, it will go through JSON PARSE and JSON_PARSE("") = null
return "", nil
return "", false, nil
}

// This matches the COPY clause for NULL terminator.
return `\N`, nil
return `\N`, false, nil
}

colValString, err := values.ToString(colVal, colKind)
if err != nil {
return "", err
return "", false, err
}

// Checks for DDL overflow needs to be done at the end in case there are any conversions that need to be done.
return replaceExceededValues(colValString, colKind, truncateExceededValue), nil

colValue, shouldIncreaseColumn := replaceExceededValues(colValString, colKind, truncateExceededValue, increaseStringPrecision)
return colValue, shouldIncreaseColumn, nil
}
144 changes: 114 additions & 30 deletions clients/redshift/cast_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,71 +3,124 @@ package redshift
import (
"fmt"

"github.com/artie-labs/transfer/lib/stringutil"

"github.com/artie-labs/transfer/lib/config/constants"

"github.com/artie-labs/transfer/lib/stringutil"
"github.com/artie-labs/transfer/lib/typing"
"github.com/stretchr/testify/assert"
)

func (r *RedshiftTestSuite) TestCanIncreasePrecision() {
{
// False
{
// Not a string
assert.False(r.T(), canIncreasePrecision(typing.Struct, 123))
}
{
// String, but precision not specified.
assert.False(r.T(), canIncreasePrecision(typing.String, 123))
}
{
// String and precision specified, but value length exceeds maxRedshiftLength
assert.False(r.T(), canIncreasePrecision(
typing.KindDetails{
Kind: typing.String.Kind,
OptionalStringPrecision: typing.ToPtr(int32(123)),
},
maxRedshiftLength+1),
)
}
}
{
// True
{
// String, precision is low and can be increased
assert.True(r.T(), canIncreasePrecision(
typing.KindDetails{
Kind: typing.String.Kind,
OptionalStringPrecision: typing.ToPtr(int32(123)),
},
123),
)
}
}
}

func (r *RedshiftTestSuite) TestReplaceExceededValues() {
{
// Irrelevant data type
{
// Integer
assert.Equal(r.T(), "123", replaceExceededValues("123", typing.Integer, false))

value, _ := replaceExceededValues("123", typing.Integer, false, false)
assert.Equal(r.T(), "123", value)
}
{
// Returns the full value since it's not a struct or string
// This is invalid and should not happen, but it's here to ensure we're only checking for structs and strings.
value := stringutil.Random(int(maxRedshiftLength + 1))
assert.Equal(r.T(), value, replaceExceededValues(value, typing.Integer, false))
input := stringutil.Random(int(maxRedshiftLength + 1))
value, _ := replaceExceededValues(input, typing.Integer, false, false)
assert.Equal(r.T(), input, value)
}
}
{
// Exceeded
{
// String
{
// TruncateExceededValue = false
assert.Equal(r.T(), constants.ExceededValueMarker, replaceExceededValues(stringutil.Random(int(maxRedshiftLength)+1), typing.String, false))
// TruncateExceededValue = false, IncreaseStringPrecision = false
value, shouldIncrease := replaceExceededValues(stringutil.Random(int(maxRedshiftLength)+1), typing.String, false, false)
assert.Equal(r.T(), constants.ExceededValueMarker, value)
assert.False(r.T(), shouldIncrease)
}
{
// TruncateExceededValue = false, string precision specified
// TruncateExceededValue = false, string precision specified, IncreaseStringPrecision = false
stringKd := typing.KindDetails{
Kind: typing.String.Kind,
OptionalStringPrecision: typing.ToPtr(int32(3)),
}

assert.Equal(r.T(), constants.ExceededValueMarker, replaceExceededValues("hello", stringKd, false))
value, shouldIncrease := replaceExceededValues("hello", stringKd, false, false)
assert.Equal(r.T(), constants.ExceededValueMarker, value)
assert.False(r.T(), shouldIncrease)
}
{
// TruncateExceededValue = true
superLongString := stringutil.Random(int(maxRedshiftLength) + 1)
assert.Equal(r.T(), superLongString[:maxRedshiftLength], replaceExceededValues(superLongString, typing.String, true))
// TruncateExceededValue = true, IncreaseStringPrecision = false
input := stringutil.Random(int(maxRedshiftLength) + 1)
value, shouldIncrease := replaceExceededValues(input, typing.String, true, false)
assert.Equal(r.T(), input[:maxRedshiftLength], value)
assert.False(r.T(), shouldIncrease)
}
{
// TruncateExceededValue = true, string precision specified
// TruncateExceededValue = true, string precision specified, IncreaseStringPrecision = false
stringKd := typing.KindDetails{
Kind: typing.String.Kind,
OptionalStringPrecision: typing.ToPtr(int32(3)),
}

assert.Equal(r.T(), "hel", replaceExceededValues("hello", stringKd, true))
value, shouldIncrease := replaceExceededValues("hello", stringKd, true, false)
assert.Equal(r.T(), "hel", value)
assert.False(r.T(), shouldIncrease)
}
}
{
// Struct and masked
assert.Equal(r.T(), fmt.Sprintf(`{"key":"%s"}`, constants.ExceededValueMarker), replaceExceededValues(fmt.Sprintf(`{"foo": "%s"}`, stringutil.Random(int(maxRedshiftLength)+1)), typing.Struct, false))
value, shouldIncrease := replaceExceededValues(fmt.Sprintf(`{"foo": "%s"}`, stringutil.Random(int(maxRedshiftLength)+1)), typing.Struct, false, false)
assert.Equal(r.T(), fmt.Sprintf(`{"key":"%s"}`, constants.ExceededValueMarker), value)
assert.False(r.T(), shouldIncrease)
}
}
{
// Valid
{
// Not masked
assert.Equal(r.T(), `{"foo": "bar"}`, replaceExceededValues(`{"foo": "bar"}`, typing.Struct, false))
assert.Equal(r.T(), "hello world", replaceExceededValues("hello world", typing.String, false))
{
value, shouldIncrease := replaceExceededValues(`{"foo": "bar"}`, typing.Struct, false, false)
assert.Equal(r.T(), `{"foo": "bar"}`, value)
assert.False(r.T(), shouldIncrease)
}
{
value, shouldIncrease := replaceExceededValues("hello world", typing.String, false, false)
assert.Equal(r.T(), "hello world", value)
assert.False(r.T(), shouldIncrease)
}
}
}
}
Expand All @@ -78,39 +131,70 @@ func (r *RedshiftTestSuite) TestCastColValStaging() {
{
// String
{
// TruncateExceededValue = false
value, err := castColValStaging(stringutil.Random(int(maxRedshiftLength)+1), typing.String, false)
// TruncateExceededValue = false, IncreaseStringPrecision = false
value, shouldIncrease, err := castColValStaging(stringutil.Random(int(maxRedshiftLength)+1), typing.String, false, false)
assert.NoError(r.T(), err)
assert.Equal(r.T(), constants.ExceededValueMarker, value)
assert.False(r.T(), shouldIncrease)
}
{
// TruncateExceededValue = true, IncreaseStringPrecision = false
input := stringutil.Random(int(maxRedshiftLength) + 1)
value, shouldIncrease, err := castColValStaging(input, typing.String, true, false)
assert.NoError(r.T(), err)
assert.Equal(r.T(), input[:maxRedshiftLength], value)
assert.False(r.T(), shouldIncrease)
}
{
// TruncateExceededValue = true
value := stringutil.Random(int(maxRedshiftLength) + 1)
value, err := castColValStaging(value, typing.String, true)
// TruncateExceededValue = false, IncreaseStringPrecision = true
stringKd := typing.KindDetails{
Kind: typing.String.Kind,
OptionalStringPrecision: typing.ToPtr(int32(3)),
}

value, shouldIncrease, err := castColValStaging("hello", stringKd, false, true)
assert.NoError(r.T(), err)
assert.Equal(r.T(), "hello", value)
assert.True(r.T(), shouldIncrease)
}
{
// TruncateExceededValue = true, IncreaseStringPrecision = true
input := stringutil.Random(int(maxRedshiftLength) + 1)
stringPrecision := int32(3)
stringKd := typing.KindDetails{
Kind: typing.String.Kind,
OptionalStringPrecision: typing.ToPtr(stringPrecision),
}

value, shouldIncrease, err := castColValStaging(input, stringKd, true, true)
assert.NoError(r.T(), err)
assert.Equal(r.T(), value[:maxRedshiftLength], value)
assert.Equal(r.T(), input[:stringPrecision], value)
assert.False(r.T(), shouldIncrease)
}
}
{
// Masked struct
value, err := castColValStaging(fmt.Sprintf(`{"foo": "%s"}`, stringutil.Random(int(maxRedshiftLength)+1)), typing.Struct, false)
value, shouldIncrease, err := castColValStaging(fmt.Sprintf(`{"foo": "%s"}`, stringutil.Random(int(maxRedshiftLength)+1)), typing.Struct, false, false)
assert.NoError(r.T(), err)
assert.Equal(r.T(), fmt.Sprintf(`{"key":"%s"}`, constants.ExceededValueMarker), value)
assert.False(r.T(), shouldIncrease)
}
}
{
// Not exceeded
{
// Valid string
value, err := castColValStaging("thisissuperlongbutnotlongenoughtogetmasked", typing.String, false)
value, shouldIncrease, err := castColValStaging("thisissuperlongbutnotlongenoughtogetmasked", typing.String, false, false)
assert.NoError(r.T(), err)
assert.Equal(r.T(), "thisissuperlongbutnotlongenoughtogetmasked", value)
assert.False(r.T(), shouldIncrease)
}
{
// Valid struct
value, err := castColValStaging(`{"foo": "bar"}`, typing.Struct, false)
value, shouldIncrease, err := castColValStaging(`{"foo": "bar"}`, typing.Struct, false, false)
assert.NoError(r.T(), err)
assert.Equal(r.T(), `{"foo": "bar"}`, value)
assert.False(r.T(), shouldIncrease)
}
}
}
4 changes: 4 additions & 0 deletions clients/redshift/dialect/dialect.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,10 @@ func (RedshiftDialect) BuildAlterColumnQuery(tableID sql.TableIdentifier, column
return fmt.Sprintf("ALTER TABLE %s %s COLUMN %s", tableID.FullyQualifiedName(), columnOp, colSQLPart)
}

func (rd RedshiftDialect) BuildIncreaseStringPrecisionQuery(tableID sql.TableIdentifier, column columns.Column, newPrecision int32) string {
return fmt.Sprintf("ALTER TABLE %s ALTER COLUMN %s TYPE VARCHAR(%d)", tableID.FullyQualifiedName(), rd.QuoteIdentifier(column.Name()), newPrecision)
}

func (rd RedshiftDialect) BuildIsNotToastValueExpression(tableAlias constants.TableAlias, column columns.Column) string {
colName := sql.QuoteTableAliasColumn(tableAlias, column, rd)
if column.KindDetails == typing.Struct {
Expand Down
10 changes: 10 additions & 0 deletions clients/redshift/dialect/dialect_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,16 @@ func TestRedshiftDialect_BuildAlterColumnQuery(t *testing.T) {
)
}

func TestRedshiftDialect_BuildIncreaseStringPrecisionQuery(t *testing.T) {
fakeTableID := &mocks.FakeTableIdentifier{}
fakeTableID.FullyQualifiedNameReturns("{TABLE}")

assert.Equal(t,
`ALTER TABLE {TABLE} ALTER COLUMN "{column}" TYPE VARCHAR(12345)`,
RedshiftDialect{}.BuildIncreaseStringPrecisionQuery(fakeTableID, columns.NewColumn("{COLUMN}", typing.String), 12345),
)
}

func TestRedshiftDialect_BuildIsNotToastValueExpression(t *testing.T) {
assert.Equal(t,
`COALESCE(tbl."bar" != '__debezium_unavailable_value', true)`,
Expand Down
8 changes: 7 additions & 1 deletion clients/redshift/staging.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,13 @@ func (s *Store) loadTemporaryTable(tableData *optimization.TableData, newTableID
for _, value := range tableData.Rows() {
var row []string
for _, col := range columns {
castedValue, castErr := castColValStaging(value[col.Name()], col.KindDetails, s.config.SharedDestinationSettings.TruncateExceededValues)
// TODO: Implement
castedValue, _, castErr := castColValStaging(
value[col.Name()],
col.KindDetails,
s.config.SharedDestinationSettings.TruncateExceededValues,
s.config.SharedDestinationSettings.IncreaseStringPrecision,
)
if castErr != nil {
return "", castErr
}
Expand Down
2 changes: 2 additions & 0 deletions lib/config/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ type Kafka struct {
type SharedDestinationSettings struct {
// TruncateExceededValues - This will truncate exceeded values instead of replacing it with `__artie_exceeded_value`
TruncateExceededValues bool `yaml:"truncateExceededValues"`
// IncreaseStringPrecision - This will increase the string precision to the maximum allowed by the destination.
IncreaseStringPrecision bool `yaml:"increaseStringPrecision"`
}

type Reporting struct {
Expand Down
Loading