diff --git a/clients/snowflake/staging.go b/clients/snowflake/staging.go index c7fb9562e..7803965ad 100644 --- a/clients/snowflake/staging.go +++ b/clients/snowflake/staging.go @@ -27,12 +27,6 @@ func castColValStaging(colVal any, colKind columns.Column, additionalDateFmts [] return values.ToString(colVal, colKind, additionalDateFmts) } -// PrepareTemporaryTable does the following: -// 1) Create the temporary table -// 2) Load in-memory table -> CSV -// 3) Runs PUT to upload CSV to Snowflake staging (auto-compression with GZIP) -// 4) Runs COPY INTO with the columns specified into temporary table -// 5) Deletes CSV generated from (2) func (s *Store) PrepareTemporaryTable(tableData *optimization.TableData, tableConfig *types.DwhTableConfig, tempTableName string, additionalSettings types.AdditionalSettings, createTempTable bool) error { if createTempTable { tempAlterTableArgs := ddl.AlterTableArgs{ @@ -51,6 +45,7 @@ func (s *Store) PrepareTemporaryTable(tableData *optimization.TableData, tableCo } } + // Write data into CSV fp, err := s.loadTemporaryTable(tableData, tempTableName) if err != nil { return fmt.Errorf("failed to load temporary table: %w", err) @@ -63,26 +58,25 @@ func (s *Store) PrepareTemporaryTable(tableData *optimization.TableData, tableCo } }() + // Upload the CSV file to Snowflake if _, err = s.Exec(fmt.Sprintf("PUT file://%s @%s AUTO_COMPRESS=TRUE", fp, addPrefixToTableName(tempTableName, "%"))); err != nil { return fmt.Errorf("failed to run PUT for temporary table: %w", err) } + // COPY the CSV file (in Snowflake) into a table copyCommand := fmt.Sprintf("COPY INTO %s (%s) FROM (SELECT %s FROM @%s)", - // Copy into temporary tables (column ...) tempTableName, strings.Join(tableData.ReadOnlyInMemoryCols().GetColumnsToUpdate(s.config.SharedDestinationConfig.UppercaseEscapedNames, &sql.NameArgs{ Escape: true, DestKind: s.Label(), }), ","), - // Escaped columns, TABLE NAME escapeColumns(tableData.ReadOnlyInMemoryCols(), ","), addPrefixToTableName(tempTableName, "%")) if additionalSettings.AdditionalCopyClause != "" { copyCommand += " " + additionalSettings.AdditionalCopyClause } - _, err = s.Exec(copyCommand) - if err != nil { - return fmt.Errorf("failed to load staging file into temporary table: %w", err) + if _, err = s.Exec(copyCommand); err != nil { + return fmt.Errorf("failed to run copy into temporary table: %w", err) } return nil