diff --git a/clients/bigquery/bigquery.go b/clients/bigquery/bigquery.go index f014df97b..c92fbf235 100644 --- a/clients/bigquery/bigquery.go +++ b/clients/bigquery/bigquery.go @@ -49,7 +49,7 @@ func (s *Store) PrepareTemporaryTable(tableData *optimization.TableData, tableCo Mode: tableData.Mode(), } - if err := tempAlterTableArgs.Alter(tableData.ReadOnlyInMemoryCols().GetColumns()...); err != nil { + if err := tempAlterTableArgs.AlterTable(tableData.ReadOnlyInMemoryCols().GetColumns()...); err != nil { return fmt.Errorf("failed to create temp table: %w", err) } } diff --git a/clients/mssql/staging.go b/clients/mssql/staging.go index 3e1a90bbd..0369052aa 100644 --- a/clients/mssql/staging.go +++ b/clients/mssql/staging.go @@ -24,7 +24,7 @@ func (s *Store) PrepareTemporaryTable(tableData *optimization.TableData, tableCo Mode: tableData.Mode(), } - if err := tempAlterTableArgs.Alter(tableData.ReadOnlyInMemoryCols().GetColumns()...); err != nil { + if err := tempAlterTableArgs.AlterTable(tableData.ReadOnlyInMemoryCols().GetColumns()...); err != nil { return fmt.Errorf("failed to create temp table: %w", err) } } diff --git a/clients/redshift/staging.go b/clients/redshift/staging.go index bfe30c67b..63e6f1552 100644 --- a/clients/redshift/staging.go +++ b/clients/redshift/staging.go @@ -28,7 +28,7 @@ func (s *Store) PrepareTemporaryTable(tableData *optimization.TableData, tableCo Mode: tableData.Mode(), } - if err := tempAlterTableArgs.Alter(tableData.ReadOnlyInMemoryCols().GetColumns()...); err != nil { + if err := tempAlterTableArgs.AlterTable(tableData.ReadOnlyInMemoryCols().GetColumns()...); err != nil { return fmt.Errorf("failed to create temp table: %w", err) } diff --git a/clients/shared/append.go b/clients/shared/append.go index ff360e0a9..e910b4fbf 100644 --- a/clients/shared/append.go +++ b/clients/shared/append.go @@ -45,7 +45,7 @@ func Append(dwh destination.DataWarehouse, tableData *optimization.TableData, cf } // Keys that exist in CDC stream, but not in DWH - err = createAlterTableArgs.Alter(targetKeysMissing...) + err = createAlterTableArgs.AlterTable(targetKeysMissing...) if err != nil { slog.Warn("Failed to apply alter table", slog.Any("err", err)) return err diff --git a/clients/shared/merge.go b/clients/shared/merge.go index e4bfe7638..5a7de04e0 100644 --- a/clients/shared/merge.go +++ b/clients/shared/merge.go @@ -47,7 +47,7 @@ func Merge(dwh destination.DataWarehouse, tableData *optimization.TableData, cfg } // Columns that are missing in DWH, but exist in our CDC stream. - err = createAlterTableArgs.Alter(targetKeysMissing...) + err = createAlterTableArgs.AlterTable(targetKeysMissing...) if err != nil { slog.Warn("Failed to apply alter table", slog.Any("err", err)) return err @@ -66,7 +66,7 @@ func Merge(dwh destination.DataWarehouse, tableData *optimization.TableData, cfg Mode: tableData.Mode(), } - if err = deleteAlterTableArgs.Alter(srcKeysMissing...); err != nil { + if err = deleteAlterTableArgs.AlterTable(srcKeysMissing...); err != nil { slog.Warn("Failed to apply alter table", slog.Any("err", err)) return err } diff --git a/clients/snowflake/staging.go b/clients/snowflake/staging.go index 51eb66436..448b01d40 100644 --- a/clients/snowflake/staging.go +++ b/clients/snowflake/staging.go @@ -41,7 +41,7 @@ func (s *Store) PrepareTemporaryTable(tableData *optimization.TableData, tableCo Mode: tableData.Mode(), } - if err := tempAlterTableArgs.Alter(tableData.ReadOnlyInMemoryCols().GetColumns()...); err != nil { + if err := tempAlterTableArgs.AlterTable(tableData.ReadOnlyInMemoryCols().GetColumns()...); err != nil { return fmt.Errorf("failed to create temp table: %w", err) } } diff --git a/lib/destination/ddl/ddl.go b/lib/destination/ddl/ddl.go index e532ca214..9c0c0121b 100644 --- a/lib/destination/ddl/ddl.go +++ b/lib/destination/ddl/ddl.go @@ -79,7 +79,7 @@ func (a *AlterTableArgs) Validate() error { return nil } -func (a *AlterTableArgs) Alter(cols ...columns.Column) error { +func (a *AlterTableArgs) AlterTable(cols ...columns.Column) error { if err := a.Validate(); err != nil { return err } diff --git a/lib/destination/ddl/ddl_alter_delete_test.go b/lib/destination/ddl/ddl_alter_delete_test.go index a63eb8ea0..ac3650138 100644 --- a/lib/destination/ddl/ddl_alter_delete_test.go +++ b/lib/destination/ddl/ddl_alter_delete_test.go @@ -67,7 +67,7 @@ func (d *DDLTestSuite) TestAlterDelete_Complete() { Mode: config.Replication, } - assert.NoError(d.T(), alterTableArgs.Alter(column)) + assert.NoError(d.T(), alterTableArgs.AlterTable(column)) } // Never actually deleted. @@ -88,7 +88,7 @@ func (d *DDLTestSuite) TestAlterDelete_Complete() { Mode: config.Replication, } - err := alterTableArgs.Alter(column) + err := alterTableArgs.AlterTable(column) assert.NoError(d.T(), err) } @@ -110,7 +110,7 @@ func (d *DDLTestSuite) TestAlterDelete_Complete() { Mode: config.Replication, } - assert.NoError(d.T(), alterTableArgs.Alter(column)) + assert.NoError(d.T(), alterTableArgs.AlterTable(column)) } // Never actually deleted. @@ -145,7 +145,7 @@ func (d *DDLTestSuite) TestAlterDelete_Complete() { Mode: config.Replication, } - assert.NoError(d.T(), alterTableArgs.Alter(column)) + assert.NoError(d.T(), alterTableArgs.AlterTable(column)) } // Never actually deleted. @@ -166,7 +166,7 @@ func (d *DDLTestSuite) TestAlterDelete_Complete() { Mode: config.Replication, } - assert.NoError(d.T(), alterTableArgs.Alter(column)) + assert.NoError(d.T(), alterTableArgs.AlterTable(column)) } // Never actually deleted. @@ -187,7 +187,7 @@ func (d *DDLTestSuite) TestAlterDelete_Complete() { Mode: config.Replication, } - assert.NoError(d.T(), alterTableArgs.Alter(column)) + assert.NoError(d.T(), alterTableArgs.AlterTable(column)) } // Never actually deleted. @@ -224,7 +224,7 @@ func (d *DDLTestSuite) TestAlterDelete_Complete() { Mode: config.Replication, } - assert.NoError(d.T(), alterTableArgs.Alter(column)) + assert.NoError(d.T(), alterTableArgs.AlterTable(column)) } // BigQuery @@ -241,7 +241,7 @@ func (d *DDLTestSuite) TestAlterDelete_Complete() { Mode: config.Replication, } - assert.NoError(d.T(), alterTableArgs.Alter(column)) + assert.NoError(d.T(), alterTableArgs.AlterTable(column)) } // Redshift @@ -258,7 +258,7 @@ func (d *DDLTestSuite) TestAlterDelete_Complete() { Mode: config.Replication, } - assert.NoError(d.T(), alterTableArgs.Alter(column)) + assert.NoError(d.T(), alterTableArgs.AlterTable(column)) } // Nothing has been deleted, but it is all added to the permissions table. @@ -283,7 +283,7 @@ func (d *DDLTestSuite) TestAlterDelete_Complete() { Mode: config.Replication, } - assert.NoError(d.T(), alterTableArgs.Alter(column)) + assert.NoError(d.T(), alterTableArgs.AlterTable(column)) // BigQuery alterTableArgs = ddl.AlterTableArgs{ @@ -298,7 +298,7 @@ func (d *DDLTestSuite) TestAlterDelete_Complete() { Mode: config.Replication, } - assert.NoError(d.T(), alterTableArgs.Alter(column)) + assert.NoError(d.T(), alterTableArgs.AlterTable(column)) // Redshift alterTableArgs = ddl.AlterTableArgs{ @@ -313,7 +313,7 @@ func (d *DDLTestSuite) TestAlterDelete_Complete() { Mode: config.Replication, } - assert.NoError(d.T(), alterTableArgs.Alter(column)) + assert.NoError(d.T(), alterTableArgs.AlterTable(column)) } // Everything has been deleted. diff --git a/lib/destination/ddl/ddl_bq_test.go b/lib/destination/ddl/ddl_bq_test.go index 922d6df9b..68292688c 100644 --- a/lib/destination/ddl/ddl_bq_test.go +++ b/lib/destination/ddl/ddl_bq_test.go @@ -67,7 +67,7 @@ func (d *DDLTestSuite) TestAlterTableDropColumnsBigQuery() { Mode: config.Replication, } - assert.NoError(d.T(), alterTableArgs.Alter(column)) + assert.NoError(d.T(), alterTableArgs.AlterTable(column)) } // Have not deleted, but tried to! @@ -90,7 +90,7 @@ func (d *DDLTestSuite) TestAlterTableDropColumnsBigQuery() { Mode: config.Replication, } - assert.NoError(d.T(), alterTableArgs.Alter(column)) + assert.NoError(d.T(), alterTableArgs.AlterTable(column)) query, _ := d.fakeBigQueryStore.ExecArgsForCall(callIdx) assert.Equal(d.T(), fmt.Sprintf("ALTER TABLE %s drop COLUMN %s", fqName, column.Name(false, &artieSQL.NameArgs{ Escape: true, @@ -150,7 +150,7 @@ func (d *DDLTestSuite) TestAlterTableAddColumns() { col := columns.NewColumn(name, kind) - assert.NoError(d.T(), alterTableArgs.Alter(col)) + assert.NoError(d.T(), alterTableArgs.AlterTable(col)) query, _ := d.fakeBigQueryStore.ExecArgsForCall(callIdx) assert.Equal(d.T(), fmt.Sprintf("ALTER TABLE %s %s COLUMN %s %s", fqName, constants.Add, col.Name(false, &artieSQL.NameArgs{ Escape: true, @@ -211,7 +211,7 @@ func (d *DDLTestSuite) TestAlterTableAddColumnsSomeAlreadyExist() { Mode: config.Replication, } - assert.NoError(d.T(), alterTableArgs.Alter(column)) + assert.NoError(d.T(), alterTableArgs.AlterTable(column)) query, _ := d.fakeBigQueryStore.ExecArgsForCall(callIdx) assert.Equal(d.T(), fmt.Sprintf("ALTER TABLE %s %s COLUMN %s %s", fqName, constants.Add, column.Name(false, &artieSQL.NameArgs{ Escape: true, @@ -268,7 +268,7 @@ func (d *DDLTestSuite) TestAlterTableDropColumnsBigQuerySafety() { UppercaseEscNames: ptr.ToBool(false), Mode: config.Replication, } - assert.NoError(d.T(), alterTableArgs.Alter(column)) + assert.NoError(d.T(), alterTableArgs.AlterTable(column)) } assert.Equal(d.T(), 0, len(d.bigQueryStore.GetConfigMap().TableConfig(fqName).ReadOnlyColumnsToDelete())) @@ -287,7 +287,7 @@ func (d *DDLTestSuite) TestAlterTableDropColumnsBigQuerySafety() { Mode: config.Replication, } - assert.NoError(d.T(), alterTableArgs.Alter(column)) + assert.NoError(d.T(), alterTableArgs.AlterTable(column)) assert.Equal(d.T(), 0, d.fakeBigQueryStore.ExecCallCount()) } diff --git a/lib/destination/ddl/ddl_create_table_test.go b/lib/destination/ddl/ddl_create_table_test.go index c0f8f25b4..da6b0a197 100644 --- a/lib/destination/ddl/ddl_create_table_test.go +++ b/lib/destination/ddl/ddl_create_table_test.go @@ -55,7 +55,7 @@ func (d *DDLTestSuite) Test_CreateTable() { Mode: config.Replication, } - assert.NoError(d.T(), alterTableArgs.Alter(columns.NewColumn("name", typing.String))) + assert.NoError(d.T(), alterTableArgs.AlterTable(columns.NewColumn("name", typing.String))) assert.Equal(d.T(), 1, dwhTc._fakeStore.ExecCallCount()) query, _ := dwhTc._fakeStore.ExecArgsForCall(0) @@ -122,7 +122,7 @@ func (d *DDLTestSuite) TestCreateTable() { Mode: config.Replication, } - assert.NoError(d.T(), alterTableArgs.Alter(testCase.cols...), testCase.name) + assert.NoError(d.T(), alterTableArgs.AlterTable(testCase.cols...), testCase.name) execQuery, _ := d.fakeSnowflakeStagesStore.ExecArgsForCall(index) assert.Equal(d.T(), testCase.expectedQuery, execQuery, testCase.name) diff --git a/lib/destination/ddl/ddl_sflk_test.go b/lib/destination/ddl/ddl_sflk_test.go index 45eb275bb..c9cde14eb 100644 --- a/lib/destination/ddl/ddl_sflk_test.go +++ b/lib/destination/ddl/ddl_sflk_test.go @@ -44,7 +44,7 @@ func (d *DDLTestSuite) TestAlterComplexObjects() { Mode: config.Replication, } - assert.NoError(d.T(), alterTableArgs.Alter(cols...)) + assert.NoError(d.T(), alterTableArgs.AlterTable(cols...)) for i := 0; i < len(cols); i++ { execQuery, _ := d.fakeSnowflakeStagesStore.ExecArgsForCall(i) assert.Equal(d.T(), fmt.Sprintf("ALTER TABLE %s add COLUMN %s %s", fqTable, cols[i].Name(false, &sql.NameArgs{ @@ -80,11 +80,11 @@ func (d *DDLTestSuite) TestAlterIdempotency() { Mode: config.Replication, } - assert.NoError(d.T(), alterTableArgs.Alter(cols...)) + assert.NoError(d.T(), alterTableArgs.AlterTable(cols...)) assert.Equal(d.T(), len(cols), d.fakeSnowflakeStagesStore.ExecCallCount(), "called SFLK the same amt to create cols") d.fakeSnowflakeStagesStore.ExecReturns(nil, errors.New("table does not exist")) - assert.Error(d.T(), alterTableArgs.Alter(cols...)) + assert.Error(d.T(), alterTableArgs.AlterTable(cols...)) } func (d *DDLTestSuite) TestAlterTableAdd() { @@ -110,7 +110,7 @@ func (d *DDLTestSuite) TestAlterTableAdd() { Mode: config.Replication, } - assert.NoError(d.T(), alterTableArgs.Alter(cols...)) + assert.NoError(d.T(), alterTableArgs.AlterTable(cols...)) assert.Equal(d.T(), len(cols), d.fakeSnowflakeStagesStore.ExecCallCount(), "called SFLK the same amt to create cols") // Check the table config @@ -153,7 +153,7 @@ func (d *DDLTestSuite) TestAlterTableDeleteDryRun() { Mode: config.Replication, } - assert.NoError(d.T(), alterTableArgs.Alter(cols...)) + assert.NoError(d.T(), alterTableArgs.AlterTable(cols...)) assert.Equal(d.T(), 0, d.fakeSnowflakeStagesStore.ExecCallCount(), "tried to delete, but not yet.") // Check the table config @@ -178,7 +178,7 @@ func (d *DDLTestSuite) TestAlterTableDeleteDryRun() { // Now let's actually try to dial the time back, and it should actually try to delete. tableConfig.AddColumnsToDelete(colToActuallyDelete, time.Now().Add(-1*time.Hour)) - assert.NoError(d.T(), alterTableArgs.Alter(cols...)) + assert.NoError(d.T(), alterTableArgs.AlterTable(cols...)) assert.Equal(d.T(), i+1, d.fakeSnowflakeStagesStore.ExecCallCount(), "tried to delete one column") execArg, _ := d.fakeSnowflakeStagesStore.ExecArgsForCall(i) @@ -218,7 +218,7 @@ func (d *DDLTestSuite) TestAlterTableDelete() { Mode: config.Replication, } - assert.NoError(d.T(), alterTableArgs.Alter(cols...)) + assert.NoError(d.T(), alterTableArgs.AlterTable(cols...)) assert.Equal(d.T(), 3, d.fakeSnowflakeStagesStore.ExecCallCount(), "tried to delete, but not yet.") // Check the table config diff --git a/lib/destination/ddl/ddl_temp_test.go b/lib/destination/ddl/ddl_temp_test.go index 33ac804c9..0ff6502a6 100644 --- a/lib/destination/ddl/ddl_temp_test.go +++ b/lib/destination/ddl/ddl_temp_test.go @@ -49,10 +49,10 @@ func (d *DDLTestSuite) TestCreateTemporaryTable_Errors() { } // No columns. - assert.NoError(d.T(), args.Alter()) + assert.NoError(d.T(), args.AlterTable()) args.ColumnOp = constants.Delete - assert.ErrorContains(d.T(), args.Alter(), "incompatible operation - cannot drop columns and create table at the same time") + assert.ErrorContains(d.T(), args.AlterTable(), "incompatible operation - cannot drop columns and create table at the same time") // Change it to SFLK + Stage d.snowflakeStagesStore.GetConfigMap().AddTableToConfig(fqName, types.NewDwhTableConfig(&columns.Columns{}, nil, true, true)) @@ -61,7 +61,7 @@ func (d *DDLTestSuite) TestCreateTemporaryTable_Errors() { args.Tc = snowflakeStagesTc args.CreateTable = false - assert.ErrorContains(d.T(), args.Alter(), "incompatible operation - we should not be altering temporary tables, only create") + assert.ErrorContains(d.T(), args.AlterTable(), "incompatible operation - we should not be altering temporary tables, only create") } func (d *DDLTestSuite) TestCreateTemporaryTable() { @@ -81,7 +81,7 @@ func (d *DDLTestSuite) TestCreateTemporaryTable() { Mode: config.Replication, } - assert.NoError(d.T(), args.Alter(columns.NewColumn("foo", typing.String), columns.NewColumn("bar", typing.Float), columns.NewColumn("start", typing.String))) + assert.NoError(d.T(), args.AlterTable(columns.NewColumn("foo", typing.String), columns.NewColumn("bar", typing.Float), columns.NewColumn("start", typing.String))) assert.Equal(d.T(), 1, d.fakeSnowflakeStagesStore.ExecCallCount()) query, _ := d.fakeSnowflakeStagesStore.ExecArgsForCall(0) @@ -96,7 +96,7 @@ func (d *DDLTestSuite) TestCreateTemporaryTable() { args.Dwh = d.bigQueryStore args.Tc = bqTc - assert.NoError(d.T(), args.Alter(columns.NewColumn("foo", typing.String), columns.NewColumn("bar", typing.Float), columns.NewColumn("select", typing.String))) + assert.NoError(d.T(), args.AlterTable(columns.NewColumn("foo", typing.String), columns.NewColumn("bar", typing.Float), columns.NewColumn("select", typing.String))) assert.Equal(d.T(), 1, d.fakeBigQueryStore.ExecCallCount()) bqQuery, _ := d.fakeBigQueryStore.ExecArgsForCall(0) // Cutting off the expiration_timestamp since it's time based.