From ddeedc7bc2deb28e4ae4985217c1f607cfdc2524 Mon Sep 17 00:00:00 2001 From: "Jonathan A. Sternberg" Date: Fri, 19 Feb 2016 16:05:36 -0500 Subject: [PATCH] Implement derivatives across intervals for aggregate queries For aggregate queries, derivatives will now alter the start time to one interval behind and will use that interval to find the derivative of the first point instead of giving no value for that interval. Null values will still be discarded so if the interval before the one you are querying is null, then it will be discarded like if it were in the middle of the query. You can use `fill(0)` to fill in these values. This does not apply to raw queries yet. Also modified the derivative and difference aggregates to use the stream iterator instead of the reduce slice iterator for space efficiency. Fixes #3247. Contributes to #5943. --- CHANGELOG.md | 1 + cmd/influxd/run/server_test.go | 80 ++++++++-------- influxql/call_iterator.go | 150 ++---------------------------- influxql/functions.go | 162 +++++++++++++++++++++++++++++++++ influxql/iterator.gen.go | 108 ++++++++++++++-------- influxql/iterator.gen.go.tmpl | 27 +++--- influxql/select.go | 15 +++ 7 files changed, 309 insertions(+), 234 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 9f1b0091a1b..9945d5b9220 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,7 @@ - [#6237](https://github.com/influxdata/influxdb/issues/6237): Enable continuous integration testing on Windows platform via AppVeyor. Thanks @mvadu - [#6263](https://github.com/influxdata/influxdb/pull/6263): Reduce UDP Service allocation size. - [#6228](https://github.com/influxdata/influxdb/pull/6228): Support for multiple listeners for collectd and OpenTSDB inputs. +- [#3247](https://github.com/influxdata/influxdb/issues/3247): Implement derivatives across intervals for aggregate queries. ### Bugfixes diff --git a/cmd/influxd/run/server_test.go b/cmd/influxd/run/server_test.go index 3c3ef886f6c..7ba9c674066 100644 --- a/cmd/influxd/run/server_test.go +++ b/cmd/influxd/run/server_test.go @@ -1547,12 +1547,12 @@ cpu value=25 1278010023000000000 &Query{ name: "calculate derivative of count with unit default (2s) group by time", command: `SELECT derivative(count(value)) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s)`, - exp: `{"results":[{"series":[{"name":"cpu","columns":["time","derivative"],"values":[["2010-07-01T18:47:02Z",0]]}]}]}`, + exp: `{"results":[{"series":[{"name":"cpu","columns":["time","derivative"],"values":[["2010-07-01T18:47:00Z",2],["2010-07-01T18:47:02Z",0]]}]}]}`, }, &Query{ name: "calculate derivative of count with unit 4s group by time", command: `SELECT derivative(count(value), 4s) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s)`, - exp: `{"results":[{"series":[{"name":"cpu","columns":["time","derivative"],"values":[["2010-07-01T18:47:02Z",0]]}]}]}`, + exp: `{"results":[{"series":[{"name":"cpu","columns":["time","derivative"],"values":[["2010-07-01T18:47:00Z",4],["2010-07-01T18:47:02Z",0]]}]}]}`, }, &Query{ name: "calculate derivative of mean with unit default (2s) group by time", @@ -1671,12 +1671,12 @@ cpu value=20 1278010021000000000 &Query{ name: "calculate derivative of count with unit default (2s) group by time with fill 0", command: `SELECT derivative(count(value)) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s) fill(0)`, - exp: `{"results":[{"series":[{"name":"cpu","columns":["time","derivative"],"values":[["2010-07-01T18:47:02Z",-2]]}]}]}`, + exp: `{"results":[{"series":[{"name":"cpu","columns":["time","derivative"],"values":[["2010-07-01T18:47:00Z",2],["2010-07-01T18:47:02Z",-2]]}]}]}`, }, &Query{ name: "calculate derivative of count with unit 4s group by time with fill 0", command: `SELECT derivative(count(value), 4s) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s) fill(0)`, - exp: `{"results":[{"series":[{"name":"cpu","columns":["time","derivative"],"values":[["2010-07-01T18:47:02Z",-4]]}]}]}`, + exp: `{"results":[{"series":[{"name":"cpu","columns":["time","derivative"],"values":[["2010-07-01T18:47:00Z",4],["2010-07-01T18:47:02Z",-4]]}]}]}`, }, &Query{ name: "calculate derivative of count with unit default (2s) group by time with fill previous", @@ -1691,12 +1691,12 @@ cpu value=20 1278010021000000000 &Query{ name: "calculate derivative of mean with unit default (2s) group by time with fill 0", command: `SELECT derivative(mean(value)) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s) fill(0)`, - exp: `{"results":[{"series":[{"name":"cpu","columns":["time","derivative"],"values":[["2010-07-01T18:47:02Z",-15]]}]}]}`, + exp: `{"results":[{"series":[{"name":"cpu","columns":["time","derivative"],"values":[["2010-07-01T18:47:00Z",15],["2010-07-01T18:47:02Z",-15]]}]}]}`, }, &Query{ name: "calculate derivative of mean with unit 4s group by time with fill 0", command: `SELECT derivative(mean(value), 4s) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s) fill(0)`, - exp: `{"results":[{"series":[{"name":"cpu","columns":["time","derivative"],"values":[["2010-07-01T18:47:02Z",-30]]}]}]}`, + exp: `{"results":[{"series":[{"name":"cpu","columns":["time","derivative"],"values":[["2010-07-01T18:47:00Z",30],["2010-07-01T18:47:02Z",-30]]}]}]}`, }, &Query{ name: "calculate derivative of mean with unit default (2s) group by time with fill previous", @@ -1711,12 +1711,12 @@ cpu value=20 1278010021000000000 &Query{ name: "calculate derivative of median with unit default (2s) group by time with fill 0", command: `SELECT derivative(median(value)) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s) fill(0)`, - exp: `{"results":[{"series":[{"name":"cpu","columns":["time","derivative"],"values":[["2010-07-01T18:47:02Z",-15]]}]}]}`, + exp: `{"results":[{"series":[{"name":"cpu","columns":["time","derivative"],"values":[["2010-07-01T18:47:00Z",15],["2010-07-01T18:47:02Z",-15]]}]}]}`, }, &Query{ name: "calculate derivative of median with unit 4s group by time with fill 0", command: `SELECT derivative(median(value), 4s) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s) fill(0)`, - exp: `{"results":[{"series":[{"name":"cpu","columns":["time","derivative"],"values":[["2010-07-01T18:47:02Z",-30]]}]}]}`, + exp: `{"results":[{"series":[{"name":"cpu","columns":["time","derivative"],"values":[["2010-07-01T18:47:00Z",30],["2010-07-01T18:47:02Z",-30]]}]}]}`, }, &Query{ name: "calculate derivative of median with unit default (2s) group by time with fill previous", @@ -1731,12 +1731,12 @@ cpu value=20 1278010021000000000 &Query{ name: "calculate derivative of sum with unit default (2s) group by time with fill 0", command: `SELECT derivative(sum(value)) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s) fill(0)`, - exp: `{"results":[{"series":[{"name":"cpu","columns":["time","derivative"],"values":[["2010-07-01T18:47:02Z",-30]]}]}]}`, + exp: `{"results":[{"series":[{"name":"cpu","columns":["time","derivative"],"values":[["2010-07-01T18:47:00Z",30],["2010-07-01T18:47:02Z",-30]]}]}]}`, }, &Query{ name: "calculate derivative of sum with unit 4s group by time with fill 0", command: `SELECT derivative(sum(value), 4s) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s) fill(0)`, - exp: `{"results":[{"series":[{"name":"cpu","columns":["time","derivative"],"values":[["2010-07-01T18:47:02Z",-60]]}]}]}`, + exp: `{"results":[{"series":[{"name":"cpu","columns":["time","derivative"],"values":[["2010-07-01T18:47:00Z",60],["2010-07-01T18:47:02Z",-60]]}]}]}`, }, &Query{ name: "calculate derivative of sum with unit default (2s) group by time with fill previous", @@ -1751,12 +1751,12 @@ cpu value=20 1278010021000000000 &Query{ name: "calculate derivative of first with unit default (2s) group by time with fill 0", command: `SELECT derivative(first(value)) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s) fill(0)`, - exp: `{"results":[{"series":[{"name":"cpu","columns":["time","derivative"],"values":[["2010-07-01T18:47:02Z",-10]]}]}]}`, + exp: `{"results":[{"series":[{"name":"cpu","columns":["time","derivative"],"values":[["2010-07-01T18:47:00Z",10],["2010-07-01T18:47:02Z",-10]]}]}]}`, }, &Query{ name: "calculate derivative of first with unit 4s group by time with fill 0", command: `SELECT derivative(first(value), 4s) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s) fill(0)`, - exp: `{"results":[{"series":[{"name":"cpu","columns":["time","derivative"],"values":[["2010-07-01T18:47:02Z",-20]]}]}]}`, + exp: `{"results":[{"series":[{"name":"cpu","columns":["time","derivative"],"values":[["2010-07-01T18:47:00Z",20],["2010-07-01T18:47:02Z",-20]]}]}]}`, }, &Query{ name: "calculate derivative of first with unit default (2s) group by time with fill previous", @@ -1771,12 +1771,12 @@ cpu value=20 1278010021000000000 &Query{ name: "calculate derivative of last with unit default (2s) group by time with fill 0", command: `SELECT derivative(last(value)) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s) fill(0)`, - exp: `{"results":[{"series":[{"name":"cpu","columns":["time","derivative"],"values":[["2010-07-01T18:47:02Z",-20]]}]}]}`, + exp: `{"results":[{"series":[{"name":"cpu","columns":["time","derivative"],"values":[["2010-07-01T18:47:00Z",20],["2010-07-01T18:47:02Z",-20]]}]}]}`, }, &Query{ name: "calculate derivative of last with unit 4s group by time with fill 0", command: `SELECT derivative(last(value), 4s) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s) fill(0)`, - exp: `{"results":[{"series":[{"name":"cpu","columns":["time","derivative"],"values":[["2010-07-01T18:47:02Z",-40]]}]}]}`, + exp: `{"results":[{"series":[{"name":"cpu","columns":["time","derivative"],"values":[["2010-07-01T18:47:00Z",40],["2010-07-01T18:47:02Z",-40]]}]}]}`, }, &Query{ name: "calculate derivative of last with unit default (2s) group by time with fill previous", @@ -1791,12 +1791,12 @@ cpu value=20 1278010021000000000 &Query{ name: "calculate derivative of min with unit default (2s) group by time with fill 0", command: `SELECT derivative(min(value)) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s) fill(0)`, - exp: `{"results":[{"series":[{"name":"cpu","columns":["time","derivative"],"values":[["2010-07-01T18:47:02Z",-10]]}]}]}`, + exp: `{"results":[{"series":[{"name":"cpu","columns":["time","derivative"],"values":[["2010-07-01T18:47:00Z",10],["2010-07-01T18:47:02Z",-10]]}]}]}`, }, &Query{ name: "calculate derivative of min with unit 4s group by time with fill 0", command: `SELECT derivative(min(value), 4s) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s) fill(0)`, - exp: `{"results":[{"series":[{"name":"cpu","columns":["time","derivative"],"values":[["2010-07-01T18:47:02Z",-20]]}]}]}`, + exp: `{"results":[{"series":[{"name":"cpu","columns":["time","derivative"],"values":[["2010-07-01T18:47:00Z",20],["2010-07-01T18:47:02Z",-20]]}]}]}`, }, &Query{ name: "calculate derivative of min with unit default (2s) group by time with fill previous", @@ -1811,12 +1811,12 @@ cpu value=20 1278010021000000000 &Query{ name: "calculate derivative of max with unit default (2s) group by time with fill 0", command: `SELECT derivative(max(value)) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s) fill(0)`, - exp: `{"results":[{"series":[{"name":"cpu","columns":["time","derivative"],"values":[["2010-07-01T18:47:02Z",-20]]}]}]}`, + exp: `{"results":[{"series":[{"name":"cpu","columns":["time","derivative"],"values":[["2010-07-01T18:47:00Z",20],["2010-07-01T18:47:02Z",-20]]}]}]}`, }, &Query{ name: "calculate derivative of max with unit 4s group by time with fill 0", command: `SELECT derivative(max(value), 4s) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s) fill(0)`, - exp: `{"results":[{"series":[{"name":"cpu","columns":["time","derivative"],"values":[["2010-07-01T18:47:02Z",-40]]}]}]}`, + exp: `{"results":[{"series":[{"name":"cpu","columns":["time","derivative"],"values":[["2010-07-01T18:47:00Z",40],["2010-07-01T18:47:02Z",-40]]}]}]}`, }, &Query{ name: "calculate derivative of max with unit default (2s) group by time with fill previous", @@ -1831,12 +1831,12 @@ cpu value=20 1278010021000000000 &Query{ name: "calculate derivative of percentile with unit default (2s) group by time with fill 0", command: `SELECT derivative(percentile(value, 50)) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s) fill(0)`, - exp: `{"results":[{"series":[{"name":"cpu","columns":["time","derivative"],"values":[["2010-07-01T18:47:02Z",-10]]}]}]}`, + exp: `{"results":[{"series":[{"name":"cpu","columns":["time","derivative"],"values":[["2010-07-01T18:47:00Z",10],["2010-07-01T18:47:02Z",-10]]}]}]}`, }, &Query{ name: "calculate derivative of percentile with unit 4s group by time with fill 0", command: `SELECT derivative(percentile(value, 50), 4s) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s) fill(0)`, - exp: `{"results":[{"series":[{"name":"cpu","columns":["time","derivative"],"values":[["2010-07-01T18:47:02Z",-20]]}]}]}`, + exp: `{"results":[{"series":[{"name":"cpu","columns":["time","derivative"],"values":[["2010-07-01T18:47:00Z",20],["2010-07-01T18:47:02Z",-20]]}]}]}`, }, &Query{ name: "calculate derivative of percentile with unit default (2s) group by time with fill previous", @@ -1887,7 +1887,7 @@ cpu value=25 1278010023000000000 &Query{ name: "calculate difference of count", command: `SELECT difference(count(value)) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s)`, - exp: `{"results":[{"series":[{"name":"cpu","columns":["time","difference"],"values":[["2010-07-01T18:47:02Z",0]]}]}]}`, + exp: `{"results":[{"series":[{"name":"cpu","columns":["time","difference"],"values":[["2010-07-01T18:47:00Z",2],["2010-07-01T18:47:02Z",0]]}]}]}`, }, &Query{ name: "calculate difference of mean", @@ -1966,7 +1966,7 @@ cpu value=20 1278010021000000000 &Query{ name: "calculate difference of count with fill 0", command: `SELECT difference(count(value)) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s) fill(0)`, - exp: `{"results":[{"series":[{"name":"cpu","columns":["time","difference"],"values":[["2010-07-01T18:47:02Z",-2]]}]}]}`, + exp: `{"results":[{"series":[{"name":"cpu","columns":["time","difference"],"values":[["2010-07-01T18:47:00Z",2],["2010-07-01T18:47:02Z",-2]]}]}]}`, }, &Query{ name: "calculate difference of count with fill previous", @@ -1976,7 +1976,7 @@ cpu value=20 1278010021000000000 &Query{ name: "calculate difference of mean with fill 0", command: `SELECT difference(mean(value)) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s) fill(0)`, - exp: `{"results":[{"series":[{"name":"cpu","columns":["time","difference"],"values":[["2010-07-01T18:47:02Z",-15]]}]}]}`, + exp: `{"results":[{"series":[{"name":"cpu","columns":["time","difference"],"values":[["2010-07-01T18:47:00Z",15],["2010-07-01T18:47:02Z",-15]]}]}]}`, }, &Query{ name: "calculate difference of mean with fill previous", @@ -1986,7 +1986,7 @@ cpu value=20 1278010021000000000 &Query{ name: "calculate difference of median with fill 0", command: `SELECT difference(median(value)) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s) fill(0)`, - exp: `{"results":[{"series":[{"name":"cpu","columns":["time","difference"],"values":[["2010-07-01T18:47:02Z",-15]]}]}]}`, + exp: `{"results":[{"series":[{"name":"cpu","columns":["time","difference"],"values":[["2010-07-01T18:47:00Z",15],["2010-07-01T18:47:02Z",-15]]}]}]}`, }, &Query{ name: "calculate difference of median with fill previous", @@ -1996,7 +1996,7 @@ cpu value=20 1278010021000000000 &Query{ name: "calculate difference of sum with fill 0", command: `SELECT difference(sum(value)) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s) fill(0)`, - exp: `{"results":[{"series":[{"name":"cpu","columns":["time","difference"],"values":[["2010-07-01T18:47:02Z",-30]]}]}]}`, + exp: `{"results":[{"series":[{"name":"cpu","columns":["time","difference"],"values":[["2010-07-01T18:47:00Z",30],["2010-07-01T18:47:02Z",-30]]}]}]}`, }, &Query{ name: "calculate difference of sum with fill previous", @@ -2006,7 +2006,7 @@ cpu value=20 1278010021000000000 &Query{ name: "calculate difference of first with fill 0", command: `SELECT difference(first(value)) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s) fill(0)`, - exp: `{"results":[{"series":[{"name":"cpu","columns":["time","difference"],"values":[["2010-07-01T18:47:02Z",-10]]}]}]}`, + exp: `{"results":[{"series":[{"name":"cpu","columns":["time","difference"],"values":[["2010-07-01T18:47:00Z",10],["2010-07-01T18:47:02Z",-10]]}]}]}`, }, &Query{ name: "calculate difference of first with fill previous", @@ -2016,7 +2016,7 @@ cpu value=20 1278010021000000000 &Query{ name: "calculate difference of last with fill 0", command: `SELECT difference(last(value)) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s) fill(0)`, - exp: `{"results":[{"series":[{"name":"cpu","columns":["time","difference"],"values":[["2010-07-01T18:47:02Z",-20]]}]}]}`, + exp: `{"results":[{"series":[{"name":"cpu","columns":["time","difference"],"values":[["2010-07-01T18:47:00Z",20],["2010-07-01T18:47:02Z",-20]]}]}]}`, }, &Query{ name: "calculate difference of last with fill previous", @@ -2026,7 +2026,7 @@ cpu value=20 1278010021000000000 &Query{ name: "calculate difference of min with fill 0", command: `SELECT difference(min(value)) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s) fill(0)`, - exp: `{"results":[{"series":[{"name":"cpu","columns":["time","difference"],"values":[["2010-07-01T18:47:02Z",-10]]}]}]}`, + exp: `{"results":[{"series":[{"name":"cpu","columns":["time","difference"],"values":[["2010-07-01T18:47:00Z",10],["2010-07-01T18:47:02Z",-10]]}]}]}`, }, &Query{ name: "calculate difference of min with fill previous", @@ -2036,7 +2036,7 @@ cpu value=20 1278010021000000000 &Query{ name: "calculate difference of max with fill 0", command: `SELECT difference(max(value)) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s) fill(0)`, - exp: `{"results":[{"series":[{"name":"cpu","columns":["time","difference"],"values":[["2010-07-01T18:47:02Z",-20]]}]}]}`, + exp: `{"results":[{"series":[{"name":"cpu","columns":["time","difference"],"values":[["2010-07-01T18:47:00Z",20],["2010-07-01T18:47:02Z",-20]]}]}]}`, }, &Query{ name: "calculate difference of max with fill previous", @@ -2046,7 +2046,7 @@ cpu value=20 1278010021000000000 &Query{ name: "calculate difference of percentile with fill 0", command: `SELECT difference(percentile(value, 50)) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s) fill(0)`, - exp: `{"results":[{"series":[{"name":"cpu","columns":["time","difference"],"values":[["2010-07-01T18:47:02Z",-10]]}]}]}`, + exp: `{"results":[{"series":[{"name":"cpu","columns":["time","difference"],"values":[["2010-07-01T18:47:00Z",10],["2010-07-01T18:47:02Z",-10]]}]}]}`, }, &Query{ name: "calculate difference of percentile with fill previous", @@ -2094,7 +2094,7 @@ cpu value=35 1278010025000000000 &Query{ name: "calculate moving average of count", command: `SELECT moving_average(count(value), 2) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:05' group by time(2s)`, - exp: `{"results":[{"series":[{"name":"cpu","columns":["time","moving_average"],"values":[["2010-07-01T18:47:02Z",2],["2010-07-01T18:47:04Z",2]]}]}]}`, + exp: `{"results":[{"series":[{"name":"cpu","columns":["time","moving_average"],"values":[["2010-07-01T18:47:00Z",1],["2010-07-01T18:47:02Z",2],["2010-07-01T18:47:04Z",2]]}]}]}`, }, &Query{ name: "calculate moving average of mean", @@ -2175,7 +2175,7 @@ cpu value=35 1278010025000000000 &Query{ name: "calculate moving average of count with fill 0", command: `SELECT moving_average(count(value), 2) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:05' group by time(2s) fill(0)`, - exp: `{"results":[{"series":[{"name":"cpu","columns":["time","moving_average"],"values":[["2010-07-01T18:47:02Z",1],["2010-07-01T18:47:04Z",1]]}]}]}`, + exp: `{"results":[{"series":[{"name":"cpu","columns":["time","moving_average"],"values":[["2010-07-01T18:47:00Z",1],["2010-07-01T18:47:02Z",1],["2010-07-01T18:47:04Z",1]]}]}]}`, }, &Query{ name: "calculate moving average of count with fill previous", @@ -2185,7 +2185,7 @@ cpu value=35 1278010025000000000 &Query{ name: "calculate moving average of mean with fill 0", command: `SELECT moving_average(mean(value), 2) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:05' group by time(2s) fill(0)`, - exp: `{"results":[{"series":[{"name":"cpu","columns":["time","moving_average"],"values":[["2010-07-01T18:47:02Z",6.25],["2010-07-01T18:47:04Z",16.25]]}]}]}`, + exp: `{"results":[{"series":[{"name":"cpu","columns":["time","moving_average"],"values":[["2010-07-01T18:47:00Z",6.25],["2010-07-01T18:47:02Z",6.25],["2010-07-01T18:47:04Z",16.25]]}]}]}`, }, &Query{ name: "calculate moving average of mean with fill previous", @@ -2195,7 +2195,7 @@ cpu value=35 1278010025000000000 &Query{ name: "calculate moving average of median with fill 0", command: `SELECT moving_average(median(value), 2) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:05' group by time(2s) fill(0)`, - exp: `{"results":[{"series":[{"name":"cpu","columns":["time","moving_average"],"values":[["2010-07-01T18:47:02Z",6.25],["2010-07-01T18:47:04Z",16.25]]}]}]}`, + exp: `{"results":[{"series":[{"name":"cpu","columns":["time","moving_average"],"values":[["2010-07-01T18:47:00Z",6.25],["2010-07-01T18:47:02Z",6.25],["2010-07-01T18:47:04Z",16.25]]}]}]}`, }, &Query{ name: "calculate moving average of median with fill previous", @@ -2205,7 +2205,7 @@ cpu value=35 1278010025000000000 &Query{ name: "calculate moving average of sum with fill 0", command: `SELECT moving_average(sum(value), 2) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:05' group by time(2s) fill(0)`, - exp: `{"results":[{"series":[{"name":"cpu","columns":["time","moving_average"],"values":[["2010-07-01T18:47:02Z",12.5],["2010-07-01T18:47:04Z",32.5]]}]}]}`, + exp: `{"results":[{"series":[{"name":"cpu","columns":["time","moving_average"],"values":[["2010-07-01T18:47:00Z",12.5],["2010-07-01T18:47:02Z",12.5],["2010-07-01T18:47:04Z",32.5]]}]}]}`, }, &Query{ name: "calculate moving average of sum with fill previous", @@ -2215,7 +2215,7 @@ cpu value=35 1278010025000000000 &Query{ name: "calculate moving average of first with fill 0", command: `SELECT moving_average(first(value), 2) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:05' group by time(2s) fill(0)`, - exp: `{"results":[{"series":[{"name":"cpu","columns":["time","moving_average"],"values":[["2010-07-01T18:47:02Z",5],["2010-07-01T18:47:04Z",15]]}]}]}`, + exp: `{"results":[{"series":[{"name":"cpu","columns":["time","moving_average"],"values":[["2010-07-01T18:47:00Z",5],["2010-07-01T18:47:02Z",5],["2010-07-01T18:47:04Z",15]]}]}]}`, }, &Query{ name: "calculate moving average of first with fill previous", @@ -2225,7 +2225,7 @@ cpu value=35 1278010025000000000 &Query{ name: "calculate moving average of last with fill 0", command: `SELECT moving_average(last(value), 2) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:05' group by time(2s) fill(0)`, - exp: `{"results":[{"series":[{"name":"cpu","columns":["time","moving_average"],"values":[["2010-07-01T18:47:02Z",7.5],["2010-07-01T18:47:04Z",17.5]]}]}]}`, + exp: `{"results":[{"series":[{"name":"cpu","columns":["time","moving_average"],"values":[["2010-07-01T18:47:00Z",7.5],["2010-07-01T18:47:02Z",7.5],["2010-07-01T18:47:04Z",17.5]]}]}]}`, }, &Query{ name: "calculate moving average of last with fill previous", @@ -2235,7 +2235,7 @@ cpu value=35 1278010025000000000 &Query{ name: "calculate moving average of min with fill 0", command: `SELECT moving_average(min(value), 2) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:05' group by time(2s) fill(0)`, - exp: `{"results":[{"series":[{"name":"cpu","columns":["time","moving_average"],"values":[["2010-07-01T18:47:02Z",5],["2010-07-01T18:47:04Z",15]]}]}]}`, + exp: `{"results":[{"series":[{"name":"cpu","columns":["time","moving_average"],"values":[["2010-07-01T18:47:00Z",5],["2010-07-01T18:47:02Z",5],["2010-07-01T18:47:04Z",15]]}]}]}`, }, &Query{ name: "calculate moving average of min with fill previous", @@ -2245,7 +2245,7 @@ cpu value=35 1278010025000000000 &Query{ name: "calculate moving average of max with fill 0", command: `SELECT moving_average(max(value), 2) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:05' group by time(2s) fill(0)`, - exp: `{"results":[{"series":[{"name":"cpu","columns":["time","moving_average"],"values":[["2010-07-01T18:47:02Z",7.5],["2010-07-01T18:47:04Z",17.5]]}]}]}`, + exp: `{"results":[{"series":[{"name":"cpu","columns":["time","moving_average"],"values":[["2010-07-01T18:47:00Z",7.5],["2010-07-01T18:47:02Z",7.5],["2010-07-01T18:47:04Z",17.5]]}]}]}`, }, &Query{ name: "calculate moving average of max with fill previous", @@ -2255,7 +2255,7 @@ cpu value=35 1278010025000000000 &Query{ name: "calculate moving average of percentile with fill 0", command: `SELECT moving_average(percentile(value, 50), 2) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:05' group by time(2s) fill(0)`, - exp: `{"results":[{"series":[{"name":"cpu","columns":["time","moving_average"],"values":[["2010-07-01T18:47:02Z",5],["2010-07-01T18:47:04Z",15]]}]}]}`, + exp: `{"results":[{"series":[{"name":"cpu","columns":["time","moving_average"],"values":[["2010-07-01T18:47:00Z",5],["2010-07-01T18:47:02Z",5],["2010-07-01T18:47:04Z",15]]}]}]}`, }, &Query{ name: "calculate moving average of percentile with fill previous", diff --git a/influxql/call_iterator.go b/influxql/call_iterator.go index 3676ddec65a..d82a045b77e 100644 --- a/influxql/call_iterator.go +++ b/influxql/call_iterator.go @@ -941,178 +941,44 @@ func NewIntegerPercentileReduceSliceFunc(percentile float64) IntegerReduceSliceF // newDerivativeIterator returns an iterator for operating on a derivative() call. func newDerivativeIterator(input Iterator, opt IteratorOptions, interval Interval, isNonNegative bool) (Iterator, error) { - // Derivatives do not use GROUP BY intervals or time constraints, so clear these options. - opt.Interval = Interval{} - opt.StartTime, opt.EndTime = MinTime, MaxTime - switch input := input.(type) { case FloatIterator: - floatDerivativeReduceSlice := NewFloatDerivativeReduceSliceFunc(interval, isNonNegative) createFn := func() (FloatPointAggregator, FloatPointEmitter) { - fn := NewFloatSliceFuncReducer(floatDerivativeReduceSlice) + fn := NewFloatDerivativeReducer(interval, isNonNegative, opt.Ascending) return fn, fn } - return &floatReduceFloatIterator{input: newBufFloatIterator(input), opt: opt, create: createFn}, nil + return newFloatStreamFloatIterator(input, createFn, opt), nil case IntegerIterator: - integerDerivativeReduceSlice := NewIntegerDerivativeReduceSliceFunc(interval, isNonNegative) createFn := func() (IntegerPointAggregator, FloatPointEmitter) { - fn := NewIntegerSliceFuncFloatReducer(integerDerivativeReduceSlice) + fn := NewIntegerDerivativeReducer(interval, isNonNegative, opt.Ascending) return fn, fn } - return &integerReduceFloatIterator{input: newBufIntegerIterator(input), opt: opt, create: createFn}, nil + return newIntegerStreamFloatIterator(input, createFn, opt), nil default: return nil, fmt.Errorf("unsupported derivative iterator type: %T", input) } } -// NewFloatDerivativeReduceSliceFunc returns the derivative value within a window. -func NewFloatDerivativeReduceSliceFunc(interval Interval, isNonNegative bool) FloatReduceSliceFunc { - prev := FloatPoint{Nil: true} - - return func(a []FloatPoint) []FloatPoint { - if len(a) == 0 { - return a - } else if len(a) == 1 { - return []FloatPoint{{Time: a[0].Time, Nil: true}} - } - - if prev.Nil { - prev = a[0] - } - - output := make([]FloatPoint, 0, len(a)-1) - for i := 1; i < len(a); i++ { - p := &a[i] - - // Calculate the derivative of successive points by dividing the - // difference of each value by the elapsed time normalized to the interval. - diff := p.Value - prev.Value - elapsed := p.Time - prev.Time - - value := 0.0 - if elapsed > 0 { - value = diff / (float64(elapsed) / float64(interval.Duration)) - } - - prev = *p - - // Drop negative values for non-negative derivatives. - if isNonNegative && diff < 0 { - continue - } - - output = append(output, FloatPoint{Time: p.Time, Value: value}) - } - return output - } -} - -// NewIntegerDerivativeReduceSliceFunc returns the derivative value within a window. -func NewIntegerDerivativeReduceSliceFunc(interval Interval, isNonNegative bool) IntegerReduceFloatSliceFunc { - prev := IntegerPoint{Nil: true} - - return func(a []IntegerPoint) []FloatPoint { - if len(a) == 0 { - return []FloatPoint{} - } else if len(a) == 1 { - return []FloatPoint{{Time: a[0].Time, Nil: true}} - } - - if prev.Nil { - prev = a[0] - } - - output := make([]FloatPoint, 0, len(a)-1) - for i := 1; i < len(a); i++ { - p := &a[i] - - // Calculate the derivative of successive points by dividing the - // difference of each value by the elapsed time normalized to the interval. - diff := float64(p.Value - prev.Value) - elapsed := p.Time - prev.Time - - value := 0.0 - if elapsed > 0 { - value = diff / (float64(elapsed) / float64(interval.Duration)) - } - - prev = *p - - // Drop negative values for non-negative derivatives. - if isNonNegative && diff < 0 { - continue - } - - output = append(output, FloatPoint{Time: p.Time, Value: value}) - } - return output - } -} - // newDifferenceIterator returns an iterator for operating on a difference() call. func newDifferenceIterator(input Iterator, opt IteratorOptions) (Iterator, error) { - // Differences do not use GROUP BY intervals or time constraints, so clear these options. - opt.Interval = Interval{} - opt.StartTime, opt.EndTime = MinTime, MaxTime - switch input := input.(type) { case FloatIterator: createFn := func() (FloatPointAggregator, FloatPointEmitter) { - fn := NewFloatSliceFuncReducer(FloatDifferenceReduceSlice) + fn := NewFloatDifferenceReducer() return fn, fn } - return &floatReduceFloatIterator{input: newBufFloatIterator(input), opt: opt, create: createFn}, nil + return newFloatStreamFloatIterator(input, createFn, opt), nil case IntegerIterator: createFn := func() (IntegerPointAggregator, IntegerPointEmitter) { - fn := NewIntegerSliceFuncReducer(IntegerDifferenceReduceSlice) + fn := NewIntegerDifferenceReducer() return fn, fn } - return &integerReduceIntegerIterator{input: newBufIntegerIterator(input), opt: opt, create: createFn}, nil + return newIntegerStreamIntegerIterator(input, createFn, opt), nil default: return nil, fmt.Errorf("unsupported difference iterator type: %T", input) } } -// FloatDifferenceReduceSlice returns the difference values within a window. -func FloatDifferenceReduceSlice(a []FloatPoint) []FloatPoint { - if len(a) < 2 { - return []FloatPoint{} - } - prev := a[0] - - output := make([]FloatPoint, 0, len(a)-1) - for i := 1; i < len(a); i++ { - p := &a[i] - - // Calculate the difference of successive points. - value := p.Value - prev.Value - prev = *p - - output = append(output, FloatPoint{Time: p.Time, Value: value}) - } - return output -} - -// IntegerDifferenceReduceSlice returns the difference values within a window. -func IntegerDifferenceReduceSlice(a []IntegerPoint) []IntegerPoint { - if len(a) < 2 { - return []IntegerPoint{} - } - prev := a[0] - - output := make([]IntegerPoint, 0, len(a)-1) - for i := 1; i < len(a); i++ { - p := &a[i] - - // Calculate the difference of successive points. - value := p.Value - prev.Value - prev = *p - - output = append(output, IntegerPoint{Time: p.Time, Value: value}) - } - return output -} - // newMovingAverageIterator returns an iterator for operating on a moving_average() call. func newMovingAverageIterator(input Iterator, n int, opt IteratorOptions) (Iterator, error) { switch input := input.(type) { diff --git a/influxql/functions.go b/influxql/functions.go index 87d9a0ff881..ffd8402bc3e 100644 --- a/influxql/functions.go +++ b/influxql/functions.go @@ -62,6 +62,168 @@ func (r *IntegerMeanReducer) Emit() []FloatPoint { }} } +// FloatDerivativeReducer calculates the derivative of the aggregated points. +type FloatDerivativeReducer struct { + interval Interval + prev FloatPoint + curr FloatPoint + isNonNegative bool + ascending bool +} + +// NewFloatDerivativeReducer creates a new FloatDerivativeReducer. +func NewFloatDerivativeReducer(interval Interval, isNonNegative, ascending bool) *FloatDerivativeReducer { + return &FloatDerivativeReducer{ + interval: interval, + isNonNegative: isNonNegative, + ascending: ascending, + prev: FloatPoint{Nil: true}, + curr: FloatPoint{Nil: true}, + } +} + +// AggregateFloat aggregates a point into the reducer and updates the current window. +func (r *FloatDerivativeReducer) AggregateFloat(p *FloatPoint) { + r.prev = r.curr + r.curr = *p +} + +// Emit emits the derivative of the reducer at the current point. +func (r *FloatDerivativeReducer) Emit() []FloatPoint { + if !r.prev.Nil { + // Calculate the derivative of successive points by dividing the + // difference of each value by the elapsed time normalized to the interval. + diff := r.curr.Value - r.prev.Value + elapsed := r.curr.Time - r.prev.Time + if !r.ascending { + elapsed = -elapsed + } + + value := 0.0 + if elapsed > 0 { + value = diff / (float64(elapsed) / float64(r.interval.Duration)) + } + + // Drop negative values for non-negative derivatives. + if r.isNonNegative && diff < 0 { + return nil + } + return []FloatPoint{{Time: r.curr.Time, Value: value}} + } + return nil +} + +// IntegerDerivativeReducer calculates the derivative of the aggregated points. +type IntegerDerivativeReducer struct { + interval Interval + prev IntegerPoint + curr IntegerPoint + isNonNegative bool + ascending bool +} + +// NewIntegerDerivativeReducer creates a new IntegerDerivativeReducer. +func NewIntegerDerivativeReducer(interval Interval, isNonNegative, ascending bool) *IntegerDerivativeReducer { + return &IntegerDerivativeReducer{ + interval: interval, + isNonNegative: isNonNegative, + ascending: ascending, + prev: IntegerPoint{Nil: true}, + curr: IntegerPoint{Nil: true}, + } +} + +// AggregateInteger aggregates a point into the reducer and updates the current window. +func (r *IntegerDerivativeReducer) AggregateInteger(p *IntegerPoint) { + r.prev = r.curr + r.curr = *p +} + +// Emit emits the derivative of the reducer at the current point. +func (r *IntegerDerivativeReducer) Emit() []FloatPoint { + if !r.prev.Nil { + // Calculate the derivative of successive points by dividing the + // difference of each value by the elapsed time normalized to the interval. + diff := float64(r.curr.Value - r.prev.Value) + elapsed := r.curr.Time - r.prev.Time + if !r.ascending { + elapsed = -elapsed + } + + value := 0.0 + if elapsed > 0 { + value = diff / (float64(elapsed) / float64(r.interval.Duration)) + } + + // Drop negative values for non-negative derivatives. + if r.isNonNegative && diff < 0 { + return nil + } + return []FloatPoint{{Time: r.curr.Time, Value: value}} + } + return nil +} + +// FloatDifferenceReducer calculates the derivative of the aggregated points. +type FloatDifferenceReducer struct { + prev FloatPoint + curr FloatPoint +} + +// NewFloatDifferenceReducer creates a new FloatDifferenceReducer. +func NewFloatDifferenceReducer() *FloatDifferenceReducer { + return &FloatDifferenceReducer{ + prev: FloatPoint{Nil: true}, + curr: FloatPoint{Nil: true}, + } +} + +// AggregateFloat aggregates a point into the reducer and updates the current window. +func (r *FloatDifferenceReducer) AggregateFloat(p *FloatPoint) { + r.prev = r.curr + r.curr = *p +} + +// Emit emits the difference of the reducer at the current point. +func (r *FloatDifferenceReducer) Emit() []FloatPoint { + if !r.prev.Nil { + // Calculate the difference of successive points. + value := r.curr.Value - r.prev.Value + return []FloatPoint{{Time: r.curr.Time, Value: value}} + } + return nil +} + +// IntegerDifferenceReducer calculates the derivative of the aggregated points. +type IntegerDifferenceReducer struct { + prev IntegerPoint + curr IntegerPoint +} + +// NewIntegerDifferenceReducer creates a new IntegerDifferenceReducer. +func NewIntegerDifferenceReducer() *IntegerDifferenceReducer { + return &IntegerDifferenceReducer{ + prev: IntegerPoint{Nil: true}, + curr: IntegerPoint{Nil: true}, + } +} + +// AggregateInteger aggregates a point into the reducer and updates the current window. +func (r *IntegerDifferenceReducer) AggregateInteger(p *IntegerPoint) { + r.prev = r.curr + r.curr = *p +} + +// Emit emits the difference of the reducer at the current point. +func (r *IntegerDifferenceReducer) Emit() []IntegerPoint { + if !r.prev.Nil { + // Calculate the difference of successive points. + value := r.curr.Value - r.prev.Value + return []IntegerPoint{{Time: r.curr.Time, Value: value}} + } + return nil +} + // FloatMovingAverageReducer calculates the moving average of the aggregated points. type FloatMovingAverageReducer struct { pos int diff --git a/influxql/iterator.gen.go b/influxql/iterator.gen.go index 2e45d17ab4d..da57deae402 100644 --- a/influxql/iterator.gen.go +++ b/influxql/iterator.gen.go @@ -467,7 +467,6 @@ type floatFillIterator struct { startTime int64 endTime int64 auxFields []interface{} - done bool opt IteratorOptions window struct { @@ -488,9 +487,9 @@ func newFloatFillIterator(input FloatIterator, expr Expr, opt IteratorOptions) * var startTime, endTime int64 if opt.Ascending { startTime, _ = opt.Window(opt.StartTime) - _, endTime = opt.Window(opt.EndTime) + endTime, _ = opt.Window(opt.EndTime) } else { - _, startTime = opt.Window(opt.EndTime) + startTime, _ = opt.Window(opt.EndTime) endTime, _ = opt.Window(opt.StartTime) } @@ -512,7 +511,11 @@ func newFloatFillIterator(input FloatIterator, expr Expr, opt IteratorOptions) * itr.window.name, itr.window.tags = p.Name, p.Tags itr.window.time = itr.startTime } else { - itr.window.time = itr.endTime + if opt.Ascending { + itr.window.time = itr.endTime + 1 + } else { + itr.window.time = itr.endTime - 1 + } } return itr } @@ -528,7 +531,7 @@ func (itr *floatFillIterator) Next() *FloatPoint { // If we are inside of an interval, unread the point and continue below to // constructing a new point. if itr.opt.Ascending { - if itr.window.time < itr.endTime { + if itr.window.time <= itr.endTime { itr.input.unread(p) p = nil break @@ -555,7 +558,7 @@ func (itr *floatFillIterator) Next() *FloatPoint { } // Check if the point is our next expected point. - if p == nil || p.Time > itr.window.time { + if p == nil || (itr.opt.Ascending && p.Time > itr.window.time) || (!itr.opt.Ascending && p.Time < itr.window.time) { if p != nil { itr.input.unread(p) } @@ -899,7 +902,7 @@ func (itr *floatReduceFloatIterator) reduce() []FloatPoint { return a } -// floatStreamFloatIterator +// floatStreamFloatIterator streams inputs into the iterator and emits points gradually. type floatStreamFloatIterator struct { input *bufFloatIterator create func() (FloatPointAggregator, FloatPointEmitter) @@ -908,6 +911,7 @@ type floatStreamFloatIterator struct { points []FloatPoint } +// newFloatStreamFloatIterator returns a new instance of floatStreamFloatIterator. func newFloatStreamFloatIterator(input FloatIterator, createFn func() (FloatPointAggregator, FloatPointEmitter), opt IteratorOptions) *floatStreamFloatIterator { return &floatStreamFloatIterator{ input: newBufFloatIterator(input), @@ -1123,7 +1127,7 @@ func (itr *floatReduceIntegerIterator) reduce() []IntegerPoint { return a } -// floatStreamIntegerIterator +// floatStreamIntegerIterator streams inputs into the iterator and emits points gradually. type floatStreamIntegerIterator struct { input *bufFloatIterator create func() (FloatPointAggregator, IntegerPointEmitter) @@ -1132,6 +1136,7 @@ type floatStreamIntegerIterator struct { points []IntegerPoint } +// newFloatStreamIntegerIterator returns a new instance of floatStreamIntegerIterator. func newFloatStreamIntegerIterator(input FloatIterator, createFn func() (FloatPointAggregator, IntegerPointEmitter), opt IteratorOptions) *floatStreamIntegerIterator { return &floatStreamIntegerIterator{ input: newBufFloatIterator(input), @@ -1347,7 +1352,7 @@ func (itr *floatReduceStringIterator) reduce() []StringPoint { return a } -// floatStreamStringIterator +// floatStreamStringIterator streams inputs into the iterator and emits points gradually. type floatStreamStringIterator struct { input *bufFloatIterator create func() (FloatPointAggregator, StringPointEmitter) @@ -1356,6 +1361,7 @@ type floatStreamStringIterator struct { points []StringPoint } +// newFloatStreamStringIterator returns a new instance of floatStreamStringIterator. func newFloatStreamStringIterator(input FloatIterator, createFn func() (FloatPointAggregator, StringPointEmitter), opt IteratorOptions) *floatStreamStringIterator { return &floatStreamStringIterator{ input: newBufFloatIterator(input), @@ -1571,7 +1577,7 @@ func (itr *floatReduceBooleanIterator) reduce() []BooleanPoint { return a } -// floatStreamBooleanIterator +// floatStreamBooleanIterator streams inputs into the iterator and emits points gradually. type floatStreamBooleanIterator struct { input *bufFloatIterator create func() (FloatPointAggregator, BooleanPointEmitter) @@ -1580,6 +1586,7 @@ type floatStreamBooleanIterator struct { points []BooleanPoint } +// newFloatStreamBooleanIterator returns a new instance of floatStreamBooleanIterator. func newFloatStreamBooleanIterator(input FloatIterator, createFn func() (FloatPointAggregator, BooleanPointEmitter), opt IteratorOptions) *floatStreamBooleanIterator { return &floatStreamBooleanIterator{ input: newBufFloatIterator(input), @@ -2279,7 +2286,6 @@ type integerFillIterator struct { startTime int64 endTime int64 auxFields []interface{} - done bool opt IteratorOptions window struct { @@ -2300,9 +2306,9 @@ func newIntegerFillIterator(input IntegerIterator, expr Expr, opt IteratorOption var startTime, endTime int64 if opt.Ascending { startTime, _ = opt.Window(opt.StartTime) - _, endTime = opt.Window(opt.EndTime) + endTime, _ = opt.Window(opt.EndTime) } else { - _, startTime = opt.Window(opt.EndTime) + startTime, _ = opt.Window(opt.EndTime) endTime, _ = opt.Window(opt.StartTime) } @@ -2324,7 +2330,11 @@ func newIntegerFillIterator(input IntegerIterator, expr Expr, opt IteratorOption itr.window.name, itr.window.tags = p.Name, p.Tags itr.window.time = itr.startTime } else { - itr.window.time = itr.endTime + if opt.Ascending { + itr.window.time = itr.endTime + 1 + } else { + itr.window.time = itr.endTime - 1 + } } return itr } @@ -2340,7 +2350,7 @@ func (itr *integerFillIterator) Next() *IntegerPoint { // If we are inside of an interval, unread the point and continue below to // constructing a new point. if itr.opt.Ascending { - if itr.window.time < itr.endTime { + if itr.window.time <= itr.endTime { itr.input.unread(p) p = nil break @@ -2367,7 +2377,7 @@ func (itr *integerFillIterator) Next() *IntegerPoint { } // Check if the point is our next expected point. - if p == nil || p.Time > itr.window.time { + if p == nil || (itr.opt.Ascending && p.Time > itr.window.time) || (!itr.opt.Ascending && p.Time < itr.window.time) { if p != nil { itr.input.unread(p) } @@ -2708,7 +2718,7 @@ func (itr *integerReduceFloatIterator) reduce() []FloatPoint { return a } -// integerStreamFloatIterator +// integerStreamFloatIterator streams inputs into the iterator and emits points gradually. type integerStreamFloatIterator struct { input *bufIntegerIterator create func() (IntegerPointAggregator, FloatPointEmitter) @@ -2717,6 +2727,7 @@ type integerStreamFloatIterator struct { points []FloatPoint } +// newIntegerStreamFloatIterator returns a new instance of integerStreamFloatIterator. func newIntegerStreamFloatIterator(input IntegerIterator, createFn func() (IntegerPointAggregator, FloatPointEmitter), opt IteratorOptions) *integerStreamFloatIterator { return &integerStreamFloatIterator{ input: newBufIntegerIterator(input), @@ -2932,7 +2943,7 @@ func (itr *integerReduceIntegerIterator) reduce() []IntegerPoint { return a } -// integerStreamIntegerIterator +// integerStreamIntegerIterator streams inputs into the iterator and emits points gradually. type integerStreamIntegerIterator struct { input *bufIntegerIterator create func() (IntegerPointAggregator, IntegerPointEmitter) @@ -2941,6 +2952,7 @@ type integerStreamIntegerIterator struct { points []IntegerPoint } +// newIntegerStreamIntegerIterator returns a new instance of integerStreamIntegerIterator. func newIntegerStreamIntegerIterator(input IntegerIterator, createFn func() (IntegerPointAggregator, IntegerPointEmitter), opt IteratorOptions) *integerStreamIntegerIterator { return &integerStreamIntegerIterator{ input: newBufIntegerIterator(input), @@ -3156,7 +3168,7 @@ func (itr *integerReduceStringIterator) reduce() []StringPoint { return a } -// integerStreamStringIterator +// integerStreamStringIterator streams inputs into the iterator and emits points gradually. type integerStreamStringIterator struct { input *bufIntegerIterator create func() (IntegerPointAggregator, StringPointEmitter) @@ -3165,6 +3177,7 @@ type integerStreamStringIterator struct { points []StringPoint } +// newIntegerStreamStringIterator returns a new instance of integerStreamStringIterator. func newIntegerStreamStringIterator(input IntegerIterator, createFn func() (IntegerPointAggregator, StringPointEmitter), opt IteratorOptions) *integerStreamStringIterator { return &integerStreamStringIterator{ input: newBufIntegerIterator(input), @@ -3380,7 +3393,7 @@ func (itr *integerReduceBooleanIterator) reduce() []BooleanPoint { return a } -// integerStreamBooleanIterator +// integerStreamBooleanIterator streams inputs into the iterator and emits points gradually. type integerStreamBooleanIterator struct { input *bufIntegerIterator create func() (IntegerPointAggregator, BooleanPointEmitter) @@ -3389,6 +3402,7 @@ type integerStreamBooleanIterator struct { points []BooleanPoint } +// newIntegerStreamBooleanIterator returns a new instance of integerStreamBooleanIterator. func newIntegerStreamBooleanIterator(input IntegerIterator, createFn func() (IntegerPointAggregator, BooleanPointEmitter), opt IteratorOptions) *integerStreamBooleanIterator { return &integerStreamBooleanIterator{ input: newBufIntegerIterator(input), @@ -4088,7 +4102,6 @@ type stringFillIterator struct { startTime int64 endTime int64 auxFields []interface{} - done bool opt IteratorOptions window struct { @@ -4109,9 +4122,9 @@ func newStringFillIterator(input StringIterator, expr Expr, opt IteratorOptions) var startTime, endTime int64 if opt.Ascending { startTime, _ = opt.Window(opt.StartTime) - _, endTime = opt.Window(opt.EndTime) + endTime, _ = opt.Window(opt.EndTime) } else { - _, startTime = opt.Window(opt.EndTime) + startTime, _ = opt.Window(opt.EndTime) endTime, _ = opt.Window(opt.StartTime) } @@ -4133,7 +4146,11 @@ func newStringFillIterator(input StringIterator, expr Expr, opt IteratorOptions) itr.window.name, itr.window.tags = p.Name, p.Tags itr.window.time = itr.startTime } else { - itr.window.time = itr.endTime + if opt.Ascending { + itr.window.time = itr.endTime + 1 + } else { + itr.window.time = itr.endTime - 1 + } } return itr } @@ -4149,7 +4166,7 @@ func (itr *stringFillIterator) Next() *StringPoint { // If we are inside of an interval, unread the point and continue below to // constructing a new point. if itr.opt.Ascending { - if itr.window.time < itr.endTime { + if itr.window.time <= itr.endTime { itr.input.unread(p) p = nil break @@ -4176,7 +4193,7 @@ func (itr *stringFillIterator) Next() *StringPoint { } // Check if the point is our next expected point. - if p == nil || p.Time > itr.window.time { + if p == nil || (itr.opt.Ascending && p.Time > itr.window.time) || (!itr.opt.Ascending && p.Time < itr.window.time) { if p != nil { itr.input.unread(p) } @@ -4517,7 +4534,7 @@ func (itr *stringReduceFloatIterator) reduce() []FloatPoint { return a } -// stringStreamFloatIterator +// stringStreamFloatIterator streams inputs into the iterator and emits points gradually. type stringStreamFloatIterator struct { input *bufStringIterator create func() (StringPointAggregator, FloatPointEmitter) @@ -4526,6 +4543,7 @@ type stringStreamFloatIterator struct { points []FloatPoint } +// newStringStreamFloatIterator returns a new instance of stringStreamFloatIterator. func newStringStreamFloatIterator(input StringIterator, createFn func() (StringPointAggregator, FloatPointEmitter), opt IteratorOptions) *stringStreamFloatIterator { return &stringStreamFloatIterator{ input: newBufStringIterator(input), @@ -4741,7 +4759,7 @@ func (itr *stringReduceIntegerIterator) reduce() []IntegerPoint { return a } -// stringStreamIntegerIterator +// stringStreamIntegerIterator streams inputs into the iterator and emits points gradually. type stringStreamIntegerIterator struct { input *bufStringIterator create func() (StringPointAggregator, IntegerPointEmitter) @@ -4750,6 +4768,7 @@ type stringStreamIntegerIterator struct { points []IntegerPoint } +// newStringStreamIntegerIterator returns a new instance of stringStreamIntegerIterator. func newStringStreamIntegerIterator(input StringIterator, createFn func() (StringPointAggregator, IntegerPointEmitter), opt IteratorOptions) *stringStreamIntegerIterator { return &stringStreamIntegerIterator{ input: newBufStringIterator(input), @@ -4965,7 +4984,7 @@ func (itr *stringReduceStringIterator) reduce() []StringPoint { return a } -// stringStreamStringIterator +// stringStreamStringIterator streams inputs into the iterator and emits points gradually. type stringStreamStringIterator struct { input *bufStringIterator create func() (StringPointAggregator, StringPointEmitter) @@ -4974,6 +4993,7 @@ type stringStreamStringIterator struct { points []StringPoint } +// newStringStreamStringIterator returns a new instance of stringStreamStringIterator. func newStringStreamStringIterator(input StringIterator, createFn func() (StringPointAggregator, StringPointEmitter), opt IteratorOptions) *stringStreamStringIterator { return &stringStreamStringIterator{ input: newBufStringIterator(input), @@ -5189,7 +5209,7 @@ func (itr *stringReduceBooleanIterator) reduce() []BooleanPoint { return a } -// stringStreamBooleanIterator +// stringStreamBooleanIterator streams inputs into the iterator and emits points gradually. type stringStreamBooleanIterator struct { input *bufStringIterator create func() (StringPointAggregator, BooleanPointEmitter) @@ -5198,6 +5218,7 @@ type stringStreamBooleanIterator struct { points []BooleanPoint } +// newStringStreamBooleanIterator returns a new instance of stringStreamBooleanIterator. func newStringStreamBooleanIterator(input StringIterator, createFn func() (StringPointAggregator, BooleanPointEmitter), opt IteratorOptions) *stringStreamBooleanIterator { return &stringStreamBooleanIterator{ input: newBufStringIterator(input), @@ -5897,7 +5918,6 @@ type booleanFillIterator struct { startTime int64 endTime int64 auxFields []interface{} - done bool opt IteratorOptions window struct { @@ -5918,9 +5938,9 @@ func newBooleanFillIterator(input BooleanIterator, expr Expr, opt IteratorOption var startTime, endTime int64 if opt.Ascending { startTime, _ = opt.Window(opt.StartTime) - _, endTime = opt.Window(opt.EndTime) + endTime, _ = opt.Window(opt.EndTime) } else { - _, startTime = opt.Window(opt.EndTime) + startTime, _ = opt.Window(opt.EndTime) endTime, _ = opt.Window(opt.StartTime) } @@ -5942,7 +5962,11 @@ func newBooleanFillIterator(input BooleanIterator, expr Expr, opt IteratorOption itr.window.name, itr.window.tags = p.Name, p.Tags itr.window.time = itr.startTime } else { - itr.window.time = itr.endTime + if opt.Ascending { + itr.window.time = itr.endTime + 1 + } else { + itr.window.time = itr.endTime - 1 + } } return itr } @@ -5958,7 +5982,7 @@ func (itr *booleanFillIterator) Next() *BooleanPoint { // If we are inside of an interval, unread the point and continue below to // constructing a new point. if itr.opt.Ascending { - if itr.window.time < itr.endTime { + if itr.window.time <= itr.endTime { itr.input.unread(p) p = nil break @@ -5985,7 +6009,7 @@ func (itr *booleanFillIterator) Next() *BooleanPoint { } // Check if the point is our next expected point. - if p == nil || p.Time > itr.window.time { + if p == nil || (itr.opt.Ascending && p.Time > itr.window.time) || (!itr.opt.Ascending && p.Time < itr.window.time) { if p != nil { itr.input.unread(p) } @@ -6326,7 +6350,7 @@ func (itr *booleanReduceFloatIterator) reduce() []FloatPoint { return a } -// booleanStreamFloatIterator +// booleanStreamFloatIterator streams inputs into the iterator and emits points gradually. type booleanStreamFloatIterator struct { input *bufBooleanIterator create func() (BooleanPointAggregator, FloatPointEmitter) @@ -6335,6 +6359,7 @@ type booleanStreamFloatIterator struct { points []FloatPoint } +// newBooleanStreamFloatIterator returns a new instance of booleanStreamFloatIterator. func newBooleanStreamFloatIterator(input BooleanIterator, createFn func() (BooleanPointAggregator, FloatPointEmitter), opt IteratorOptions) *booleanStreamFloatIterator { return &booleanStreamFloatIterator{ input: newBufBooleanIterator(input), @@ -6550,7 +6575,7 @@ func (itr *booleanReduceIntegerIterator) reduce() []IntegerPoint { return a } -// booleanStreamIntegerIterator +// booleanStreamIntegerIterator streams inputs into the iterator and emits points gradually. type booleanStreamIntegerIterator struct { input *bufBooleanIterator create func() (BooleanPointAggregator, IntegerPointEmitter) @@ -6559,6 +6584,7 @@ type booleanStreamIntegerIterator struct { points []IntegerPoint } +// newBooleanStreamIntegerIterator returns a new instance of booleanStreamIntegerIterator. func newBooleanStreamIntegerIterator(input BooleanIterator, createFn func() (BooleanPointAggregator, IntegerPointEmitter), opt IteratorOptions) *booleanStreamIntegerIterator { return &booleanStreamIntegerIterator{ input: newBufBooleanIterator(input), @@ -6774,7 +6800,7 @@ func (itr *booleanReduceStringIterator) reduce() []StringPoint { return a } -// booleanStreamStringIterator +// booleanStreamStringIterator streams inputs into the iterator and emits points gradually. type booleanStreamStringIterator struct { input *bufBooleanIterator create func() (BooleanPointAggregator, StringPointEmitter) @@ -6783,6 +6809,7 @@ type booleanStreamStringIterator struct { points []StringPoint } +// newBooleanStreamStringIterator returns a new instance of booleanStreamStringIterator. func newBooleanStreamStringIterator(input BooleanIterator, createFn func() (BooleanPointAggregator, StringPointEmitter), opt IteratorOptions) *booleanStreamStringIterator { return &booleanStreamStringIterator{ input: newBufBooleanIterator(input), @@ -6998,7 +7025,7 @@ func (itr *booleanReduceBooleanIterator) reduce() []BooleanPoint { return a } -// booleanStreamBooleanIterator +// booleanStreamBooleanIterator streams inputs into the iterator and emits points gradually. type booleanStreamBooleanIterator struct { input *bufBooleanIterator create func() (BooleanPointAggregator, BooleanPointEmitter) @@ -7007,6 +7034,7 @@ type booleanStreamBooleanIterator struct { points []BooleanPoint } +// newBooleanStreamBooleanIterator returns a new instance of booleanStreamBooleanIterator. func newBooleanStreamBooleanIterator(input BooleanIterator, createFn func() (BooleanPointAggregator, BooleanPointEmitter), opt IteratorOptions) *booleanStreamBooleanIterator { return &booleanStreamBooleanIterator{ input: newBufBooleanIterator(input), diff --git a/influxql/iterator.gen.go.tmpl b/influxql/iterator.gen.go.tmpl index 397ed9e6022..d66bf70d97b 100644 --- a/influxql/iterator.gen.go.tmpl +++ b/influxql/iterator.gen.go.tmpl @@ -466,7 +466,6 @@ type {{$k.name}}FillIterator struct { startTime int64 endTime int64 auxFields []interface{} - done bool opt IteratorOptions window struct { @@ -487,9 +486,9 @@ func new{{$k.Name}}FillIterator(input {{$k.Name}}Iterator, expr Expr, opt Iterat var startTime, endTime int64 if opt.Ascending { startTime, _ = opt.Window(opt.StartTime) - _, endTime = opt.Window(opt.EndTime) + endTime, _ = opt.Window(opt.EndTime) } else { - _, startTime = opt.Window(opt.EndTime) + startTime, _ = opt.Window(opt.EndTime) endTime, _ = opt.Window(opt.StartTime) } @@ -511,7 +510,11 @@ func new{{$k.Name}}FillIterator(input {{$k.Name}}Iterator, expr Expr, opt Iterat itr.window.name, itr.window.tags = p.Name, p.Tags itr.window.time = itr.startTime } else { - itr.window.time = itr.endTime + if opt.Ascending { + itr.window.time = itr.endTime + 1 + } else { + itr.window.time = itr.endTime - 1 + } } return itr } @@ -527,7 +530,7 @@ func (itr *{{$k.name}}FillIterator) Next() *{{$k.Name}}Point { // If we are inside of an interval, unread the point and continue below to // constructing a new point. if itr.opt.Ascending { - if itr.window.time < itr.endTime { + if itr.window.time <= itr.endTime { itr.input.unread(p) p = nil break @@ -554,7 +557,7 @@ func (itr *{{$k.name}}FillIterator) Next() *{{$k.Name}}Point { } // Check if the point is our next expected point. - if p == nil || p.Time > itr.window.time { + if p == nil || (itr.opt.Ascending && p.Time > itr.window.time) || (!itr.opt.Ascending && p.Time < itr.window.time) { if p != nil { itr.input.unread(p) } @@ -799,10 +802,10 @@ func (itr *{{$k.name}}ChanIterator) Next() *{{$k.Name}}Point { // {{$k.name}}Reduce{{$v.Name}}Iterator executes a reducer for every interval and buffers the result. type {{$k.name}}Reduce{{$v.Name}}Iterator struct { - input *buf{{$k.Name}}Iterator - create func() ({{$k.Name}}PointAggregator, {{$v.Name}}PointEmitter) - opt IteratorOptions - points []{{$v.Name}}Point + input *buf{{$k.Name}}Iterator + create func() ({{$k.Name}}PointAggregator, {{$v.Name}}PointEmitter) + opt IteratorOptions + points []{{$v.Name}}Point } // Stats returns stats from the input iterator. @@ -900,7 +903,7 @@ func (itr *{{$k.name}}Reduce{{$v.Name}}Iterator) reduce() []{{$v.Name}}Point { return a } -// {{$k.name}}Stream{{$v.Name}}Iterator +// {{$k.name}}Stream{{$v.Name}}Iterator streams inputs into the iterator and emits points gradually. type {{$k.name}}Stream{{$v.Name}}Iterator struct { input *buf{{$k.Name}}Iterator create func() ({{$k.Name}}PointAggregator, {{$v.Name}}PointEmitter) @@ -909,6 +912,7 @@ type {{$k.name}}Stream{{$v.Name}}Iterator struct { points []{{$v.Name}}Point } +// new{{$k.Name}}Stream{{$v.Name}}Iterator returns a new instance of {{$k.name}}Stream{{$v.Name}}Iterator. func new{{$k.Name}}Stream{{$v.Name}}Iterator(input {{$k.Name}}Iterator, createFn func() ({{$k.Name}}PointAggregator, {{$v.Name}}PointEmitter), opt IteratorOptions) *{{$k.name}}Stream{{$v.Name}}Iterator { return &{{$k.name}}Stream{{$v.Name}}Iterator{ input: newBuf{{$k.Name}}Iterator(input), @@ -1227,7 +1231,6 @@ func (enc *IteratorEncoder) encode{{.Name}}Iterator(itr {{.Name}}Iterator) error default: } - // Retrieve the next point from the iterator. p := itr.Next() if p == nil { diff --git a/influxql/select.go b/influxql/select.go index 9a050833dd5..f29974b26d1 100644 --- a/influxql/select.go +++ b/influxql/select.go @@ -231,6 +231,14 @@ func buildExprIterator(expr Expr, ic IteratorCreator, opt IteratorOptions) (Iter } return NewIntervalIterator(input, opt), nil case "derivative", "non_negative_derivative", "difference", "moving_average": + if !opt.Interval.IsZero() { + if opt.Ascending { + opt.StartTime -= int64(opt.Interval.Duration) + } else { + opt.EndTime += int64(opt.Interval.Duration) + } + } + input, err := buildExprIterator(expr.Args[0], ic, opt) if err != nil { return nil, err @@ -245,6 +253,13 @@ func buildExprIterator(expr Expr, ic IteratorCreator, opt IteratorOptions) (Iter return newDifferenceIterator(input, opt) case "moving_average": n := expr.Args[1].(*IntegerLiteral) + if n.Val > 1 && !opt.Interval.IsZero() { + if opt.Ascending { + opt.StartTime -= int64(opt.Interval.Duration) * (n.Val - 1) + } else { + opt.EndTime += int64(opt.Interval.Duration) * (n.Val - 1) + } + } return newMovingAverageIterator(input, int(n.Val), opt) } panic(fmt.Sprintf("invalid series aggregate function: %s", expr.Name))