Skip to content

Commit

Permalink
feat: add initialize setting to dynamic table
Browse files Browse the repository at this point in the history
Dynamic tables have an INITIALIZE parameter that can be specified on
table creation. This parameter determines when the first refresh of the
dynamic table occurs. This option defaults to ON_CREATE, which causes
the dynamic table to execute a refresh immediately upon creation.

There are cases where this behavior is not desirable. For example, if
refreshing takes a long time, we may not want terraform to wait. There
are also some cases where a refresh ON_CREATE fails due to time travel
issues. In these cases, setting the initialize option to ON_SCHEDULE may
be the solution for these issues.

This value does not have its own column in SHOW DYNAMIC TABLES and can
only be parsed from DDL.

This commit also changes the data_timestamp field to a nullable type.
This is necessary because when dynamic tables are set to INITIALIZE =
ON_SCHEDULE, the dynamic table's data_timestamp is null at creation
time, and is only populated when the table refreshes.
  • Loading branch information
sonya committed Jan 27, 2024
1 parent 8c1905f commit 6f7bedd
Show file tree
Hide file tree
Showing 11 changed files with 46 additions and 4 deletions.
1 change: 1 addition & 0 deletions docs/resources/dynamic_table.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ resource "snowflake_dynamic_table" "dt" {
- `comment` (String) Specifies a comment for the dynamic table.
- `or_replace` (Boolean) Specifies whether to replace the dynamic table if it already exists.
- `refresh_mode` (String) INCREMENTAL if the dynamic table will use incremental refreshes, or FULL if it will recompute the whole table on every refresh.
- `initialize` (String) Specifies the behavior of the initial refresh of the dynamic table. This property cannot be altered after you create the dynamic table.

### Read-Only

Expand Down
21 changes: 21 additions & 0 deletions pkg/resources/dynamic_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,15 @@ var dynamicTableShema = map[string]*schema.Schema{
},
DiffSuppressOnRefresh: true,
},
"initialize": {
Type: schema.TypeString,
Optional: true,
Description: "Initialize trigger for the dynamic table. Can only be set on creation.",
ForceNew: true,
DiffSuppressFunc: func(k, oldValue, newValue string, d *schema.ResourceData) bool {
return oldValue == "ON_CREATE" && newValue == ""
},
},
"cluster_by": {
Type: schema.TypeString,
Description: "The clustering key for the dynamic table.",
Expand Down Expand Up @@ -214,6 +223,15 @@ func ReadDynamicTable(d *schema.ResourceData, meta interface{}) error {
return err
}
}
if strings.Contains(dynamicTable.Text, "initialize = 'ON_CREATE'") {
if err := d.Set("initialize", "ON_CREATE"); err != nil {
return err
}
} else if strings.Contains(dynamicTable.Text, "initialize = 'ON_SCHEDULE'") {
if err := d.Set("initialize", "ON_SCHEDULE"); err != nil {
return err
}
}
if err := d.Set("cluster_by", dynamicTable.ClusterBy); err != nil {
return err
}
Expand Down Expand Up @@ -332,6 +350,9 @@ func CreateDynamicTable(d *schema.ResourceData, meta interface{}) error {
if v, ok := d.GetOk("refresh_mode"); ok {
request.WithRefreshMode(sdk.String(v.(string)))
}
if v, ok := d.GetOk("initialize"); ok {
request.WithInitialize(sdk.String(v.(string)))
}
if err := client.DynamicTables.Create(context.Background(), request); err != nil {
return err
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/resources/dynamic_table_acceptance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ func TestAcc_DynamicTable_basic(t *testing.T) {
"database": config.StringVariable(acc.TestDatabaseName),
"schema": config.StringVariable(acc.TestSchemaName),
"warehouse": config.StringVariable(acc.TestWarehouseName),
"initialize": config.StringVariable("ON_SCHEDULE"),
"refresh_mode": config.StringVariable("FULL"),
"query": config.StringVariable(fmt.Sprintf(`select "id" from "%v"."%v"."%v"`, acc.TestDatabaseName, acc.TestSchemaName, tableName)),
"comment": config.StringVariable("Terraform acceptance test"),
Expand All @@ -54,6 +55,7 @@ func TestAcc_DynamicTable_basic(t *testing.T) {
resource.TestCheckResourceAttr(resourceName, "database", acc.TestDatabaseName),
resource.TestCheckResourceAttr(resourceName, "schema", acc.TestSchemaName),
resource.TestCheckResourceAttr(resourceName, "warehouse", acc.TestWarehouseName),
resource.TestCheckResourceAttr(resourceName, "initialize", "ON_SCHEDULE"),
resource.TestCheckResourceAttr(resourceName, "refresh_mode", "FULL"),
resource.TestCheckResourceAttr(resourceName, "target_lag.#", "1"),
resource.TestCheckResourceAttr(resourceName, "target_lag.0.maximum_duration", "2 minutes"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,5 @@ resource "snowflake_dynamic_table" "dt" {
query = var.query
comment = var.comment
refresh_mode = var.refresh_mode
initialize = var.initialize
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ variable "refresh_mode" {
type = string
}

variable "initialize" {
type = string
}

variable "table_name" {
type = string
}
7 changes: 5 additions & 2 deletions pkg/sdk/dynamic_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ type createDynamicTableOptions struct {
dynamicTable bool `ddl:"static" sql:"DYNAMIC TABLE"`
name SchemaObjectIdentifier `ddl:"identifier"`
targetLag TargetLag `ddl:"parameter,no_quotes" sql:"TARGET_LAG"`
Initialize *string `ddl:"parameter,no_quotes" sql:"INITIALIZE"`
RefreshMode *string `ddl:"parameter,no_quotes" sql:"REFRESH_MODE"`
warehouse AccountObjectIdentifier `ddl:"identifier,equals" sql:"WAREHOUSE"`
Comment *string `ddl:"parameter,single_quotes" sql:"COMMENT"`
Expand Down Expand Up @@ -130,7 +131,7 @@ type dynamicTableRow struct {
LastSuspendedOn sql.NullTime `db:"last_suspended_on"`
IsClone bool `db:"is_clone"`
IsReplica bool `db:"is_replica"`
DataTimestamp time.Time `db:"data_timestamp"`
DataTimestamp sql.NullTime `db:"data_timestamp"`
}

func (dtr dynamicTableRow) convert() *DynamicTable {
Expand All @@ -153,11 +154,13 @@ func (dtr dynamicTableRow) convert() *DynamicTable {
SchedulingState: DynamicTableSchedulingState(dtr.SchedulingState),
IsClone: dtr.IsClone,
IsReplica: dtr.IsReplica,
DataTimestamp: dtr.DataTimestamp,
}
if dtr.RefreshModeReason.Valid {
dt.RefreshModeReason = dtr.RefreshModeReason.String
}
if dtr.DataTimestamp.Valid {
dt.DataTimestamp = dtr.DataTimestamp.Time
}
if dtr.LastSuspendedOn.Valid {
dt.LastSuspendedOn = dtr.LastSuspendedOn.Time
}
Expand Down
1 change: 1 addition & 0 deletions pkg/sdk/dynamic_table_dto.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ type CreateDynamicTableRequest struct {

comment *string
refreshMode *string
initialize *string
}

type AlterDynamicTableRequest struct {
Expand Down
5 changes: 5 additions & 0 deletions pkg/sdk/dynamic_table_dto_builders.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@ func (s *CreateDynamicTableRequest) WithRefreshMode(refreshMode *string) *Create
return s
}

func (s *CreateDynamicTableRequest) WithInitialize(initialize *string) *CreateDynamicTableRequest {
s.initialize = initialize
return s
}

func NewAlterDynamicTableRequest(
name SchemaObjectIdentifier,
) *AlterDynamicTableRequest {
Expand Down
1 change: 1 addition & 0 deletions pkg/sdk/dynamic_table_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ func (s *CreateDynamicTableRequest) toOpts() *createDynamicTableOptions {
query: s.query,
Comment: s.comment,
RefreshMode: s.refreshMode,
Initialize: s.initialize,
}
}

Expand Down
3 changes: 2 additions & 1 deletion pkg/sdk/dynamic_table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ func TestDynamicTableCreate(t *testing.T) {
opts.OrReplace = Bool(true)
opts.Comment = String("comment")
opts.RefreshMode = String("FULL")
assertOptsValidAndSQLEquals(t, opts, `CREATE OR REPLACE DYNAMIC TABLE %s TARGET_LAG = '1 minutes' REFRESH_MODE = FULL WAREHOUSE = "warehouse_name" COMMENT = 'comment' AS SELECT product_id, product_name FROM staging_table`, id.FullyQualifiedName())
opts.Initialize = String("ON_SCHEDULE")
assertOptsValidAndSQLEquals(t, opts, `CREATE OR REPLACE DYNAMIC TABLE %s TARGET_LAG = '1 minutes' INITIALIZE = ON_SCHEDULE REFRESH_MODE = FULL WAREHOUSE = "warehouse_name" COMMENT = 'comment' AS SELECT product_id, product_name FROM staging_table`, id.FullyQualifiedName())
})
}

Expand Down
4 changes: 3 additions & 1 deletion pkg/sdk/testint/dynamic_table_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ func TestInt_DynamicTableCreateAndDrop(t *testing.T) {
query := "select id from " + tableTest.ID().FullyQualifiedName()
comment := random.Comment()
refreshMode := "FULL"
err := client.DynamicTables.Create(ctx, sdk.NewCreateDynamicTableRequest(name, testWarehouse(t).ID(), targetLag, query).WithOrReplace(true).WithRefreshMode(&refreshMode).WithComment(&comment))
initialize := "ON_SCHEDULE"
err := client.DynamicTables.Create(ctx, sdk.NewCreateDynamicTableRequest(name, testWarehouse(t).ID(), targetLag, query).WithOrReplace(true).WithInitialize(&initialize).WithRefreshMode(&refreshMode).WithComment(&comment))
require.NoError(t, err)
t.Cleanup(func() {
err = client.DynamicTables.Drop(ctx, sdk.NewDropDynamicTableRequest(name))
Expand All @@ -70,6 +71,7 @@ func TestInt_DynamicTableCreateAndDrop(t *testing.T) {
require.Equal(t, testWarehouse(t).ID().Name(), entity.Warehouse)
require.Equal(t, "DOWNSTREAM", entity.TargetLag)
require.Equal(t, "FULL", entity.RefreshMode)
require.Contains(t, entity.Text, "ON_SCHEDULE")
})
}

Expand Down

0 comments on commit 6f7bedd

Please sign in to comment.