Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Send metrics in FIFO order #7814

Merged
merged 1 commit into from
Jul 14, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@
- [#7558](https://github.com/influxdata/telegraf/issues/7558): Remove trailing backslash from tag keys/values in influx serializer.
- [#7715](https://github.com/influxdata/telegraf/issues/7715): Fix incorrect Azure SQL DB server properties.
- [#7431](https://github.com/influxdata/telegraf/issues/7431): Fix json unmarshal error in the kibana input.
- [#5633](https://github.com/influxdata/telegraf/issues/5633): Send metrics in FIFO order.

## v1.14.5 [2020-06-30]

Expand Down
60 changes: 27 additions & 33 deletions models/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func (b *Buffer) add(m telegraf.Metric) int {
b.metricDropped(b.buf[b.last])
dropped++

if b.last == b.batchFirst && b.batchSize > 0 {
if b.batchSize > 0 {
b.batchSize--
b.batchFirst = b.next(b.batchFirst)
}
Expand Down Expand Up @@ -146,8 +146,8 @@ func (b *Buffer) Add(metrics ...telegraf.Metric) int {
return dropped
}

// Batch returns a slice containing up to batchSize of the most recently added
// metrics. Metrics are ordered from newest to oldest in the batch. The
// Batch returns a slice containing up to batchSize of the oldest metrics not
// yet dropped. Metrics are ordered from oldest to newest in the batch. The
// batch must not be modified by the client.
func (b *Buffer) Batch(batchSize int) []telegraf.Metric {
b.Lock()
Expand All @@ -159,18 +159,17 @@ func (b *Buffer) Batch(batchSize int) []telegraf.Metric {
return out
}

b.batchFirst = b.cap + b.last - outLen
b.batchFirst %= b.cap
b.batchFirst = b.first
b.batchSize = outLen

batchIndex := b.batchFirst
for i := range out {
out[len(out)-1-i] = b.buf[batchIndex]
out[i] = b.buf[batchIndex]
b.buf[batchIndex] = nil
batchIndex = b.next(batchIndex)
}

b.last = b.batchFirst
b.first = b.nextby(b.first, b.batchSize)
b.size -= outLen
return out
}
Expand Down Expand Up @@ -198,38 +197,22 @@ func (b *Buffer) Reject(batch []telegraf.Metric) {
return
}

older := b.dist(b.first, b.batchFirst)
free := b.cap - b.size
restore := min(len(batch), free+older)
restore := min(len(batch), free)
skip := len(batch) - restore

// Rotate newer metrics forward the number of metrics that we can restore.
rb := b.batchFirst
rp := b.last
re := b.nextby(rp, restore)
b.last = re
b.first = b.prevby(b.first, restore)
b.size = min(b.size+restore, b.cap)

for rb != rp && rp != re {
rp = b.prev(rp)
re = b.prev(re)
re := b.first

if b.buf[re] != nil {
b.metricDropped(b.buf[re])
b.first = b.next(b.first)
}

b.buf[re] = b.buf[rp]
b.buf[rp] = nil
}

// Copy metrics from the batch back into the buffer; recall that the
// batch is in reverse order compared to b.buf
// Copy metrics from the batch back into the buffer
for i := range batch {
if i < restore {
re = b.prev(re)
b.buf[re] = batch[i]
b.size = min(b.size+1, b.cap)
} else {
if i < skip {
b.metricDropped(batch[i])
} else {
b.buf[re] = batch[i]
re = b.next(re)
}
}

Expand Down Expand Up @@ -273,6 +256,17 @@ func (b *Buffer) prev(index int) int {
return index
}

// prevby returns the index that is count older with wrapping.
func (b *Buffer) prevby(index, count int) int {
index -= count
for index < 0 {
index += b.cap
}

index %= b.cap
return index
}

func (b *Buffer) resetBatch() {
b.batchFirst = 0
b.batchSize = 0
Expand Down
98 changes: 49 additions & 49 deletions models/buffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ func TestBuffer_BatchLatest(t *testing.T) {

testutil.RequireMetricsEqual(t,
[]telegraf.Metric{
MetricTime(3),
MetricTime(1),
MetricTime(2),
}, batch)
}
Expand All @@ -177,8 +177,8 @@ func TestBuffer_BatchLatestWrap(t *testing.T) {

testutil.RequireMetricsEqual(t,
[]telegraf.Metric{
MetricTime(5),
MetricTime(4),
MetricTime(2),
MetricTime(3),
}, batch)
}

Expand All @@ -193,17 +193,17 @@ func TestBuffer_MultipleBatch(t *testing.T) {
batch := b.Batch(5)
testutil.RequireMetricsEqual(t,
[]telegraf.Metric{
MetricTime(6),
MetricTime(5),
MetricTime(4),
MetricTime(3),
MetricTime(1),
MetricTime(2),
MetricTime(3),
MetricTime(4),
MetricTime(5),
}, batch)
b.Accept(batch)
batch = b.Batch(5)
testutil.RequireMetricsEqual(t,
[]telegraf.Metric{
MetricTime(1),
MetricTime(6),
}, batch)
b.Accept(batch)
}
Expand All @@ -223,11 +223,11 @@ func TestBuffer_RejectWithRoom(t *testing.T) {
batch = b.Batch(5)
testutil.RequireMetricsEqual(t,
[]telegraf.Metric{
MetricTime(5),
MetricTime(4),
MetricTime(3),
MetricTime(2),
MetricTime(1),
MetricTime(2),
MetricTime(3),
MetricTime(4),
MetricTime(5),
}, batch)
}

Expand All @@ -246,11 +246,11 @@ func TestBuffer_RejectNothingNewFull(t *testing.T) {
batch = b.Batch(5)
testutil.RequireMetricsEqual(t,
[]telegraf.Metric{
MetricTime(5),
MetricTime(4),
MetricTime(3),
MetricTime(2),
MetricTime(1),
MetricTime(2),
MetricTime(3),
MetricTime(4),
MetricTime(5),
}, batch)
}

Expand All @@ -275,11 +275,11 @@ func TestBuffer_RejectNoRoom(t *testing.T) {
batch = b.Batch(5)
testutil.RequireMetricsEqual(t,
[]telegraf.Metric{
MetricTime(8),
MetricTime(7),
MetricTime(6),
MetricTime(5),
MetricTime(4),
MetricTime(5),
MetricTime(6),
MetricTime(7),
MetricTime(8),
}, batch)
}

Expand All @@ -299,11 +299,11 @@ func TestBuffer_RejectRoomExact(t *testing.T) {
batch = b.Batch(5)
testutil.RequireMetricsEqual(t,
[]telegraf.Metric{
MetricTime(5),
MetricTime(4),
MetricTime(3),
MetricTime(2),
MetricTime(1),
MetricTime(2),
MetricTime(3),
MetricTime(4),
MetricTime(5),
}, batch)
}

Expand All @@ -324,11 +324,11 @@ func TestBuffer_RejectRoomOverwriteOld(t *testing.T) {
batch = b.Batch(5)
testutil.RequireMetricsEqual(t,
[]telegraf.Metric{
MetricTime(6),
MetricTime(5),
MetricTime(4),
MetricTime(3),
MetricTime(2),
MetricTime(3),
MetricTime(4),
MetricTime(5),
MetricTime(6),
}, batch)
}

Expand All @@ -351,11 +351,11 @@ func TestBuffer_RejectPartialRoom(t *testing.T) {
batch = b.Batch(5)
testutil.RequireMetricsEqual(t,
[]telegraf.Metric{
MetricTime(7),
MetricTime(6),
MetricTime(5),
MetricTime(4),
MetricTime(3),
MetricTime(4),
MetricTime(5),
MetricTime(6),
MetricTime(7),
}, batch)
}

Expand Down Expand Up @@ -394,11 +394,11 @@ func TestBuffer_RejectNewMetricsWrapped(t *testing.T) {
batch = b.Batch(5)
testutil.RequireMetricsEqual(t,
[]telegraf.Metric{
MetricTime(15),
MetricTime(14),
MetricTime(13),
MetricTime(12),
MetricTime(11),
MetricTime(12),
MetricTime(13),
MetricTime(14),
MetricTime(15),
}, batch)
}

Expand All @@ -425,11 +425,11 @@ func TestBuffer_RejectWrapped(t *testing.T) {
batch = b.Batch(5)
testutil.RequireMetricsEqual(t,
[]telegraf.Metric{
MetricTime(12),
MetricTime(11),
MetricTime(10),
MetricTime(9),
MetricTime(8),
MetricTime(9),
MetricTime(10),
MetricTime(11),
MetricTime(12),
}, batch)
}

Expand Down Expand Up @@ -467,16 +467,16 @@ func TestBuffer_RejectAdjustFirst(t *testing.T) {
batch = b.Batch(10)
testutil.RequireMetricsEqual(t,
[]telegraf.Metric{
MetricTime(19),
MetricTime(18),
MetricTime(17),
MetricTime(16),
MetricTime(15),
MetricTime(14),
MetricTime(13),
MetricTime(12),
MetricTime(11),
MetricTime(10),
MetricTime(11),
MetricTime(12),
MetricTime(13),
MetricTime(14),
MetricTime(15),
MetricTime(16),
MetricTime(17),
MetricTime(18),
MetricTime(19),
}, batch)
}

Expand Down
10 changes: 5 additions & 5 deletions models/running_output_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,7 @@ func TestRunningOutputWriteFailOrder(t *testing.T) {
// Verify that 10 metrics were written
assert.Len(t, m.Metrics(), 10)
// Verify that they are in order
expected := append(reverse(next5), reverse(first5)...)
expected := append(first5, next5...)
assert.Equal(t, expected, m.Metrics())
}

Expand Down Expand Up @@ -421,9 +421,9 @@ func TestRunningOutputWriteFailOrder2(t *testing.T) {
// Verify that 20 metrics were written
assert.Len(t, m.Metrics(), 20)
// Verify that they are in order
expected := append(reverse(next5), reverse(first5)...)
expected = append(expected, reverse(next5)...)
expected = append(expected, reverse(first5)...)
expected := append(first5, next5...)
expected = append(expected, first5...)
expected = append(expected, next5...)
assert.Equal(t, expected, m.Metrics())
}

Expand Down Expand Up @@ -464,7 +464,7 @@ func TestRunningOutputWriteFailOrder3(t *testing.T) {
// Verify that 6 metrics were written
assert.Len(t, m.Metrics(), 6)
// Verify that they are in order
expected := []telegraf.Metric{next5[0], first5[4], first5[3], first5[2], first5[1], first5[0]}
expected := []telegraf.Metric{first5[0], first5[1], first5[2], first5[3], first5[4], next5[0]}
assert.Equal(t, expected, m.Metrics())
}

Expand Down