diff --git a/cmd/tsbs_generate_queries/databases/cassandra/devops.go b/cmd/tsbs_generate_queries/databases/cassandra/devops.go index 3db17649c..8ef525be5 100644 --- a/cmd/tsbs_generate_queries/databases/cassandra/devops.go +++ b/cmd/tsbs_generate_queries/databases/cassandra/devops.go @@ -6,7 +6,7 @@ import ( "time" "github.com/timescale/tsbs/cmd/tsbs_generate_queries/uses/devops" - "github.com/timescale/tsbs/cmd/tsbs_generate_queries/utils" + "github.com/timescale/tsbs/internal/utils" "github.com/timescale/tsbs/query" ) @@ -50,7 +50,8 @@ func (d *Devops) getHostWhere(nHosts int) []string { // AND time >= '$HOUR_START' AND time < '$HOUR_END' // GROUP BY minute ORDER BY minute ASC func (d *Devops) GroupByTime(qi query.Query, nHosts, numMetrics int, timeRange time.Duration) { - interval := d.Interval.RandWindow(timeRange) + interval := d.Interval.MustRandWindow(timeRange) + metrics := devops.GetCPUMetricsSlice(numMetrics) tagSet := d.getHostWhere(nHosts) @@ -70,8 +71,12 @@ func (d *Devops) GroupByTime(qi query.Query, nHosts, numMetrics int, timeRange t // GROUP BY t ORDER BY t DESC // LIMIT $LIMIT func (d *Devops) GroupByOrderByLimit(qi query.Query) { - interval := d.Interval.RandWindow(time.Hour) - interval.Start = d.Interval.Start + interval := d.Interval.MustRandWindow(time.Hour) + + interval, err := utils.NewTimeInterval(d.Interval.Start(), interval.End()) + if err != nil { + panic(err.Error()) + } humanLabel := "Cassandra max cpu over last 5 min-intervals (random end)" humanDesc := fmt.Sprintf("%s: %s", humanLabel, d.Interval.StartString()) @@ -90,7 +95,8 @@ func (d *Devops) GroupByOrderByLimit(qi query.Query) { // WHERE time >= '$HOUR_START' AND time < '$HOUR_END' // GROUP BY hour, hostname ORDER BY hour func (d *Devops) GroupByTimeAndPrimaryTag(qi query.Query, numMetrics int) { - interval := d.Interval.RandWindow(devops.DoubleGroupByDuration) + interval := d.Interval.MustRandWindow(devops.DoubleGroupByDuration) + metrics := devops.GetCPUMetricsSlice(numMetrics) humanLabel := devops.GetDoubleGroupByLabel("Cassandra", numMetrics) @@ -108,7 +114,8 @@ func (d *Devops) GroupByTimeAndPrimaryTag(qi query.Query, numMetrics int) { // AND time >= '$HOUR_START' AND time < '$HOUR_END' // GROUP BY hour ORDER BY hour func (d *Devops) MaxAllCPU(qi query.Query, nHosts int) { - interval := d.Interval.RandWindow(devops.MaxAllDuration) + interval := d.Interval.MustRandWindow(devops.MaxAllDuration) + tagSet := d.getHostWhere(nHosts) tagSets := [][]string{} @@ -140,7 +147,8 @@ func (d *Devops) LastPointPerHost(qi query.Query) { // AND time >= '$TIME_START' AND time < '$TIME_END' // AND (hostname = '$HOST' OR hostname = '$HOST2'...) func (d *Devops) HighCPUForHosts(qi query.Query, nHosts int) { - interval := d.Interval.RandWindow(devops.HighCPUDuration) + interval := d.Interval.MustRandWindow(devops.HighCPUDuration) + tagSet := d.getHostWhere(nHosts) tagSets := [][]string{} @@ -156,7 +164,7 @@ func (d *Devops) HighCPUForHosts(qi query.Query, nHosts int) { q.WhereClause = []byte("usage_user,>,90.0") } -func (d *Devops) fillInQuery(qi query.Query, humanLabel, humanDesc, aggType string, fields []string, interval utils.TimeInterval, tagSets [][]string) { +func (d *Devops) fillInQuery(qi query.Query, humanLabel, humanDesc, aggType string, fields []string, interval *utils.TimeInterval, tagSets [][]string) { q := qi.(*query.Cassandra) q.HumanLabel = []byte(humanLabel) q.HumanDescription = []byte(humanDesc) @@ -165,8 +173,8 @@ func (d *Devops) fillInQuery(qi query.Query, humanLabel, humanDesc, aggType stri q.MeasurementName = []byte("cpu") q.FieldName = []byte(strings.Join(fields, ",")) - q.TimeStart = interval.Start - q.TimeEnd = interval.End + q.TimeStart = interval.Start() + q.TimeEnd = interval.End() q.TagSets = tagSets } diff --git a/cmd/tsbs_generate_queries/databases/clickhouse/devops.go b/cmd/tsbs_generate_queries/databases/clickhouse/devops.go index b0ec6e2b9..fb0432401 100644 --- a/cmd/tsbs_generate_queries/databases/clickhouse/devops.go +++ b/cmd/tsbs_generate_queries/databases/clickhouse/devops.go @@ -91,7 +91,7 @@ const clickhouseTimeStringFormat = "2006-01-02 15:04:05" // cpu-max-all-1 // cpu-max-all-8 func (d *Devops) MaxAllCPU(qi query.Query, nHosts int) { - interval := d.Interval.RandWindow(devops.MaxAllDuration) + interval := d.Interval.MustRandWindow(devops.MaxAllDuration) metrics := devops.GetAllCPUMetrics() selectClauses := d.getSelectClausesAggMetrics("max", metrics) @@ -106,8 +106,8 @@ func (d *Devops) MaxAllCPU(qi query.Query, nHosts int) { `, strings.Join(selectClauses, ", "), d.getHostWhereString(nHosts), - interval.Start.Format(clickhouseTimeStringFormat), - interval.End.Format(clickhouseTimeStringFormat)) + interval.Start().Format(clickhouseTimeStringFormat), + interval.End().Format(clickhouseTimeStringFormat)) humanLabel := devops.GetMaxAllLabel("ClickHouse", nHosts) humanDesc := fmt.Sprintf("%s: %s", humanLabel, interval.StartString()) @@ -129,7 +129,7 @@ func (d *Devops) MaxAllCPU(qi query.Query, nHosts int) { // double-groupby-all func (d *Devops) GroupByTimeAndPrimaryTag(qi query.Query, numMetrics int) { metrics := devops.GetCPUMetricsSlice(numMetrics) - interval := d.Interval.RandWindow(devops.DoubleGroupByDuration) + interval := d.Interval.MustRandWindow(devops.DoubleGroupByDuration) selectClauses := make([]string, numMetrics) meanClauses := make([]string, numMetrics) @@ -149,7 +149,7 @@ func (d *Devops) GroupByTimeAndPrimaryTag(qi query.Query, numMetrics int) { hour, %s, %s - FROM + FROM ( SELECT toStartOfHour(created_at) AS hour, @@ -161,16 +161,16 @@ func (d *Devops) GroupByTimeAndPrimaryTag(qi query.Query, numMetrics int) { hour, id ) AS cpu_avg - %s + %s ORDER BY hour ASC, %s `, - hostnameField, // main SELECT %s, - strings.Join(meanClauses, ", "), // main SELECT %s - strings.Join(selectClauses, ", "), // cpu_avg SELECT %s - interval.Start.Format(clickhouseTimeStringFormat), // cpu_avg time >= '%s' - interval.End.Format(clickhouseTimeStringFormat), // cpu_avg time < '%s' + hostnameField, // main SELECT %s, + strings.Join(meanClauses, ", "), // main SELECT %s + strings.Join(selectClauses, ", "), // cpu_avg SELECT %s + interval.Start().Format(clickhouseTimeStringFormat), // cpu_avg time >= '%s' + interval.End().Format(clickhouseTimeStringFormat), // cpu_avg time < '%s' joinClause, // JOIN clause hostnameField) // ORDER BY %s @@ -190,7 +190,8 @@ func (d *Devops) GroupByTimeAndPrimaryTag(qi query.Query, numMetrics int) { // Resultsets: // groupby-orderby-limit func (d *Devops) GroupByOrderByLimit(qi query.Query) { - interval := d.Interval.RandWindow(time.Hour) + interval := d.Interval.MustRandWindow(time.Hour) + sql := fmt.Sprintf(` SELECT toStartOfMinute(created_at) AS minute, @@ -201,7 +202,7 @@ func (d *Devops) GroupByOrderByLimit(qi query.Query) { ORDER BY minute DESC LIMIT 5 `, - interval.End.Format(clickhouseTimeStringFormat)) + interval.End().Format(clickhouseTimeStringFormat)) humanLabel := "ClickHouse max cpu over last 5 min-intervals (random end)" humanDesc := fmt.Sprintf("%s: %s", humanLabel, interval.EndString()) @@ -227,15 +228,15 @@ func (d *Devops) HighCPUForHosts(qi query.Query, nHosts int) { } else { hostWhereClause = fmt.Sprintf("AND (%s)", d.getHostWhereString(nHosts)) } - interval := d.Interval.RandWindow(devops.HighCPUDuration) + interval := d.Interval.MustRandWindow(devops.HighCPUDuration) sql := fmt.Sprintf(` SELECT * FROM cpu PREWHERE (usage_user > 90.0) AND (created_at >= '%s') AND (created_at < '%s') %s `, - interval.Start.Format(clickhouseTimeStringFormat), - interval.End.Format(clickhouseTimeStringFormat), + interval.Start().Format(clickhouseTimeStringFormat), + interval.End().Format(clickhouseTimeStringFormat), hostWhereClause) humanLabel := devops.GetHighCPULabel("ClickHouse", nHosts) @@ -255,8 +256,8 @@ func (d *Devops) LastPointPerHost(qi query.Query) { FROM ( SELECT * - FROM cpu - WHERE (tags_id, created_at) IN + FROM cpu + WHERE (tags_id, created_at) IN ( SELECT tags_id, @@ -306,7 +307,8 @@ func (d *Devops) LastPointPerHost(qi query.Query) { // single-groupby-5-1-1 // single-groupby-5-8-1 func (d *Devops) GroupByTime(qi query.Query, nHosts, numMetrics int, timeRange time.Duration) { - interval := d.Interval.RandWindow(timeRange) + interval := d.Interval.MustRandWindow(timeRange) + metrics := devops.GetCPUMetricsSlice(numMetrics) selectClauses := d.getSelectClausesAggMetrics("max", metrics) @@ -321,8 +323,8 @@ func (d *Devops) GroupByTime(qi query.Query, nHosts, numMetrics int, timeRange t `, strings.Join(selectClauses, ", "), d.getHostWhereString(nHosts), - interval.Start.Format(clickhouseTimeStringFormat), - interval.End.Format(clickhouseTimeStringFormat)) + interval.Start().Format(clickhouseTimeStringFormat), + interval.End().Format(clickhouseTimeStringFormat)) humanLabel := fmt.Sprintf("ClickHouse %d cpu metric(s), random %4d hosts, random %s by 1m", numMetrics, nHosts, timeRange) humanDesc := fmt.Sprintf("%s: %s", humanLabel, interval.StartString()) diff --git a/cmd/tsbs_generate_queries/databases/influx/devops.go b/cmd/tsbs_generate_queries/databases/influx/devops.go index f62a5f529..d96d052f6 100644 --- a/cmd/tsbs_generate_queries/databases/influx/devops.go +++ b/cmd/tsbs_generate_queries/databases/influx/devops.go @@ -59,7 +59,7 @@ func (d *Devops) getSelectClausesAggMetrics(agg string, metrics []string) []stri // AND time >= '$HOUR_START' AND time < '$HOUR_END' // GROUP BY minute ORDER BY minute ASC func (d *Devops) GroupByTime(qi query.Query, nHosts, numMetrics int, timeRange time.Duration) { - interval := d.Interval.RandWindow(timeRange) + interval := d.Interval.MustRandWindow(timeRange) metrics := devops.GetCPUMetricsSlice(numMetrics) selectClauses := d.getSelectClausesAggMetrics("max", metrics) whereHosts := d.getHostWhereString(nHosts) @@ -76,7 +76,7 @@ func (d *Devops) GroupByTime(qi query.Query, nHosts, numMetrics int, timeRange t // GROUP BY t ORDER BY t DESC // LIMIT $LIMIT func (d *Devops) GroupByOrderByLimit(qi query.Query) { - interval := d.Interval.RandWindow(time.Hour) + interval := d.Interval.MustRandWindow(time.Hour) where := fmt.Sprintf("WHERE time < '%s'", interval.EndString()) humanLabel := "Influx max cpu over last 5 min-intervals (random end)" @@ -94,7 +94,7 @@ func (d *Devops) GroupByOrderByLimit(qi query.Query) { // GROUP BY hour, hostname ORDER BY hour, hostname func (d *Devops) GroupByTimeAndPrimaryTag(qi query.Query, numMetrics int) { metrics := devops.GetCPUMetricsSlice(numMetrics) - interval := d.Interval.RandWindow(devops.DoubleGroupByDuration) + interval := d.Interval.MustRandWindow(devops.DoubleGroupByDuration) selectClauses := d.getSelectClausesAggMetrics("mean", metrics) humanLabel := devops.GetDoubleGroupByLabel("Influx", numMetrics) @@ -111,7 +111,7 @@ func (d *Devops) GroupByTimeAndPrimaryTag(qi query.Query, numMetrics int) { // AND time >= '$HOUR_START' AND time < '$HOUR_END' // GROUP BY hour ORDER BY hour func (d *Devops) MaxAllCPU(qi query.Query, nHosts int) { - interval := d.Interval.RandWindow(devops.MaxAllDuration) + interval := d.Interval.MustRandWindow(devops.MaxAllDuration) whereHosts := d.getHostWhereString(nHosts) selectClauses := d.getSelectClausesAggMetrics("max", devops.GetAllCPUMetrics()) @@ -138,7 +138,8 @@ func (d *Devops) LastPointPerHost(qi query.Query) { // AND time >= '$TIME_START' AND time < '$TIME_END' // AND (hostname = '$HOST' OR hostname = '$HOST2'...) func (d *Devops) HighCPUForHosts(qi query.Query, nHosts int) { - interval := d.Interval.RandWindow(devops.HighCPUDuration) + interval := d.Interval.MustRandWindow(devops.HighCPUDuration) + var hostWhereClause string if nHosts == 0 { hostWhereClause = "" @@ -147,7 +148,7 @@ func (d *Devops) HighCPUForHosts(qi query.Query, nHosts int) { } humanLabel := devops.GetHighCPULabel("Influx", nHosts) - humanDesc := fmt.Sprintf("%s: %s", humanLabel, interval) + humanDesc := fmt.Sprintf("%s: %s", humanLabel, interval.StartString()) influxql := fmt.Sprintf("SELECT * from cpu where usage_user > 90.0 %s and time >= '%s' and time < '%s'", hostWhereClause, interval.StartString(), interval.EndString()) d.fillInQuery(qi, humanLabel, humanDesc, influxql) } diff --git a/cmd/tsbs_generate_queries/databases/mongo/devops-naive.go b/cmd/tsbs_generate_queries/databases/mongo/devops-naive.go index 20cc0c613..9c68ef97e 100644 --- a/cmd/tsbs_generate_queries/databases/mongo/devops-naive.go +++ b/cmd/tsbs_generate_queries/databases/mongo/devops-naive.go @@ -44,7 +44,7 @@ func (d *NaiveDevops) GenerateEmptyQuery() query.Query { // AND time >= '$HOUR_START' AND time < '$HOUR_END' // GROUP BY minute ORDER BY minute ASC func (d *NaiveDevops) GroupByTime(qi query.Query, nHosts, numMetrics int, timeRange time.Duration) { - interval := d.Interval.RandWindow(timeRange) + interval := d.Interval.MustRandWindow(timeRange) hostnames := d.GetRandomHosts(nHosts) metrics := devops.GetCPUMetricsSlice(numMetrics) @@ -105,7 +105,7 @@ func (d *NaiveDevops) GroupByTime(qi query.Query, nHosts, numMetrics int, timeRa // WHERE time >= '$HOUR_START' AND time < '$HOUR_END' // GROUP BY hour, hostname ORDER BY hour, hostname func (d *NaiveDevops) GroupByTimeAndPrimaryTag(qi query.Query, numMetrics int) { - interval := d.Interval.RandWindow(devops.DoubleGroupByDuration) + interval := d.Interval.MustRandWindow(devops.DoubleGroupByDuration) metrics := devops.GetCPUMetricsSlice(numMetrics) bucketNano := time.Hour.Nanoseconds() diff --git a/cmd/tsbs_generate_queries/databases/mongo/devops.go b/cmd/tsbs_generate_queries/databases/mongo/devops.go index 919588ab8..27a2d1fc0 100644 --- a/cmd/tsbs_generate_queries/databases/mongo/devops.go +++ b/cmd/tsbs_generate_queries/databases/mongo/devops.go @@ -7,7 +7,7 @@ import ( "github.com/globalsign/mgo/bson" "github.com/timescale/tsbs/cmd/tsbs_generate_queries/uses/devops" - "github.com/timescale/tsbs/cmd/tsbs_generate_queries/utils" + "github.com/timescale/tsbs/internal/utils" "github.com/timescale/tsbs/query" ) @@ -35,7 +35,7 @@ func (d *Devops) GenerateEmptyQuery() query.Query { return query.NewMongo() } -func getTimeFilterPipeline(interval utils.TimeInterval) []bson.M { +func getTimeFilterPipeline(interval *utils.TimeInterval) []bson.M { return []bson.M{ {"$unwind": "$events"}, { @@ -72,16 +72,16 @@ func getTimeFilterPipeline(interval utils.TimeInterval) []bson.M { const aggDateFmt = "20060102" // see Go docs for how we arrive at this time format -func getTimeFilterDocs(interval utils.TimeInterval) []interface{} { +func getTimeFilterDocs(interval *utils.TimeInterval) []interface{} { docs := []interface{}{} - startDay := interval.Start.Format(aggDateFmt) - startHr := interval.Start.Hour() + startDay := interval.Start().Format(aggDateFmt) + startHr := interval.Start().Hour() lenHrs := int(interval.Duration()/time.Hour) + 1 for i := 0; i < lenHrs; i++ { hr := int(startHr) + i if hr > 23 { days := int64(hr / 24) - day := interval.Start.Add(time.Duration(days * 24 * 60 * 60 * 1e9)) + day := interval.Start().Add(time.Duration(days * 24 * 60 * 60 * 1e9)) docs = append(docs, fmt.Sprintf("%s_%02d", day.Format(aggDateFmt), hr%24)) } else { docs = append(docs, fmt.Sprintf("%s_%02d", startDay, hr)) @@ -101,7 +101,8 @@ func getTimeFilterDocs(interval utils.TimeInterval) []interface{} { // AND time >= '$HOUR_START' AND time < '$HOUR_END' // GROUP BY minute ORDER BY minute ASC func (d *Devops) GroupByTime(qi query.Query, nHosts, numMetrics int, timeRange time.Duration) { - interval := d.Interval.RandWindow(timeRange) + interval := d.Interval.MustRandWindow(timeRange) + hostnames := d.GetRandomHosts(nHosts) metrics := devops.GetCPUMetricsSlice(numMetrics) docs := getTimeFilterDocs(interval) @@ -169,7 +170,7 @@ func (d *Devops) GroupByTime(qi query.Query, nHosts, numMetrics int, timeRange t // AND time >= '$HOUR_START' AND time < '$HOUR_END' // GROUP BY hour ORDER BY hour func (d *Devops) MaxAllCPU(qi query.Query, nHosts int) { - interval := d.Interval.RandWindow(devops.MaxAllDuration) + interval := d.Interval.MustRandWindow(devops.MaxAllDuration) hostnames := d.GetRandomHosts(nHosts) docs := getTimeFilterDocs(interval) bucketNano := time.Hour.Nanoseconds() @@ -237,7 +238,8 @@ func (d *Devops) MaxAllCPU(qi query.Query, nHosts int) { // WHERE time >= '$HOUR_START' AND time < '$HOUR_END' // GROUP BY hour, hostname ORDER BY hour, hostname func (d *Devops) GroupByTimeAndPrimaryTag(qi query.Query, numMetrics int) { - interval := d.Interval.RandWindow(devops.DoubleGroupByDuration) + interval := d.Interval.MustRandWindow(devops.DoubleGroupByDuration) + metrics := devops.GetCPUMetricsSlice(numMetrics) docs := getTimeFilterDocs(interval) bucketNano := time.Hour.Nanoseconds() @@ -317,7 +319,8 @@ func (d *Devops) GroupByTimeAndPrimaryTag(qi query.Query, numMetrics int) { // AND time >= '$TIME_START' AND time < '$TIME_END' // AND (hostname = '$HOST' OR hostname = '$HOST2'...) func (d *Devops) HighCPUForHosts(qi query.Query, nHosts int) { - interval := d.Interval.RandWindow(devops.HighCPUDuration) + interval := d.Interval.MustRandWindow(devops.HighCPUDuration) + hostnames := d.GetRandomHosts(nHosts) docs := getTimeFilterDocs(interval) @@ -448,8 +451,11 @@ func (d *Devops) LastPointPerHost(qi query.Query) { // GROUP BY t ORDER BY t DESC // LIMIT $LIMIT func (d *Devops) GroupByOrderByLimit(qi query.Query) { - interval := d.Interval.RandWindow(time.Hour) - interval = utils.NewTimeInterval(d.Interval.Start, interval.End) + interval := d.Interval.MustRandWindow(time.Hour) + interval, err := utils.NewTimeInterval(d.Interval.Start(), interval.End()) + if err != nil { + panic(err.Error()) + } docs := getTimeFilterDocs(interval) bucketNano := time.Minute.Nanoseconds() diff --git a/cmd/tsbs_generate_queries/databases/siridb/devops.go b/cmd/tsbs_generate_queries/databases/siridb/devops.go index e6332a6db..7c68e0ebc 100644 --- a/cmd/tsbs_generate_queries/databases/siridb/devops.go +++ b/cmd/tsbs_generate_queries/databases/siridb/devops.go @@ -63,7 +63,7 @@ const goTimeFmt = "2006-01-02 15:04:05Z" // // select max(1m) from (`groupHost1` | ...) & (`groupMetric1` | ...) between 'time1' and 'time2' func (d *Devops) GroupByTime(qi query.Query, nHosts, numMetrics int, timeRange time.Duration) { - interval := d.Interval.RandWindow(timeRange) + interval := d.Interval.MustRandWindow(timeRange) metrics := devops.GetCPUMetricsSlice(numMetrics) whereMetrics := d.getMetricWhereString(metrics) whereHosts := d.getHostWhereString(nHosts) @@ -84,8 +84,8 @@ func (d *Devops) GroupByTime(qi query.Query, nHosts, numMetrics int, timeRange t // // select max(1m) from `usage_user` between time - 5m and 'roundedTime' merge as 'max usage user of the last 5 aggregate readings' using max(1) func (d *Devops) GroupByOrderByLimit(qi query.Query) { - interval := d.Interval.RandWindow(time.Hour) - timeStr := interval.End.Format(goTimeFmt) + interval := d.Interval.MustRandWindow(time.Hour) + timeStr := interval.End().Format(goTimeFmt) timestrRounded := timeStr[:len(timeStr)-4] + ":00Z" where := fmt.Sprintf("between '%s' - 5m and '%s'", timeStr, timestrRounded) @@ -101,7 +101,7 @@ func (d *Devops) GroupByOrderByLimit(qi query.Query) { // // select mean(1h) from (`groupMetric1` | ...) between 'time1' and 'time2' func (d *Devops) GroupByTimeAndPrimaryTag(qi query.Query, numMetrics int) { - interval := d.Interval.RandWindow(devops.DoubleGroupByDuration) + interval := d.Interval.MustRandWindow(devops.DoubleGroupByDuration) metrics := devops.GetCPUMetricsSlice(numMetrics) whereMetrics := d.getMetricWhereString(metrics) @@ -120,7 +120,7 @@ func (d *Devops) GroupByTimeAndPrimaryTag(qi query.Query, numMetrics int) { // // select max(1h) from (`groupHost1` | ...) & `cpu` between 'time1' and 'time2' func (d *Devops) MaxAllCPU(qi query.Query, nHosts int) { - interval := d.Interval.RandWindow(devops.MaxAllDuration) + interval := d.Interval.MustRandWindow(devops.MaxAllDuration) whereMetrics := "`cpu`" whereHosts := d.getHostWhereString(nHosts) @@ -159,7 +159,7 @@ func (d *Devops) HighCPUForHosts(qi query.Query, nHosts int) { } else { whereHosts = "& " + d.getHostWhereString(nHosts) } - interval := d.Interval.RandWindow(devops.HighCPUDuration) + interval := d.Interval.MustRandWindow(devops.HighCPUDuration) humanLabel := devops.GetHighCPULabel("SiriDB", nHosts) humanDesc := fmt.Sprintf("%s: %s", humanLabel, interval.StartString()) diff --git a/cmd/tsbs_generate_queries/databases/timescaledb/devops.go b/cmd/tsbs_generate_queries/databases/timescaledb/devops.go index 57fe84594..710410fec 100644 --- a/cmd/tsbs_generate_queries/databases/timescaledb/devops.go +++ b/cmd/tsbs_generate_queries/databases/timescaledb/devops.go @@ -93,7 +93,7 @@ const goTimeFmt = "2006-01-02 15:04:05.999999 -0700" // AND time >= '$HOUR_START' AND time < '$HOUR_END' // GROUP BY minute ORDER BY minute ASC func (d *Devops) GroupByTime(qi query.Query, nHosts, numMetrics int, timeRange time.Duration) { - interval := d.Interval.RandWindow(timeRange) + interval := d.Interval.MustRandWindow(timeRange) metrics := devops.GetCPUMetricsSlice(numMetrics) selectClauses := d.getSelectClausesAggMetrics("max", metrics) if len(selectClauses) < 1 { @@ -108,8 +108,8 @@ func (d *Devops) GroupByTime(qi query.Query, nHosts, numMetrics int, timeRange t d.getTimeBucket(oneMinute), strings.Join(selectClauses, ", "), d.getHostWhereString(nHosts), - interval.Start.Format(goTimeFmt), - interval.End.Format(goTimeFmt)) + interval.Start().Format(goTimeFmt), + interval.End().Format(goTimeFmt)) humanLabel := fmt.Sprintf("TimescaleDB %d cpu metric(s), random %4d hosts, random %s by 1m", numMetrics, nHosts, timeRange) humanDesc := fmt.Sprintf("%s: %s", humanLabel, interval.StartString()) @@ -122,7 +122,7 @@ func (d *Devops) GroupByTime(qi query.Query, nHosts, numMetrics int, timeRange t // GROUP BY t ORDER BY t DESC // LIMIT $LIMIT func (d *Devops) GroupByOrderByLimit(qi query.Query) { - interval := d.Interval.RandWindow(time.Hour) + interval := d.Interval.MustRandWindow(time.Hour) sql := fmt.Sprintf(`SELECT %s AS minute, max(usage_user) FROM cpu WHERE time < '%s' @@ -130,7 +130,7 @@ func (d *Devops) GroupByOrderByLimit(qi query.Query) { ORDER BY minute DESC LIMIT 5`, d.getTimeBucket(oneMinute), - interval.End.Format(goTimeFmt)) + interval.End().Format(goTimeFmt)) humanLabel := "TimescaleDB max cpu over last 5 min-intervals (random end)" humanDesc := fmt.Sprintf("%s: %s", humanLabel, interval.EndString()) @@ -146,7 +146,7 @@ func (d *Devops) GroupByOrderByLimit(qi query.Query) { // GROUP BY hour, hostname ORDER BY hour func (d *Devops) GroupByTimeAndPrimaryTag(qi query.Query, numMetrics int) { metrics := devops.GetCPUMetricsSlice(numMetrics) - interval := d.Interval.RandWindow(devops.DoubleGroupByDuration) + interval := d.Interval.MustRandWindow(devops.DoubleGroupByDuration) selectClauses := make([]string, numMetrics) meanClauses := make([]string, numMetrics) @@ -180,7 +180,8 @@ func (d *Devops) GroupByTimeAndPrimaryTag(qi query.Query, numMetrics int) { ORDER BY hour, %s`, d.getTimeBucket(oneHour), strings.Join(selectClauses, ", "), - interval.Start.Format(goTimeFmt), interval.End.Format(goTimeFmt), + interval.Start().Format(goTimeFmt), + interval.End().Format(goTimeFmt), hostnameField, strings.Join(meanClauses, ", "), joinStr, hostnameField) humanLabel := devops.GetDoubleGroupByLabel("TimescaleDB", numMetrics) @@ -196,7 +197,8 @@ func (d *Devops) GroupByTimeAndPrimaryTag(qi query.Query, numMetrics int) { // AND time >= '$HOUR_START' AND time < '$HOUR_END' // GROUP BY hour ORDER BY hour func (d *Devops) MaxAllCPU(qi query.Query, nHosts int) { - interval := d.Interval.RandWindow(devops.MaxAllDuration) + interval := d.Interval.MustRandWindow(devops.MaxAllDuration) + metrics := devops.GetAllCPUMetrics() selectClauses := d.getSelectClausesAggMetrics("max", metrics) @@ -208,7 +210,8 @@ func (d *Devops) MaxAllCPU(qi query.Query, nHosts int) { d.getTimeBucket(oneHour), strings.Join(selectClauses, ", "), d.getHostWhereString(nHosts), - interval.Start.Format(goTimeFmt), interval.End.Format(goTimeFmt)) + interval.Start().Format(goTimeFmt), + interval.End().Format(goTimeFmt)) humanLabel := devops.GetMaxAllLabel("TimescaleDB", nHosts) humanDesc := fmt.Sprintf("%s: %s", humanLabel, interval.StartString()) @@ -246,10 +249,10 @@ func (d *Devops) HighCPUForHosts(qi query.Query, nHosts int) { } else { hostWhereClause = fmt.Sprintf("AND %s", d.getHostWhereString(nHosts)) } - interval := d.Interval.RandWindow(devops.HighCPUDuration) + interval := d.Interval.MustRandWindow(devops.HighCPUDuration) sql := fmt.Sprintf(`SELECT * FROM cpu WHERE usage_user > 90.0 and time >= '%s' AND time < '%s' %s`, - interval.Start.Format(goTimeFmt), interval.End.Format(goTimeFmt), hostWhereClause) + interval.Start().Format(goTimeFmt), interval.End().Format(goTimeFmt), hostWhereClause) humanLabel := devops.GetHighCPULabel("TimescaleDB", nHosts) humanDesc := fmt.Sprintf("%s: %s", humanLabel, interval.StartString()) diff --git a/cmd/tsbs_generate_queries/uses/devops/common.go b/cmd/tsbs_generate_queries/uses/devops/common.go index a7f6f6170..085300bf4 100644 --- a/cmd/tsbs_generate_queries/uses/devops/common.go +++ b/cmd/tsbs_generate_queries/uses/devops/common.go @@ -8,6 +8,7 @@ import ( "time" "github.com/timescale/tsbs/cmd/tsbs_generate_queries/utils" + internalutils "github.com/timescale/tsbs/internal/utils" "github.com/timescale/tsbs/query" ) @@ -46,7 +47,7 @@ var fatal = log.Fatalf // Core is the common component of all generators for all systems type Core struct { // Interval is the entire time range of the dataset - Interval utils.TimeInterval + Interval *internalutils.TimeInterval // Scale is the cardinality of the dataset in terms of devices/hosts Scale int @@ -54,15 +55,13 @@ type Core struct { // NewCore returns a new Core for the given time range and cardinality func NewCore(start, end time.Time, scale int) *Core { - if !start.Before(end) { - fatal(errBadTimeOrder) + ti, err := internalutils.NewTimeInterval(start, end) + if err != nil { + fatal(err.Error()) return nil } - return &Core{ - utils.NewTimeInterval(start, end), - scale, - } + return &Core{Interval: ti, Scale: scale} } // GetRandomHosts returns a random set of nHosts from a given Core diff --git a/cmd/tsbs_generate_queries/uses/devops/common_test.go b/cmd/tsbs_generate_queries/uses/devops/common_test.go index 157a7e83c..0ec2bad5f 100644 --- a/cmd/tsbs_generate_queries/uses/devops/common_test.go +++ b/cmd/tsbs_generate_queries/uses/devops/common_test.go @@ -7,16 +7,18 @@ import ( "strings" "testing" "time" + + "github.com/timescale/tsbs/internal/utils" ) func TestNewCore(t *testing.T) { s := time.Now() e := time.Now() c := NewCore(s, e, 10) - if got := c.Interval.Start.UnixNano(); got != s.UnixNano() { + if got := c.Interval.StartUnixNano(); got != s.UnixNano() { t.Errorf("NewCore does not have right start time: got %d want %d", got, s.UnixNano()) } - if got := c.Interval.End.UnixNano(); got != e.UnixNano() { + if got := c.Interval.EndUnixNano(); got != e.UnixNano() { t.Errorf("NewCore does not have right end time: got %d want %d", got, e.UnixNano()) } if got := c.Scale; got != 10 { @@ -32,7 +34,7 @@ func TestNewCoreEndBeforeStart(t *testing.T) { errMsg = fmt.Sprintf(format, args...) } _ = NewCore(s, e, 10) - if errMsg != errBadTimeOrder { + if errMsg != utils.ErrEndBeforeStart { t.Errorf("NewCore did not error correctly") } } diff --git a/cmd/tsbs_generate_queries/utils/time_interval.go b/cmd/tsbs_generate_queries/utils/time_interval.go deleted file mode 100644 index 9687a29f0..000000000 --- a/cmd/tsbs_generate_queries/utils/time_interval.go +++ /dev/null @@ -1,66 +0,0 @@ -package utils - -import ( - "math/rand" - "time" -) - -// TimeInterval represents an interval of time. -type TimeInterval struct { - Start time.Time - End time.Time -} - -// NewTimeInterval constructs a TimeInterval. -func NewTimeInterval(start, end time.Time) TimeInterval { - return TimeInterval{ - Start: start, - End: end, - } -} - -// Duration converts a TimeInterval to a time.Duration. -func (ti *TimeInterval) Duration() time.Duration { - return ti.End.UTC().Sub(ti.Start.UTC()) -} - -// RandWindow creates a TimeInterval of duration `window` at a uniformly-random -// start time within this time interval. -func (ti *TimeInterval) RandWindow(window time.Duration) TimeInterval { - lower := ti.Start.UnixNano() - upper := ti.End.Add(-window).UnixNano() - - if upper <= lower { - panic("logic error: bad time bounds") - } - - start := lower + rand.Int63n(upper-lower) - end := start + window.Nanoseconds() - - x := NewTimeInterval(time.Unix(0, start).UTC(), time.Unix(0, end).UTC()) - if x.Duration() != window { - panic("logic error: generated interval does not equal window") - } - - return x -} - -// StartString formats the start of the time interval. -func (ti *TimeInterval) StartString() string { - return ti.Start.UTC().Format(time.RFC3339) -} - -// EndString formats the end of the time interval. -func (ti *TimeInterval) EndString() string { - return ti.End.UTC().Format(time.RFC3339) -} - -// StartUnixNano returns the start time as nanoseconds. -func (ti *TimeInterval) StartUnixNano() int64 { - return ti.Start.UTC().UnixNano() -} - -// EndUnixNano returns the end time as nanoseconds. -func (ti *TimeInterval) EndUnixNano() int64 { - return ti.End.UTC().UnixNano() -} diff --git a/cmd/tsbs_run_queries_cassandra/client_side_index.go b/cmd/tsbs_run_queries_cassandra/client_side_index.go index f9c9d41aa..cff8e866c 100644 --- a/cmd/tsbs_run_queries_cassandra/client_side_index.go +++ b/cmd/tsbs_run_queries_cassandra/client_side_index.go @@ -7,13 +7,14 @@ import ( "time" "github.com/gocql/gocql" + "github.com/timescale/tsbs/internal/utils" ) // A ClientSideIndex wraps runtime data used to translate an HLQuery into // Cassandra CQL queries. After initialization, objects of this type are // read-only. type ClientSideIndex struct { - timeIntervalMapping map[TimeInterval]map[*Series]struct{} + timeIntervalMapping map[*utils.TimeInterval]map[*Series]struct{} tagMapping map[string]map[*Series]struct{} nameMapping map[[2]string][]Series @@ -29,7 +30,7 @@ func NewClientSideIndex(seriesCollection []Series) *ClientSideIndex { } // build the "time interval -> series" index: - bm := map[TimeInterval]map[*Series]struct{}{} + bm := map[*utils.TimeInterval]map[*Series]struct{}{} for _, s := range seriesCollection { if _, ok := bm[s.TimeInterval]; !ok { @@ -44,7 +45,7 @@ func NewClientSideIndex(seriesCollection []Series) *ClientSideIndex { tm := map[string]map[*Series]struct{}{} for _, s := range seriesCollection { - for tag, _ := range s.Tags { + for tag := range s.Tags { if _, ok := tm[tag]; !ok { tm[tag] = map[*Series]struct{}{} } @@ -106,7 +107,7 @@ type Series struct { Measurement string // e.g. "cpu" Tags map[string]struct{} // e.g. {"hostname": "host_3"} Field string // e.g. "usage_idle" - TimeInterval TimeInterval // (UTC) e.g. "2016-01-01" + TimeInterval *utils.TimeInterval // (UTC) e.g. "2016-01-01" } // NewSeries parses a new Series from the given Cassandra data. @@ -152,12 +153,16 @@ func (s *Series) parse() { log.Fatal("bad time bucket parse in pre-existing database series") } end := start.Add(BucketDuration) - s.TimeInterval = NewTimeInterval(start, end) + ti, err := utils.NewTimeInterval(start, end) + if err != nil { + log.Fatalf("could not create time interval: %v", err) + } + s.TimeInterval = ti } // MatchesTimeInterval determines whether this Series time overlaps with the // provided TimeInterval. -func (s *Series) MatchesTimeInterval(ti *TimeInterval) bool { +func (s *Series) MatchesTimeInterval(ti *utils.TimeInterval) bool { return s.TimeInterval.Overlap(ti) } diff --git a/cmd/tsbs_run_queries_cassandra/query.go b/cmd/tsbs_run_queries_cassandra/query.go index 0f80b207a..f4956ed73 100644 --- a/cmd/tsbs_run_queries_cassandra/query.go +++ b/cmd/tsbs_run_queries_cassandra/query.go @@ -5,6 +5,7 @@ import ( "strconv" "strings" + "github.com/timescale/tsbs/internal/utils" "github.com/timescale/tsbs/query" ) @@ -38,7 +39,7 @@ func (q *HLQuery) ToQueryPlanWithServerAggregation(csi *ClientSideIndex) (qp *Qu // It is important to populate these even if they end up being empty, // so that we get correct results for empty 'time buckets'. tis := bucketTimeIntervals(q.TimeStart, q.TimeEnd, q.GroupByDuration) - bucketedSeries := map[TimeInterval][]Series{} + bucketedSeries := map[*utils.TimeInterval][]Series{} for _, ti := range tis { bucketedSeries[ti] = []Series{} } @@ -59,7 +60,7 @@ func (q *HLQuery) ToQueryPlanWithServerAggregation(csi *ClientSideIndex) (qp *Qu // check each group-by interval to see if it applies: for _, ti := range tis { - if !s.MatchesTimeInterval(&ti) { + if !s.MatchesTimeInterval(ti) { continue } bucketedSeries[ti] = append(bucketedSeries[ti], s) @@ -67,12 +68,12 @@ func (q *HLQuery) ToQueryPlanWithServerAggregation(csi *ClientSideIndex) (qp *Qu } // For each group-by time bucket, convert its series into CQLQueries: - cqlBuckets := make(map[TimeInterval][]CQLQuery, len(bucketedSeries)) + cqlBuckets := make(map[*utils.TimeInterval][]CQLQuery, len(bucketedSeries)) for ti, seriesSlice := range bucketedSeries { cqlQueries := make([]CQLQuery, len(seriesSlice)) for i, ser := range seriesSlice { - start := ti.Start - end := ti.End + start := ti.Start() + end := ti.End() // the following two special cases ensure equivalency with rounded time boundaries as seen in influxdb: // https://docs.influxdata.com/influxdb/v0.13/query_language/data_exploration/#rounded-group-by-time-boundaries @@ -106,7 +107,10 @@ func (csi *ClientSideIndex) getSeriesChoicesForFieldsAndMeasurement(fields []str // // It executes at most one CQLQuery per series. func (q *HLQuery) ToQueryPlanWithoutServerAggregation(csi *ClientSideIndex) (qp *QueryPlanWithoutServerAggregation, err error) { - hlQueryInterval := NewTimeInterval(q.TimeStart, q.TimeEnd) + hlQueryInterval, err := utils.NewTimeInterval(q.TimeStart, q.TimeEnd) + if err != nil { + return nil, err + } fields := strings.Split(string(q.FieldName), ",") seriesChoices := csi.getSeriesChoicesForFieldsAndMeasurement(fields, string(q.MeasurementName)) orderBy := string(q.OrderBy) @@ -152,7 +156,7 @@ outer: if !s.MatchesTagSets(q.TagSets) { continue } - if !s.MatchesTimeInterval(&hlQueryInterval) { + if !s.MatchesTimeInterval(hlQueryInterval) { continue } @@ -172,7 +176,10 @@ outer: // ToQueryPlanNoAggregation combines an HLQuery with a // ClientSideIndex to make a QueryPlanNoAggregation. func (q *HLQuery) ToQueryPlanNoAggregation(csi *ClientSideIndex) (*QueryPlanNoAggregation, error) { - hlQueryInterval := NewTimeInterval(q.TimeStart, q.TimeEnd) + hlQueryInterval, err := utils.NewTimeInterval(q.TimeStart, q.TimeEnd) + if err != nil { + return nil, err + } fields := strings.Split(string(q.FieldName), ",") seriesChoices := csi.getSeriesChoicesForFieldsAndMeasurement(fields, string(q.MeasurementName)) @@ -187,7 +194,7 @@ func (q *HLQuery) ToQueryPlanNoAggregation(csi *ClientSideIndex) (*QueryPlanNoAg } } - if !s.MatchesTimeInterval(&hlQueryInterval) { + if !s.MatchesTimeInterval(hlQueryInterval) { continue } @@ -214,7 +221,10 @@ func (q *HLQuery) ToQueryPlanForEvery(csi *ClientSideIndex) (*QueryPlanForEvery, panic("unparseable ForEveryN field: " + string(q.ForEveryN)) } - hlQueryInterval := NewTimeInterval(q.TimeStart, q.TimeEnd) + hlQueryInterval, err := utils.NewTimeInterval(q.TimeStart, q.TimeEnd) + if err != nil { + return nil, err + } fields := strings.Split(string(q.FieldName), ",") seriesChoices := csi.getSeriesChoicesForFieldsAndMeasurement(fields, string(q.MeasurementName)) @@ -230,7 +240,7 @@ func (q *HLQuery) ToQueryPlanForEvery(csi *ClientSideIndex) (*QueryPlanForEvery, } } - if !s.MatchesTimeInterval(&hlQueryInterval) { + if !s.MatchesTimeInterval(hlQueryInterval) { continue } @@ -277,6 +287,6 @@ func NewCQLQuery(aggrLabel, tableName, rowName, orderBy string, timeStartNanos, // CQLResult holds a result from a set of CQL aggregation queries. // Used for debug printing. type CQLResult struct { - TimeInterval + *utils.TimeInterval Values []float64 } diff --git a/cmd/tsbs_run_queries_cassandra/query_executor.go b/cmd/tsbs_run_queries_cassandra/query_executor.go index 37243e3c4..392fd67d2 100644 --- a/cmd/tsbs_run_queries_cassandra/query_executor.go +++ b/cmd/tsbs_run_queries_cassandra/query_executor.go @@ -90,7 +90,7 @@ func (qe *HLQueryExecutor) Do(q *HLQuery, opts HLQueryExecutorDoOptions) (qpLagM // optionally, print reponses for query validation: if opts.PrettyPrintResponses { for _, r := range results { - fmt.Fprintf(os.Stderr, "ID %d: [%s, %s] -> %v\n", q.GetID(), r.TimeInterval.Start, r.TimeInterval.End, r.Values) + fmt.Fprintf(os.Stderr, "ID %d: [%s, %s] -> %v\n", q.GetID(), r.TimeInterval.Start(), r.TimeInterval.End(), r.Values) } } return diff --git a/cmd/tsbs_run_queries_cassandra/query_plan.go b/cmd/tsbs_run_queries_cassandra/query_plan.go index ee9e9016d..081beb74f 100644 --- a/cmd/tsbs_run_queries_cassandra/query_plan.go +++ b/cmd/tsbs_run_queries_cassandra/query_plan.go @@ -9,6 +9,7 @@ import ( "time" "github.com/gocql/gocql" + "github.com/timescale/tsbs/internal/utils" ) // A QueryPlan is a strategy used to fulfill an HLQuery. @@ -26,12 +27,12 @@ type QueryPlan interface { // relevant to each bucket. type QueryPlanWithServerAggregation struct { AggregatorLabel string - BucketedCQLQueries map[TimeInterval][]CQLQuery + BucketedCQLQueries map[*utils.TimeInterval][]CQLQuery } // NewQueryPlanWithServerAggregation builds a QueryPlanWithServerAggregation. // It is typically called via (*HLQuery).ToQueryPlanWithServerAggregation. -func NewQueryPlanWithServerAggregation(aggrLabel string, bucketedCQLQueries map[TimeInterval][]CQLQuery) (*QueryPlanWithServerAggregation, error) { +func NewQueryPlanWithServerAggregation(aggrLabel string, bucketedCQLQueries map[*utils.TimeInterval][]CQLQuery) (*QueryPlanWithServerAggregation, error) { qp := &QueryPlanWithServerAggregation{ AggregatorLabel: aggrLabel, BucketedCQLQueries: bucketedCQLQueries, @@ -44,7 +45,7 @@ func NewQueryPlanWithServerAggregation(aggrLabel string, bucketedCQLQueries map[ // TODO(rw): support parallel execution. func (qp *QueryPlanWithServerAggregation) Execute(session *gocql.Session) ([]CQLResult, error) { // sort the time interval buckets we'll use: - sortedKeys := make([]TimeInterval, 0, len(qp.BucketedCQLQueries)) + sortedKeys := make([]*utils.TimeInterval, 0, len(qp.BucketedCQLQueries)) for k := range qp.BucketedCQLQueries { sortedKeys = append(sortedKeys, k) } @@ -93,7 +94,7 @@ func (qp *QueryPlanWithServerAggregation) DebugQueries(level int) { if level >= 2 { for k, qq := range qp.BucketedCQLQueries { for i, q := range qq { - fmt.Printf("[qpsa] CQL: %s, %d, %s\n", k, i, q) + fmt.Printf("[qpsa] CQL: %v, %d, %s\n", k, i, q) } } } @@ -109,18 +110,18 @@ func (qp *QueryPlanWithServerAggregation) DebugQueries(level int) { // store final aggregated items, and 4) a set of CQLQueries used to fulfill // this plan. type QueryPlanWithoutServerAggregation struct { - Aggregators map[TimeInterval]map[string]Aggregator + Aggregators map[*utils.TimeInterval]map[string]Aggregator GroupByDuration time.Duration Fields []string - TimeBuckets []TimeInterval + TimeBuckets []*utils.TimeInterval limit int CQLQueries []CQLQuery } // NewQueryPlanWithoutServerAggregation builds a QueryPlanWithoutServerAggregation. // It is typically called via (*HLQuery).ToQueryPlanWithoutServerAggregation. -func NewQueryPlanWithoutServerAggregation(aggrLabel string, groupByDuration time.Duration, fields []string, timeBuckets []TimeInterval, limit int, cqlQueries []CQLQuery) (*QueryPlanWithoutServerAggregation, error) { - aggrs := make(map[TimeInterval]map[string]Aggregator, len(timeBuckets)) +func NewQueryPlanWithoutServerAggregation(aggrLabel string, groupByDuration time.Duration, fields []string, timeBuckets []*utils.TimeInterval, limit int, cqlQueries []CQLQuery) (*QueryPlanWithoutServerAggregation, error) { + aggrs := make(map[*utils.TimeInterval]map[string]Aggregator, len(timeBuckets)) for _, ti := range timeBuckets { if len(aggrs) > 0 && len(aggrs) == limit { break @@ -174,9 +175,9 @@ func (qp *QueryPlanWithoutServerAggregation) Execute(session *gocql.Session) ([] for iter.Scan(×tampNs, &value) { ts := time.Unix(0, timestampNs).UTC() tsTruncated := ts.Truncate(qp.GroupByDuration) - bucketKey := TimeInterval{ - Start: tsTruncated, - End: tsTruncated.Add(qp.GroupByDuration), + bucketKey, err := utils.NewTimeInterval(tsTruncated, tsTruncated.Add(qp.GroupByDuration)) + if err != nil { + return nil, err } // Due to limits, bucket is not needed, skip @@ -331,7 +332,14 @@ func (qp *QueryPlanNoAggregation) Execute(session *gocql.Session) ([]CQLResult, for _, ts := range keys { tst := time.Unix(0, ts) for _, vals := range res[ts] { - temp := CQLResult{TimeInterval: NewTimeInterval(tst, tst), Values: vals} + ti, err := utils.NewTimeInterval(tst, tst) + if err != nil { + return nil, err + } + temp := CQLResult{ + TimeInterval: ti, + Values: vals, + } results = append(results, temp) } } @@ -449,7 +457,14 @@ func (qp *QueryPlanForEvery) Execute(session *gocql.Session) ([]CQLResult, error for _, map2 := range res { for ts, vals := range map2 { tst := time.Unix(0, ts) - temp := CQLResult{TimeInterval: NewTimeInterval(tst, tst), Values: vals} + ti, err := utils.NewTimeInterval(tst, tst) + if err != nil { + return nil, err + } + temp := CQLResult{ + TimeInterval: ti, + Values: vals, + } results = append(results, temp) } } diff --git a/cmd/tsbs_run_queries_cassandra/time_util.go b/cmd/tsbs_run_queries_cassandra/time_util.go index 2baf32b07..92d262c16 100644 --- a/cmd/tsbs_run_queries_cassandra/time_util.go +++ b/cmd/tsbs_run_queries_cassandra/time_util.go @@ -1,90 +1,35 @@ package main -import "time" +import ( + "fmt" + "time" -// A TimeInterval represents a span of time. The start is inclusive, the end -// is exclusive. -type TimeInterval struct { - Start, End time.Time -} - -// NewTimeInterval constructs a TimeInterval value after checking for logic -// errors. -func NewTimeInterval(start, end time.Time) TimeInterval { - if end.Before(start) { - panic("logic error in NewTimeInterval: bad input times") - } - // force UTC to help with pretty-printing: - start = start.UTC() - end = end.UTC() - - return TimeInterval{Start: start, End: end} -} + "github.com/timescale/tsbs/internal/utils" +) -// Overlap detects whether this TimeInterval overlaps with another -// TimeInterval. -func (ti *TimeInterval) Overlap(other *TimeInterval) bool { - S := ti.Start.UnixNano() - E := ti.End.UnixNano() - - s := other.Start.UnixNano() - e := other.End.UnixNano() - - // special case 1 of 2: when boundaries match exactly, maintain the - // property that end is exclusive but start is inclusive: - if E == s { - return false - } - - // special case 2 of 2: when boundaries match exactly, maintain the - // property that start is inclusive but end is exclusive: - if e == S { - return false - } - - // *{--[--]--}* (surrounds other) - if S <= s && e <= E { - return true - } - - // *{--[--}*--] (overlaps other start) - if S <= s && s <= E { - return true - } - - // [--*{--]--}* (overlaps other end) - if S <= e && e <= E { - return true - } - - // *[--{--}--]* (contained within other) - if s <= S && E <= e { - return true - } - - return false -} - -type TimeIntervals []TimeInterval +type TimeIntervals []*utils.TimeInterval // implement sort.Interface func (x TimeIntervals) Len() int { return len(x) } func (x TimeIntervals) Swap(i, j int) { x[i], x[j] = x[j], x[i] } func (x TimeIntervals) Less(i, j int) bool { - return x[i].Start.Before(x[j].Start) + return x[i].Start().Before(x[j].Start()) } // bucketTimeIntervals is a helper that creates a slice of TimeInterval // over the given span of time, in chunks of duration `window`. -func bucketTimeIntervals(start, end time.Time, window time.Duration) []TimeInterval { +func bucketTimeIntervals(start, end time.Time, window time.Duration) []*utils.TimeInterval { if end.Before(start) { panic("logic error in bucketTimeIntervals: bad input times") } - ret := []TimeInterval{} + ret := []*utils.TimeInterval{} start = start.Truncate(window) for start.Before(end) { - ti := NewTimeInterval(start, start.Add(window)) + ti, err := utils.NewTimeInterval(start, start.Add(window)) + if err != nil { + panic(fmt.Sprintf("unexpected error: %v", err)) + } ret = append(ret, ti) start = start.Add(window) } diff --git a/internal/utils/time_interval.go b/internal/utils/time_interval.go new file mode 100644 index 000000000..6b7c4629a --- /dev/null +++ b/internal/utils/time_interval.go @@ -0,0 +1,134 @@ +package utils + +import ( + "fmt" + "math/rand" + "time" +) + +const ( + // ErrEndBeforeStart is the error message for when a TimeInterval's end time + // would be before its start. + ErrEndBeforeStart = "end time before start time" + + errWindowTooLargeFmt = "random window equal to or larger than TimeInterval: window %v, interval %v" +) + +// TimeInterval represents an interval of time in UTC. That is, regardless of +// what timezone(s) are used for the beginning and end times, they will be +// converted to UTC and methods will return them as such. +type TimeInterval struct { + start time.Time + end time.Time +} + +// NewTimeInterval creates a new TimeInterval for a given start and end. If end +// is a time.Time before start, then an error is returned. +func NewTimeInterval(start, end time.Time) (*TimeInterval, error) { + if end.Before(start) { + return nil, fmt.Errorf(ErrEndBeforeStart) + } + return &TimeInterval{start.UTC(), end.UTC()}, nil +} + +// Duration returns the time.Duration of the TimeInterval. +func (ti *TimeInterval) Duration() time.Duration { + return ti.end.Sub(ti.start) +} + +// Overlap detects whether the given TimeInterval overlaps with this +// TimeInterval, assuming an inclusive start boundary and exclusive end +// boundary. +func (ti *TimeInterval) Overlap(other *TimeInterval) bool { + s1 := ti.Start() + e1 := ti.End() + + s2 := other.Start() + e2 := other.End() + + // If the two TimeIntervals share opposite boundaries, then they do not + // overlap since the end is exclusive + if e1 == s2 || e2 == s1 { + return false + } + + // If the start and end of the first are both before the start of the + // second, they do not overlap. + if s1.Before(s2) && e1.Before(s2) { + return false + } + + // Same as the previous case, just reversed. + if s2.Before(s1) && e2.Before(s1) { + return false + } + + // Everything else must overlap + return true +} + +// RandWindow creates a TimeInterval of duration `window` at a uniformly-random +// start time within the time period represented by this TimeInterval. +func (ti *TimeInterval) RandWindow(window time.Duration) (*TimeInterval, error) { + lower := ti.start.UnixNano() + upper := ti.end.Add(-window).UnixNano() + + if upper <= lower { + return nil, fmt.Errorf(errWindowTooLargeFmt, window, ti.end.Sub(ti.start)) + + } + + start := lower + rand.Int63n(upper-lower) + end := start + window.Nanoseconds() + + x, err := NewTimeInterval(time.Unix(0, start), time.Unix(0, end)) + if err != nil { + return nil, err + } else if x.Duration() != window { + // Unless the logic above this changes, this should not happen, so + // we panic in that case. + panic("generated TimeInterval's duration does not equal window") + } + + return x, nil +} + +// MustRandWindow is the form of RandWindow that cannot error; if it does error, +// it causes a panic. +func (ti *TimeInterval) MustRandWindow(window time.Duration) *TimeInterval { + res, err := ti.RandWindow(window) + if err != nil { + panic(err.Error()) + } + return res +} + +// Start returns the starting time in UTC. +func (ti *TimeInterval) Start() time.Time { + return ti.start +} + +// StartUnixNano returns the start time as nanoseconds. +func (ti *TimeInterval) StartUnixNano() int64 { + return ti.start.UnixNano() +} + +// StartString formats the start of the TimeInterval according to RFC3339. +func (ti *TimeInterval) StartString() string { + return ti.start.Format(time.RFC3339) +} + +// End returns the starting time in UTC. +func (ti *TimeInterval) End() time.Time { + return ti.end +} + +// EndUnixNano returns the end time as nanoseconds. +func (ti *TimeInterval) EndUnixNano() int64 { + return ti.end.UnixNano() +} + +// EndString formats the end of the TimeInterval according to RFC3339. +func (ti *TimeInterval) EndString() string { + return ti.end.Format(time.RFC3339) +} diff --git a/internal/utils/time_interval_test.go b/internal/utils/time_interval_test.go new file mode 100644 index 000000000..d49d7df63 --- /dev/null +++ b/internal/utils/time_interval_test.go @@ -0,0 +1,312 @@ +package utils + +import ( + "fmt" + "testing" + "time" +) + +var ( + // From godoc example for time: + // China doesn't have daylight saving. It uses a fixed 8 hour offset from UTC. + secondsEastOfUTC = int((8 * time.Hour).Seconds()) + beijing = time.FixedZone("Beijing Time", secondsEastOfUTC) +) + +func TestNewTimeInterval(t *testing.T) { + cases := []struct { + desc string + start time.Time + end time.Time + errMsg string + }{ + { + desc: "error on end before start", + start: time.Date(2016, time.January, 1, 1, 30, 15, 0, time.UTC), + end: time.Date(2016, time.January, 1, 1, 0, 0, 0, time.UTC), + errMsg: ErrEndBeforeStart, + }, + { + desc: "both in UTC", + start: time.Date(2016, time.January, 1, 1, 30, 15, 0, time.UTC), + end: time.Date(2016, time.January, 2, 1, 30, 15, 0, time.UTC), + }, + { + desc: "start not in UTC", + start: time.Date(2016, time.January, 1, 1, 30, 15, 0, beijing), + end: time.Date(2016, time.January, 10, 1, 30, 15, 0, time.UTC), + }, + { + desc: "end not in UTC", + start: time.Date(2016, time.January, 1, 1, 30, 15, 0, time.UTC), + end: time.Date(2016, time.January, 10, 1, 30, 15, 0, beijing), + }, + + { + desc: "both not in UTC", + start: time.Date(2016, time.January, 1, 1, 30, 15, 0, beijing), + end: time.Date(2016, time.January, 10, 1, 30, 15, 0, beijing), + }, + } + + for _, c := range cases { + t.Run(c.desc, func(t *testing.T) { + ti, err := NewTimeInterval(c.start, c.end) + if c.errMsg == "" { + if err != nil { + t.Errorf("unexpected error: got %v", err) + } else { + wantStart := c.start.UTC() + wantEnd := c.end.UTC() + wantDuration := c.end.Sub(c.start) + if got := ti.Start(); got != wantStart { + t.Errorf("incorrect start: got %v want %v", got, wantStart) + } + if got := ti.End(); got != wantEnd { + t.Errorf("incorrect end: got %v want %v", got, wantEnd) + } + if got := ti.Duration(); got != wantDuration { + t.Errorf("incorrect duration: got %v want %v", got, wantDuration) + } + } + } else if c.errMsg != "" { + if err == nil { + t.Errorf("unexpected lack of error") + } else if got := err.Error(); got != c.errMsg { + t.Errorf("unexpected error:\ngot\n%v\nwant\n%v", got, c.errMsg) + } + } + }) + } +} + +func TestTimeIntervalStartAndEndFuncs(t *testing.T) { + start := time.Date(2016, time.January, 1, 12, 30, 45, 100, beijing) + end := time.Date(2016, time.February, 1, 12, 30, 45, 100, beijing) + ti, err := NewTimeInterval(start, end) + if err != nil { + t.Fatalf("unexpected error creating TimeInterval: got %v", err) + } + + startUTC := start.UTC() + endUTC := end.UTC() + if got := ti.StartUnixNano(); got != startUTC.UnixNano() { + t.Errorf("incorrect start unix nano: got %v want %v", got, startUTC.UnixNano()) + } + if got := ti.EndUnixNano(); got != endUTC.UnixNano() { + t.Errorf("incorrect end unix nano: got %v want %v", got, endUTC.UnixNano()) + } + + if got := ti.StartString(); got != startUTC.Format(time.RFC3339) { + t.Errorf("incorrect start string: got %s want %s", got, startUTC.Format(time.RFC3339)) + } + if got := ti.EndString(); got != endUTC.Format(time.RFC3339) { + t.Errorf("incorrect start string: got %s want %s", got, endUTC.Format(time.RFC3339)) + } +} + +func TestTimeIntervalOverlap(t *testing.T) { + cases := []struct { + desc string + start1 string + end1 string + start2 string + end2 string + wantOverlap bool + }{ + { + desc: "completely disjoint", + start1: "2016-01-01", + end1: "2016-02-01", + start2: "2016-03-01", + end2: "2016-04-01", + wantOverlap: false, + }, + { + desc: "disjoint because of exclusive end", + start1: "2016-01-01", + end1: "2016-02-01", + start2: "2016-02-01", + end2: "2016-03-01", + wantOverlap: false, + }, + { + desc: "disjoint because of exclusive end #2", + start1: "2016-02-01", + end1: "2016-03-01", + start2: "2016-01-01", + end2: "2016-02-01", + wantOverlap: false, + }, + { + desc: "complete overlap", + start1: "2016-01-01", + end1: "2016-02-01", + start2: "2016-01-01", + end2: "2016-02-01", + wantOverlap: true, + }, + { + desc: "1 inside of 2", + start1: "2016-02-01", + end1: "2016-03-01", + start2: "2016-01-01", + end2: "2016-04-01", + wantOverlap: true, + }, + { + desc: "2 inside of 1", + start1: "2016-01-01", + end1: "2016-06-01", + start2: "2016-04-01", + end2: "2016-05-01", + wantOverlap: true, + }, + { + desc: "1 starts first, 2 ends later", + start1: "2016-01-01", + end1: "2016-03-01", + start2: "2016-02-01", + end2: "2016-04-01", + wantOverlap: true, + }, + { + desc: "1 starts later, 2 ends early", + start1: "2016-02-01", + end1: "2016-04-01", + start2: "2016-01-01", + end2: "2016-03-01", + wantOverlap: true, + }, + } + layout := "2006-01-02" + parse := func(s string) time.Time { + x, err := time.Parse(layout, s) + if err != nil { + t.Fatalf("could not parse %v into time", s) + } + return x + } + for _, c := range cases { + t.Run(c.desc, func(t *testing.T) { + ti1, err := NewTimeInterval(parse(c.start1), parse(c.end1)) + if err != nil { + t.Errorf("could not create ti1: got %v", err) + } + ti2, err := NewTimeInterval(parse(c.start2), parse(c.end2)) + if err != nil { + t.Errorf("could not create ti2: got %v", err) + } + if got := ti1.Overlap(ti2); got != c.wantOverlap { + t.Errorf("incorrect overlap with ti1: got %v want %v", got, c.wantOverlap) + } + if got := ti2.Overlap(ti1); got != c.wantOverlap { + t.Errorf("incorrect overlap with ti2: got %v want %v", got, c.wantOverlap) + } + }) + } +} + +type randWindowCase struct { + desc string + window time.Duration + errMsg string +} + +func (c randWindowCase) checkTimeInterval(t *testing.T, bigTI *TimeInterval, randTI *TimeInterval) { + if got := randTI.Duration(); got != c.window { + t.Errorf("incorrect duration: got %v want %v", got, c.window) + } + if randTI.Start().Before(bigTI.Start()) { + t.Errorf("window start too early: %v is before %v", randTI.Start(), bigTI.Start()) + } + if randTI.End().After(bigTI.End()) { + t.Errorf("window end too late: %v is after %v", randTI.End(), bigTI.End()) + } +} + +var rwCases = []randWindowCase{ + { + desc: "too large window", + window: 2 * time.Hour, + errMsg: fmt.Sprintf(errWindowTooLargeFmt, 2*time.Hour, 1*time.Hour), + }, + { + desc: "window is exact", + window: 1 * time.Hour, + errMsg: fmt.Sprintf(errWindowTooLargeFmt, 1*time.Hour, 1*time.Hour), + }, + { + desc: "window is just under", + window: time.Hour - time.Second, + }, + { + desc: "window is small", + window: time.Second, + }, + { + desc: "window is zero", + window: 0, + }, + { + desc: "window is negative", + window: -1 * time.Second, + errMsg: fmt.Sprintf(ErrEndBeforeStart), + }, +} + +func TestTimeIntervalRandWindow(t *testing.T) { + start := time.Date(2016, time.January, 1, 0, 0, 0, 0, time.UTC) + end := time.Date(2016, time.January, 1, 1, 0, 0, 0, time.UTC) + ti, err := NewTimeInterval(start, end) // 1 hour duration + if err != nil { + t.Fatalf("unexpected error creating TimeInterval: got %v", err) + } + + for _, c := range rwCases { + t.Run(c.desc, func(t *testing.T) { + x, err := ti.RandWindow(c.window) + if c.errMsg == "" { + if err != nil { + t.Errorf("unexpected error: got %v", err) + } else { + c.checkTimeInterval(t, ti, x) + } + } else { + if err == nil { + t.Errorf("unexpected lack of error") + } else if got := err.Error(); got != c.errMsg { + t.Errorf("unexpected error:\ngot\n%v\nwant\n%v", got, c.errMsg) + } + } + }) + } +} + +func TestTimeIntervalMustRandWindow(t *testing.T) { + start := time.Date(2016, time.January, 1, 0, 0, 0, 0, time.UTC) + end := time.Date(2016, time.January, 1, 1, 0, 0, 0, time.UTC) + ti, err := NewTimeInterval(start, end) // 1 hour duration + if err != nil { + t.Fatalf("unexpected error creating TimeInterval: got %v", err) + } + + for _, c := range rwCases { + t.Run(c.desc, func(t *testing.T) { + if c.errMsg != "" { + defer func() { + r := recover() + if r == nil { + t.Errorf("unexpected lack of panic") + } else if got := r.(string); got != c.errMsg { + t.Errorf("unexpected panic:\ngot\n%v\nwant\n%v", got, c.errMsg) + } + }() + } + x := ti.MustRandWindow(c.window) + if c.errMsg == "" { + c.checkTimeInterval(t, ti, x) + } + }) + } +}