Skip to content

Commit

Permalink
Combine both TimeInterval types into single type
Browse files Browse the repository at this point in the history
Query generation and Cassandra's query running both used a type
called TimeInterval that did roughly the same thing. This change
combines the two into one type that can be used from the utils
package in internal/. This improves code reuse and keeps the two
representations in sync, and also increases the testability of
the code.
  • Loading branch information
RobAtticus committed May 21, 2019
1 parent 3cfaae0 commit 3ce1a1f
Show file tree
Hide file tree
Showing 17 changed files with 620 additions and 244 deletions.
28 changes: 18 additions & 10 deletions cmd/tsbs_generate_queries/databases/cassandra/devops.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"time"

"github.com/timescale/tsbs/cmd/tsbs_generate_queries/uses/devops"
"github.com/timescale/tsbs/cmd/tsbs_generate_queries/utils"
"github.com/timescale/tsbs/internal/utils"
"github.com/timescale/tsbs/query"
)

Expand Down Expand Up @@ -50,7 +50,8 @@ func (d *Devops) getHostWhere(nHosts int) []string {
// AND time >= '$HOUR_START' AND time < '$HOUR_END'
// GROUP BY minute ORDER BY minute ASC
func (d *Devops) GroupByTime(qi query.Query, nHosts, numMetrics int, timeRange time.Duration) {
interval := d.Interval.RandWindow(timeRange)
interval := d.Interval.MustRandWindow(timeRange)

metrics := devops.GetCPUMetricsSlice(numMetrics)
tagSet := d.getHostWhere(nHosts)

Expand All @@ -70,8 +71,12 @@ func (d *Devops) GroupByTime(qi query.Query, nHosts, numMetrics int, timeRange t
// GROUP BY t ORDER BY t DESC
// LIMIT $LIMIT
func (d *Devops) GroupByOrderByLimit(qi query.Query) {
interval := d.Interval.RandWindow(time.Hour)
interval.Start = d.Interval.Start
interval := d.Interval.MustRandWindow(time.Hour)

interval, err := utils.NewTimeInterval(d.Interval.Start(), interval.End())
if err != nil {
panic(err.Error())
}

humanLabel := "Cassandra max cpu over last 5 min-intervals (random end)"
humanDesc := fmt.Sprintf("%s: %s", humanLabel, d.Interval.StartString())
Expand All @@ -90,7 +95,8 @@ func (d *Devops) GroupByOrderByLimit(qi query.Query) {
// WHERE time >= '$HOUR_START' AND time < '$HOUR_END'
// GROUP BY hour, hostname ORDER BY hour
func (d *Devops) GroupByTimeAndPrimaryTag(qi query.Query, numMetrics int) {
interval := d.Interval.RandWindow(devops.DoubleGroupByDuration)
interval := d.Interval.MustRandWindow(devops.DoubleGroupByDuration)

metrics := devops.GetCPUMetricsSlice(numMetrics)

humanLabel := devops.GetDoubleGroupByLabel("Cassandra", numMetrics)
Expand All @@ -108,7 +114,8 @@ func (d *Devops) GroupByTimeAndPrimaryTag(qi query.Query, numMetrics int) {
// AND time >= '$HOUR_START' AND time < '$HOUR_END'
// GROUP BY hour ORDER BY hour
func (d *Devops) MaxAllCPU(qi query.Query, nHosts int) {
interval := d.Interval.RandWindow(devops.MaxAllDuration)
interval := d.Interval.MustRandWindow(devops.MaxAllDuration)

tagSet := d.getHostWhere(nHosts)

tagSets := [][]string{}
Expand Down Expand Up @@ -140,7 +147,8 @@ func (d *Devops) LastPointPerHost(qi query.Query) {
// AND time >= '$TIME_START' AND time < '$TIME_END'
// AND (hostname = '$HOST' OR hostname = '$HOST2'...)
func (d *Devops) HighCPUForHosts(qi query.Query, nHosts int) {
interval := d.Interval.RandWindow(devops.HighCPUDuration)
interval := d.Interval.MustRandWindow(devops.HighCPUDuration)

tagSet := d.getHostWhere(nHosts)

tagSets := [][]string{}
Expand All @@ -156,7 +164,7 @@ func (d *Devops) HighCPUForHosts(qi query.Query, nHosts int) {
q.WhereClause = []byte("usage_user,>,90.0")
}

func (d *Devops) fillInQuery(qi query.Query, humanLabel, humanDesc, aggType string, fields []string, interval utils.TimeInterval, tagSets [][]string) {
func (d *Devops) fillInQuery(qi query.Query, humanLabel, humanDesc, aggType string, fields []string, interval *utils.TimeInterval, tagSets [][]string) {
q := qi.(*query.Cassandra)
q.HumanLabel = []byte(humanLabel)
q.HumanDescription = []byte(humanDesc)
Expand All @@ -165,8 +173,8 @@ func (d *Devops) fillInQuery(qi query.Query, humanLabel, humanDesc, aggType stri
q.MeasurementName = []byte("cpu")
q.FieldName = []byte(strings.Join(fields, ","))

q.TimeStart = interval.Start
q.TimeEnd = interval.End
q.TimeStart = interval.Start()
q.TimeEnd = interval.End()

q.TagSets = tagSets
}
44 changes: 23 additions & 21 deletions cmd/tsbs_generate_queries/databases/clickhouse/devops.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ const clickhouseTimeStringFormat = "2006-01-02 15:04:05"
// cpu-max-all-1
// cpu-max-all-8
func (d *Devops) MaxAllCPU(qi query.Query, nHosts int) {
interval := d.Interval.RandWindow(devops.MaxAllDuration)
interval := d.Interval.MustRandWindow(devops.MaxAllDuration)
metrics := devops.GetAllCPUMetrics()
selectClauses := d.getSelectClausesAggMetrics("max", metrics)

Expand All @@ -106,8 +106,8 @@ func (d *Devops) MaxAllCPU(qi query.Query, nHosts int) {
`,
strings.Join(selectClauses, ", "),
d.getHostWhereString(nHosts),
interval.Start.Format(clickhouseTimeStringFormat),
interval.End.Format(clickhouseTimeStringFormat))
interval.Start().Format(clickhouseTimeStringFormat),
interval.End().Format(clickhouseTimeStringFormat))

humanLabel := devops.GetMaxAllLabel("ClickHouse", nHosts)
humanDesc := fmt.Sprintf("%s: %s", humanLabel, interval.StartString())
Expand All @@ -129,7 +129,7 @@ func (d *Devops) MaxAllCPU(qi query.Query, nHosts int) {
// double-groupby-all
func (d *Devops) GroupByTimeAndPrimaryTag(qi query.Query, numMetrics int) {
metrics := devops.GetCPUMetricsSlice(numMetrics)
interval := d.Interval.RandWindow(devops.DoubleGroupByDuration)
interval := d.Interval.MustRandWindow(devops.DoubleGroupByDuration)

selectClauses := make([]string, numMetrics)
meanClauses := make([]string, numMetrics)
Expand All @@ -149,7 +149,7 @@ func (d *Devops) GroupByTimeAndPrimaryTag(qi query.Query, numMetrics int) {
hour,
%s,
%s
FROM
FROM
(
SELECT
toStartOfHour(created_at) AS hour,
Expand All @@ -161,16 +161,16 @@ func (d *Devops) GroupByTimeAndPrimaryTag(qi query.Query, numMetrics int) {
hour,
id
) AS cpu_avg
%s
%s
ORDER BY
hour ASC,
%s
`,
hostnameField, // main SELECT %s,
strings.Join(meanClauses, ", "), // main SELECT %s
strings.Join(selectClauses, ", "), // cpu_avg SELECT %s
interval.Start.Format(clickhouseTimeStringFormat), // cpu_avg time >= '%s'
interval.End.Format(clickhouseTimeStringFormat), // cpu_avg time < '%s'
hostnameField, // main SELECT %s,
strings.Join(meanClauses, ", "), // main SELECT %s
strings.Join(selectClauses, ", "), // cpu_avg SELECT %s
interval.Start().Format(clickhouseTimeStringFormat), // cpu_avg time >= '%s'
interval.End().Format(clickhouseTimeStringFormat), // cpu_avg time < '%s'
joinClause, // JOIN clause
hostnameField) // ORDER BY %s

Expand All @@ -190,7 +190,8 @@ func (d *Devops) GroupByTimeAndPrimaryTag(qi query.Query, numMetrics int) {
// Resultsets:
// groupby-orderby-limit
func (d *Devops) GroupByOrderByLimit(qi query.Query) {
interval := d.Interval.RandWindow(time.Hour)
interval := d.Interval.MustRandWindow(time.Hour)

sql := fmt.Sprintf(`
SELECT
toStartOfMinute(created_at) AS minute,
Expand All @@ -201,7 +202,7 @@ func (d *Devops) GroupByOrderByLimit(qi query.Query) {
ORDER BY minute DESC
LIMIT 5
`,
interval.End.Format(clickhouseTimeStringFormat))
interval.End().Format(clickhouseTimeStringFormat))

humanLabel := "ClickHouse max cpu over last 5 min-intervals (random end)"
humanDesc := fmt.Sprintf("%s: %s", humanLabel, interval.EndString())
Expand All @@ -227,15 +228,15 @@ func (d *Devops) HighCPUForHosts(qi query.Query, nHosts int) {
} else {
hostWhereClause = fmt.Sprintf("AND (%s)", d.getHostWhereString(nHosts))
}
interval := d.Interval.RandWindow(devops.HighCPUDuration)
interval := d.Interval.MustRandWindow(devops.HighCPUDuration)

sql := fmt.Sprintf(`
SELECT *
FROM cpu
PREWHERE (usage_user > 90.0) AND (created_at >= '%s') AND (created_at < '%s') %s
`,
interval.Start.Format(clickhouseTimeStringFormat),
interval.End.Format(clickhouseTimeStringFormat),
interval.Start().Format(clickhouseTimeStringFormat),
interval.End().Format(clickhouseTimeStringFormat),
hostWhereClause)

humanLabel := devops.GetHighCPULabel("ClickHouse", nHosts)
Expand All @@ -255,8 +256,8 @@ func (d *Devops) LastPointPerHost(qi query.Query) {
FROM
(
SELECT *
FROM cpu
WHERE (tags_id, created_at) IN
FROM cpu
WHERE (tags_id, created_at) IN
(
SELECT
tags_id,
Expand Down Expand Up @@ -306,7 +307,8 @@ func (d *Devops) LastPointPerHost(qi query.Query) {
// single-groupby-5-1-1
// single-groupby-5-8-1
func (d *Devops) GroupByTime(qi query.Query, nHosts, numMetrics int, timeRange time.Duration) {
interval := d.Interval.RandWindow(timeRange)
interval := d.Interval.MustRandWindow(timeRange)

metrics := devops.GetCPUMetricsSlice(numMetrics)
selectClauses := d.getSelectClausesAggMetrics("max", metrics)

Expand All @@ -321,8 +323,8 @@ func (d *Devops) GroupByTime(qi query.Query, nHosts, numMetrics int, timeRange t
`,
strings.Join(selectClauses, ", "),
d.getHostWhereString(nHosts),
interval.Start.Format(clickhouseTimeStringFormat),
interval.End.Format(clickhouseTimeStringFormat))
interval.Start().Format(clickhouseTimeStringFormat),
interval.End().Format(clickhouseTimeStringFormat))

humanLabel := fmt.Sprintf("ClickHouse %d cpu metric(s), random %4d hosts, random %s by 1m", numMetrics, nHosts, timeRange)
humanDesc := fmt.Sprintf("%s: %s", humanLabel, interval.StartString())
Expand Down
13 changes: 7 additions & 6 deletions cmd/tsbs_generate_queries/databases/influx/devops.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func (d *Devops) getSelectClausesAggMetrics(agg string, metrics []string) []stri
// AND time >= '$HOUR_START' AND time < '$HOUR_END'
// GROUP BY minute ORDER BY minute ASC
func (d *Devops) GroupByTime(qi query.Query, nHosts, numMetrics int, timeRange time.Duration) {
interval := d.Interval.RandWindow(timeRange)
interval := d.Interval.MustRandWindow(timeRange)
metrics := devops.GetCPUMetricsSlice(numMetrics)
selectClauses := d.getSelectClausesAggMetrics("max", metrics)
whereHosts := d.getHostWhereString(nHosts)
Expand All @@ -76,7 +76,7 @@ func (d *Devops) GroupByTime(qi query.Query, nHosts, numMetrics int, timeRange t
// GROUP BY t ORDER BY t DESC
// LIMIT $LIMIT
func (d *Devops) GroupByOrderByLimit(qi query.Query) {
interval := d.Interval.RandWindow(time.Hour)
interval := d.Interval.MustRandWindow(time.Hour)
where := fmt.Sprintf("WHERE time < '%s'", interval.EndString())

humanLabel := "Influx max cpu over last 5 min-intervals (random end)"
Expand All @@ -94,7 +94,7 @@ func (d *Devops) GroupByOrderByLimit(qi query.Query) {
// GROUP BY hour, hostname ORDER BY hour, hostname
func (d *Devops) GroupByTimeAndPrimaryTag(qi query.Query, numMetrics int) {
metrics := devops.GetCPUMetricsSlice(numMetrics)
interval := d.Interval.RandWindow(devops.DoubleGroupByDuration)
interval := d.Interval.MustRandWindow(devops.DoubleGroupByDuration)
selectClauses := d.getSelectClausesAggMetrics("mean", metrics)

humanLabel := devops.GetDoubleGroupByLabel("Influx", numMetrics)
Expand All @@ -111,7 +111,7 @@ func (d *Devops) GroupByTimeAndPrimaryTag(qi query.Query, numMetrics int) {
// AND time >= '$HOUR_START' AND time < '$HOUR_END'
// GROUP BY hour ORDER BY hour
func (d *Devops) MaxAllCPU(qi query.Query, nHosts int) {
interval := d.Interval.RandWindow(devops.MaxAllDuration)
interval := d.Interval.MustRandWindow(devops.MaxAllDuration)
whereHosts := d.getHostWhereString(nHosts)
selectClauses := d.getSelectClausesAggMetrics("max", devops.GetAllCPUMetrics())

Expand All @@ -138,7 +138,8 @@ func (d *Devops) LastPointPerHost(qi query.Query) {
// AND time >= '$TIME_START' AND time < '$TIME_END'
// AND (hostname = '$HOST' OR hostname = '$HOST2'...)
func (d *Devops) HighCPUForHosts(qi query.Query, nHosts int) {
interval := d.Interval.RandWindow(devops.HighCPUDuration)
interval := d.Interval.MustRandWindow(devops.HighCPUDuration)

var hostWhereClause string
if nHosts == 0 {
hostWhereClause = ""
Expand All @@ -147,7 +148,7 @@ func (d *Devops) HighCPUForHosts(qi query.Query, nHosts int) {
}

humanLabel := devops.GetHighCPULabel("Influx", nHosts)
humanDesc := fmt.Sprintf("%s: %s", humanLabel, interval)
humanDesc := fmt.Sprintf("%s: %s", humanLabel, interval.StartString())
influxql := fmt.Sprintf("SELECT * from cpu where usage_user > 90.0 %s and time >= '%s' and time < '%s'", hostWhereClause, interval.StartString(), interval.EndString())
d.fillInQuery(qi, humanLabel, humanDesc, influxql)
}
Expand Down
4 changes: 2 additions & 2 deletions cmd/tsbs_generate_queries/databases/mongo/devops-naive.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func (d *NaiveDevops) GenerateEmptyQuery() query.Query {
// AND time >= '$HOUR_START' AND time < '$HOUR_END'
// GROUP BY minute ORDER BY minute ASC
func (d *NaiveDevops) GroupByTime(qi query.Query, nHosts, numMetrics int, timeRange time.Duration) {
interval := d.Interval.RandWindow(timeRange)
interval := d.Interval.MustRandWindow(timeRange)
hostnames := d.GetRandomHosts(nHosts)
metrics := devops.GetCPUMetricsSlice(numMetrics)

Expand Down Expand Up @@ -105,7 +105,7 @@ func (d *NaiveDevops) GroupByTime(qi query.Query, nHosts, numMetrics int, timeRa
// WHERE time >= '$HOUR_START' AND time < '$HOUR_END'
// GROUP BY hour, hostname ORDER BY hour, hostname
func (d *NaiveDevops) GroupByTimeAndPrimaryTag(qi query.Query, numMetrics int) {
interval := d.Interval.RandWindow(devops.DoubleGroupByDuration)
interval := d.Interval.MustRandWindow(devops.DoubleGroupByDuration)
metrics := devops.GetCPUMetricsSlice(numMetrics)
bucketNano := time.Hour.Nanoseconds()

Expand Down
30 changes: 18 additions & 12 deletions cmd/tsbs_generate_queries/databases/mongo/devops.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (

"github.com/globalsign/mgo/bson"
"github.com/timescale/tsbs/cmd/tsbs_generate_queries/uses/devops"
"github.com/timescale/tsbs/cmd/tsbs_generate_queries/utils"
"github.com/timescale/tsbs/internal/utils"
"github.com/timescale/tsbs/query"
)

Expand Down Expand Up @@ -35,7 +35,7 @@ func (d *Devops) GenerateEmptyQuery() query.Query {
return query.NewMongo()
}

func getTimeFilterPipeline(interval utils.TimeInterval) []bson.M {
func getTimeFilterPipeline(interval *utils.TimeInterval) []bson.M {
return []bson.M{
{"$unwind": "$events"},
{
Expand Down Expand Up @@ -72,16 +72,16 @@ func getTimeFilterPipeline(interval utils.TimeInterval) []bson.M {

const aggDateFmt = "20060102" // see Go docs for how we arrive at this time format

func getTimeFilterDocs(interval utils.TimeInterval) []interface{} {
func getTimeFilterDocs(interval *utils.TimeInterval) []interface{} {
docs := []interface{}{}
startDay := interval.Start.Format(aggDateFmt)
startHr := interval.Start.Hour()
startDay := interval.Start().Format(aggDateFmt)
startHr := interval.Start().Hour()
lenHrs := int(interval.Duration()/time.Hour) + 1
for i := 0; i < lenHrs; i++ {
hr := int(startHr) + i
if hr > 23 {
days := int64(hr / 24)
day := interval.Start.Add(time.Duration(days * 24 * 60 * 60 * 1e9))
day := interval.Start().Add(time.Duration(days * 24 * 60 * 60 * 1e9))
docs = append(docs, fmt.Sprintf("%s_%02d", day.Format(aggDateFmt), hr%24))
} else {
docs = append(docs, fmt.Sprintf("%s_%02d", startDay, hr))
Expand All @@ -101,7 +101,8 @@ func getTimeFilterDocs(interval utils.TimeInterval) []interface{} {
// AND time >= '$HOUR_START' AND time < '$HOUR_END'
// GROUP BY minute ORDER BY minute ASC
func (d *Devops) GroupByTime(qi query.Query, nHosts, numMetrics int, timeRange time.Duration) {
interval := d.Interval.RandWindow(timeRange)
interval := d.Interval.MustRandWindow(timeRange)

hostnames := d.GetRandomHosts(nHosts)
metrics := devops.GetCPUMetricsSlice(numMetrics)
docs := getTimeFilterDocs(interval)
Expand Down Expand Up @@ -169,7 +170,7 @@ func (d *Devops) GroupByTime(qi query.Query, nHosts, numMetrics int, timeRange t
// AND time >= '$HOUR_START' AND time < '$HOUR_END'
// GROUP BY hour ORDER BY hour
func (d *Devops) MaxAllCPU(qi query.Query, nHosts int) {
interval := d.Interval.RandWindow(devops.MaxAllDuration)
interval := d.Interval.MustRandWindow(devops.MaxAllDuration)
hostnames := d.GetRandomHosts(nHosts)
docs := getTimeFilterDocs(interval)
bucketNano := time.Hour.Nanoseconds()
Expand Down Expand Up @@ -237,7 +238,8 @@ func (d *Devops) MaxAllCPU(qi query.Query, nHosts int) {
// WHERE time >= '$HOUR_START' AND time < '$HOUR_END'
// GROUP BY hour, hostname ORDER BY hour, hostname
func (d *Devops) GroupByTimeAndPrimaryTag(qi query.Query, numMetrics int) {
interval := d.Interval.RandWindow(devops.DoubleGroupByDuration)
interval := d.Interval.MustRandWindow(devops.DoubleGroupByDuration)

metrics := devops.GetCPUMetricsSlice(numMetrics)
docs := getTimeFilterDocs(interval)
bucketNano := time.Hour.Nanoseconds()
Expand Down Expand Up @@ -317,7 +319,8 @@ func (d *Devops) GroupByTimeAndPrimaryTag(qi query.Query, numMetrics int) {
// AND time >= '$TIME_START' AND time < '$TIME_END'
// AND (hostname = '$HOST' OR hostname = '$HOST2'...)
func (d *Devops) HighCPUForHosts(qi query.Query, nHosts int) {
interval := d.Interval.RandWindow(devops.HighCPUDuration)
interval := d.Interval.MustRandWindow(devops.HighCPUDuration)

hostnames := d.GetRandomHosts(nHosts)
docs := getTimeFilterDocs(interval)

Expand Down Expand Up @@ -448,8 +451,11 @@ func (d *Devops) LastPointPerHost(qi query.Query) {
// GROUP BY t ORDER BY t DESC
// LIMIT $LIMIT
func (d *Devops) GroupByOrderByLimit(qi query.Query) {
interval := d.Interval.RandWindow(time.Hour)
interval = utils.NewTimeInterval(d.Interval.Start, interval.End)
interval := d.Interval.MustRandWindow(time.Hour)
interval, err := utils.NewTimeInterval(d.Interval.Start(), interval.End())
if err != nil {
panic(err.Error())
}
docs := getTimeFilterDocs(interval)
bucketNano := time.Minute.Nanoseconds()

Expand Down
Loading

0 comments on commit 3ce1a1f

Please sign in to comment.