Skip to content

Commit

Permalink
Merged in rrk/refactor-devops (pull request timescale#77)
Browse files Browse the repository at this point in the history
Refactor the devops pkg in tsbs_generate_data

Approved-by: Lee Hampton <leejhampton@gmail.com>
  • Loading branch information
RobAtticus committed Aug 1, 2018
2 parents 9c059f4 + 6f92279 commit 4ee1731
Show file tree
Hide file tree
Showing 12 changed files with 274 additions and 337 deletions.
57 changes: 22 additions & 35 deletions cmd/tsbs_generate_data/devops/cpu.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,56 +11,43 @@ import (

var (
CPUByteString = []byte("cpu") // heap optimization
CPUFieldKeys = [][]byte{
[]byte("usage_user"),
[]byte("usage_system"),
[]byte("usage_idle"),
[]byte("usage_nice"),
[]byte("usage_iowait"),
[]byte("usage_irq"),
[]byte("usage_softirq"),
[]byte("usage_steal"),
[]byte("usage_guest"),
[]byte("usage_guest_nice"),
CPUFields = []labeledDistributionMaker{
{[]byte("usage_user"), func() common.Distribution { return common.CWD(cpuND, 0.0, 100.0, rand.Float64()*100.0) }},
{[]byte("usage_system"), func() common.Distribution { return common.CWD(cpuND, 0.0, 100.0, rand.Float64()*100.0) }},
{[]byte("usage_idle"), func() common.Distribution { return common.CWD(cpuND, 0.0, 100.0, rand.Float64()*100.0) }},
{[]byte("usage_nice"), func() common.Distribution { return common.CWD(cpuND, 0.0, 100.0, rand.Float64()*100.0) }},
{[]byte("usage_iowait"), func() common.Distribution { return common.CWD(cpuND, 0.0, 100.0, rand.Float64()*100.0) }},
{[]byte("usage_irq"), func() common.Distribution { return common.CWD(cpuND, 0.0, 100.0, rand.Float64()*100.0) }},
{[]byte("usage_softirq"), func() common.Distribution { return common.CWD(cpuND, 0.0, 100.0, rand.Float64()*100.0) }},
{[]byte("usage_steal"), func() common.Distribution { return common.CWD(cpuND, 0.0, 100.0, rand.Float64()*100.0) }},
{[]byte("usage_guest"), func() common.Distribution { return common.CWD(cpuND, 0.0, 100.0, rand.Float64()*100.0) }},
{[]byte("usage_guest_nice"), func() common.Distribution { return common.CWD(cpuND, 0.0, 100.0, rand.Float64()*100.0) }},
}
)

var cpuND = &common.NormalDistribution{Mean: 0.0, StdDev: 1.0}
// Reuse NormalDistributions as arguments to other distributions. This is
// safe to do because the higher-level distribution advances the ND and
// immediately uses its value and saves the state
var cpuND = common.ND(0.0, 1.0)

type CPUMeasurement struct {
timestamp time.Time
distributions []common.Distribution
*subsystemMeasurement
}

func NewCPUMeasurement(start time.Time) *CPUMeasurement {
return newCPUMeasurementNumDistributions(start, len(CPUFieldKeys))
return newCPUMeasurementNumDistributions(start, len(CPUFields))
}

func newSingleCPUMeasurement(start time.Time) *CPUMeasurement {
return newCPUMeasurementNumDistributions(start, 1)
}

func newCPUMeasurementNumDistributions(start time.Time, numDistributions int) *CPUMeasurement {
distributions := make([]common.Distribution, numDistributions)
for i := range distributions {
distributions[i] = &common.ClampedRandomWalkDistribution{
State: rand.Float64() * 100.0,
Min: 0.0,
Max: 100.0,
Step: cpuND,
}
}
return &CPUMeasurement{
timestamp: start,
distributions: distributions,
}
}

func (m *CPUMeasurement) Tick(d time.Duration) {
m.timestamp = m.timestamp.Add(d)
for i := range m.distributions {
m.distributions[i].Advance()
sub := newSubsystemMeasurement(start, numDistributions)
for i := 0; i < numDistributions; i++ {
sub.distributions[i] = CPUFields[i].distributionMaker()
}
return &CPUMeasurement{sub}
}

func (m *CPUMeasurement) ToPoint(p *serialize.Point) {
Expand All @@ -71,6 +58,6 @@ func (m *CPUMeasurement) ToPoint(p *serialize.Point) {
// Use ints for CPU metrics.
// The full float64 precision in the distributions list is bad for compression on some systems (e.g., ZFS).
// Anything above int precision is also not that common or useful for a devops CPU monitoring use case.
p.AppendField(CPUFieldKeys[i], math.Round(m.distributions[i].Get()))
p.AppendField(CPUFields[i].label, math.Round(m.distributions[i].Get()))
}
}
38 changes: 17 additions & 21 deletions cmd/tsbs_generate_data/devops/disk.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@ import (
"bitbucket.org/440-labs/tsbs/cmd/tsbs_generate_data/serialize"
)

const OneTerabyte = 1 << 40
const (
OneTerabyte = 1 << 40
inodeSize = 4096
)

var (
DiskByteString = []byte("disk") // heap optimization
Expand All @@ -33,31 +36,25 @@ var (
)

type DiskMeasurement struct {
timestamp time.Time
*subsystemMeasurement

path, fsType []byte
uptime time.Duration
freeBytesDist common.Distribution
path, fsType []byte
uptime time.Duration
}

func NewDiskMeasurement(start time.Time) *DiskMeasurement {
path := []byte(fmt.Sprintf("/dev/sda%d", rand.Intn(10)))
fsType := DiskFSTypeChoices[rand.Intn(len(DiskFSTypeChoices))]
return &DiskMeasurement{
path: path,
fsType: fsType,
sub := newSubsystemMeasurement(start, 1)
sub.distributions[0] = common.CWD(common.ND(50, 1), 0, OneTerabyte, OneTerabyte/2)

timestamp: start,
freeBytesDist: common.CWD(common.ND(50, 1), 0, OneTerabyte, OneTerabyte/2),
return &DiskMeasurement{
subsystemMeasurement: sub,
path: path,
fsType: fsType,
}
}

func (m *DiskMeasurement) Tick(d time.Duration) {
m.timestamp = m.timestamp.Add(d)

m.freeBytesDist.Advance()
}

func (m *DiskMeasurement) ToPoint(p *serialize.Point) {
p.SetMeasurementName(DiskByteString)
p.SetTimestamp(&m.timestamp)
Expand All @@ -66,16 +63,15 @@ func (m *DiskMeasurement) ToPoint(p *serialize.Point) {
p.AppendTag(DiskTags[1], m.fsType)

// the only thing that actually changes is the free byte count:
free := int64(m.freeBytesDist.Get())
free := int64(m.distributions[0].Get())

total := int64(OneTerabyte)
used := total - free
usedPercent := int64(100.0 * (float64(used) / float64(total)))

// inodes are 4096b in size:
inodesTotal := total / 4096
inodesFree := free / 4096
inodesUsed := used / 4096
inodesTotal := total / inodeSize
inodesFree := free / inodeSize
inodesUsed := used / inodeSize

p.AppendField(TotalByteString, total)
p.AppendField(FreeByteString, free)
Expand Down
49 changes: 22 additions & 27 deletions cmd/tsbs_generate_data/devops/diskio.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,44 +13,39 @@ var (
DiskIOByteString = []byte("diskio") // heap optimization
SerialByteString = []byte("serial")

DiskIOFields = []LabeledDistributionMaker{
{[]byte("reads"), func() common.Distribution { return common.MWD(common.ND(50, 1), 0) }},
{[]byte("writes"), func() common.Distribution { return common.MWD(common.ND(50, 1), 0) }},
{[]byte("read_bytes"), func() common.Distribution { return common.MWD(common.ND(100, 1), 0) }},
{[]byte("write_bytes"), func() common.Distribution { return common.MWD(common.ND(100, 1), 0) }},
{[]byte("read_time"), func() common.Distribution { return common.MWD(common.ND(5, 1), 0) }},
{[]byte("write_time"), func() common.Distribution { return common.MWD(common.ND(5, 1), 0) }},
{[]byte("io_time"), func() common.Distribution { return common.MWD(common.ND(5, 1), 0) }},
// Reuse NormalDistributions as arguments to other distributions. This is
// safe to do because the higher-level distribution advances the ND and
// immediately uses its value and saves the state
opsND = common.ND(50, 1)
bytesND = common.ND(100, 1)
timeND = common.ND(5, 1)

DiskIOFields = []labeledDistributionMaker{
{[]byte("reads"), func() common.Distribution { return common.MWD(opsND, 0) }},
{[]byte("writes"), func() common.Distribution { return common.MWD(opsND, 0) }},
{[]byte("read_bytes"), func() common.Distribution { return common.MWD(bytesND, 0) }},
{[]byte("write_bytes"), func() common.Distribution { return common.MWD(bytesND, 0) }},
{[]byte("read_time"), func() common.Distribution { return common.MWD(timeND, 0) }},
{[]byte("write_time"), func() common.Distribution { return common.MWD(timeND, 0) }},
{[]byte("io_time"), func() common.Distribution { return common.MWD(timeND, 0) }},
}
)

type DiskIOMeasurement struct {
timestamp time.Time

serial []byte
distributions []common.Distribution
*subsystemMeasurement
serial []byte
}

func NewDiskIOMeasurement(start time.Time) *DiskIOMeasurement {
distributions := make([]common.Distribution, len(DiskIOFields))
sub := newSubsystemMeasurement(start, len(DiskIOFields))
for i := range DiskIOFields {
distributions[i] = DiskIOFields[i].DistributionMaker()
sub.distributions[i] = DiskIOFields[i].distributionMaker()
}

serial := []byte(fmt.Sprintf("%03d-%03d-%03d", rand.Intn(1000), rand.Intn(1000), rand.Intn(1000)))
return &DiskIOMeasurement{
serial: serial,

timestamp: start,
distributions: distributions,
}
}

func (m *DiskIOMeasurement) Tick(d time.Duration) {
m.timestamp = m.timestamp.Add(d)

for i := range m.distributions {
m.distributions[i].Advance()
subsystemMeasurement: sub,
serial: serial,
}
}

Expand All @@ -61,6 +56,6 @@ func (m *DiskIOMeasurement) ToPoint(p *serialize.Point) {
p.AppendTag(SerialByteString, m.serial)

for i := range m.distributions {
p.AppendField(DiskIOFields[i].Label, int64(m.distributions[i].Get()))
p.AppendField(DiskIOFields[i].label, int64(m.distributions[i].Get()))
}
}
45 changes: 19 additions & 26 deletions cmd/tsbs_generate_data/devops/kernel.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,43 +11,36 @@ import (
var (
KernelByteString = []byte("kernel") // heap optimization
BootTimeByteString = []byte("boot_time")
KernelFields = []LabeledDistributionMaker{
{[]byte("interrupts"), func() common.Distribution { return common.MWD(common.ND(5, 1), 0) }},
{[]byte("context_switches"), func() common.Distribution { return common.MWD(common.ND(5, 1), 0) }},
{[]byte("processes_forked"), func() common.Distribution { return common.MWD(common.ND(5, 1), 0) }},
{[]byte("disk_pages_in"), func() common.Distribution { return common.MWD(common.ND(5, 1), 0) }},
{[]byte("disk_pages_out"), func() common.Distribution { return common.MWD(common.ND(5, 1), 0) }},

// Reuse NormalDistributions as arguments to other distributions. This is
// safe to do because the higher-level distribution advances the ND and
// immediately uses its value and saves the state
kernelND = common.ND(5, 1)

KernelFields = []labeledDistributionMaker{
{[]byte("interrupts"), func() common.Distribution { return common.MWD(kernelND, 0) }},
{[]byte("context_switches"), func() common.Distribution { return common.MWD(kernelND, 0) }},
{[]byte("processes_forked"), func() common.Distribution { return common.MWD(kernelND, 0) }},
{[]byte("disk_pages_in"), func() common.Distribution { return common.MWD(kernelND, 0) }},
{[]byte("disk_pages_out"), func() common.Distribution { return common.MWD(kernelND, 0) }},
}
)

type KernelMeasurement struct {
timestamp time.Time

bootTime int64
uptime time.Duration
distributions []common.Distribution
*subsystemMeasurement
bootTime int64
}

func NewKernelMeasurement(start time.Time) *KernelMeasurement {
distributions := make([]common.Distribution, len(KernelFields))
sub := newSubsystemMeasurement(start, len(KernelFields))
for i := range KernelFields {
distributions[i] = KernelFields[i].DistributionMaker()
sub.distributions[i] = KernelFields[i].distributionMaker()
}

bootTime := rand.Int63n(240)
return &KernelMeasurement{
bootTime: bootTime,

timestamp: start,
distributions: distributions,
}
}

func (m *KernelMeasurement) Tick(d time.Duration) {
m.timestamp = m.timestamp.Add(d)

for i := range m.distributions {
m.distributions[i].Advance()
subsystemMeasurement: sub,
bootTime: bootTime,
}
}

Expand All @@ -57,6 +50,6 @@ func (m *KernelMeasurement) ToPoint(p *serialize.Point) {

p.AppendField(BootTimeByteString, m.bootTime)
for i := range m.distributions {
p.AppendField(KernelFields[i].Label, int64(m.distributions[i].Get()))
p.AppendField(KernelFields[i].label, int64(m.distributions[i].Get()))
}
}
31 changes: 31 additions & 0 deletions cmd/tsbs_generate_data/devops/measurement.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package devops

import (
"time"

"bitbucket.org/440-labs/tsbs/cmd/tsbs_generate_data/common"
)

type subsystemMeasurement struct {
timestamp time.Time
distributions []common.Distribution
}

func newSubsystemMeasurement(start time.Time, numDistributions int) *subsystemMeasurement {
return &subsystemMeasurement{
timestamp: start,
distributions: make([]common.Distribution, numDistributions),
}
}

func (m *subsystemMeasurement) Tick(d time.Duration) {
m.timestamp = m.timestamp.Add(d)
for i := range m.distributions {
m.distributions[i].Advance()
}
}

type labeledDistributionMaker struct {
label []byte
distributionMaker func() common.Distribution
}
Loading

0 comments on commit 4ee1731

Please sign in to comment.