From 385e049db8bce036c4bfce676b6d2d974847dfd3 Mon Sep 17 00:00:00 2001 From: Rob Kiefer Date: Wed, 3 Apr 2019 16:18:36 -0400 Subject: [PATCH] Add internal/inputs package and rewrite tsbs_generate_data For a long time, our two generation binaries -- tsbs_generate_data and tsbs_generate_queries -- have shared (roughly) a fair bit of code, especially when it comes to flags and validation. However, they were never truly in sync and combining them has been a long wanted to-do. Similarly, to enable better tooling around TSBS, it would be beneficial if more of its functionality was exposed as a library instead of a CLI that needs to be called. To those ends, this PR is a first step in addressing both of them. It introduces the internal/inputs package, which can eventually be moved to pkg/inputs when we are ready for other things to consume its API. This package will contain the structs, interfaces, and functions for generating 'input' to other TSBS tools. For now, that only includes generating data files (for tsbs_load_* binaries) and query files (for tsbs_run_queries_* binaries). This PR starts by introducing these building blocks and converting tsbs_generate_data to use it. The idea is that each type of input (e.g., data, queries) is handled by a Generator, which is customized by a GeneratorConfig. The config contains fields such as the PRNG seed, number of items to generate, etc, which are used by the Generator to control the output. These GeneratorConfigs come with a means of easily adding their fields to a flag.FlagSet, making them work well with CLIs while also not restricting their use to only CLIs. Once configured, this GeneratorConfig is passed to a Generator, which then produces the output. This design has a few other nice features to help cleanup TSBS. One, it uses an approach of bubbling up errors and passing them back to the caller, allowing for more graceful error handling. CLIs can output them to the console, while other programs using the library can pass them to another error handling system if they desire. Two, Generators should be designed with an Out field that allows the caller to point to any io.Writer it wants -- not just the console or a file. The next step will be to convert tsbs_generate_queries to use this as well, which will be done in a follow up PR. --- cmd/tsbs_generate_data/main.go | 318 +--------------- cmd/tsbs_generate_data/main_test.go | 415 --------------------- internal/inputs/generator.go | 84 +++++ internal/inputs/generator_data.go | 270 ++++++++++++++ internal/inputs/generator_data_test.go | 494 +++++++++++++++++++++++++ internal/inputs/generator_test.go | 86 +++++ internal/inputs/utils.go | 83 +++++ internal/inputs/utils_test.go | 57 +++ 8 files changed, 1084 insertions(+), 723 deletions(-) delete mode 100644 cmd/tsbs_generate_data/main_test.go create mode 100644 internal/inputs/generator.go create mode 100644 internal/inputs/generator_data.go create mode 100644 internal/inputs/generator_data_test.go create mode 100644 internal/inputs/generator_test.go create mode 100644 internal/inputs/utils.go create mode 100644 internal/inputs/utils_test.go diff --git a/cmd/tsbs_generate_data/main.go b/cmd/tsbs_generate_data/main.go index 8eba8f3dd..803b707a5 100644 --- a/cmd/tsbs_generate_data/main.go +++ b/cmd/tsbs_generate_data/main.go @@ -14,342 +14,44 @@ package main import ( - "bufio" "flag" "fmt" - "io" "log" - "math/rand" "os" "os/signal" "runtime/pprof" - "sort" - "strings" - "time" - "github.com/timescale/tsbs/cmd/tsbs_generate_data/common" - "github.com/timescale/tsbs/cmd/tsbs_generate_data/devops" - "github.com/timescale/tsbs/cmd/tsbs_generate_data/serialize" + "github.com/timescale/tsbs/internal/inputs" ) -const ( - // Output data format choices (alphabetical order) - formatCassandra = "cassandra" - formatClickhouse = "clickhouse" - formatInflux = "influx" - formatMongo = "mongo" - formatSiriDB = "siridb" - formatTimescaleDB = "timescaledb" - - // Use case choices (make sure to update TestGetConfig if adding a new one) - useCaseCPUOnly = "cpu-only" - useCaseCPUSingle = "cpu-single" - useCaseDevops = "devops" - - errTotalGroupsZero = "incorrect interleaved groups configuration: total groups = 0" - errInvalidGroupsFmt = "incorrect interleaved groups configuration: id %d >= total groups %d" - errInvalidFormatFmt = "invalid format specifier: %v (valid choices: %v)" - - defaultWriteSize = 4 << 20 // 4 MB -) - -// semi-constants var ( - formatChoices = []string{ - formatCassandra, - formatClickhouse, - formatInflux, - formatMongo, - formatSiriDB, - formatTimescaleDB, - } - useCaseChoices = []string{ - useCaseCPUOnly, - useCaseCPUSingle, - useCaseDevops, - } - // allows for testing - fatal = log.Fatalf -) - -// parseableFlagVars are flag values that need sanitization or re-parsing after -// being set, e.g., to convert from string to time.Time or re-setting the value -// based on a special '0' value -type parseableFlagVars struct { - timestampStartStr string - timestampEndStr string - seed int64 - initialScale uint64 -} - -// Program option vars: -var ( - format string - useCase string profileFile string - - initialScale uint64 - scale uint64 - seed int64 - debug int - - timestampStart time.Time - timestampEnd time.Time - - interleavedGenerationGroupID uint - interleavedGenerationGroupsNum uint - - logInterval time.Duration - maxDataPoints uint64 - fileName string + dg = &inputs.DataGenerator{} + config = &inputs.DataGeneratorConfig{} ) -// parseTimeFromString parses string-represented time of the format 2006-01-02T15:04:05Z07:00 -func parseTimeFromString(s string) time.Time { - t, err := time.Parse(time.RFC3339, s) - if err != nil { - fatal("can not parse time from string '%s': %v", s, err) - return time.Time{} - } - return t.UTC() -} - -// validateGroups checks validity of combination groupID and totalGroups -func validateGroups(groupID, totalGroupsNum uint) (bool, error) { - if totalGroupsNum == 0 { - // Need at least one group - return false, fmt.Errorf(errTotalGroupsZero) - } - if groupID >= totalGroupsNum { - // Need reasonable groupID - return false, fmt.Errorf(errInvalidGroupsFmt, groupID, totalGroupsNum) - } - return true, nil -} - -// validateFormat checks whether format is valid (i.e., one of formatChoices) -func validateFormat(format string) bool { - for _, s := range formatChoices { - if s == format { - return true - } - } - return false -} - -// validateUseCase checks whether use-case is valid (i.e., one of useCaseChoices) -func validateUseCase(useCase string) bool { - for _, s := range useCaseChoices { - if s == useCase { - return true - } - } - return false -} - -// postFlagParse assigns parseable flags -func postFlagParse(flags parseableFlagVars) { - if flags.initialScale == 0 { - initialScale = scale - } else { - initialScale = flags.initialScale - } - - // the default seed is the current timestamp: - if flags.seed == 0 { - seed = int64(time.Now().Nanosecond()) - } else { - seed = flags.seed - } - fmt.Fprintf(os.Stderr, "using random seed %d\n", seed) - - // Parse timestamps - timestampStart = parseTimeFromString(flags.timestampStartStr) - timestampEnd = parseTimeFromString(flags.timestampEndStr) -} - -// GetBufferedWriter returns the buffered Writer that should be used for generated output -func GetBufferedWriter(fileName string) *bufio.Writer { - // Prepare output file/STDOUT - if len(fileName) > 0 { - // Write output to file - file, err := os.Create(fileName) - if err != nil { - fatal("cannot open file for write %s: %v", fileName, err) - } - return bufio.NewWriterSize(file, defaultWriteSize) - } - - // Write output to STDOUT - return bufio.NewWriterSize(os.Stdout, defaultWriteSize) -} - // Parse args: func init() { - pfv := parseableFlagVars{} - - flag.StringVar(&format, "format", "", fmt.Sprintf("Format to emit. (choices: %s)", strings.Join(formatChoices, ", "))) - - flag.StringVar(&useCase, "use-case", "", fmt.Sprintf("Use case to model. (choices: %s)", strings.Join(useCaseChoices, ", "))) - - flag.Uint64Var(&pfv.initialScale, "initial-scale", 0, "Initial scaling variable specific to the use case (e.g., devices in 'devops'). 0 means to use -scale value") - flag.Uint64Var(&scale, "scale", 1, "Scaling value specific to the use case (e.g., devices in 'devops').") - - flag.StringVar(&pfv.timestampStartStr, "timestamp-start", "2016-01-01T00:00:00Z", "Beginning timestamp (RFC3339).") - flag.StringVar(&pfv.timestampEndStr, "timestamp-end", "2016-01-02T06:00:00Z", "Ending timestamp (RFC3339).") + config.AddToFlagSet(flag.CommandLine) - flag.Int64Var(&pfv.seed, "seed", 0, "PRNG seed (0 uses the current timestamp). (default 0)") - - flag.IntVar(&debug, "debug", 0, "Debug printing (choices: 0, 1, 2). (default 0)") - - flag.UintVar(&interleavedGenerationGroupID, "interleaved-generation-group-id", 0, - "Group (0-indexed) to perform round-robin serialization within. Use this to scale up data generation to multiple processes.") - flag.UintVar(&interleavedGenerationGroupsNum, "interleaved-generation-groups", 1, - "The number of round-robin serialization groups. Use this to scale up data generation to multiple processes.") + flag.StringVar(&config.TimeStart, "timestamp-start", "2016-01-01T00:00:00Z", "Beginning timestamp (RFC3339).") + flag.StringVar(&config.TimeEnd, "timestamp-end", "2016-01-02T06:00:00Z", "Ending timestamp (RFC3339).") flag.StringVar(&profileFile, "profile-file", "", "File to which to write go profiling data") - flag.DurationVar(&logInterval, "log-interval", 10*time.Second, "Duration between host data points") - flag.Uint64Var(&maxDataPoints, "max-data-points", 0, "Limit the number of data points to generate, 0 = no limit") - flag.StringVar(&fileName, "file", "", "File name to write generated data to") + flag.Uint64Var(&config.Limit, "max-data-points", 0, "Limit the number of data points to generate, 0 = no limit") flag.Parse() - - postFlagParse(pfv) } func main() { - if ok, err := validateGroups(interleavedGenerationGroupID, interleavedGenerationGroupsNum); !ok { - fatal("incorrect interleaved groups specification: %v", err) - } - if ok := validateFormat(format); !ok { - fatal("invalid format specified: %v (valid choices: %v)", format, formatChoices) - } - if ok := validateUseCase(useCase); !ok { - fatal("invalid use-case specified: %v (valid choices: %v)", useCase, useCaseChoices) - } - if len(profileFile) > 0 { defer startMemoryProfile(profileFile)() } - rand.Seed(seed) - - // Get output writer - out := GetBufferedWriter(fileName) - defer func() { - err := out.Flush() - if err != nil { - fatal(err.Error()) - } - }() - - cfg := getConfig(useCase) - sim := cfg.NewSimulator(logInterval, maxDataPoints) - serializer := getSerializer(sim, format, out) - - runSimulator(sim, serializer, out, interleavedGenerationGroupID, interleavedGenerationGroupsNum) -} - -func runSimulator(sim common.Simulator, serializer serialize.PointSerializer, out io.Writer, groupID, totalGroups uint) { - currGroupID := uint(0) - point := serialize.NewPoint() - for !sim.Finished() { - write := sim.Next(point) - if !write { - point.Reset() - continue - } - - // in the default case this is always true - if currGroupID == groupID { - err := serializer.Serialize(point, out) - if err != nil { - fatal("can not serialize point: %s", err) - return - } - } - point.Reset() - - currGroupID = (currGroupID + 1) % totalGroups - } -} - -func getConfig(useCase string) common.SimulatorConfig { - switch useCase { - case useCaseDevops: - return &devops.DevopsSimulatorConfig{ - Start: timestampStart, - End: timestampEnd, - - InitHostCount: initialScale, - HostCount: scale, - HostConstructor: devops.NewHost, - } - case useCaseCPUOnly: - return &devops.CPUOnlySimulatorConfig{ - Start: timestampStart, - End: timestampEnd, - - InitHostCount: initialScale, - HostCount: scale, - HostConstructor: devops.NewHostCPUOnly, - } - case useCaseCPUSingle: - return &devops.CPUOnlySimulatorConfig{ - Start: timestampStart, - End: timestampEnd, - - InitHostCount: initialScale, - HostCount: scale, - HostConstructor: devops.NewHostCPUSingle, - } - default: - fatal("unknown use case: '%s'", useCase) - return nil - } -} - -func getSerializer(sim common.Simulator, format string, out *bufio.Writer) serialize.PointSerializer { - switch format { - case formatCassandra: - return &serialize.CassandraSerializer{} - case formatInflux: - return &serialize.InfluxSerializer{} - case formatMongo: - return &serialize.MongoSerializer{} - case formatSiriDB: - return &serialize.SiriDBSerializer{} - case formatClickhouse: - fallthrough - case formatTimescaleDB: - out.WriteString("tags") - for _, key := range sim.TagKeys() { - out.WriteString(",") - out.Write(key) - } - out.WriteString("\n") - // sort the keys so the header is deterministic - keys := make([]string, 0) - fields := sim.Fields() - for k := range fields { - keys = append(keys, k) - } - sort.Strings(keys) - for _, measurementName := range keys { - out.WriteString(measurementName) - for _, field := range fields[measurementName] { - out.WriteString(",") - out.Write(field) - } - out.WriteString("\n") - } - out.WriteString("\n") - - return &serialize.TimescaleDBSerializer{} - default: - fatal("unknown format: '%s'", format) - return nil + err := dg.Generate(config) + if err != nil { + fmt.Printf("error: %v\n", err) } } diff --git a/cmd/tsbs_generate_data/main_test.go b/cmd/tsbs_generate_data/main_test.go deleted file mode 100644 index bd5440d8c..000000000 --- a/cmd/tsbs_generate_data/main_test.go +++ /dev/null @@ -1,415 +0,0 @@ -package main - -import ( - "bufio" - "bytes" - "fmt" - "io" - "testing" - "time" - - "github.com/timescale/tsbs/cmd/tsbs_generate_data/devops" - "github.com/timescale/tsbs/cmd/tsbs_generate_data/serialize" -) - -const ( - correctTimeStr = "2016-01-01T00:00:00Z" - incorrectTimeStr = "2017-01-01" -) - -var correctTime = time.Date(2016, time.January, 1, 0, 0, 0, 0, time.UTC) - -func TestParseTimeFromStrong(t *testing.T) { - parsedTime := parseTimeFromString(correctTimeStr) - if parsedTime != correctTime { - t.Errorf("did not get correct time back: got %v want %v", parsedTime, correctTime) - } - - oldFatal := fatal - fatalCalled := false - fatal = func(format string, args ...interface{}) { - fatalCalled = true - } - _ = parseTimeFromString(incorrectTimeStr) - if !fatalCalled { - t.Errorf("fatal not called when it should have been") - } - fatal = oldFatal -} - -func TestValidateGroups(t *testing.T) { - cases := []struct { - desc string - groupID uint - totalGroups uint - shouldErr bool - errFmt string - }{ - { - desc: "id < total, no err", - groupID: 0, - totalGroups: 1, - shouldErr: false, - }, - { - desc: "id = total, should err", - groupID: 1, - totalGroups: 1, - shouldErr: true, - errFmt: errInvalidGroupsFmt, - }, - { - desc: "total = 0, should err", - groupID: 0, - totalGroups: 0, - shouldErr: true, - errFmt: errTotalGroupsZero, - }, - } - for _, c := range cases { - ok, err := validateGroups(c.groupID, c.totalGroups) - if ok != c.shouldErr { - magic := 46 // first 45 chars are the same for both error messages, so check up to 46 to make sure its different - if c.shouldErr && err.Error()[:magic] != c.errFmt[:magic] { - t.Errorf("%s: did not get correct error: got\n%v\nwant\n%v\n", c.desc, err, c.errFmt) - } - if !c.shouldErr && err != nil { - t.Errorf("%s: got unexpected error: %v", c.desc, err) - } - } - } -} - -func TestValidateFormat(t *testing.T) { - for _, f := range formatChoices { - if !validateFormat(f) { - t.Errorf("format '%s' did not return true when it should", f) - } - } - if validateFormat("incorrect format!") { - t.Errorf("validateFormat returned true for invalid format") - } -} - -func TestValidateUseCase(t *testing.T) { - for _, f := range useCaseChoices { - if !validateUseCase(f) { - t.Errorf("use-case '%s' did not return true when it should", f) - } - } - if validateUseCase("incorrect use-case!") { - t.Errorf("validateUseCase returned true for invalid use-case") - } -} - -func TestPostFlagsParse(t *testing.T) { - scale = 100 - timestampStart = time.Time{} - timestampEnd = time.Time{} - boringPFV := parseableFlagVars{ - initialScale: 1, - seed: 123, - timestampStartStr: correctTimeStr, - timestampEndStr: correctTimeStr, - } - postFlagParse(boringPFV) - if initialScale != boringPFV.initialScale { - t.Errorf("specified initScale not set correctly: got %d", initialScale) - } - if seed != boringPFV.seed { - t.Errorf("specified seed not set correctly: got %d", seed) - } - if timestampStart != correctTime { - t.Errorf("start time not parsed correctly: got %v", timestampStart) - } - if timestampEnd != correctTime { - t.Errorf("end time not parsed correctly: got %v", timestampEnd) - } - - // initScale should set to the same as scale - testPFV := parseableFlagVars{ - initialScale: 0, - seed: boringPFV.seed, - timestampStartStr: boringPFV.timestampStartStr, - timestampEndStr: boringPFV.timestampEndStr, - } - postFlagParse(testPFV) - if initialScale != scale { - t.Errorf("initScale = 0 not parsed correctly: got %d", initialScale) - } - - // seed should set to current time - testPFV = parseableFlagVars{ - initialScale: boringPFV.initialScale, - seed: 0, - timestampStartStr: boringPFV.timestampStartStr, - timestampEndStr: boringPFV.timestampEndStr, - } - postFlagParse(testPFV) - if seed == boringPFV.seed || seed == 0 { - t.Errorf("seed = 0 not parsed correctly: got %d", seed) - } - - // check that incorrect times fail - oldFatal := fatal - fatalCalled := false - fatal = func(format string, args ...interface{}) { - fatalCalled = true - } - testPFV = parseableFlagVars{ - initialScale: boringPFV.initialScale, - seed: boringPFV.seed, - timestampStartStr: incorrectTimeStr, - timestampEndStr: boringPFV.timestampEndStr, - } - postFlagParse(testPFV) - if !fatalCalled { - t.Errorf("fatal not called when it should have been") - } - - testPFV = parseableFlagVars{ - initialScale: boringPFV.initialScale, - seed: boringPFV.seed, - timestampStartStr: boringPFV.timestampStartStr, - timestampEndStr: incorrectTimeStr, - } - postFlagParse(testPFV) - if !fatalCalled { - t.Errorf("fatal not called when it should have been") - } - fatal = oldFatal -} - -var keyIteration = []byte("iteration") - -type testSimulator struct { - limit uint64 - shouldWriteLimit uint64 - iteration uint64 -} - -func (s *testSimulator) Finished() bool { - return s.iteration >= s.limit -} - -func (s *testSimulator) Next(p *serialize.Point) bool { - p.AppendField(keyIteration, s.iteration) - ret := s.iteration < s.shouldWriteLimit - s.iteration++ - return ret -} - -func (s *testSimulator) Fields() map[string][][]byte { - return nil -} - -func (s *testSimulator) TagKeys() [][]byte { - return nil -} - -type testSerializer struct { - shouldError bool -} - -func (s *testSerializer) Serialize(p *serialize.Point, w io.Writer) error { - if s.shouldError { - return fmt.Errorf("erroring") - } - w.Write(keyIteration) - w.Write([]byte("=")) - str := fmt.Sprintf("%d", p.GetFieldValue(keyIteration).(uint64)) - w.Write([]byte(str)) - w.Write([]byte("\n")) - return nil -} - -func TestRunSimulator(t *testing.T) { - cases := []struct { - desc string - limit uint64 - shouldWriteLimit uint64 - groupID uint - totalGroups uint - shouldError bool - wantPoints uint - }{ - { - desc: "shouldWriteLimit = limit", - limit: 10, - shouldWriteLimit: 10, - totalGroups: 1, - wantPoints: 10, - }, - { - desc: "shouldWriteLimit < limit", - limit: 10, - shouldWriteLimit: 5, - totalGroups: 1, - wantPoints: 5, - }, - { - desc: "shouldWriteLimit > limit", - limit: 10, - shouldWriteLimit: 15, - totalGroups: 1, - wantPoints: 10, - }, - { - desc: "shouldWriteLimit = limit, totalGroups=2", - limit: 10, - shouldWriteLimit: 10, - totalGroups: 2, - wantPoints: 5, - }, - { - desc: "shouldWriteLimit < limit, totalGroups=2", - limit: 10, - shouldWriteLimit: 6, - totalGroups: 2, - wantPoints: 3, - }, - { - desc: "shouldWriteLimit < limit, totalGroups=2, other half", - limit: 10, - shouldWriteLimit: 6, - groupID: 1, - totalGroups: 2, - wantPoints: 3, - }, - { - desc: "should error in serializer", - shouldError: true, - limit: 10, - totalGroups: 1, - shouldWriteLimit: 10, - }, - } - oldFatal := fatal - for _, c := range cases { - fatalCalled := false - if c.shouldError { - fatal = func(format string, args ...interface{}) { - fatalCalled = true - } - } - var buf bytes.Buffer - sim := &testSimulator{ - limit: c.limit, - shouldWriteLimit: c.shouldWriteLimit, - } - serializer := &testSerializer{shouldError: c.shouldError} - - runSimulator(sim, serializer, &buf, c.groupID, c.totalGroups) - if c.shouldError && !fatalCalled { - t.Errorf("%s: did not fatal when should", c.desc) - } else if !c.shouldError { - scanner := bufio.NewScanner(bytes.NewReader(buf.Bytes())) - lines := uint(0) - for { - ok := scanner.Scan() - if !ok && scanner.Err() != nil { - t.Fatal(scanner.Err()) - } else if !ok { - break - } - line := scanner.Text() - want := fmt.Sprintf("iteration=%d", (lines*c.totalGroups)+c.groupID) - if line != want { - t.Errorf("%s: incorrect line: got\n%s\nwant\n%s\n", c.desc, line, want) - } - lines++ - } - if lines != c.wantPoints { - t.Errorf("%s: incorrect number of points: got %d want %d", c.desc, lines, c.wantPoints) - } - } - } - fatal = oldFatal -} - -func TestGetConfig(t *testing.T) { - cfg := getConfig(useCaseDevops) - switch got := cfg.(type) { - case *devops.DevopsSimulatorConfig: - default: - t.Errorf("use case '%s' does not run the right type: got %T", useCaseDevops, got) - } - - cfg = getConfig(useCaseCPUOnly) - switch got := cfg.(type) { - case *devops.CPUOnlySimulatorConfig: - default: - t.Errorf("use case '%s' does not run the right type: got %T", useCaseDevops, got) - } - - cfg = getConfig(useCaseCPUSingle) - switch got := cfg.(type) { - case *devops.CPUOnlySimulatorConfig: - default: - t.Errorf("use case '%s' does not run the right type: got %T", useCaseDevops, got) - } - - oldFatal := fatal - fatalCalled := false - fatal = func(f string, args ...interface{}) { - fatalCalled = true - } - cfg = getConfig("bogus config") - if !fatalCalled { - t.Errorf("fatal not called on bogus use case") - } - if cfg != nil { - t.Errorf("got a non-nil config for bogus use case: got %T", cfg) - } - fatal = oldFatal -} - -func TestGetSerializer(t *testing.T) { - cfg := getConfig(useCaseCPUOnly) - sim := cfg.NewSimulator(logInterval, 0) - buf := bytes.NewBuffer(make([]byte, 1024)) - out := bufio.NewWriter(buf) - defer out.Flush() - - s := getSerializer(sim, formatCassandra, out) - switch got := s.(type) { - case *serialize.CassandraSerializer: - default: - t.Errorf("format '%s' does not run the right serializer: got %T", formatCassandra, got) - } - - s = getSerializer(sim, formatInflux, out) - switch got := s.(type) { - case *serialize.InfluxSerializer: - default: - t.Errorf("format '%s' does not run the right serializer: got %T", formatInflux, got) - } - - s = getSerializer(sim, formatMongo, out) - switch got := s.(type) { - case *serialize.MongoSerializer: - default: - t.Errorf("format '%s' does not run the right serializer: got %T", formatMongo, got) - } - - s = getSerializer(sim, formatTimescaleDB, out) - switch got := s.(type) { - case *serialize.TimescaleDBSerializer: - default: - t.Errorf("format '%s' does not run the right serializer: got %T", formatTimescaleDB, got) - } - - oldFatal := fatal - fatalCalled := false - fatal = func(f string, args ...interface{}) { - fatalCalled = true - } - s = getSerializer(sim, "bogus format", out) - if !fatalCalled { - t.Errorf("fatal not called on bogus format") - } - if s != nil { - t.Errorf("got a non-nil config for bogus format: got %T", cfg) - } - fatal = oldFatal -} diff --git a/internal/inputs/generator.go b/internal/inputs/generator.go new file mode 100644 index 000000000..6afa8a058 --- /dev/null +++ b/internal/inputs/generator.go @@ -0,0 +1,84 @@ +package inputs + +import ( + "flag" + "fmt" + "strings" + "time" +) + +// Error messages when using a GeneratorConfig +const ( + ErrScaleIsZero = "scale cannot be 0" + + errBadFormatFmt = "invalid format specified: '%v'" + errBadUseFmt = "invalid use case specified: '%v'" +) + +// GeneratorConfig is an interface that defines a configuration that is used +// by Generators to govern their behavior. The interface methods provide a way +// to use the GeneratorConfig with the command-line via flag.FlagSet and +// a method to validate the config is actually valid. +type GeneratorConfig interface { + // AddToFlagSet adds all the config options to a FlagSet, for easy use with CLIs + AddToFlagSet(fs *flag.FlagSet) + // Validate checks that configuration is valid and ready to be consumed by a Generator + Validate() error +} + +// BaseConfig is a data struct that includes the common flags or configuration +// options shared across different types of Generators. These include things like +// the data format (i.e., which database system is this for), a PRNG seed, etc. +type BaseConfig struct { + Format string + Use string + + Scale uint64 + Limit uint64 + + TimeStart string + TimeEnd string + + Seed int64 + Verbose bool + File string +} + +func (c *BaseConfig) AddToFlagSet(fs *flag.FlagSet) { + fs.StringVar(&c.Format, "format", "", fmt.Sprintf("Format to generate. (choices: %s)", strings.Join(ValidFormats(), ", "))) + fs.StringVar(&c.Use, "use-case", "", fmt.Sprintf("Use case to generate.")) + fs.StringVar(&c.File, "file", "", "Write the output to this path") + + fs.Uint64Var(&c.Scale, "scale", 1, "Scaling value specific to use case (e.g., devices in 'devops').") + fs.Int64Var(&c.Seed, "seed", 0, "PRNG seed (default: 0, which uses the current timestamp)") + + fs.BoolVar(&c.Verbose, "verbose", false, "Show verbose output") +} + +func (c *BaseConfig) Validate() error { + if c.Scale == 0 { + return fmt.Errorf(ErrScaleIsZero) + } + + if c.Seed == 0 { + c.Seed = int64(time.Now().Nanosecond()) + } + + if !isIn(c.Format, formats) { + return fmt.Errorf(errBadFormatFmt, c.Format) + } + + if !isIn(c.Use, useCaseChoices) { + return fmt.Errorf(errBadUseFmt, c.Use) + } + + return nil +} + +// Generator is an interface that defines a type that generates inputs to other +// TSBS tools. Examples include DataGenerator which creates database data that +// gets inserted and stored, or QueryGenerator which creates queries that are +// used to test with. +type Generator interface { + Generate(GeneratorConfig) error +} diff --git a/internal/inputs/generator_data.go b/internal/inputs/generator_data.go new file mode 100644 index 000000000..0111f308b --- /dev/null +++ b/internal/inputs/generator_data.go @@ -0,0 +1,270 @@ +package inputs + +import ( + "bufio" + "flag" + "fmt" + "io" + "math/rand" + "os" + "sort" + "time" + + "github.com/timescale/tsbs/cmd/tsbs_generate_data/common" + "github.com/timescale/tsbs/cmd/tsbs_generate_data/devops" + "github.com/timescale/tsbs/cmd/tsbs_generate_data/serialize" +) + +// Error messages when using a DataGenerator +const ( + ErrNoConfig = "no GeneratorConfig provided" + ErrInvalidDataConfig = "invalid config: DataGenerator needs a DataGeneratorConfig" + + errLogIntervalZero = "cannot have log interval of 0" + errTotalGroupsZero = "incorrect interleaved groups configuration: total groups = 0" + errInvalidGroupsFmt = "incorrect interleaved groups configuration: id %d >= total groups %d" + errCannotParseTimeFmt = "cannot parse time from string '%s': %v" +) + +const defaultLogInterval = 10 * time.Second + +// validateGroups checks validity of combination groupID and totalGroups +func validateGroups(groupID, totalGroupsNum uint) error { + if totalGroupsNum == 0 { + // Need at least one group + return fmt.Errorf(errTotalGroupsZero) + } + if groupID >= totalGroupsNum { + // Need reasonable groupID + return fmt.Errorf(errInvalidGroupsFmt, groupID, totalGroupsNum) + } + return nil +} + +// DataGeneratorConfig is the GeneratorConfig that should be used with a +// DataGenerator. It includes all the fields from a BaseConfig, as well as some +// options that are specific to generating the data for database write operations, +// such as the initial scale and how spaced apart data points should be in time. +type DataGeneratorConfig struct { + BaseConfig + InitialScale uint64 + LogInterval time.Duration + InterleavedGroupID uint + InterleavedNumGroups uint +} + +func (c *DataGeneratorConfig) Validate() error { + err := c.BaseConfig.Validate() + if err != nil { + return err + } + + if c.InitialScale == 0 { + c.InitialScale = c.BaseConfig.Scale + } + + if c.LogInterval == 0 { + return fmt.Errorf(errLogIntervalZero) + } + + err = validateGroups(c.InterleavedGroupID, c.InterleavedNumGroups) + return err +} + +func (c *DataGeneratorConfig) AddToFlagSet(fs *flag.FlagSet) { + c.BaseConfig.AddToFlagSet(fs) + flag.Uint64Var(&c.InitialScale, "initial-scale", 0, "Initial scaling variable specific to the use case (e.g., devices in 'devops'). 0 means to use -scale value") + flag.DurationVar(&c.LogInterval, "log-interval", defaultLogInterval, "Duration between host data points") + + flag.UintVar(&c.InterleavedGroupID, "interleaved-generation-group-id", 0, + "Group (0-indexed) to perform round-robin serialization within. Use this to scale up data generation to multiple processes.") + flag.UintVar(&c.InterleavedNumGroups, "interleaved-generation-groups", 1, + "The number of round-robin serialization groups. Use this to scale up data generation to multiple processes.") + +} + +// DataGenerator is a type of Generator for creating data that will be consumed +// by a database's write/insert operations. The output is specific to the type +// of database, but is consumed by TSBS loaders like tsbs_load_timescaledb. +type DataGenerator struct { + // Out is the writer where data should be written. If nil, it will be + // os.Stdout unless File is specified in the GeneratorConfig passed to + // Generate. + Out io.Writer + + config *DataGeneratorConfig + tsStart time.Time + tsEnd time.Time + + // bufOut represents the buffered writer that should actually be passed to + // any operations that write out data. + bufOut *bufio.Writer +} + +func (g *DataGenerator) init(config GeneratorConfig) error { + if config == nil { + return fmt.Errorf(ErrNoConfig) + } + switch config.(type) { + case *DataGeneratorConfig: + default: + return fmt.Errorf(ErrInvalidDataConfig) + } + g.config = config.(*DataGeneratorConfig) + + err := g.config.Validate() + if err != nil { + return err + } + + g.tsStart, err = ParseUTCTime(g.config.TimeStart) + if err != nil { + return fmt.Errorf(errCannotParseTimeFmt, g.config.TimeStart, err) + } + g.tsEnd, err = ParseUTCTime(g.config.TimeEnd) + if err != nil { + return fmt.Errorf(errCannotParseTimeFmt, g.config.TimeEnd, err) + } + + if g.Out == nil { + g.Out = os.Stdout + } + g.bufOut, err = getBufferedWriter(g.config.File, g.Out) + if err != nil { + return err + } + + return nil +} + +func (g *DataGenerator) Generate(config GeneratorConfig) error { + err := g.init(config) + if err != nil { + return err + } + + rand.Seed(g.config.Seed) + + scfg, err := g.getSimulatorConfig(g.config) + if err != nil { + return err + } + + sim := scfg.NewSimulator(g.config.LogInterval, g.config.Limit) + serializer, err := g.getSerializer(sim, g.config.Format) + if err != nil { + return err + } + + return g.runSimulator(sim, serializer, g.config) +} + +func (g *DataGenerator) runSimulator(sim common.Simulator, serializer serialize.PointSerializer, dgc *DataGeneratorConfig) error { + defer g.bufOut.Flush() + + currGroupID := uint(0) + point := serialize.NewPoint() + for !sim.Finished() { + write := sim.Next(point) + if !write { + point.Reset() + continue + } + + // in the default case this is always true + if currGroupID == dgc.InterleavedGroupID { + err := serializer.Serialize(point, g.bufOut) + if err != nil { + return fmt.Errorf("can not serialize point: %s", err) + } + } + point.Reset() + + currGroupID = (currGroupID + 1) % dgc.InterleavedNumGroups + } + return nil +} + +func (g *DataGenerator) getSimulatorConfig(dgc *DataGeneratorConfig) (common.SimulatorConfig, error) { + var ret common.SimulatorConfig + var err error + switch dgc.Use { + case useCaseDevops: + ret = &devops.DevopsSimulatorConfig{ + Start: g.tsStart, + End: g.tsEnd, + + InitHostCount: dgc.InitialScale, + HostCount: dgc.Scale, + HostConstructor: devops.NewHost, + } + case useCaseCPUOnly: + ret = &devops.CPUOnlySimulatorConfig{ + Start: g.tsStart, + End: g.tsEnd, + + InitHostCount: dgc.InitialScale, + HostCount: dgc.Scale, + HostConstructor: devops.NewHostCPUOnly, + } + case useCaseCPUSingle: + ret = &devops.CPUOnlySimulatorConfig{ + Start: g.tsStart, + End: g.tsEnd, + + InitHostCount: dgc.InitialScale, + HostCount: dgc.Scale, + HostConstructor: devops.NewHostCPUSingle, + } + default: + err = fmt.Errorf("unknown use case: '%s'", dgc.Use) + } + return ret, err +} + +func (g *DataGenerator) getSerializer(sim common.Simulator, format string) (serialize.PointSerializer, error) { + var ret serialize.PointSerializer + var err error + + switch format { + case FormatCassandra: + ret = &serialize.CassandraSerializer{} + case FormatInflux: + ret = &serialize.InfluxSerializer{} + case FormatMongo: + ret = &serialize.MongoSerializer{} + case FormatSiriDB: + ret = &serialize.SiriDBSerializer{} + case FormatClickhouse: + fallthrough + case FormatTimescaleDB: + g.bufOut.WriteString("tags") + for _, key := range sim.TagKeys() { + g.bufOut.WriteString(",") + g.bufOut.Write(key) + } + g.bufOut.WriteString("\n") + // sort the keys so the header is deterministic + keys := make([]string, 0) + fields := sim.Fields() + for k := range fields { + keys = append(keys, k) + } + sort.Strings(keys) + for _, measurementName := range keys { + g.bufOut.WriteString(measurementName) + for _, field := range fields[measurementName] { + g.bufOut.WriteString(",") + g.bufOut.Write(field) + } + g.bufOut.WriteString("\n") + } + g.bufOut.WriteString("\n") + + ret = &serialize.TimescaleDBSerializer{} + default: + err = fmt.Errorf("unknown format: '%s'", format) + } + + return ret, err +} diff --git a/internal/inputs/generator_data_test.go b/internal/inputs/generator_data_test.go new file mode 100644 index 000000000..06a9cc845 --- /dev/null +++ b/internal/inputs/generator_data_test.go @@ -0,0 +1,494 @@ +package inputs + +import ( + "bufio" + "bytes" + "fmt" + "io" + "os" + "reflect" + "strings" + "testing" + "time" + + "github.com/timescale/tsbs/cmd/tsbs_generate_data/common" + "github.com/timescale/tsbs/cmd/tsbs_generate_data/devops" + "github.com/timescale/tsbs/cmd/tsbs_generate_data/serialize" +) + +func TestValidateGroups(t *testing.T) { + cases := []struct { + desc string + groupID uint + totalGroups uint + errMsg string + }{ + { + desc: "id < total, no err", + groupID: 0, + totalGroups: 1, + }, + { + desc: "id = total, should err", + groupID: 1, + totalGroups: 1, + errMsg: fmt.Sprintf(errInvalidGroupsFmt, 1, 1), + }, + { + desc: "id > total, should err", + groupID: 2, + totalGroups: 1, + errMsg: fmt.Sprintf(errInvalidGroupsFmt, 2, 1), + }, + { + desc: "total = 0, should err", + groupID: 0, + totalGroups: 0, + errMsg: errTotalGroupsZero, + }, + } + for _, c := range cases { + err := validateGroups(c.groupID, c.totalGroups) + if c.errMsg == "" && err != nil { + t.Errorf("%s: unexpected error: %v", c.desc, err) + } else if c.errMsg != "" && err == nil { + t.Errorf("%s: unexpected lack of error", c.desc) + } else if err != nil && err.Error() != c.errMsg { + t.Errorf("%s: incorrect error: got %s want %s", c.desc, err.Error(), c.errMsg) + } + } +} + +func TestDataGeneratorConfigValidate(t *testing.T) { + c := &DataGeneratorConfig{ + BaseConfig: BaseConfig{ + Seed: 123, + Format: FormatTimescaleDB, + Use: useCaseDevops, + Scale: 10, + }, + LogInterval: time.Second, + InitialScale: 0, + InterleavedGroupID: 0, + InterleavedNumGroups: 1, + } + + // Test base validation + err := c.Validate() + if err != nil { + t.Errorf("unexpected error for correct config: %v", err) + } + + c.Format = "bad format" + err = c.Validate() + if err == nil { + t.Errorf("unexpected lack of error for bad format") + } + c.Format = FormatTimescaleDB + + // Test InitialScale validation + c.InitialScale = 0 + err = c.Validate() + if err != nil { + t.Errorf("unexpected error for InitialScale of 0: %v", err) + } + if c.InitialScale != c.Scale { + t.Errorf("InitialScale not set correctly for 0: got %d want %d", c.InitialScale, c.Scale) + } + + c.InitialScale = 5 + err = c.Validate() + if err != nil { + t.Errorf("unexpected error for InitialScale of 5: %v", err) + } + if c.InitialScale != 5 { + t.Errorf("InitialScale not set correctly for 0: got %d want %d", c.InitialScale, 5) + } + + // Test LogInterval validation + c.LogInterval = 0 + err = c.Validate() + if err == nil { + t.Errorf("unexpected lack of error for 0 log interval") + } else if got := err.Error(); got != errLogIntervalZero { + t.Errorf("incorrect error for 0 log interval: got\n%s\nwant\n%s", got, errLogIntervalZero) + } + c.LogInterval = time.Second + + // Test groups validation + c.InterleavedNumGroups = 0 + err = c.Validate() + if err == nil { + t.Errorf("unexpected lack of error for 0 groups") + } else if got := err.Error(); got != errTotalGroupsZero { + t.Errorf("incorrect error for 0 groups: got\n%s\nwant\n%s", got, errTotalGroupsZero) + } + c.InterleavedNumGroups = 1 + + c.InterleavedGroupID = 2 + err = c.Validate() + if err == nil { + t.Errorf("unexpected lack of error for group id > num groups") + } else { + want := fmt.Sprintf(errInvalidGroupsFmt, 2, 1) + if got := err.Error(); got != want { + t.Errorf("incorrect error for group id > num groups: got\n%s\nwant\n%s", got, want) + } + } +} + +func TestDataGeneratorInit(t *testing.T) { + // Test that empty config fails + dg := &DataGenerator{} + err := dg.init(nil) + if err == nil { + t.Errorf("unexpected lack of error with empty config") + } else if got := err.Error(); got != ErrNoConfig { + t.Errorf("incorrect error: got\n%s\nwant\n%s", got, ErrNoConfig) + } + + // Test that wrong type of config fails + err = dg.init(&BaseConfig{}) + if err == nil { + t.Errorf("unexpected lack of error with invalid config") + } else if got := err.Error(); got != ErrInvalidDataConfig { + t.Errorf("incorrect error: got\n%s\nwant\n%s", got, ErrInvalidDataConfig) + } + + // Test that empty, invalid config fails + err = dg.init(&DataGeneratorConfig{}) + if err == nil { + t.Errorf("unexpected lack of error with empty DataGeneratorConfig") + } + + c := &DataGeneratorConfig{ + BaseConfig: BaseConfig{ + Format: FormatTimescaleDB, + Use: useCaseDevops, + Scale: 1, + }, + LogInterval: time.Second, + InterleavedNumGroups: 1, + } + const errTimePrefix = "cannot parse time from string" + + // Test incorrect time format for start + c.TimeStart = "2006 Jan 2" + err = dg.init(c) + if err == nil { + t.Errorf("unexpected lack of error with bad start date") + } else if got := err.Error(); !strings.HasPrefix(got, errTimePrefix) { + t.Errorf("unexpected error for bad start date: got\n%s", got) + } + c.TimeStart = defaultTimeStart + + // Test incorrect time format for end + c.TimeEnd = "Jan 3rd 2016" + err = dg.init(c) + if err == nil { + t.Errorf("unexpected lack of error with bad end date") + } else if got := err.Error(); !strings.HasPrefix(got, errTimePrefix) { + t.Errorf("unexpected error for bad end date: got\n%s", got) + } + c.TimeEnd = defaultTimeEnd + + // Test that Out is set to os.Stdout if unset + err = dg.init(c) + if err != nil { + t.Errorf("unexpected error when checking Out: got %v", err) + } else if dg.Out != os.Stdout { + t.Errorf("Out not set to Stdout") + } + + // Test that Out is same if set + var buf bytes.Buffer + dg.Out = &buf + err = dg.init(c) + if err != nil { + t.Errorf("unexpected error when checking Out: got %v", err) + } else if dg.Out != &buf { + t.Errorf("Out not set to explicit io.Writer") + } +} + +const correctData = `tags,hostname,region,datacenter,rack,os,arch,team,service,service_version,service_environment +cpu,usage_user,usage_system,usage_idle,usage_nice,usage_iowait,usage_irq,usage_softirq,usage_steal,usage_guest,usage_guest_nice + +tags,hostname=host_0,region=eu-central-1,datacenter=eu-central-1a,rack=6,os=Ubuntu15.10,arch=x86,team=SF,service=19,service_version=1,service_environment=test +cpu,1451606400000000000,58,2,24,61,22,63,6,44,80,38 +tags,hostname=host_0,region=eu-central-1,datacenter=eu-central-1a,rack=6,os=Ubuntu15.10,arch=x86,team=SF,service=19,service_version=1,service_environment=test +cpu,1451606401000000000,57,3,23,60,23,64,5,44,76,36 +tags,hostname=host_0,region=eu-central-1,datacenter=eu-central-1a,rack=6,os=Ubuntu15.10,arch=x86,team=SF,service=19,service_version=1,service_environment=test +cpu,1451606402000000000,58,2,25,62,23,65,5,45,78,36 +` + +func TestDataGeneratorGenerate(t *testing.T) { + dg := &DataGenerator{} + + // Test that an invalid config fails + c := &DataGeneratorConfig{} + err := dg.Generate(c) + if err == nil { + t.Errorf("unexpected lack of error with empty DataGeneratorConfig") + } + + c = &DataGeneratorConfig{ + BaseConfig: BaseConfig{ + Seed: 123, + Limit: 3, + Format: FormatTimescaleDB, + Use: useCaseCPUOnly, + Scale: 1, + TimeStart: defaultTimeStart, + TimeEnd: defaultTimeEnd, + }, + InitialScale: 1, + LogInterval: time.Second, + InterleavedNumGroups: 1, + } + var buf bytes.Buffer + dg.Out = &buf + err = dg.Generate(c) + if err != nil { + t.Errorf("unexpected error when generating: got %v", err) + } else if got := string(buf.Bytes()); got != correctData { + t.Errorf("incorrect data written:\ngot\n%s\nwant\n%s", got, correctData) + } + +} + +var keyIteration = []byte("iteration") + +type testSimulator struct { + limit uint64 + shouldWriteLimit uint64 + iteration uint64 +} + +func (s *testSimulator) Finished() bool { + return s.iteration >= s.limit +} + +func (s *testSimulator) Next(p *serialize.Point) bool { + p.AppendField(keyIteration, s.iteration) + ret := s.iteration < s.shouldWriteLimit + s.iteration++ + return ret +} + +func (s *testSimulator) Fields() map[string][][]byte { + return nil +} + +func (s *testSimulator) TagKeys() [][]byte { + return nil +} + +type testSerializer struct { + shouldError bool +} + +func (s *testSerializer) Serialize(p *serialize.Point, w io.Writer) error { + if s.shouldError { + return fmt.Errorf("erroring") + } + + w.Write(keyIteration) + w.Write([]byte("=")) + str := fmt.Sprintf("%d", p.GetFieldValue(keyIteration).(uint64)) + w.Write([]byte(str)) + w.Write([]byte("\n")) + return nil +} + +func TestRunSimulator(t *testing.T) { + cases := []struct { + desc string + limit uint64 + shouldWriteLimit uint64 + groupID uint + totalGroups uint + shouldError bool + wantPoints uint + }{ + { + desc: "shouldWriteLimit = limit", + limit: 10, + shouldWriteLimit: 10, + totalGroups: 1, + wantPoints: 10, + }, + { + desc: "shouldWriteLimit < limit", + limit: 10, + shouldWriteLimit: 5, + totalGroups: 1, + wantPoints: 5, + }, + { + desc: "shouldWriteLimit > limit", + limit: 10, + shouldWriteLimit: 15, + totalGroups: 1, + wantPoints: 10, + }, + { + desc: "shouldWriteLimit = limit, totalGroups=2", + limit: 10, + shouldWriteLimit: 10, + totalGroups: 2, + wantPoints: 5, + }, + { + desc: "shouldWriteLimit < limit, totalGroups=2", + limit: 10, + shouldWriteLimit: 6, + totalGroups: 2, + wantPoints: 3, + }, + { + desc: "shouldWriteLimit < limit, totalGroups=2, other half", + limit: 10, + shouldWriteLimit: 6, + groupID: 1, + totalGroups: 2, + wantPoints: 3, + }, + { + desc: "should error in serializer", + shouldError: true, + limit: 10, + totalGroups: 1, + shouldWriteLimit: 10, + }, + } + for _, c := range cases { + var buf bytes.Buffer + dgc := &DataGeneratorConfig{ + BaseConfig: BaseConfig{ + Scale: 1, + Limit: c.limit, + }, + InitialScale: 1, + LogInterval: defaultLogInterval, + InterleavedGroupID: c.groupID, + InterleavedNumGroups: c.totalGroups, + } + g := &DataGenerator{ + config: dgc, + bufOut: bufio.NewWriter(&buf), + } + sim := &testSimulator{ + limit: c.limit, + shouldWriteLimit: c.shouldWriteLimit, + } + serializer := &testSerializer{shouldError: c.shouldError} + + err := g.runSimulator(sim, serializer, dgc) + if c.shouldError && err == nil { + t.Errorf("%s: unexpected lack of error", c.desc) + } else if !c.shouldError && err != nil { + t.Errorf("%s: unexpected error: got %v", c.desc, err) + } else if !c.shouldError { + scanner := bufio.NewScanner(bytes.NewReader(buf.Bytes())) + lines := uint(0) + for { + ok := scanner.Scan() + if !ok && scanner.Err() != nil { + t.Fatal(scanner.Err()) + } else if !ok { + break + } + line := scanner.Text() + want := fmt.Sprintf("iteration=%d", (lines*c.totalGroups)+c.groupID) + if line != want { + t.Errorf("%s: incorrect line: got\n%s\nwant\n%s\n", c.desc, line, want) + } + lines++ + } + if lines != c.wantPoints { + t.Errorf("%s: incorrect number of points: got %d want %d", c.desc, lines, c.wantPoints) + } + } + } +} + +func TestGetSimulatorConfig(t *testing.T) { + dgc := &DataGeneratorConfig{ + BaseConfig: BaseConfig{ + Scale: 1, + }, + InitialScale: 1, + LogInterval: defaultLogInterval, + } + g := &DataGenerator{config: dgc} + + checkType := func(use string, want common.SimulatorConfig) { + wantType := reflect.TypeOf(want) + dgc.Use = use + scfg, err := g.getSimulatorConfig(dgc) + if err != nil { + t.Errorf("unexpected error with use case %s: %v", use, err) + } + if got := reflect.TypeOf(scfg); got != wantType { + t.Errorf("use '%s' does not give right scfg: got %v want %v", use, got, wantType) + } + } + + checkType(useCaseDevops, &devops.DevopsSimulatorConfig{}) + checkType(useCaseCPUOnly, &devops.CPUOnlySimulatorConfig{}) + checkType(useCaseCPUSingle, &devops.CPUOnlySimulatorConfig{}) + + dgc.Use = "bogus use case" + _, err := g.getSimulatorConfig(dgc) + if err == nil { + t.Errorf("unexpected lack of error for bogus use case") + } +} + +func TestGetSerializer(t *testing.T) { + dgc := &DataGeneratorConfig{ + BaseConfig: BaseConfig{ + Use: useCaseCPUOnly, + Scale: 1, + }, + InitialScale: 1, + LogInterval: defaultLogInterval, + } + g := &DataGenerator{ + config: dgc, + } + + scfg, err := g.getSimulatorConfig(dgc) + if err != nil { + t.Errorf("unexpected error creating scfg: %v", err) + } + + sim := scfg.NewSimulator(dgc.LogInterval, 0) + var buf bytes.Buffer + g.bufOut = bufio.NewWriter(&buf) + defer g.bufOut.Flush() + + checkType := func(format string, want serialize.PointSerializer) { + wantType := reflect.TypeOf(want) + s, err := g.getSerializer(sim, format) + if err != nil { + t.Errorf("unexpected error making serializer: %v", err) + } + if got := reflect.TypeOf(s); got != wantType { + t.Errorf("format '%s' does not run the right serializer: got %v want %v", format, got, wantType) + } + } + + checkType(FormatCassandra, &serialize.CassandraSerializer{}) + checkType(FormatClickhouse, &serialize.TimescaleDBSerializer{}) + checkType(FormatInflux, &serialize.InfluxSerializer{}) + checkType(FormatMongo, &serialize.MongoSerializer{}) + checkType(FormatSiriDB, &serialize.SiriDBSerializer{}) + checkType(FormatClickhouse, &serialize.TimescaleDBSerializer{}) + + _, err = g.getSerializer(sim, "bogus format") + if err == nil { + t.Errorf("unexpected lack of error creating bogus serializer") + } +} diff --git a/internal/inputs/generator_test.go b/internal/inputs/generator_test.go new file mode 100644 index 000000000..d96ddeec9 --- /dev/null +++ b/internal/inputs/generator_test.go @@ -0,0 +1,86 @@ +package inputs + +import ( + "fmt" + "testing" +) + +func TestBaseConfigValidate(t *testing.T) { + c := &BaseConfig{ + Scale: 1, + Seed: 123, + Format: FormatTimescaleDB, + Use: useCaseDevops, + } + + // Test Scale validation + err := c.Validate() + if err != nil { + t.Errorf("unexpected error with scale 1: %v", err) + } + + c.Scale = 0 + err = c.Validate() + if err == nil { + t.Errorf("unexpected lack of error for scale of 0") + } else if got := err.Error(); got != ErrScaleIsZero { + t.Errorf("incorrect error for scale of 0: got\n%s\nwant\n%s", got, ErrScaleIsZero) + } + c.Scale = 1 + + // Test Seed validation + err = c.Validate() + if err != nil { + t.Errorf("unexpected error with seed 123: %v", err) + } + if c.Seed != 123 { + t.Errorf("seed was not 123 after validation") + } + + c.Seed = 0 + err = c.Validate() + if err != nil { + t.Errorf("unexpected error with seed 0: %v", err) + } + if c.Seed == 0 { + t.Errorf("seed was not set to nanosecond when 0") + } + + // Test Format validation + c.Format = FormatCassandra + err = c.Validate() + if err != nil { + t.Errorf("unexpected error with Format '%s': %v", FormatCassandra, err) + } + + c.Format = "unknown type" + err = c.Validate() + if err == nil { + t.Errorf("unexpected lack of error for incorrect format") + } else { + want := fmt.Sprintf(errBadFormatFmt, "unknown type") + if got := err.Error(); got != want { + t.Errorf("incorrect error for incorrect format: got\n%v\nwant\n%v", got, want) + } + } + c.Format = FormatTimescaleDB + + // Test Use validation + c.Use = useCaseCPUOnly + err = c.Validate() + if err != nil { + t.Errorf("unexpected error with Use '%s': %v", useCaseCPUOnly, err) + } + + c.Use = "bad use" + err = c.Validate() + if err == nil { + t.Errorf("unexpected lack of error for incorrect use") + } else { + want := fmt.Sprintf(errBadUseFmt, "bad use") + if got := err.Error(); got != want { + t.Errorf("incorrect error for incorrect format: got\n%v\nwant\n%v", got, want) + } + } + c.Use = useCaseDevops +} diff --git a/internal/inputs/utils.go b/internal/inputs/utils.go new file mode 100644 index 000000000..3f79098fe --- /dev/null +++ b/internal/inputs/utils.go @@ -0,0 +1,83 @@ +package inputs + +import ( + "bufio" + "fmt" + "io" + "os" + "time" +) + +// Formats supported for generation +const ( + FormatCassandra = "cassandra" + FormatClickhouse = "clickhouse" + FormatInflux = "influx" + FormatMongo = "mongo" + FormatSiriDB = "siridb" + FormatTimescaleDB = "timescaledb" +) + +const ( + defaultTimeStart = "2016-01-01T00:00:00Z" + defaultTimeEnd = "2016-01-02T00:00:00Z" +) + +var formats = []string{ + FormatCassandra, + FormatClickhouse, + FormatInflux, + FormatMongo, + FormatSiriDB, + FormatTimescaleDB, +} + +func ValidFormats() []string { + return append([]string{}, formats...) +} + +func isIn(s string, arr []string) bool { + for _, x := range arr { + if s == x { + return true + } + } + return false +} + +const ( + // Use case choices (make sure to update TestGetConfig if adding a new one) + useCaseCPUOnly = "cpu-only" + useCaseCPUSingle = "cpu-single" + useCaseDevops = "devops" +) + +var useCaseChoices = []string{ + useCaseCPUOnly, + useCaseCPUSingle, + useCaseDevops, +} + +// ParseUTCTime parses a string-represented time of the format 2006-01-02T15:04:05Z07:00 +func ParseUTCTime(s string) (time.Time, error) { + t, err := time.Parse(time.RFC3339, s) + if err != nil { + return time.Time{}, err + } + return t.UTC(), nil +} + +const defaultWriteSize = 4 << 20 // 4 MB + +func getBufferedWriter(filename string, fallback io.Writer) (*bufio.Writer, error) { + // If filename is given, output should go to a file + if len(filename) > 0 { + file, err := os.Create(filename) + if err != nil { + return nil, fmt.Errorf("cannot open file for write %s: %v", filename, err) + } + return bufio.NewWriterSize(file, defaultWriteSize), nil + } + + return bufio.NewWriterSize(fallback, defaultWriteSize), nil +} diff --git a/internal/inputs/utils_test.go b/internal/inputs/utils_test.go new file mode 100644 index 000000000..68cb01f0d --- /dev/null +++ b/internal/inputs/utils_test.go @@ -0,0 +1,57 @@ +package inputs + +import ( + "testing" + "time" +) + +func TestIsIn(t *testing.T) { + arr := []string{"foo", "bar", "baz"} + arr2 := []string{"oof", "foo ", "nada", "123"} + + // Test positive cases + for _, s := range arr { + if !isIn(s, arr) { + t.Errorf("%s not found in %v incorrectly", s, arr) + } + } + for _, s := range arr2 { + if !isIn(s, arr2) { + t.Errorf("%s not found in %v incorrectly", s, arr) + } + } + + // Test negative cases + for _, s := range arr { + if isIn(s, arr2) { + t.Errorf("%s found in %v incorrectly", s, arr) + } + } + for _, s := range arr2 { + if isIn(s, arr) { + t.Errorf("%s found in %v incorrectly", s, arr) + } + } + +} + +const ( + correctTimeStr = "2016-01-01T00:00:00Z" + incorrectTimeStr = "2017-01-01" +) + +var correctTime = time.Date(2016, time.January, 1, 0, 0, 0, 0, time.UTC) + +func TestParseUTCTime(t *testing.T) { + parsedTime, err := ParseUTCTime(correctTimeStr) + if err != nil { + t.Errorf("unexpected error: got %v", err) + } else if parsedTime != correctTime { + t.Errorf("did not get correct time back: got %v want %v", parsedTime, correctTime) + } + + _, err = ParseUTCTime(incorrectTimeStr) + if err == nil { + t.Errorf("unexpected lack of error") + } +}