Skip to content

Commit

Permalink
Refactor devops generators to share common code
Browse files Browse the repository at this point in the history
  • Loading branch information
RobAtticus committed Jul 31, 2018
1 parent ae62992 commit 5c2254c
Show file tree
Hide file tree
Showing 3 changed files with 139 additions and 159 deletions.
84 changes: 84 additions & 0 deletions cmd/tsbs_generate_data/devops/common_generate_data.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package devops

import (
"time"

"bitbucket.org/440-labs/tsbs/cmd/tsbs_generate_data/common"
"bitbucket.org/440-labs/tsbs/cmd/tsbs_generate_data/serialize"
)

type commonDevopsSimulator struct {
madePoints uint64
maxPoints uint64

hostIndex uint64
hosts []Host

epoch uint64
epochs uint64
epochHosts uint64
initHosts uint64

timestampStart time.Time
timestampEnd time.Time
interval time.Duration
}

// Finished tells whether we have simulated all the necessary points
func (s *commonDevopsSimulator) Finished() bool {
return s.madePoints >= s.maxPoints
}

func (s *commonDevopsSimulator) Fields() map[string][][]byte {
return s.fields(s.hosts[0].SimulatedMeasurements)
}

func (s *commonDevopsSimulator) fields(measurements []common.SimulatedMeasurement) map[string][][]byte {
data := make(map[string][][]byte)
for _, sm := range measurements {
point := serialize.NewPoint()
sm.ToPoint(point)
data[string(point.MeasurementName())] = point.FieldKeys()
}

return data
}

func (s *commonDevopsSimulator) populatePoint(p *serialize.Point, measureIdx int) bool {
host := &s.hosts[s.hostIndex]

// Populate host-specific tags:
p.AppendTag(MachineTagKeys[0], host.Name)
p.AppendTag(MachineTagKeys[1], host.Region)
p.AppendTag(MachineTagKeys[2], host.Datacenter)
p.AppendTag(MachineTagKeys[3], host.Rack)
p.AppendTag(MachineTagKeys[4], host.OS)
p.AppendTag(MachineTagKeys[5], host.Arch)
p.AppendTag(MachineTagKeys[6], host.Team)
p.AppendTag(MachineTagKeys[7], host.Service)
p.AppendTag(MachineTagKeys[8], host.ServiceVersion)
p.AppendTag(MachineTagKeys[9], host.ServiceEnvironment)

// Populate measurement-specific tags and fields:
host.SimulatedMeasurements[measureIdx].ToPoint(p)

ret := s.hostIndex < s.epochHosts
s.madePoints++
s.hostIndex++
return ret
}

