Skip to content

Commit

Permalink
[Redshift] Improve Sweep query (#428)
Browse files Browse the repository at this point in the history
  • Loading branch information
Tang8330 authored Apr 11, 2024
1 parent 61f6333 commit 64792a0
Show file tree
Hide file tree
Showing 2 changed files with 4 additions and 6 deletions.
3 changes: 2 additions & 1 deletion clients/redshift/redshift.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ func (s *Store) Sweep() error {
return err
}

// `relkind` will filter for only ordinary tables and exclude sequences, views, etc.
queryFunc := func(dbAndSchemaPair kafkalib.DatabaseSchemaPair) (string, []any) {
return `
SELECT
Expand All @@ -82,7 +83,7 @@ FROM
JOIN
PG_CATALOG.PG_NAMESPACE n ON n.oid = c.relnamespace
WHERE
n.nspname = $1 AND c.relname ILIKE $2;`, []any{dbAndSchemaPair.Schema, "%" + constants.ArtiePrefix + "%"}
n.nspname = $1 AND c.relname ILIKE $2 AND c.relkind = 'r';`, []any{dbAndSchemaPair.Schema, "%" + constants.ArtiePrefix + "%"}
}

return shared.Sweep(s, tcs, queryFunc)
Expand Down
7 changes: 2 additions & 5 deletions lib/destination/ddl/expiry.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,10 @@ func ShouldDeleteFromName(name string) bool {
return false
}

return shouldDeleteUnix(nameParts[len(nameParts)-1])
}

func shouldDeleteUnix(unixString string) bool {
unixString := nameParts[len(nameParts)-1]
unix, err := strconv.Atoi(unixString)
if err != nil {
slog.Warn("Failed to parse unix string", slog.Any("err", err), slog.String("unixString", unixString))
slog.Error("Failed to parse unix string", slog.Any("err", err), slog.String("tableName", name), slog.String("unixString", unixString))
return false
}

Expand Down

0 comments on commit 64792a0

Please sign in to comment.