Skip to content

Commit

Permalink
Refactor more functionality in devops pkg
Browse files Browse the repository at this point in the history
Most of the subsystems did roughly similar things with their
ToPoint methods. This code has been moved into the common
subsystemMeasurement type. Also added some tests for that struct.
  • Loading branch information
RobAtticus committed Aug 7, 2018
1 parent de13239 commit da63c7b
Show file tree
Hide file tree
Showing 13 changed files with 233 additions and 134 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ actual query results so that you can compare across databases that the
results are the same. Using the flag `-print-responses` will return
the results.

## Appendix I: Query types
## Appendix I: Query types <a name="appendix-i-query-types"></a>

### Devops / cpu-only
|Query type|Description|
Expand Down
20 changes: 4 additions & 16 deletions cmd/tsbs_generate_data/devops/cpu.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package devops

import (
"math"
"math/rand"
"time"

Expand All @@ -10,8 +9,8 @@ import (
)

var (
CPUByteString = []byte("cpu") // heap optimization
CPUFields = []labeledDistributionMaker{
labelCPU = []byte("cpu") // heap optimization
CPUFields = []labeledDistributionMaker{
{[]byte("usage_user"), func() common.Distribution { return common.CWD(cpuND, 0.0, 100.0, rand.Float64()*100.0) }},
{[]byte("usage_system"), func() common.Distribution { return common.CWD(cpuND, 0.0, 100.0, rand.Float64()*100.0) }},
{[]byte("usage_idle"), func() common.Distribution { return common.CWD(cpuND, 0.0, 100.0, rand.Float64()*100.0) }},
Expand Down Expand Up @@ -43,21 +42,10 @@ func newSingleCPUMeasurement(start time.Time) *CPUMeasurement {
}

func newCPUMeasurementNumDistributions(start time.Time, numDistributions int) *CPUMeasurement {
sub := newSubsystemMeasurement(start, numDistributions)
for i := 0; i < numDistributions; i++ {
sub.distributions[i] = CPUFields[i].distributionMaker()
}
sub := newSubsystemMeasurementWithDistributionMakers(start, CPUFields)
return &CPUMeasurement{sub}
}

func (m *CPUMeasurement) ToPoint(p *serialize.Point) {
p.SetMeasurementName(CPUByteString)
p.SetTimestamp(&m.timestamp)

for i := range m.distributions {
// Use ints for CPU metrics.
// The full float64 precision in the distributions list is bad for compression on some systems (e.g., ZFS).
// Anything above int precision is also not that common or useful for a devops CPU monitoring use case.
p.AppendField(CPUFields[i].label, math.Round(m.distributions[i].Get()))
}
m.toPointAllInt64(p, labelCPU, CPUFields)
}
4 changes: 2 additions & 2 deletions cmd/tsbs_generate_data/devops/disk.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ const (
)

var (
DiskByteString = []byte("disk") // heap optimization
labelDisk = []byte("disk") // heap optimization
TotalByteString = []byte("total")
FreeByteString = []byte("free")
UsedByteString = []byte("used")
Expand Down Expand Up @@ -56,7 +56,7 @@ func NewDiskMeasurement(start time.Time) *DiskMeasurement {
}

func (m *DiskMeasurement) ToPoint(p *serialize.Point) {
p.SetMeasurementName(DiskByteString)
p.SetMeasurementName(labelDisk)
p.SetTimestamp(&m.timestamp)

p.AppendTag(DiskTags[0], m.path)
Expand Down
16 changes: 3 additions & 13 deletions cmd/tsbs_generate_data/devops/diskio.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
)

var (
DiskIOByteString = []byte("diskio") // heap optimization
labelDiskIO = []byte("diskio") // heap optimization
SerialByteString = []byte("serial")

// Reuse NormalDistributions as arguments to other distributions. This is
Expand All @@ -37,11 +37,7 @@ type DiskIOMeasurement struct {
}

func NewDiskIOMeasurement(start time.Time) *DiskIOMeasurement {
sub := newSubsystemMeasurement(start, len(DiskIOFields))
for i := range DiskIOFields {
sub.distributions[i] = DiskIOFields[i].distributionMaker()
}

sub := newSubsystemMeasurementWithDistributionMakers(start, DiskIOFields)
serial := []byte(fmt.Sprintf("%03d-%03d-%03d", rand.Intn(1000), rand.Intn(1000), rand.Intn(1000)))
return &DiskIOMeasurement{
subsystemMeasurement: sub,
Expand All @@ -50,12 +46,6 @@ func NewDiskIOMeasurement(start time.Time) *DiskIOMeasurement {
}

func (m *DiskIOMeasurement) ToPoint(p *serialize.Point) {
p.SetMeasurementName(DiskIOByteString)
p.SetTimestamp(&m.timestamp)

m.toPointAllInt64(p, labelDiskIO, DiskIOFields)
p.AppendTag(SerialByteString, m.serial)

for i := range m.distributions {
p.AppendField(DiskIOFields[i].label, int64(m.distributions[i].Get()))
}
}
15 changes: 3 additions & 12 deletions cmd/tsbs_generate_data/devops/kernel.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
)

var (
KernelByteString = []byte("kernel") // heap optimization
labelKernel = []byte("kernel") // heap optimization
BootTimeByteString = []byte("boot_time")

// Reuse NormalDistributions as arguments to other distributions. This is
Expand All @@ -32,11 +32,7 @@ type KernelMeasurement struct {
}

func NewKernelMeasurement(start time.Time) *KernelMeasurement {
sub := newSubsystemMeasurement(start, len(KernelFields))
for i := range KernelFields {
sub.distributions[i] = KernelFields[i].distributionMaker()
}

sub := newSubsystemMeasurementWithDistributionMakers(start, KernelFields)
bootTime := rand.Int63n(240)
return &KernelMeasurement{
subsystemMeasurement: sub,
Expand All @@ -45,11 +41,6 @@ func NewKernelMeasurement(start time.Time) *KernelMeasurement {
}

func (m *KernelMeasurement) ToPoint(p *serialize.Point) {
p.SetMeasurementName(KernelByteString)
p.SetTimestamp(&m.timestamp)

p.AppendField(BootTimeByteString, m.bootTime)
for i := range m.distributions {
p.AppendField(KernelFields[i].label, int64(m.distributions[i].Get()))
}
m.toPointAllInt64(p, labelKernel, KernelFields)
}
31 changes: 31 additions & 0 deletions cmd/tsbs_generate_data/devops/measurement.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"time"

"bitbucket.org/440-labs/tsbs/cmd/tsbs_generate_data/common"
"bitbucket.org/440-labs/tsbs/cmd/tsbs_generate_data/serialize"
)

type subsystemMeasurement struct {
Expand All @@ -18,13 +19,43 @@ func newSubsystemMeasurement(start time.Time, numDistributions int) *subsystemMe
}
}

func newSubsystemMeasurementWithDistributionMakers(start time.Time, makers []labeledDistributionMaker) *subsystemMeasurement {
m := newSubsystemMeasurement(start, len(makers))
for i := 0; i < len(makers); i++ {
m.distributions[i] = makers[i].distributionMaker()
}
return m
}

func (m *subsystemMeasurement) Tick(d time.Duration) {
m.timestamp = m.timestamp.Add(d)
for i := range m.distributions {
m.distributions[i].Advance()
}
}

func (m *subsystemMeasurement) toPoint(p *serialize.Point, measurementName []byte, labels []labeledDistributionMaker) {
p.SetMeasurementName(measurementName)
p.SetTimestamp(&m.timestamp)

for i, d := range m.distributions {
p.AppendField(labels[i].label, d.Get())
}
}

// toPointAllInt64 fills in a serialize.Point with a given measurementName and
// all vales from the distributions stored as int64. The labels for each field
// are given by the supplied []labeledDistributionMaker, assuming that the distributions
// are in the same order.
func (m *subsystemMeasurement) toPointAllInt64(p *serialize.Point, measurementName []byte, labels []labeledDistributionMaker) {
p.SetMeasurementName(measurementName)
p.SetTimestamp(&m.timestamp)

for i, d := range m.distributions {
p.AppendField(labels[i].label, int64(d.Get()))
}
}

type labeledDistributionMaker struct {
label []byte
distributionMaker func() common.Distribution
Expand Down
155 changes: 155 additions & 0 deletions cmd/tsbs_generate_data/devops/measurement_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
package devops

import (
"bytes"
"math"
"strconv"
"strings"
"testing"
"time"

"bitbucket.org/440-labs/tsbs/cmd/tsbs_generate_data/common"
"bitbucket.org/440-labs/tsbs/cmd/tsbs_generate_data/serialize"
)

type monotonicDistribution struct {
state float64
}

func (d *monotonicDistribution) Advance() {
d.state++
}

func (d *monotonicDistribution) Get() float64 {
return d.state
}

func TestNewSubsystemMeasurement(t *testing.T) {
cases := []struct {
desc string
numDistros int
}{
{
desc: "no distros",
numDistros: 0,
},
{
desc: "one distro",
numDistros: 1,
},
{
desc: "three distros",
numDistros: 3,
},
}

for _, c := range cases {
now := time.Now()
m := newSubsystemMeasurement(now, c.numDistros)
if !m.timestamp.Equal(now) {
t.Errorf("%s: incorrect timestamp set: got %v want %v", c.desc, m.timestamp, now)
}
if got := len(m.distributions); got != c.numDistros {
t.Errorf("%s: incorrect number of distros: got %d want %d", c.desc, got, c.numDistros)
}
}
}

func TestNewSubsystemMeasurementWithDistributionMakers(t *testing.T) {
makers := []labeledDistributionMaker{
{[]byte("foo"), func() common.Distribution { return &monotonicDistribution{state: 0.0} }},
{[]byte("bar"), func() common.Distribution { return &monotonicDistribution{state: 1.0} }},
}
now := time.Now()
m := newSubsystemMeasurementWithDistributionMakers(now, makers)
if !m.timestamp.Equal(now) {
t.Errorf("incorrect timestamp set: got %v want %v", m.timestamp, now)
}

if got := len(m.distributions); got != len(makers) {
t.Errorf("incorrect number of distros: got %d want %d", got, len(makers))
}

for i := 0; i < 2; i++ {
md := m.distributions[i].(*monotonicDistribution)
if got := md.state; got != float64(i) {
t.Errorf("distribution %d has wrong state: got %f want %f", i, got, float64(i))
}
}
}

func TestSubsytemMeasurementTick(t *testing.T) {
now := time.Now()
numDistros := 3
m := newSubsystemMeasurement(now, numDistros)
for i := 0; i < numDistros; i++ {
m.distributions[i] = &monotonicDistribution{state: float64(i)}
}
m.Tick(time.Nanosecond)
if got := m.timestamp.UnixNano(); got != now.UnixNano()+1 {
t.Errorf("tick did not increase timestamp correct: got %d want %d", got, now.UnixNano()+1)
}
for i := 0; i < numDistros; i++ {
if got := m.distributions[i].Get(); got != float64(i+1) {
t.Errorf("tick did not advance distro %d: got %f want %f", i, got, float64(i+1))
}
}
}

const (
toPointState = 0.5
toPointLabel = "foo"
toPointFieldLabel = "foo1"
)

func TestToPoint(t *testing.T) {
now := time.Now()
m, makers := setupToPoint(now)
p := serialize.NewPoint()
m.toPoint(p, []byte(toPointLabel), makers)
testCommonToPoint(t, p, toPointState+1.0)
}

func TestToPointAllInt64(t *testing.T) {
now := time.Now()
m, makers := setupToPoint(now)
p := serialize.NewPoint()
m.toPointAllInt64(p, []byte(toPointLabel), makers)
testCommonToPoint(t, p, math.Floor(toPointState+1.0))
}

func setupToPoint(start time.Time) (*subsystemMeasurement, []labeledDistributionMaker) {
makers := []labeledDistributionMaker{
{[]byte(toPointFieldLabel), func() common.Distribution { return &monotonicDistribution{state: toPointState} }},
}
m := newSubsystemMeasurementWithDistributionMakers(start, makers)
m.Tick(time.Nanosecond)
return m, makers
}

func testCommonToPoint(t *testing.T, p *serialize.Point, fieldVal float64) {
// serialize the point to check output
b := new(bytes.Buffer)
serializer := &serialize.InfluxSerializer{}
serializer.Serialize(p, b)

if got := string(p.MeasurementName()); got != toPointLabel {
t.Errorf("measurement name incorrect: got %s want %s", got, toPointLabel)
}

output := b.String()

args := strings.Split(output, " ")
fieldArgs := strings.Split(args[1], "=")
fieldArgs[1] = strings.Replace(fieldArgs[1], "i", "", -1)
if got := fieldArgs[0]; got != toPointFieldLabel {
t.Errorf("incorrect field label: got %s want %s", got, toPointFieldLabel)
}
if got, err := strconv.ParseFloat(fieldArgs[1], 64); err != nil || got != fieldVal {
if err != nil {
t.Errorf("could not parse field value as float64: %v", err)
} else {
t.Errorf("incorrect field value: got %f want %f", got, fieldVal)
}
}
}
4 changes: 2 additions & 2 deletions cmd/tsbs_generate_data/devops/mem.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
)

var (
MemoryByteString = []byte("mem") // heap optimization
labelMem = []byte("mem") // heap optimization

// Choices for modeling a host's memory capacity.
MemoryMaxBytesChoices = []int64{8 << 30, 12 << 30, 16 << 30}
Expand Down Expand Up @@ -56,7 +56,7 @@ func NewMemMeasurement(start time.Time) *MemMeasurement {
}

func (m *MemMeasurement) ToPoint(p *serialize.Point) {
p.SetMeasurementName(MemoryByteString)
p.SetMeasurementName(labelMem)
p.SetTimestamp(&m.timestamp)

total := m.bytesTotal
Expand Down
Loading

0 comments on commit da63c7b

Please sign in to comment.