diff --git a/pkg/ruler/appender.go b/pkg/ruler/appender.go index 80e6c3e31d099..e3dae2140a305 100644 --- a/pkg/ruler/appender.go +++ b/pkg/ruler/appender.go @@ -105,9 +105,9 @@ func (a *RemoteWriteAppendable) onEvict(userID, groupKey string) func() { } func (a *RemoteWriteAppender) Append(_ uint64, l labels.Labels, t int64, v float64) (uint64, error) { - a.queue.Append(queueEntry{ - labels: l, - sample: cortexpb.Sample{ + a.queue.Append(TimeSeriesEntry{ + Labels: l, + Sample: cortexpb.Sample{ Value: v, TimestampMs: t, }, diff --git a/pkg/ruler/appender_test.go b/pkg/ruler/appender_test.go index 34e959af72489..b4a4d0a29098f 100644 --- a/pkg/ruler/appender_test.go +++ b/pkg/ruler/appender_test.go @@ -128,9 +128,9 @@ func TestAppendSample(t *testing.T) { ts := time.Now().Unix() val := 91.2 - sample := queueEntry{ - labels: labels, - sample: cortexpb.Sample{ + sample := TimeSeriesEntry{ + Labels: labels, + Sample: cortexpb.Sample{ Value: val, TimestampMs: ts, }, @@ -257,8 +257,8 @@ func TestAppenderEvictOldest(t *testing.T) { require.Equal(t, capacity, appender.queue.Length()) // only two newest samples are kept - require.Equal(t, appender.queue.Entries()[0].(queueEntry).sample.Value, 11.3) - require.Equal(t, appender.queue.Entries()[1].(queueEntry).sample.Value, 11.4) + require.Equal(t, appender.queue.Entries()[0].(TimeSeriesEntry).Sample.Value, 11.3) + require.Equal(t, appender.queue.Entries()[1].(TimeSeriesEntry).Sample.Value, 11.4) } // context is created by ruler, passing along details of the rule being executed @@ -321,7 +321,7 @@ func (c *MockRemoteWriteClient) Name() string { return "" } // Endpoint is the remote read or write endpoint for the storage client. func (c *MockRemoteWriteClient) Endpoint() string { return "" } -func (c *MockRemoteWriteClient) PrepareRequest(queue *util.EvictingQueue) ([]byte, error) { +func (c *MockRemoteWriteClient) PrepareRequest(queue util.Queue) ([]byte, error) { args := c.Called(queue) return args.Get(0).([]byte), args.Error(1) } diff --git a/pkg/ruler/remote_write.go b/pkg/ruler/remote_write.go index 1c4df44857207..e85b4f30b64cb 100644 --- a/pkg/ruler/remote_write.go +++ b/pkg/ruler/remote_write.go @@ -17,15 +17,15 @@ import ( var UserAgent = fmt.Sprintf("loki-remote-write/%s", build.Version) -type queueEntry struct { - labels labels.Labels - sample cortexpb.Sample +type TimeSeriesEntry struct { + Labels labels.Labels + Sample cortexpb.Sample } type RemoteWriter interface { remote.WriteClient - PrepareRequest(queue *util.EvictingQueue) ([]byte, error) + PrepareRequest(queue util.Queue) ([]byte, error) } type RemoteWriteClient struct { @@ -62,7 +62,7 @@ func NewRemoteWriter(cfg Config, userID string) (RemoteWriter, error) { }, nil } -func (r *RemoteWriteClient) prepare(queue *util.EvictingQueue) error { +func (r *RemoteWriteClient) prepare(queue util.Queue) error { // reuse slices, resize if they are not big enough if cap(r.labels) < queue.Length() { r.labels = make([]labels.Labels, 0, queue.Length()) @@ -75,13 +75,13 @@ func (r *RemoteWriteClient) prepare(queue *util.EvictingQueue) error { r.samples = r.samples[:0] for _, entry := range queue.Entries() { - entry, ok := entry.(queueEntry) + entry, ok := entry.(TimeSeriesEntry) if !ok { return fmt.Errorf("queue contains invalid entry of type: %T", entry) } - r.labels = append(r.labels, entry.labels) - r.samples = append(r.samples, entry.sample) + r.labels = append(r.labels, entry.Labels) + r.samples = append(r.samples, entry.Sample) } return nil @@ -89,7 +89,7 @@ func (r *RemoteWriteClient) prepare(queue *util.EvictingQueue) error { // PrepareRequest takes the given queue and serializes it into a compressed // proto write request that will be sent to Cortex -func (r *RemoteWriteClient) PrepareRequest(queue *util.EvictingQueue) ([]byte, error) { +func (r *RemoteWriteClient) PrepareRequest(queue util.Queue) ([]byte, error) { // prepare labels and samples from queue err := r.prepare(queue) if err != nil { diff --git a/pkg/ruler/remote_write_test.go b/pkg/ruler/remote_write_test.go index 9a5efa2460fcb..5716702c3d5a5 100644 --- a/pkg/ruler/remote_write_test.go +++ b/pkg/ruler/remote_write_test.go @@ -32,7 +32,7 @@ func TestPrepare(t *testing.T) { // first start with 10 items for i := 0; i < 10; i++ { - queue.Append(queueEntry{labels: lbs, sample: sample}) + queue.Append(TimeSeriesEntry{Labels: lbs, Sample: sample}) } require.Nil(t, client.prepare(queue)) @@ -45,7 +45,7 @@ func TestPrepare(t *testing.T) { // then resize the internal slices to 100 for i := 0; i < 100; i++ { - queue.Append(queueEntry{labels: lbs, sample: sample}) + queue.Append(TimeSeriesEntry{Labels: lbs, Sample: sample}) } require.Nil(t, client.prepare(queue)) @@ -58,7 +58,7 @@ func TestPrepare(t *testing.T) { // then reuse the existing slice (no resize necessary since 5 < 100) for i := 0; i < 5; i++ { - queue.Append(queueEntry{labels: lbs, sample: sample}) + queue.Append(TimeSeriesEntry{Labels: lbs, Sample: sample}) } require.Nil(t, client.prepare(queue)) diff --git a/pkg/util/queue.go b/pkg/util/queue.go new file mode 100644 index 0000000000000..cd44bb9941703 --- /dev/null +++ b/pkg/util/queue.go @@ -0,0 +1,8 @@ +package util + +type Queue interface { + Append(entry interface{}) + Entries() []interface{} + Length() int + Clear() +}