Skip to content

Commit

Permalink
Simplifying GetTableConfig (#1109)
Browse files Browse the repository at this point in the history
  • Loading branch information
Tang8330 authored Jan 27, 2025
1 parent 6b37fd6 commit 15ab27e
Show file tree
Hide file tree
Showing 9 changed files with 21 additions and 22 deletions.
6 changes: 3 additions & 3 deletions clients/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,15 +141,15 @@ func (s *Store) IdentifierFor(topicConfig kafkalib.TopicConfig, table string) sq
return dialect.NewTableIdentifier(s.config.BigQuery.ProjectID, topicConfig.Database, table)
}

func (s *Store) GetTableConfig(tableData *optimization.TableData) (*types.DwhTableConfig, error) {
func (s *Store) GetTableConfig(tableID sql.TableIdentifier, dropDeletedColumns bool) (*types.DwhTableConfig, error) {
return shared.GetTableCfgArgs{
Dwh: s,
TableID: s.IdentifierFor(tableData.TopicConfig(), tableData.Name()),
TableID: tableID,
ConfigMap: s.configMap,
ColumnNameForName: "column_name",
ColumnNameForDataType: "data_type",
ColumnNameForComment: "description",
DropDeletedColumns: tableData.TopicConfig().DropDeletedColumns,
DropDeletedColumns: dropDeletedColumns,
}.GetTableConfig()
}

Expand Down
6 changes: 3 additions & 3 deletions clients/databricks/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,15 +69,15 @@ func (s Store) Dedupe(tableID sql.TableIdentifier, primaryKeys []string, include
return nil
}

func (s Store) GetTableConfig(tableData *optimization.TableData) (*types.DwhTableConfig, error) {
func (s Store) GetTableConfig(tableID sql.TableIdentifier, dropDeletedColumns bool) (*types.DwhTableConfig, error) {
return shared.GetTableCfgArgs{
Dwh: s,
TableID: dialect.NewTableIdentifier(tableData.TopicConfig().Database, tableData.TopicConfig().Schema, tableData.Name()),
TableID: tableID,
ConfigMap: s.configMap,
ColumnNameForName: "col_name",
ColumnNameForDataType: "data_type",
ColumnNameForComment: "comment",
DropDeletedColumns: tableData.TopicConfig().DropDeletedColumns,
DropDeletedColumns: dropDeletedColumns,
}.GetTableConfig()
}

Expand Down
6 changes: 3 additions & 3 deletions clients/mssql/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,15 +70,15 @@ func (s *Store) Dedupe(_ sql.TableIdentifier, _ []string, _ bool) error {
return nil // dedupe is not necessary for MS SQL
}

func (s *Store) GetTableConfig(tableData *optimization.TableData) (*types.DwhTableConfig, error) {
func (s *Store) GetTableConfig(tableID sql.TableIdentifier, dropDeletedColumns bool) (*types.DwhTableConfig, error) {
return shared.GetTableCfgArgs{
Dwh: s,
TableID: s.specificIdentifierFor(tableData.TopicConfig(), tableData.Name()),
TableID: tableID,
ConfigMap: s.configMap,
ColumnNameForName: "column_name",
ColumnNameForDataType: "data_type",
ColumnNameForComment: "description",
DropDeletedColumns: tableData.TopicConfig().DropDeletedColumns,
DropDeletedColumns: dropDeletedColumns,
}.GetTableConfig()
}

Expand Down
6 changes: 3 additions & 3 deletions clients/redshift/redshift.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,15 +59,15 @@ func (s *Store) dialect() dialect.RedshiftDialect {
return dialect.RedshiftDialect{}
}

func (s *Store) GetTableConfig(tableData *optimization.TableData) (*types.DwhTableConfig, error) {
func (s *Store) GetTableConfig(tableID sql.TableIdentifier, dropDeletedColumns bool) (*types.DwhTableConfig, error) {
return shared.GetTableCfgArgs{
Dwh: s,
TableID: s.IdentifierFor(tableData.TopicConfig(), tableData.Name()),
TableID: tableID,
ConfigMap: s.configMap,
ColumnNameForName: "column_name",
ColumnNameForDataType: "data_type",
ColumnNameForComment: "description",
DropDeletedColumns: tableData.TopicConfig().DropDeletedColumns,
DropDeletedColumns: dropDeletedColumns,
}.GetTableConfig()
}

Expand Down
4 changes: 2 additions & 2 deletions clients/shared/append.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ func Append(ctx context.Context, dwh destination.DataWarehouse, tableData *optim
return nil
}

tableConfig, err := dwh.GetTableConfig(tableData)
tableID := dwh.IdentifierFor(tableData.TopicConfig(), tableData.Name())
tableConfig, err := dwh.GetTableConfig(tableID, tableData.TopicConfig().DropDeletedColumns)
if err != nil {
return fmt.Errorf("failed to get table config: %w", err)
}
Expand All @@ -30,7 +31,6 @@ func Append(ctx context.Context, dwh destination.DataWarehouse, tableData *optim
tableData.Mode(),
)

tableID := dwh.IdentifierFor(tableData.TopicConfig(), tableData.Name())
if tableConfig.CreateTable() {
if err = CreateTable(ctx, dwh, tableData, tableConfig, opts.ColumnSettings, tableID, false, targetKeysMissing); err != nil {
return fmt.Errorf("failed to create table: %w", err)
Expand Down
4 changes: 2 additions & 2 deletions clients/shared/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ func Merge(ctx context.Context, dwh destination.DataWarehouse, tableData *optimi
return nil
}

tableConfig, err := dwh.GetTableConfig(tableData)
tableID := dwh.IdentifierFor(tableData.TopicConfig(), tableData.Name())
tableConfig, err := dwh.GetTableConfig(tableID, tableData.TopicConfig().DropDeletedColumns)
if err != nil {
return fmt.Errorf("failed to get table config: %w", err)
}
Expand All @@ -36,7 +37,6 @@ func Merge(ctx context.Context, dwh destination.DataWarehouse, tableData *optimi
tableData.Mode(),
)

tableID := dwh.IdentifierFor(tableData.TopicConfig(), tableData.Name())
if tableConfig.CreateTable() {
if err = CreateTable(ctx, dwh, tableData, tableConfig, opts.ColumnSettings, tableID, false, targetKeysMissing); err != nil {
return fmt.Errorf("failed to create table: %w", err)
Expand Down
2 changes: 1 addition & 1 deletion clients/snowflake/ddl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func (s *SnowflakeTestSuite) TestGetTableConfig() {
tableData := optimization.NewTableData(nil, config.Replication, nil,
kafkalib.TopicConfig{Database: "customers", Schema: "public", TableName: "orders22"}, "foo")

tableConfig, err := s.stageStore.GetTableConfig(tableData)
tableConfig, err := s.stageStore.GetTableConfig(s.identifierFor(tableData), tableData.TopicConfig().DropDeletedColumns)
assert.NotNil(s.T(), tableConfig, "config is nil")
assert.NoError(s.T(), err)

Expand Down
7 changes: 3 additions & 4 deletions clients/snowflake/snowflake.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"github.com/artie-labs/transfer/lib/destination"
"github.com/artie-labs/transfer/lib/destination/types"
"github.com/artie-labs/transfer/lib/kafkalib"
"github.com/artie-labs/transfer/lib/optimization"
"github.com/artie-labs/transfer/lib/sql"
)

Expand All @@ -27,15 +26,15 @@ func (s *Store) IdentifierFor(topicConfig kafkalib.TopicConfig, table string) sq
return dialect.NewTableIdentifier(topicConfig.Database, topicConfig.Schema, table)
}

func (s *Store) GetTableConfig(tableData *optimization.TableData) (*types.DwhTableConfig, error) {
func (s *Store) GetTableConfig(tableID sql.TableIdentifier, dropDeletedColumns bool) (*types.DwhTableConfig, error) {
return shared.GetTableCfgArgs{
Dwh: s,
TableID: s.IdentifierFor(tableData.TopicConfig(), tableData.Name()),
TableID: tableID,
ConfigMap: s.configMap,
ColumnNameForName: "name",
ColumnNameForDataType: "type",
ColumnNameForComment: "comment",
DropDeletedColumns: tableData.TopicConfig().DropDeletedColumns,
DropDeletedColumns: dropDeletedColumns,
}.GetTableConfig()
}

Expand Down
2 changes: 1 addition & 1 deletion lib/destination/dwh.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ type DataWarehouse interface {
Begin() (*sql.Tx, error)

// Helper functions for merge
GetTableConfig(tableData *optimization.TableData) (*types.DwhTableConfig, error)
GetTableConfig(tableID sqllib.TableIdentifier, dropDeletedColumns bool) (*types.DwhTableConfig, error)
PrepareTemporaryTable(ctx context.Context, tableData *optimization.TableData, tableConfig *types.DwhTableConfig, tempTableID sqllib.TableIdentifier, parentTableID sqllib.TableIdentifier, additionalSettings types.AdditionalSettings, createTempTable bool) error
}

Expand Down

0 comments on commit 15ab27e

Please sign in to comment.