Skip to content

Commit

Permalink
Fixing lint errors
Browse files Browse the repository at this point in the history
Clean up WAL dir after test

Signed-off-by: Danny Kopping <danny.kopping@grafana.com>
  • Loading branch information
Danny Kopping committed Sep 19, 2021
1 parent d61540b commit 104b810
Show file tree
Hide file tree
Showing 7 changed files with 95 additions and 34 deletions.
4 changes: 2 additions & 2 deletions pkg/ruler/compat.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ type RulesLimits interface {

// engineQueryFunc returns a new query function using the rules.EngineQueryFunc function
// and passing an altered timestamp.
func engineQueryFunc(logger log.Logger, engine *logql.Engine, overrides RulesLimits, checker readyChecker, userID string) rules.QueryFunc {
func engineQueryFunc(engine *logql.Engine, overrides RulesLimits, checker readyChecker, userID string) rules.QueryFunc {
return rules.QueryFunc(func(ctx context.Context, qs string, t time.Time) (promql.Vector, error) {
// check if storage instance is ready; if not, fail the rule evaluation;
// we do this to prevent an attempt to append new samples before the WAL appender is ready
Expand Down Expand Up @@ -141,7 +141,7 @@ func MultiTenantRuleManager(cfg Config, engine *logql.Engine, overrides RulesLim
registry.configureTenantStorage(userID)

logger = log.With(logger, "user", userID)
queryFunc := engineQueryFunc(logger, engine, overrides, registry, userID)
queryFunc := engineQueryFunc(engine, overrides, registry, userID)
memStore := NewMemStore(userID, queryFunc, newMemstoreMetrics(reg), 5*time.Minute, log.With(logger, "subcomponent", "MemStore"))

mgr := rules.NewManager(&rules.ManagerOptions{
Expand Down
3 changes: 1 addition & 2 deletions pkg/ruler/compat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"time"

"github.com/cortexproject/cortex/pkg/ruler"
"github.com/go-kit/kit/log"
"github.com/prometheus/prometheus/config"
"github.com/stretchr/testify/require"

Expand Down Expand Up @@ -306,7 +305,7 @@ func TestNonMetricQuery(t *testing.T) {
require.Nil(t, err)

engine := logql.NewEngine(logql.EngineOpts{}, &FakeQuerier{}, overrides)
queryFunc := engineQueryFunc(log.NewNopLogger(), engine, overrides, fakeChecker{}, "fake")
queryFunc := engineQueryFunc(engine, overrides, fakeChecker{}, "fake")

_, err = queryFunc(context.TODO(), `{job="nginx"}`, time.Now())
require.Error(t, err, "rule result is not a vector or scalar")
Expand Down
47 changes: 34 additions & 13 deletions pkg/ruler/registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,19 @@ package ruler

import (
"context"
"fmt"
"io/ioutil"
"net/url"
"os"
"testing"
"time"

"github.com/cortexproject/cortex/pkg/util/test"
"github.com/go-kit/log"
promConfig "github.com/prometheus/common/config"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/pkg/relabel"
"github.com/prometheus/prometheus/storage"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/weaveworks/common/user"
Expand Down Expand Up @@ -40,13 +42,13 @@ func newFakeLimits() fakeLimits {
}
}

func setupRegistry(t *testing.T) *walRegistry {
func setupRegistry(t *testing.T, dir string) *walRegistry {
u, _ := url.Parse("http://remote-write")

cfg := Config{
RemoteWrite: RemoteWriteConfig{
Client: config.RemoteWriteConfig{
URL: &promConfig.URL{u},
URL: &promConfig.URL{URL: u},
QueueConfig: config.QueueConfig{
Capacity: defaultCapacity,
},
Expand All @@ -62,7 +64,7 @@ func setupRegistry(t *testing.T) *walRegistry {
ConfigRefreshPeriod: 5 * time.Second,
},
WAL: instance.Config{
Dir: os.TempDir(),
Dir: dir,
},
}

Expand All @@ -75,8 +77,15 @@ func setupRegistry(t *testing.T) *walRegistry {
return reg.(*walRegistry)
}

func createTempWALDir() (string, error) {
return ioutil.TempDir(os.TempDir(), "wal")
}

func TestTenantRemoteWriteConfigWithOverride(t *testing.T) {
reg := setupRegistry(t)
walDir, err := createTempWALDir()
require.NoError(t, err)
reg := setupRegistry(t, walDir)
defer os.RemoveAll(walDir)

tenantCfg, err := reg.getTenantConfig(enabledRWTenant)
require.NoError(t, err)
Expand All @@ -90,7 +99,10 @@ func TestTenantRemoteWriteConfigWithOverride(t *testing.T) {
}

func TestTenantRemoteWriteConfigWithoutOverride(t *testing.T) {
reg := setupRegistry(t)
walDir, err := createTempWALDir()
require.NoError(t, err)
reg := setupRegistry(t, walDir)
defer os.RemoveAll(walDir)

// this tenant has no overrides, so will get defaults
tenantCfg, err := reg.getTenantConfig("unknown")
Expand All @@ -103,7 +115,10 @@ func TestTenantRemoteWriteConfigWithoutOverride(t *testing.T) {
}

func TestTenantRemoteWriteConfigDisabled(t *testing.T) {
reg := setupRegistry(t)
walDir, err := createTempWALDir()
require.NoError(t, err)
reg := setupRegistry(t, walDir)
defer os.RemoveAll(walDir)

tenantCfg, err := reg.getTenantConfig(disabledRWTenant)
require.NoError(t, err)
Expand Down Expand Up @@ -137,22 +152,28 @@ func TestWALRegistryCreation(t *testing.T) {
}

func TestStorageSetup(t *testing.T) {
reg := setupRegistry(t)
walDir, err := createTempWALDir()
require.NoError(t, err)
reg := setupRegistry(t, walDir)
defer os.RemoveAll(walDir)

// once the registry is setup and we configure the tenant storage, we should be able
// to acquire an appender for the WAL storage
reg.configureTenantStorage(enabledRWTenant)

// give the manager some time to spawn its processes
time.Sleep(2 * time.Second)
test.Poll(t, 2*time.Second, true, func() interface{} {
return reg.isReady(enabledRWTenant)
})

app := reg.Appender(user.InjectOrgID(context.Background(), enabledRWTenant))
_, ok := app.(storage.Appender)
assert.Truef(t, ok, "instance is not of expected type")
assert.Equalf(t, "*storage.fanoutAppender", fmt.Sprintf("%T", app), "instance is not of expected type")
}

func TestStorageSetupWithRemoteWriteDisabled(t *testing.T) {
reg := setupRegistry(t)
walDir, err := createTempWALDir()
require.NoError(t, err)
reg := setupRegistry(t, walDir)
defer os.RemoveAll(walDir)

// once the registry is setup and we configure the tenant storage, we should be able
// to acquire an appender for the WAL storage
Expand Down
2 changes: 1 addition & 1 deletion pkg/ruler/storage/instance/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ func (n noopScrapeManager) Get() (*scrape.Manager, error) {
// settings. initialize will be called each time the Instance is run. Prometheus
// components cannot be reused after they are stopped so we need to recreate them
// each run.
func (i *Instance) initialize(ctx context.Context, reg prometheus.Registerer, cfg *Config) error {
func (i *Instance) initialize(_ context.Context, reg prometheus.Registerer, cfg *Config) error {
// explicitly set this in case this function is called multiple times
i.initialized = false

Expand Down
50 changes: 50 additions & 0 deletions pkg/ruler/storage/instance/instance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,22 @@ package instance
import (
"context"
"fmt"
"io/ioutil"
"os"
"strings"
"sync"
"testing"
"time"

"github.com/cortexproject/cortex/pkg/util/test"
"github.com/go-kit/kit/log"
"github.com/go-kit/log/level"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/pkg/exemplar"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/storage"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -150,6 +157,49 @@ func TestRemoteWriteMetricInterceptor_AllValues(t *testing.T) {
require.Equal(t, []float64{12345, 67890}, vals)
}

// TestInstance tests that discovery and scraping are working by using a mock
// instance of the WAL storage and testing that samples get written to it.
// This test touches most of Instance and is enough for a basic integration test.
func TestInstance(t *testing.T) {
walDir, err := ioutil.TempDir(os.TempDir(), "wal")
require.NoError(t, err)
defer os.RemoveAll(walDir)

mockStorage := mockWalStorage{
series: make(map[uint64]int),
directory: walDir,
}
newWal := func(_ prometheus.Registerer) (walStorage, error) { return &mockStorage, nil }

logger := level.NewFilter(log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr)), level.AllowInfo())
cfg := DefaultConfig
cfg.Dir = walDir
inst, err := newInstance(cfg, nil, logger, newWal, "12345")
require.NoError(t, err)
runInstance(t, inst)

// Wait until mockWalStorage is initialized.
test.Poll(t, 10*time.Second, true, func() interface{} {
mockStorage.mut.Lock()
defer mockStorage.mut.Unlock()
return inst.Ready()
})

app := inst.Appender(context.TODO())
refTime := time.Now().UnixNano()

count := 3
for i := 0; i < count; i++ {
_, err := app.Append(0, labels.Labels{
labels.Label{Name: "__name__", Value: "test"},
labels.Label{Name: "iter", Value: fmt.Sprintf("%v", i)},
}, refTime-int64(i), float64(i))

require.NoError(t, err)
}
assert.Len(t, mockStorage.series, count)
}

type mockWalStorage struct {
storage.Queryable
storage.ChunkQueryable
Expand Down
8 changes: 1 addition & 7 deletions pkg/ruler/storage/wal/series.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,10 @@ package wal
import (
"sync"

//"github.com/prometheus/prometheus/pkg/intern"
"github.com/prometheus/prometheus/pkg/labels"
)

// NOTE:
// interning has been disabled since it has not been upstreamed to prometheus/prometheus
// the agent (from which this file was copied) replaces the prometheus/prometheus lib:
// replace github.com/prometheus/prometheus => github.com/grafana/prometheus v1.8.2-0.20210608193638-7b78de4ccffc
// TODO(dannyk): add label set interning

type memSeries struct {
sync.Mutex
Expand Down Expand Up @@ -78,8 +74,6 @@ func (m seriesHashmap) del(hash uint64, ref uint64) {
for _, s := range m[hash] {
if s.ref != ref {
rem = append(rem, s)
} else {
//intern.ReleaseLabels(intern.Global, s.lset)
}
}
if len(rem) == 0 {
Expand Down
15 changes: 6 additions & 9 deletions pkg/ruler/storage/wal/wal.go
Original file line number Diff line number Diff line change
Expand Up @@ -500,16 +500,13 @@ func (w *Storage) recordSize() {
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()

for {
select {
case <-ticker.C:
size, err := dirSize(w.path)
if err != nil {
level.Debug(w.logger).Log("msg", "could not calculate WAL disk size", "path", w.path, "err", err)
continue
}
w.metrics.DiskSize.Set(float64(size))
for range ticker.C {
size, err := dirSize(w.path)
if err != nil {
level.Debug(w.logger).Log("msg", "could not calculate WAL disk size", "path", w.path, "err", err)
continue
}
w.metrics.DiskSize.Set(float64(size))
}
}

Expand Down

0 comments on commit 104b810

Please sign in to comment.