// TODO(rrk) - Can probably turn this logic into a separate interface and implement other
// types of scale up, e.g., exponential
//
// To "scale up" the number of reporting items, we need to know when
// which epoch we are currently in. Once we know that, we can take the "missing"
// amount of scale -- i.e., the max amount of scale less the initial amount
// -- and add it in proportion to the percentage of epochs that have passed. This
// way we simulate all items at each epoch, but at the end of the function
// we check whether the point should be recorded by the calling process.
func (s *commonDevopsSimulator) adjustNumHostsForEpoch() {
s.epoch++
missingScale := float64(uint64(len(s.hosts)) - s.initHosts)
s.epochHosts = s.initHosts + uint64(missingScale*float64(s.epoch)/float64(s.epochs-1))
}
96 changes: 21 additions & 75 deletions cmd/tsbs_generate_data/devops/cpu_only_generate_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,25 +10,28 @@ import (
// A CPUOnlySimulator generates data similar to telemetry from Telegraf for only CPU metrics.
// It fulfills the Simulator interface.
type CPUOnlySimulator struct {
madePoints uint64
maxPoints uint64
*commonDevopsSimulator
}

hostIndex uint64
hosts []Host
// Fields returns a map of subsystems to metrics collected
func (d *CPUOnlySimulator) Fields() map[string][][]byte {
return d.fields(d.hosts[0].SimulatedMeasurements[:1])
}

epoch uint64
epochs uint64
epochHosts uint64
initHosts uint64
// Next advances a Point to the next state in the generator.
func (d *CPUOnlySimulator) Next(p *serialize.Point) bool {
// switch to the next metric if needed
if d.hostIndex == uint64(len(d.hosts)) {
d.hostIndex = 0

timestampStart time.Time
timestampEnd time.Time
interval time.Duration
}
for i := 0; i < len(d.hosts); i++ {
d.hosts[i].TickAll(d.interval)
}

// Finished tells whether we have simulated all the necessary points
func (d *CPUOnlySimulator) Finished() bool {
return d.madePoints >= d.maxPoints
d.adjustNumHostsForEpoch()
}

return d.populatePoint(p, 0)
}

// CPUOnlySimulatorConfig is used to create a CPUOnlySimulator.
Expand All @@ -43,6 +46,7 @@ type CPUOnlySimulatorConfig struct {
HostConstructor func(i int, start time.Time) Host
}

// ToSimulator produces a Simulator that conforms to the given SimulatorConfig over the specified interval
func (d *CPUOnlySimulatorConfig) ToSimulator(interval time.Duration) common.Simulator {
hostInfos := make([]Host, d.HostCount)
for i := 0; i < len(hostInfos); i++ {
Expand All @@ -51,7 +55,7 @@ func (d *CPUOnlySimulatorConfig) ToSimulator(interval time.Duration) common.Simu

epochs := uint64(d.End.Sub(d.Start).Nanoseconds() / interval.Nanoseconds())
maxPoints := epochs * d.HostCount
dg := &CPUOnlySimulator{
dg := &CPUOnlySimulator{&commonDevopsSimulator{
madePoints: 0,
maxPoints: maxPoints,

Expand All @@ -65,65 +69,7 @@ func (d *CPUOnlySimulatorConfig) ToSimulator(interval time.Duration) common.Simu
timestampStart: d.Start,
timestampEnd: d.End,
interval: interval,
}
}}

return dg
}

func (d *CPUOnlySimulator) Fields() map[string][][]byte {
data := make(map[string][][]byte)
point := serialize.NewPoint()
d.hosts[0].SimulatedMeasurements[0].ToPoint(point)
data[string(point.MeasurementName())] = point.FieldKeys()

return data
}

// Next advances a Point to the next state in the generator.
func (d *CPUOnlySimulator) Next(p *serialize.Point) bool {
// switch to the next metric if needed
if d.hostIndex == uint64(len(d.hosts)) {
d.hostIndex = 0

for i := 0; i < len(d.hosts); i++ {
d.hosts[i].TickAll(d.interval)
}

// TODO(rrk) - Can probably turn this logic into a separate interface and implement other
// types of scale up, e.g., exponential
//
// To "scale up" the number of reporting items, we need to know when
// which epoch we are currently in. Once we know that, we can take the "missing"
// amount of scale -- i.e., the max amount of scale less the initial amount
// -- and add it in proportion to the percentage of epochs that have passed. This
// way we simulate all items at each epoch, but at the end of the function
// we check whether the point should be recorded by the calling process.
d.epoch++
missingScale := float64(uint64(len(d.hosts)) - d.initHosts)
d.epochHosts = d.initHosts + uint64(missingScale*float64(d.epoch)/float64(d.epochs-1))

}

host := &d.hosts[d.hostIndex]

// Populate host-specific tags:
p.AppendTag(MachineTagKeys[0], host.Name)
p.AppendTag(MachineTagKeys[1], host.Region)
p.AppendTag(MachineTagKeys[2], host.Datacenter)
p.AppendTag(MachineTagKeys[3], host.Rack)
p.AppendTag(MachineTagKeys[4], host.OS)
p.AppendTag(MachineTagKeys[5], host.Arch)
p.AppendTag(MachineTagKeys[6], host.Team)
p.AppendTag(MachineTagKeys[7], host.Service)
p.AppendTag(MachineTagKeys[8], host.ServiceVersion)
p.AppendTag(MachineTagKeys[9], host.ServiceEnvironment)

// Populate measurement-specific tags and fields:
host.SimulatedMeasurements[0].ToPoint(p)

ret := d.hostIndex < d.epochHosts
d.madePoints++
d.hostIndex++

return ret
}
118 changes: 34 additions & 84 deletions cmd/tsbs_generate_data/devops/generate_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,27 +10,29 @@ import (
// DevopsSimulator generates data similar to telemetry, with metrics from a variety of device systems.
// It fulfills the Simulator interface.
type DevopsSimulator struct {
madePoints uint64
maxPoints uint64

*commonDevopsSimulator
simulatedMeasurementIndex int
}

hostIndex uint64
hosts []Host
// Next advances a Point to the next state in the generator.
func (d *DevopsSimulator) Next(p *serialize.Point) bool {
// switch to the next metric if needed
if d.hostIndex == uint64(len(d.hosts)) {
d.hostIndex = 0
d.simulatedMeasurementIndex++
}

epoch uint64
epochs uint64
epochHosts uint64
initHosts uint64
if d.simulatedMeasurementIndex == len(d.hosts[0].SimulatedMeasurements) {
d.simulatedMeasurementIndex = 0

timestampStart time.Time
timestampEnd time.Time
interval time.Duration
}
for i := 0; i < len(d.hosts); i++ {
d.hosts[i].TickAll(d.interval)
}

// Finished tells whether we have simulated all the necessary points
func (d *DevopsSimulator) Finished() bool {
return d.madePoints >= d.maxPoints
d.adjustNumHostsForEpoch()
}

return d.populatePoint(p, d.simulatedMeasurementIndex)
}

// DevopsSimulatorConfig is used to create a DevopsSimulator.
Expand All @@ -43,6 +45,7 @@ type DevopsSimulatorConfig struct {
HostConstructor func(i int, start time.Time) Host
}

// ToSimulator produces a Simulator that conforms to the given SimulatorConfig over the specified interval
func (d *DevopsSimulatorConfig) ToSimulator(interval time.Duration) common.Simulator {
hostInfos := make([]Host, d.HostCount)
for i := 0; i < len(hostInfos); i++ {
Expand All @@ -52,76 +55,23 @@ func (d *DevopsSimulatorConfig) ToSimulator(interval time.Duration) common.Simul
epochs := uint64(d.End.Sub(d.Start).Nanoseconds() / interval.Nanoseconds())
maxPoints := epochs * d.HostCount * uint64(len(hostInfos[0].SimulatedMeasurements))
dg := &DevopsSimulator{
madePoints: 0,
maxPoints: maxPoints,

commonDevopsSimulator: &commonDevopsSimulator{
madePoints: 0,
maxPoints: maxPoints,

hostIndex: 0,
hosts: hostInfos,

epoch: 0,
epochs: epochs,
epochHosts: d.InitHostCount,
initHosts: d.InitHostCount,
timestampStart: d.Start,
timestampEnd: d.End,
interval: interval,
},
simulatedMeasurementIndex: 0,

hostIndex: 0,
hosts: hostInfos,

epoch: 0,
epochs: epochs,
epochHosts: d.InitHostCount,
initHosts: d.InitHostCount,
timestampStart: d.Start,
timestampEnd: d.End,
interval: interval,
}

return dg
}

func (d *DevopsSimulator) Fields() map[string][][]byte {
data := make(map[string][][]byte)
for _, sm := range d.hosts[0].SimulatedMeasurements {
point := serialize.NewPoint()
sm.ToPoint(point)
data[string(point.MeasurementName())] = point.FieldKeys()
}

return data
}

// Next advances a Point to the next state in the generator.
func (d *DevopsSimulator) Next(p *serialize.Point) bool {
// switch to the next metric if needed
if d.hostIndex == uint64(len(d.hosts)) {
d.hostIndex = 0
d.simulatedMeasurementIndex++
}

if d.simulatedMeasurementIndex == len(d.hosts[0].SimulatedMeasurements) {
d.simulatedMeasurementIndex = 0

for i := 0; i < len(d.hosts); i++ {
d.hosts[i].TickAll(d.interval)
}
d.epoch++
missingScale := float64(uint64(len(d.hosts)) - d.initHosts)
d.epochHosts = d.initHosts + uint64(missingScale*float64(d.epoch)/float64(d.epochs-1))
}

host := &d.hosts[d.hostIndex]

// Populate host-specific tags:
p.AppendTag(MachineTagKeys[0], host.Name)
p.AppendTag(MachineTagKeys[1], host.Region)
p.AppendTag(MachineTagKeys[2], host.Datacenter)
p.AppendTag(MachineTagKeys[3], host.Rack)
p.AppendTag(MachineTagKeys[4], host.OS)
p.AppendTag(MachineTagKeys[5], host.Arch)
p.AppendTag(MachineTagKeys[6], host.Team)
p.AppendTag(MachineTagKeys[7], host.Service)
p.AppendTag(MachineTagKeys[8], host.ServiceVersion)
p.AppendTag(MachineTagKeys[9], host.ServiceEnvironment)

// Populate measurement-specific tags and fields:
host.SimulatedMeasurements[d.simulatedMeasurementIndex].ToPoint(p)

ret := d.hostIndex < d.epochHosts
d.madePoints++
d.hostIndex++

return ret
}

0 comments on commit 5c2254c

Please sign in to comment.