Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extracting queue interface #3816

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions pkg/ruler/appender.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,9 +105,9 @@ func (a *RemoteWriteAppendable) onEvict(userID, groupKey string) func() {
}

func (a *RemoteWriteAppender) Append(_ uint64, l labels.Labels, t int64, v float64) (uint64, error) {
a.queue.Append(queueEntry{
labels: l,
sample: cortexpb.Sample{
a.queue.Append(TimeSeriesEntry{
Labels: l,
Sample: cortexpb.Sample{
Value: v,
TimestampMs: t,
},
Expand Down
12 changes: 6 additions & 6 deletions pkg/ruler/appender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,9 +128,9 @@ func TestAppendSample(t *testing.T) {
ts := time.Now().Unix()
val := 91.2

sample := queueEntry{
labels: labels,
sample: cortexpb.Sample{
sample := TimeSeriesEntry{
Labels: labels,
Sample: cortexpb.Sample{
Value: val,
TimestampMs: ts,
},
Expand Down Expand Up @@ -257,8 +257,8 @@ func TestAppenderEvictOldest(t *testing.T) {
require.Equal(t, capacity, appender.queue.Length())

// only two newest samples are kept
require.Equal(t, appender.queue.Entries()[0].(queueEntry).sample.Value, 11.3)
require.Equal(t, appender.queue.Entries()[1].(queueEntry).sample.Value, 11.4)
require.Equal(t, appender.queue.Entries()[0].(TimeSeriesEntry).Sample.Value, 11.3)
require.Equal(t, appender.queue.Entries()[1].(TimeSeriesEntry).Sample.Value, 11.4)
}

// context is created by ruler, passing along details of the rule being executed
Expand Down Expand Up @@ -321,7 +321,7 @@ func (c *MockRemoteWriteClient) Name() string { return "" }
// Endpoint is the remote read or write endpoint for the storage client.
func (c *MockRemoteWriteClient) Endpoint() string { return "" }

func (c *MockRemoteWriteClient) PrepareRequest(queue *util.EvictingQueue) ([]byte, error) {
func (c *MockRemoteWriteClient) PrepareRequest(queue util.Queue) ([]byte, error) {
args := c.Called(queue)
return args.Get(0).([]byte), args.Error(1)
}
18 changes: 9 additions & 9 deletions pkg/ruler/remote_write.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,15 @@ import (

var UserAgent = fmt.Sprintf("loki-remote-write/%s", build.Version)

type queueEntry struct {
labels labels.Labels
sample cortexpb.Sample
type TimeSeriesEntry struct {
Labels labels.Labels
Sample cortexpb.Sample
}

type RemoteWriter interface {
remote.WriteClient

PrepareRequest(queue *util.EvictingQueue) ([]byte, error)
PrepareRequest(queue util.Queue) ([]byte, error)
}

type RemoteWriteClient struct {
Expand Down Expand Up @@ -62,7 +62,7 @@ func NewRemoteWriter(cfg Config, userID string) (RemoteWriter, error) {
}, nil
}

func (r *RemoteWriteClient) prepare(queue *util.EvictingQueue) error {
func (r *RemoteWriteClient) prepare(queue util.Queue) error {
// reuse slices, resize if they are not big enough
if cap(r.labels) < queue.Length() {
r.labels = make([]labels.Labels, 0, queue.Length())
Expand All @@ -75,21 +75,21 @@ func (r *RemoteWriteClient) prepare(queue *util.EvictingQueue) error {
r.samples = r.samples[:0]

for _, entry := range queue.Entries() {
entry, ok := entry.(queueEntry)
entry, ok := entry.(TimeSeriesEntry)
if !ok {
return fmt.Errorf("queue contains invalid entry of type: %T", entry)
}

r.labels = append(r.labels, entry.labels)
r.samples = append(r.samples, entry.sample)
r.labels = append(r.labels, entry.Labels)
r.samples = append(r.samples, entry.Sample)
}

return nil
}

// PrepareRequest takes the given queue and serializes it into a compressed
// proto write request that will be sent to Cortex
func (r *RemoteWriteClient) PrepareRequest(queue *util.EvictingQueue) ([]byte, error) {
func (r *RemoteWriteClient) PrepareRequest(queue util.Queue) ([]byte, error) {
// prepare labels and samples from queue
err := r.prepare(queue)
if err != nil {
Expand Down
6 changes: 3 additions & 3 deletions pkg/ruler/remote_write_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func TestPrepare(t *testing.T) {

// first start with 10 items
for i := 0; i < 10; i++ {
queue.Append(queueEntry{labels: lbs, sample: sample})
queue.Append(TimeSeriesEntry{Labels: lbs, Sample: sample})
}
require.Nil(t, client.prepare(queue))

Expand All @@ -45,7 +45,7 @@ func TestPrepare(t *testing.T) {

// then resize the internal slices to 100
for i := 0; i < 100; i++ {
queue.Append(queueEntry{labels: lbs, sample: sample})
queue.Append(TimeSeriesEntry{Labels: lbs, Sample: sample})
}
require.Nil(t, client.prepare(queue))

Expand All @@ -58,7 +58,7 @@ func TestPrepare(t *testing.T) {

// then reuse the existing slice (no resize necessary since 5 < 100)
for i := 0; i < 5; i++ {
queue.Append(queueEntry{labels: lbs, sample: sample})
queue.Append(TimeSeriesEntry{Labels: lbs, Sample: sample})
}
require.Nil(t, client.prepare(queue))

Expand Down
8 changes: 8 additions & 0 deletions pkg/util/queue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package util

type Queue interface {
Append(entry interface{})
Entries() []interface{}
Length() int
Clear()
}