Skip to content

Commit

Permalink
render: merge cached before enter limiter
Browse files Browse the repository at this point in the history
  • Loading branch information
msaf1980 committed Jan 31, 2023
1 parent 03535f6 commit 6629996
Show file tree
Hide file tree
Showing 9 changed files with 244 additions and 158 deletions.
6 changes: 6 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"os"
"regexp"
"sort"
"strconv"
"strings"
"time"

Expand All @@ -31,7 +32,9 @@ type CacheConfig struct {
Size int `toml:"size-mb" json:"size-mb" comment:"cache size"`
MemcachedServers []string `toml:"memcached-servers" json:"memcached-servers" comment:"memcached servers"`
DefaultTimeoutSec int32 `toml:"default-timeout" json:"default-timeout" comment:"default cache ttl"`
DefaultTimeoutStr string `toml:"-" json:"-"`
ShortTimeoutSec int32 `toml:"short-timeout" json:"short-timeout" comment:"short-time cache ttl"`
ShortTimeoutStr string `toml:"-" json:"-"`
FindTimeoutSec int32 `toml:"find-timeout" json:"find-timeout" comment:"finder/tags autocompleter cache ttl"`
ShortDuration time.Duration `toml:"short-duration" json:"short-duration" comment:"maximum diration, used with short_timeout"`
ShortUntilOffsetSec int64 `toml:"short-offset" json:"short-offset" comment:"offset beetween now and until for select short cache timeout"`
Expand Down Expand Up @@ -716,6 +719,9 @@ func CreateCache(cacheName string, cacheConfig *CacheConfig) (cache.BytesCache,
if cacheConfig.ShortUntilOffsetSec == 0 {
cacheConfig.ShortUntilOffsetSec = 120
}
cacheConfig.DefaultTimeoutStr = strconv.Itoa(int(cacheConfig.DefaultTimeoutSec))
cacheConfig.ShortTimeoutStr = strconv.Itoa(int(cacheConfig.ShortTimeoutSec))

switch cacheConfig.Type {
case "memcache":
if len(cacheConfig.MemcachedServers) == 0 {
Expand Down
2 changes: 1 addition & 1 deletion prometheus/querier_select.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func (q *Querier) Select(sortSeries bool, hints *storage.SelectHints, labelsMatc
From: from,
Until: until,
MaxDataPoints: maxDataPoints,
}: &data.Targets{List: []string{}, AM: am},
}: data.NewTargets([]string{}, am),
}
reply, err := multiTarget.Fetch(q.ctx, q.config, config.ContextPrometheus)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions render/data/multi_target.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,9 @@ func MFRToMultiTarget(v3Request *v3pb.MultiFetchRequest) MultiTarget {
}
if _, ok := multiTarget[tf]; ok {
target := multiTarget[tf]
target.List = append(multiTarget[tf].List, m.PathExpression)
target.Append(m.PathExpression)
} else {
multiTarget[tf] = &Targets{List: []string{m.PathExpression}, AM: alias.New()}
multiTarget[tf] = NewTargetsOne(m.PathExpression, len(v3Request.Metrics), alias.New())
}
}
}
Expand Down
11 changes: 4 additions & 7 deletions render/data/query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,13 +69,10 @@ func ageToTimestamp(age int64) int64 {
// fromAge and untilAge are relative age of timeframe
func newCondition(fromAge, untilAge, maxDataPoints int64) *conditions {
tf := TimeFrame{ageToTimestamp(fromAge), ageToTimestamp(untilAge), maxDataPoints}
tt := Targets{
List: []string{"*.name.*"},
AM: newAM(),
pointsTable: "graphite.data",
rollupRules: newRules(false),
}
return &conditions{TimeFrame: &tf, Targets: &tt}
tt := NewTargets([]string{"*.name.*"}, newAM())
tt.pointsTable = "graphite.data"
tt.rollupRules = newRules(false)
return &conditions{TimeFrame: &tf, Targets: tt}
}

func extTableString(et map[string]*strings.Builder) map[string]string {
Expand Down
35 changes: 34 additions & 1 deletion render/data/targets.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,20 @@ import (
"github.com/lomik/graphite-clickhouse/pkg/alias"
)

type Cache struct {
Cached bool
TS int64 // cached timestamp
Timeout int32
TimeoutStr string
Key string // cache key
M *metrics.CacheMetric
}

// Targets represents requested metrics
type Targets struct {
// List contains queried metrics, e.g. [metric.{name1,name2}, metric.name[3-9]]
List []string
List []string
Cache []Cache
// AM stores found expanded metrics
AM *alias.Map
pointsTable string
Expand All @@ -23,6 +33,29 @@ type Targets struct {
queryMetrics *metrics.QueryMetrics
}

func NewTargets(list []string, am *alias.Map) *Targets {
return &Targets{
List: list,
Cache: make([]Cache, len(list)),
AM: am,
}
}

func NewTargetsOne(target string, capacity int, am *alias.Map) *Targets {
list := make([]string, 1, capacity)
list[0] = target
return &Targets{
List: list,
Cache: make([]Cache, 1, capacity),
AM: am,
}
}

func (tt *Targets) Append(target string) {
tt.List = append(tt.List, target)
tt.Cache = append(tt.Cache, Cache{})
}

func (tt *Targets) selectDataTable(cfg *config.Config, tf *TimeFrame, context string) error {
now := time.Now().Unix()

Expand Down
8 changes: 4 additions & 4 deletions render/data/targets_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func TestSelectDataTableTime(t *testing.T) {
}
err := cfg.ProcessDataTables()
assert.NoError(t, err)
tg := Targets{List: []string{"metric"}}
tg := NewTargets([]string{"metric"}, nil)

tests := []struct {
*TimeFrame
Expand Down Expand Up @@ -125,17 +125,17 @@ func TestSelectDataTableMatch(t *testing.T) {
err error
}{
{
&Targets{List: []string{"allinclucive.in.avg", "all.metrics.for.avg"}},
NewTargets([]string{"allinclucive.in.avg", "all.metrics.for.avg"}, nil),
cfg.DataTable[0],
nil,
},
{
&Targets{List: []string{"allinclucive.in.avg", "any.metrics.for.avg"}},
NewTargets([]string{"allinclucive.in.avg", "any.metrics.for.avg"}, nil),
cfg.DataTable[1],
nil,
},
{
&Targets{List: []string{"allinclucive.in.avg", "some.metrics.for.avg"}},
NewTargets([]string{"allinclucive.in.avg", "some.metrics.for.avg"}, nil),
cfg.DataTable[2],
nil,
},
Expand Down
Loading

0 comments on commit 6629996

Please sign in to comment.