Skip to content

Commit

Permalink
Improving Transfer's comments.
Browse files Browse the repository at this point in the history
  • Loading branch information
Tang8330 committed Apr 4, 2024
1 parent 40edcae commit 33420eb
Showing 1 changed file with 5 additions and 11 deletions.
16 changes: 5 additions & 11 deletions clients/snowflake/staging.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,6 @@ func castColValStaging(colVal any, colKind columns.Column, additionalDateFmts []
return values.ToString(colVal, colKind, additionalDateFmts)
}

// PrepareTemporaryTable does the following:
// 1) Create the temporary table
// 2) Load in-memory table -> CSV
// 3) Runs PUT to upload CSV to Snowflake staging (auto-compression with GZIP)
// 4) Runs COPY INTO with the columns specified into temporary table
// 5) Deletes CSV generated from (2)
func (s *Store) PrepareTemporaryTable(tableData *optimization.TableData, tableConfig *types.DwhTableConfig, tempTableName string, additionalSettings types.AdditionalSettings, createTempTable bool) error {
if createTempTable {
tempAlterTableArgs := ddl.AlterTableArgs{
Expand All @@ -51,6 +45,7 @@ func (s *Store) PrepareTemporaryTable(tableData *optimization.TableData, tableCo
}
}

// Write data into CSV
fp, err := s.loadTemporaryTable(tableData, tempTableName)
if err != nil {
return fmt.Errorf("failed to load temporary table: %w", err)
Expand All @@ -63,26 +58,25 @@ func (s *Store) PrepareTemporaryTable(tableData *optimization.TableData, tableCo
}
}()

// Upload the CSV file to Snowflake
if _, err = s.Exec(fmt.Sprintf("PUT file://%s @%s AUTO_COMPRESS=TRUE", fp, addPrefixToTableName(tempTableName, "%"))); err != nil {
return fmt.Errorf("failed to run PUT for temporary table: %w", err)
}

// COPY the CSV file (in Snowflake) into a table
copyCommand := fmt.Sprintf("COPY INTO %s (%s) FROM (SELECT %s FROM @%s)",
// Copy into temporary tables (column ...)
tempTableName, strings.Join(tableData.ReadOnlyInMemoryCols().GetColumnsToUpdate(s.config.SharedDestinationConfig.UppercaseEscapedNames, &sql.NameArgs{
Escape: true,
DestKind: s.Label(),
}), ","),
// Escaped columns, TABLE NAME
escapeColumns(tableData.ReadOnlyInMemoryCols(), ","), addPrefixToTableName(tempTableName, "%"))

if additionalSettings.AdditionalCopyClause != "" {
copyCommand += " " + additionalSettings.AdditionalCopyClause
}

_, err = s.Exec(copyCommand)
if err != nil {
return fmt.Errorf("failed to load staging file into temporary table: %w", err)
if _, err = s.Exec(copyCommand); err != nil {
return fmt.Errorf("failed to run copy into temporary table: %w", err)
}

return nil
Expand Down

0 comments on commit 33420eb

Please sign in to comment.