From 86f1e4788833342056c62b103f144b89ada72fbe Mon Sep 17 00:00:00 2001 From: Pete Kruckenberg Date: Sat, 18 Jul 2015 07:36:15 -0600 Subject: [PATCH] compute continuous queries back to 'recompute-no-older-than', bucketing with GROUP BY clause --- etc/config.sample.toml | 1 - services/continuous_querier/config.go | 9 --------- services/continuous_querier/config_test.go | 5 +---- services/continuous_querier/service.go | 23 +--------------------- 4 files changed, 2 insertions(+), 36 deletions(-) diff --git a/etc/config.sample.toml b/etc/config.sample.toml index 7aa78980f1c..990f44ed6b0 100644 --- a/etc/config.sample.toml +++ b/etc/config.sample.toml @@ -172,7 +172,6 @@ reporting-disabled = false [continuous_queries] enabled = true - recompute-previous-n = 2 recompute-no-older-than = "10m" compute-runs-per-interval = 10 compute-no-more-than = "2m" diff --git a/services/continuous_querier/config.go b/services/continuous_querier/config.go index d6f08aea77a..3806d527f24 100644 --- a/services/continuous_querier/config.go +++ b/services/continuous_querier/config.go @@ -7,8 +7,6 @@ import ( ) const ( - DefaultRecomputePreviousN = 2 - DefaultRecomputeNoOlderThan = 10 * time.Minute DefaultComputeRunsPerInterval = 10 @@ -21,12 +19,6 @@ type Config struct { // If this flag is set to false, both the brokers and data nodes should ignore any CQ processing. Enabled bool `toml:"enabled"` - // when continuous queries are run we'll automatically recompute previous intervals - // in case lagged data came in. Set to zero if you never have lagged data. We do - // it this way because invalidating previously computed intervals would be insanely hard - // and expensive. - RecomputePreviousN int `toml:"recompute-previous-n"` - // The RecomputePreviousN setting provides guidance for how far back to recompute, the RecomputeNoOlderThan // setting sets a ceiling on how far back in time it will go. For example, if you have 2 PreviousN // and have this set to 10m, then we'd only compute the previous two intervals for any @@ -53,7 +45,6 @@ type Config struct { func NewConfig() Config { return Config{ Enabled: true, - RecomputePreviousN: DefaultRecomputePreviousN, RecomputeNoOlderThan: toml.Duration(DefaultRecomputeNoOlderThan), ComputeRunsPerInterval: DefaultComputeRunsPerInterval, ComputeNoMoreThan: toml.Duration(DefaultComputeNoMoreThan), diff --git a/services/continuous_querier/config_test.go b/services/continuous_querier/config_test.go index 2a0edc4f2d7..c1f7317fed7 100644 --- a/services/continuous_querier/config_test.go +++ b/services/continuous_querier/config_test.go @@ -12,7 +12,6 @@ func TestConfig_Parse(t *testing.T) { // Parse configuration. var c continuous_querier.Config if _, err := toml.Decode(` -recompute-previous-n = 1 recompute-no-older-than = "10s" compute-runs-per-interval = 2 compute-no-more-than = "20s" @@ -22,9 +21,7 @@ enabled = true } // Validate configuration. - if c.RecomputePreviousN != 1 { - t.Fatalf("unexpected recompute previous n: %d", c.RecomputePreviousN) - } else if time.Duration(c.RecomputeNoOlderThan) != 10*time.Second { + if time.Duration(c.RecomputeNoOlderThan) != 10*time.Second { t.Fatalf("unexpected recompute no older than: %v", c.RecomputeNoOlderThan) } else if c.ComputeRunsPerInterval != 2 { t.Fatalf("unexpected compute runs per interval: %d", c.ComputeRunsPerInterval) diff --git a/services/continuous_querier/service.go b/services/continuous_querier/service.go index df5d640552e..62c55944c05 100644 --- a/services/continuous_querier/service.go +++ b/services/continuous_querier/service.go @@ -230,7 +230,7 @@ func (s *Service) ExecuteContinuousQuery(dbi *meta.DatabaseInfo, cqi *meta.Conti startTime = startTime.Add(-interval) } - if err := cq.q.SetTimeRange(startTime, startTime.Add(interval)); err != nil { + if err := cq.q.SetTimeRange(now.Add(-time.Duration(s.Config.RecomputeNoOlderThan)), startTime.Add(interval)); err != nil { s.Logger.Printf("error setting time range: %s\n", err) } @@ -240,27 +240,6 @@ func (s *Service) ExecuteContinuousQuery(dbi *meta.DatabaseInfo, cqi *meta.Conti return err } - recomputeNoOlderThan := time.Duration(s.Config.RecomputeNoOlderThan) - - for i := 0; i < s.Config.RecomputePreviousN; i++ { - // if we're already more time past the previous window than we're going to look back, stop - if now.Sub(startTime) > recomputeNoOlderThan { - return nil - } - newStartTime := startTime.Add(-interval) - - if err := cq.q.SetTimeRange(newStartTime, startTime); err != nil { - s.Logger.Printf("error setting time range: %s\n", err) - return err - } - - if err := s.runContinuousQueryAndWriteResult(cq); err != nil { - s.Logger.Printf("error during recompute previous: %s. running: %s\n", err, cq.q.String()) - return err - } - - startTime = newStartTime - } return nil }