Skip to content

Commit

Permalink
Clean up.
Browse files Browse the repository at this point in the history
  • Loading branch information
Tang8330 committed Aug 21, 2024
1 parent 6f8c141 commit aec9973
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 10 deletions.
10 changes: 7 additions & 3 deletions clients/bigquery/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,17 @@ import (
func (s *Store) Merge(tableData *optimization.TableData) error {
var additionalEqualityStrings []string
if tableData.TopicConfig().BigQueryPartitionSettings != nil {
additionalDateFmts := s.config.SharedTransferConfig.TypingSettings.AdditionalDateFormats
distinctDates, err := tableData.DistinctDates(tableData.TopicConfig().BigQueryPartitionSettings.PartitionField, additionalDateFmts)
distinctValues, err := tableData.DistinctTimes(
tableData.TopicConfig().BigQueryPartitionSettings.PartitionField,
s.config.SharedTransferConfig.TypingSettings.AdditionalDateFormats,
tableData.TopicConfig().BigQueryPartitionSettings.PartitionBy.PartitionFormat(),
)

if err != nil {
return fmt.Errorf("failed to generate distinct dates: %w", err)
}

mergeString, err := generateMergeString(tableData.TopicConfig().BigQueryPartitionSettings, s.Dialect(), distinctDates)
mergeString, err := generateMergeString(tableData.TopicConfig().BigQueryPartitionSettings, s.Dialect(), distinctValues)
if err != nil {
return fmt.Errorf("failed to generate merge string: %w", err)
}
Expand Down
16 changes: 16 additions & 0 deletions lib/kafkalib/partition/settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,22 @@ const (
Yearly PartitioningType = "yearly"
)

func (p PartitioningType) PartitionFormat() string {
switch p {
case Hourly:
return "2006-01-02 15"
case Daily:
return "2006-01-02"
case Monthly:
return "2006-01"
case Yearly:
return "2006"
}

return ""

}

var ValidPartitionBy = []PartitioningType{Hourly, Daily, Monthly, Yearly}

// We need the JSON annotations here for our dashboard to import the settings correctly.
Expand Down
18 changes: 11 additions & 7 deletions lib/optimization/table_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,28 +193,32 @@ func (t *TableData) NumberOfRows() uint {
return uint(len(t.rowsData))
}

func (t *TableData) DistinctDates(colName string, additionalDateFmts []string) ([]string, error) {
func (t *TableData) DistinctTimes(colName string, additionalDateFmts []string, format string) ([]string, error) {
if format == "" {
return nil, fmt.Errorf("format cannot be empty")
}

retMap := make(map[string]bool)
for _, row := range t.rowsData {
val, isOk := row[colName]
if !isOk {
return nil, fmt.Errorf("col: %v does not exist on row: %v", colName, row)
return nil, fmt.Errorf("col %q does not exist on row: %v", colName, row)
}

extTime, err := ext.ParseFromInterface(val, additionalDateFmts)
if err != nil {
return nil, fmt.Errorf("col: %v is not a time column, value: %v, err: %w", colName, val, err)
return nil, fmt.Errorf("col %q is not a time column, value: %v, err: %w", colName, val, err)
}

retMap[extTime.String(ext.PostgresDateFormat)] = true
retMap[extTime.String(format)] = true
}

var distinctDates []string
var distinctValues []string
for key := range retMap {
distinctDates = append(distinctDates, key)
distinctValues = append(distinctValues, key)
}

return distinctDates, nil
return distinctValues, nil
}

func (t *TableData) ResetTempTableSuffix() {
Expand Down

0 comments on commit aec9973

Please sign in to comment.