Skip to content

Commit

Permalink
Add min/max time filter to blockscopy and change min duration default…
Browse files Browse the repository at this point in the history
… to 0 (#261)

* Add min/max time filter to blockscopy and change min duration default to 0

Signed-off-by: Marco Pracucci <marco@pracucci.com>

* Explain the time format for min/max time params

Signed-off-by: Marco Pracucci <marco@pracucci.com>

Signed-off-by: Marco Pracucci <marco@pracucci.com>
  • Loading branch information
pracucci authored Sep 28, 2022
1 parent 57cffa0 commit c88f1fa
Showing 1 changed file with 31 additions and 12 deletions.
43 changes: 31 additions & 12 deletions cmd/blockscopy/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ type config struct {
sourceBucket string
destBucket string
minBlockDuration time.Duration
minTime flagext.Time
maxTime flagext.Time
tenantConcurrency int
blocksConcurrency int
copyPeriod time.Duration
Expand All @@ -50,7 +52,9 @@ type config struct {
func (c *config) RegisterFlags(f *flag.FlagSet) {
f.StringVar(&c.sourceBucket, "source-bucket", "", "Source GCS bucket with blocks.")
f.StringVar(&c.destBucket, "destination-bucket", "", "Destination GCS bucket with blocks.")
f.DurationVar(&c.minBlockDuration, "min-block-duration", 24*time.Hour, "If non-zero, ignore blocks that cover block range smaller than this.")
f.DurationVar(&c.minBlockDuration, "min-block-duration", 0, "If non-zero, ignore blocks that cover block range smaller than this.")
f.Var(&c.minTime, "min-time", fmt.Sprintf("If set, only blocks with MinTime >= this value are copied. The supported time format is %q.", time.RFC3339))
f.Var(&c.maxTime, "max-time", fmt.Sprintf("If set, only blocks with MaxTime <= this value are copied. The supported time format is %q.", time.RFC3339))
f.IntVar(&c.tenantConcurrency, "tenant-concurrency", 5, "How many tenants to process at once.")
f.IntVar(&c.blocksConcurrency, "block-concurrency", 5, "How many blocks to copy at once per tenant.")
f.DurationVar(&c.copyPeriod, "copy-period", 0, "How often to repeat the copy. If set to 0, copy is done once, and program stops. Otherwise program keeps running and copying blocks until terminated.")
Expand Down Expand Up @@ -207,29 +211,44 @@ func copyBlocks(ctx context.Context, cfg config, logger log.Logger, m *metrics)

logger := log.With(logger, "block", blockID)

// Skip if the block was already copied.
if markers[blockID].copied {
level.Debug(logger).Log("msg", "skipping block because it has been copied already")
return nil
}

blockMeta, err := loadMetaJSONFile(ctx, sourceBucket, tenantID, blockID)
if err != nil {
level.Error(logger).Log("msg", "skipping block, failed to read meta.json file", "err", err)
return err
}

// Add min/max time to each log entry. This is useful for debugging purposes.
blockMinTime := time.Unix(0, blockMeta.MinTime*int64(time.Millisecond)).UTC()
blockMaxTime := time.Unix(0, blockMeta.MaxTime*int64(time.Millisecond)).UTC()
logger = log.With(logger, "block_min_time", blockMinTime, "block_max_time", blockMaxTime)

if markers[blockID].deletion {
level.Debug(logger).Log("msg", "skipping block because it is marked for deletion")
return nil
}

if cfg.minBlockDuration > 0 {
meta, err := loadMetaJSONFile(ctx, sourceBucket, tenantID, blockID)
if err != nil {
level.Error(logger).Log("msg", "skipping block, failed to read meta.json file", "err", err)
return err
}
// If the min time filter is set, only blocks with MinTime >= the configured value are copied.
if filterMinTime := time.Time(cfg.minTime); !filterMinTime.IsZero() && blockMinTime.Before(filterMinTime) {
level.Debug(logger).Log("msg", "skipping block, block min time is lower than the configured min time filter", "configured_min_time", filterMinTime)
return nil
}

blockDuration := time.Millisecond * time.Duration(meta.MaxTime-meta.MinTime)
if blockDuration < cfg.minBlockDuration {
minTime := time.Unix(0, meta.MinTime*int64(time.Millisecond)).UTC()
maxTime := time.Unix(0, meta.MaxTime*int64(time.Millisecond)).UTC()
// If the max time filter is set, only blocks with MaxTime <= the configured value are copied.
if filterMaxTime := time.Time(cfg.maxTime); !filterMaxTime.IsZero() && blockMaxTime.After(filterMaxTime) {
level.Debug(logger).Log("msg", "skipping block, block max time is greater than the configured max time filter", "configured_max_time", filterMaxTime)
return nil
}

level.Debug(logger).Log("msg", "skipping block, block duration is smaller than minimum duration", "blockDuration", blockDuration, "minimumDuration", cfg.minBlockDuration, "min_time", minTime, "max_time", maxTime)
if cfg.minBlockDuration > 0 {
blockDuration := time.Millisecond * time.Duration(blockMeta.MaxTime-blockMeta.MinTime)
if blockDuration < cfg.minBlockDuration {
level.Debug(logger).Log("msg", "skipping block, block duration is smaller than minimum duration", "block_duration", blockDuration, "configured_min_duration", cfg.minBlockDuration)
return nil
}
}
Expand Down

0 comments on commit c88f1fa

Please sign in to comment.