diff --git a/README.md b/README.md index 93f7bef56..a96697226 100644 --- a/README.md +++ b/README.md @@ -13,6 +13,7 @@ Current databases supported: + ClickHouse [(supplemental docs)](docs/clickhouse.md) + CrateDB [(supplemental docs)](docs/cratedb.md) + SiriDB [(supplemental docs)](docs/siridb.md) ++ Akumuli [(supplemental docs)](docs/akumuli.md) ## Overview diff --git a/cmd/tsbs_generate_data/serialize/akumuli.go b/cmd/tsbs_generate_data/serialize/akumuli.go new file mode 100644 index 000000000..0cb44e41a --- /dev/null +++ b/cmd/tsbs_generate_data/serialize/akumuli.go @@ -0,0 +1,141 @@ +package serialize + +import ( + "encoding/binary" + "errors" + "fmt" + "io" +) + +const ( + placeholderText = "AAAAFFEE" +) + +// AkumuliSerializer writes a series of Point elements into RESP encoded +// buffer. +type AkumuliSerializer struct { + book map[string]uint32 + bookClosed bool + deferred []byte + index uint32 +} + +// NewAkumuliSerializer initializes AkumuliSerializer instance. +func NewAkumuliSerializer() *AkumuliSerializer { + s := &AkumuliSerializer{} + s.book = make(map[string]uint32) + s.deferred = make([]byte, 0, 4096) + s.bookClosed = false + return s +} + +// Serialize writes Point data to the given writer, conforming to the +// AKUMULI RESP protocol. Serializer adds extra data to guide data loader. +// This function writes output that contains binary and text data in RESP format. +func (s *AkumuliSerializer) Serialize(p *Point, w io.Writer) (err error) { + deferPoint := false + + buf := make([]byte, 0, 1024) + // Add cue + const HeaderLength = 8 + buf = append(buf, placeholderText...) + buf = append(buf, "+"...) + + // Series name + for i := 0; i < len(p.fieldKeys); i++ { + buf = append(buf, p.measurementName...) + buf = append(buf, '.') + buf = append(buf, p.fieldKeys[i]...) + if i+1 < len(p.fieldKeys) { + buf = append(buf, '|') + } else { + buf = append(buf, ' ') + } + } + + for i := 0; i < len(p.tagKeys); i++ { + buf = append(buf, ' ') + buf = append(buf, p.tagKeys[i]...) + buf = append(buf, '=') + buf = append(buf, p.tagValues[i].(string)...) + } + + series := string(buf[HeaderLength:]) + if !s.bookClosed { + // Save point for later + if id, ok := s.book[series]; ok { + s.bookClosed = true + _, err = w.Write(s.deferred) + if err != nil { + return err + } + buf = buf[:HeaderLength] + buf = append(buf, fmt.Sprintf(":%d", id)...) + binary.LittleEndian.PutUint32(buf[:4], id) + } else { + // Shortcut + s.index++ + tmp := make([]byte, 0, 1024) + tmp = append(tmp, placeholderText...) + tmp = append(tmp, "*2\n"...) + tmp = append(tmp, buf[HeaderLength:]...) + tmp = append(tmp, '\n') + tmp = append(tmp, fmt.Sprintf(":%d\n", s.index)...) + s.book[series] = s.index + // Update cue + binary.LittleEndian.PutUint16(tmp[4:6], uint16(len(tmp))) + binary.LittleEndian.PutUint16(tmp[6:HeaderLength], uint16(0)) + binary.LittleEndian.PutUint32(tmp[:4], s.index) + binary.LittleEndian.PutUint32(buf[:4], s.index) + _, err = w.Write(tmp) + if err != nil { + return err + } + deferPoint = true + buf = buf[:HeaderLength] + buf = append(buf, fmt.Sprintf(":%d", s.index)...) + } + } else { + // Replace the series name with the value from the book + if id, ok := s.book[series]; ok { + buf = buf[:HeaderLength] + buf = append(buf, fmt.Sprintf(":%d", id)...) + binary.LittleEndian.PutUint16(buf[4:6], uint16(len(buf))) + binary.LittleEndian.PutUint16(buf[6:HeaderLength], uint16(0)) + binary.LittleEndian.PutUint32(buf[:4], id) + } else { + return errors.New("unexpected series name") + } + } + + buf = append(buf, '\n') + + // Timestamp + buf = append(buf, ':') + buf = fastFormatAppend(p.timestamp.UTC().UnixNano(), buf) + buf = append(buf, '\n') + + // Values + buf = append(buf, fmt.Sprintf("*%d\n", len(p.fieldValues))...) + for i := 0; i < len(p.fieldValues); i++ { + v := p.fieldValues[i] + switch v.(type) { + case int, int64: + buf = append(buf, ':') + case float64: + buf = append(buf, '+') + } + buf = fastFormatAppend(v, buf) + buf = append(buf, '\n') + } + + // Update cue + binary.LittleEndian.PutUint16(buf[4:6], uint16(len(buf))) + binary.LittleEndian.PutUint16(buf[6:HeaderLength], uint16(len(p.fieldValues))) + if deferPoint { + s.deferred = append(s.deferred, buf...) + return nil + } + _, err = w.Write(buf) + return err +} diff --git a/cmd/tsbs_generate_data/serialize/akumuli_test.go b/cmd/tsbs_generate_data/serialize/akumuli_test.go new file mode 100644 index 000000000..1849c5efc --- /dev/null +++ b/cmd/tsbs_generate_data/serialize/akumuli_test.go @@ -0,0 +1,78 @@ +package serialize + +import ( + "bytes" + "strings" + "testing" +) + +func TestAkumuliSerializerSerialize(t *testing.T) { + + serializer := NewAkumuliSerializer() + + points := []*Point{ + testPointDefault, + testPointInt, + testPointMultiField, + testPointDefault, + testPointInt, + testPointMultiField, + } + + type testCase struct { + expCount int + expValue string + name string + } + + cases := []testCase{ + { + expCount: 1, + expValue: "+cpu.usage_guest_nice hostname=host_0 region=eu-west-1 datacenter=eu-west-1b", + name: "series name default", + }, + { + expCount: 1, + expValue: "+cpu.usage_guest hostname=host_0 region=eu-west-1 datacenter=eu-west-1b", + name: "series name int", + }, + { + expCount: 1, + expValue: "+cpu.big_usage_guest|cpu.usage_guest|cpu.usage_guest_nice hostname=host_0 region=eu-west-1 datacenter=eu-west-1b", + name: "series name multi-field", + }, + { + expCount: 2, + expValue: "*1\n+38.24311829", + name: "value default", + }, + { + expCount: 2, + expValue: "*1\n:38", + name: "value int", + }, + { + expCount: 2, + expValue: "*3\n:5000000000\n:38\n+38.24311829", + name: "value multi-field", + }, + { + expCount: 6, + expValue: ":1451606400000000000", + name: "timestamp", + }, + } + buf := new(bytes.Buffer) + for _, point := range points { + serializer.Serialize(point, buf) + } + + got := buf.String() + + for _, c := range cases { + actualCnt := strings.Count(got, c.expValue) + if actualCnt != c.expCount { + t.Errorf("Output incorrect: %s expected %d times got %d times", c.name, c.expCount, actualCnt) + } + } +} diff --git a/cmd/tsbs_generate_queries/databases/akumuli/common.go b/cmd/tsbs_generate_queries/databases/akumuli/common.go new file mode 100644 index 000000000..cd0c8ff26 --- /dev/null +++ b/cmd/tsbs_generate_queries/databases/akumuli/common.go @@ -0,0 +1,45 @@ +package akumuli + +import ( + "time" + + "github.com/timescale/tsbs/cmd/tsbs_generate_queries/uses/devops" + "github.com/timescale/tsbs/cmd/tsbs_generate_queries/utils" + "github.com/timescale/tsbs/query" +) + +// BaseGenerator contains settings specific for Akumuli database. +type BaseGenerator struct { +} + +// GenerateEmptyQuery returns an empty query.HTTP +func (d *Devops) GenerateEmptyQuery() query.Query { + return query.NewHTTP() +} + +// fillInQuery fills the query struct with data. +func (g *BaseGenerator) fillInQuery(qi query.Query, humanLabel, humanDesc, body string, begin, end int64) { + q := qi.(*query.HTTP) + q.HumanLabel = []byte(humanLabel) + q.HumanDescription = []byte(humanDesc) + q.Method = []byte("POST") + q.Path = []byte("/api/query") + q.Body = []byte(body) + q.StartTimestamp = begin + q.EndTimestamp = end +} + +// NewDevops makes an Devops object ready to generate Queries. +func (g *BaseGenerator) NewDevops(start, end time.Time, scale int) (utils.QueryGenerator, error) { + core, err := devops.NewCore(start, end, scale) + if err != nil { + return nil, err + } + + devops := &Devops{ + BaseGenerator: g, + Core: core, + } + + return devops, nil +} diff --git a/cmd/tsbs_generate_queries/databases/akumuli/devops.go b/cmd/tsbs_generate_queries/databases/akumuli/devops.go new file mode 100644 index 000000000..f87f851e3 --- /dev/null +++ b/cmd/tsbs_generate_queries/databases/akumuli/devops.go @@ -0,0 +1,288 @@ +package akumuli + +import ( + "bytes" + "encoding/json" + "fmt" + "time" + + "github.com/timescale/tsbs/cmd/tsbs_generate_queries/uses/devops" + "github.com/timescale/tsbs/query" +) + +// Devops produces Influx-specific queries for all the devops query types. +type Devops struct { + *BaseGenerator + *devops.Core +} + +func panicIfErr(err error) { + if err != nil { + panic(err.Error()) + } +} + +type tsdbQueryRange struct { + From int64 `json:"from"` + To int64 `json:"to"` +} + +type tsdbGroupAggStmt struct { + Name []string `json:"metric"` + Func []string `json:"func"` + Step string `json:"step"` +} + +type tsdbGroupAggregateQuery struct { + GroupAggregate tsdbGroupAggStmt `json:"group-aggregate"` + TimeRange tsdbQueryRange `json:"range"` + Where map[string][]string `json:"where"` + Output map[string]string `json:"output"` + OrderBy string `json:"order-by"` +} + +type tsdbGroupByTagGroupAggregateQuery struct { + GroupAggregate tsdbGroupAggStmt `json:"group-aggregate"` + TimeRange tsdbQueryRange `json:"range"` + Where map[string][]string `json:"where"` + Output map[string]string `json:"output"` + OrderBy string `json:"order-by"` + GroupBy []string `json:"group-by-tag"` +} + +type tsdbSelectQuery struct { + Select string `json:"select"` + TimeRange tsdbQueryRange `json:"range"` + Where map[string][]string `json:"where"` + Output map[string]string `json:"output"` + Filter map[string]map[string]string `json:"filter"` +} + +type tsdbAggregateAllQuery struct { + Metrics map[string]string `json:"aggregate"` + Output map[string]string `json:"output"` +} + +// GroupByTime selects the MAX for a single metric under 'cpu', +// per minute for nhosts hosts, +// e.g. in pseudo-SQL: +// +// SELECT minute, max(metric1), ..., max(metricN) +// FROM cpu +// WHERE +// (hostname = '$HOSTNAME_1' OR ... OR hostname = '$HOSTNAME_N') +// AND time >= '$HOUR_START' +// AND time < '$HOUR_END' +// GROUP BY minute +// ORDER BY minute ASC +// +// Resultsets: +// single-groupby-1-1-12 +// single-groupby-1-1-1 +// single-groupby-1-8-1 +// single-groupby-5-1-12 +// single-groupby-5-1-1 +// single-groupby-5-8-1 +func (d *Devops) GroupByTime(qi query.Query, nhosts, numMetrics int, timeRange time.Duration) { + interval := d.Interval.MustRandWindow(timeRange) + hostnames, err := d.GetRandomHosts(nhosts) + if err != nil { + panic(err) + } + startTimestamp := interval.StartUnixNano() + endTimestamp := interval.EndUnixNano() + + var query tsdbGroupAggregateQuery + query.GroupAggregate.Func = append(query.GroupAggregate.Func, "max") + query.GroupAggregate.Step = "1m" + metricSlice, err := devops.GetCPUMetricsSlice(numMetrics) + panicIfErr(err) + for _, name := range metricSlice { + query.GroupAggregate.Name = append(query.GroupAggregate.Name, "cpu."+name) + } + + query.Where = make(map[string][]string) + query.Where["hostname"] = hostnames + query.TimeRange.From = startTimestamp + query.TimeRange.To = endTimestamp + query.Output = make(map[string]string) + query.Output["format"] = "csv" + query.OrderBy = "time" + + bodyWriter := new(bytes.Buffer) + body, err := json.Marshal(query) + if err != nil { + panic(err) + } + bodyWriter.Write(body) + + humanLabel := fmt.Sprintf("Akumuli max cpu, rand %4d hosts, rand %s by 1m", nhosts, timeRange) + humanDesc := fmt.Sprintf("%s: %s", humanLabel, interval.StartString()) + d.fillInQuery(qi, humanLabel, humanDesc, string(bodyWriter.Bytes()), interval.StartUnixNano(), interval.EndUnixNano()) +} + +// HighCPUForHosts populates a query that gets CPU metrics when the CPU has high +// usage between a time period for a number of hosts (if 0, it will search all hosts), +// e.g. in pseudo-SQL: +// +// SELECT * FROM cpu +// WHERE usage_user > 90.0 +// AND time >= '$TIME_START' AND time < '$TIME_END' +// AND (hostname = '$HOST' OR hostname = '$HOST2'...) +// +// Resultsets: +// high-cpu-1 +// high-cpu-all +func (d *Devops) HighCPUForHosts(qi query.Query, nHosts int) { + interval := d.Interval.MustRandWindow(devops.HighCPUDuration) + var hostnames []string + if nHosts > 0 { + var err error + hostnames, err = d.GetRandomHosts(nHosts) + panicIfErr(err) + } + startTimestamp := interval.StartUnixNano() + endTimestamp := interval.EndUnixNano() + var query tsdbSelectQuery + query.Select = "cpu.usage_user" + query.Where = make(map[string][]string) + if nHosts > 0 { + query.Where["hostname"] = hostnames + } + query.TimeRange.From = startTimestamp + query.TimeRange.To = endTimestamp + query.Output = make(map[string]string) + query.Output["format"] = "csv" + query.Filter = make(map[string]map[string]string) + query.Filter["cpu"] = make(map[string]string) + query.Filter["cpu"]["gt"] = "90.0" + + bodyWriter := new(bytes.Buffer) + body, err := json.Marshal(query) + if err != nil { + panic(err) + } + bodyWriter.Write(body) + + humanLabel, err := devops.GetHighCPULabel("Akumuli", nHosts) + panicIfErr(err) + humanDesc := fmt.Sprintf("%s: %s - %s", humanLabel, interval.StartString(), interval.EndString()) + d.fillInQuery(qi, humanLabel, humanDesc, string(bodyWriter.Bytes()), interval.StartUnixNano(), interval.EndUnixNano()) +} + +// MaxAllCPU selects the MAX of all metrics under 'cpu' per hour for nhosts hosts, +// e.g. in pseudo-SQL: +// +// SELECT MAX(metric1), ..., MAX(metricN) +// FROM cpu +// WHERE +// (hostname = '$HOSTNAME_1' OR ... OR hostname = '$HOSTNAME_N') +// AND time >= '$HOUR_START' +// AND time < '$HOUR_END' +// GROUP BY hour +// ORDER BY hour +// +// Resultsets: +// cpu-max-all-1 +// cpu-max-all-8 +func (d *Devops) MaxAllCPU(qi query.Query, nHosts int) { + interval := d.Interval.MustRandWindow(devops.MaxAllDuration) + hostnames, err := d.GetRandomHosts(nHosts) + panicIfErr(err) + startTimestamp := interval.StartUnixNano() + endTimestamp := interval.EndUnixNano() + + var query tsdbGroupAggregateQuery + query.GroupAggregate.Func = append(query.GroupAggregate.Func, "max") + query.GroupAggregate.Step = "1h" + for _, name := range devops.GetAllCPUMetrics() { + query.GroupAggregate.Name = append(query.GroupAggregate.Name, "cpu."+name) + } + + query.Where = make(map[string][]string) + query.Where["hostname"] = hostnames + query.TimeRange.From = startTimestamp + query.TimeRange.To = endTimestamp + query.Output = make(map[string]string) + query.Output["format"] = "csv" + query.OrderBy = "time" + + bodyWriter := new(bytes.Buffer) + body, err := json.Marshal(query) + if err != nil { + panic(err) + } + bodyWriter.Write(body) + + humanLabel := devops.GetMaxAllLabel("Akumuli", nHosts) + humanDesc := fmt.Sprintf("%s: %s", humanLabel, interval.StartString()) + d.fillInQuery(qi, humanLabel, humanDesc, string(bodyWriter.Bytes()), interval.StartUnixNano(), interval.EndUnixNano()) +} + +// LastPointPerHost finds the last row for every host in the dataset +func (d *Devops) LastPointPerHost(qi query.Query) { + + var query tsdbAggregateAllQuery + query.Metrics = make(map[string]string) + for _, name := range devops.GetAllCPUMetrics() { + query.Metrics["cpu."+name] = "last" + } + query.Output = make(map[string]string) + query.Output["format"] = "csv" + + bodyWriter := new(bytes.Buffer) + body, err := json.Marshal(query) + if err != nil { + panic(err) + } + bodyWriter.Write(body) + + humanLabel := "Akumuli last row per host" + humanDesc := humanLabel + ": cpu" + d.fillInQuery(qi, humanLabel, humanDesc, string(bodyWriter.Bytes()), 0, 0) +} + +// GroupByTimeAndPrimaryTag selects the AVG of numMetrics metrics under 'cpu' per device per hour for a day, +// e.g. in pseudo-SQL: +// +// SELECT AVG(metric1), ..., AVG(metricN) +// FROM cpu +// WHERE time >= '$HOUR_START' AND time < '$HOUR_END' +// GROUP BY hour, hostname ORDER BY hour, hostname +// +// Resultsets: +// double-groupby-1 +// double-groupby-5 +// double-groupby-all +func (d *Devops) GroupByTimeAndPrimaryTag(qi query.Query, numMetrics int) { + metrics, err := devops.GetCPUMetricsSlice(numMetrics) + panicIfErr(err) + interval := d.Interval.MustRandWindow(devops.DoubleGroupByDuration) + startTimestamp := interval.StartUnixNano() + endTimestamp := interval.EndUnixNano() + + var query tsdbGroupByTagGroupAggregateQuery + query.GroupAggregate.Func = append(query.GroupAggregate.Func, "mean") + query.GroupAggregate.Step = "1h" + query.GroupBy = append(query.GroupBy, "hostname") + for _, name := range metrics { + query.GroupAggregate.Name = append(query.GroupAggregate.Name, "cpu."+name) + } + + query.TimeRange.From = startTimestamp + query.TimeRange.To = endTimestamp + query.Output = make(map[string]string) + query.Output["format"] = "csv" + query.OrderBy = "time" + + bodyWriter := new(bytes.Buffer) + body, err := json.Marshal(query) + if err != nil { + panic(err) + } + bodyWriter.Write(body) + + humanLabel := devops.GetDoubleGroupByLabel("Akumuli", numMetrics) + humanDesc := fmt.Sprintf("%s: %s", humanLabel, interval.StartString()) + d.fillInQuery(qi, humanLabel, humanDesc, string(bodyWriter.Bytes()), interval.StartUnixNano(), interval.EndUnixNano()) +} diff --git a/cmd/tsbs_load_akumuli/creator.go b/cmd/tsbs_load_akumuli/creator.go new file mode 100644 index 000000000..e1ea9e3ff --- /dev/null +++ b/cmd/tsbs_load_akumuli/creator.go @@ -0,0 +1,32 @@ +package main + +import ( + "bufio" +) + +// loader.DBCreator interface implementation +type dbCreator struct { +} + +// loader.DBCreator interface implementation +func (d *dbCreator) Init() { +} + +// loader.DBCreator interface implementation +func (d *dbCreator) readDataHeader(br *bufio.Reader) { +} + +// loader.DBCreator interface implementation +func (d *dbCreator) DBExists(dbName string) bool { + return false +} + +// loader.DBCreator interface implementation +func (d *dbCreator) RemoveOldDB(dbName string) error { + return nil +} + +// loader.DBCreator interface implementation +func (d *dbCreator) CreateDB(dbName string) error { + return nil +} diff --git a/cmd/tsbs_load_akumuli/main.go b/cmd/tsbs_load_akumuli/main.go new file mode 100644 index 000000000..1d713fe6c --- /dev/null +++ b/cmd/tsbs_load_akumuli/main.go @@ -0,0 +1,85 @@ +// bulk_load_akumuli loads an akumlid daemon with data from stdin. +// +// The caller is responsible for assuring that the database is empty before +// bulk load. +package main + +import ( + "bufio" + "bytes" + "fmt" + "log" + "sync" + + "github.com/spf13/pflag" + "github.com/spf13/viper" + "github.com/timescale/tsbs/internal/utils" + "github.com/timescale/tsbs/load" +) + +// Program option vars: +var ( + endpoint string +) + +// Global vars +var ( + loader *load.BenchmarkRunner + bufPool sync.Pool +) + +// allows for testing +var fatal = log.Fatalf + +// Parse args: +func init() { + var config load.BenchmarkRunnerConfig + config.AddToFlagSet(pflag.CommandLine) + + pflag.StringVar(&endpoint, "endpoint", "http://localhost:8282", "Akumuli RESP endpoint IP address.") + pflag.Parse() + + err := utils.SetupConfigFile() + + if err != nil { + panic(fmt.Errorf("fatal error config file: %s", err)) + } + + if err := viper.Unmarshal(&config); err != nil { + panic(fmt.Errorf("unable to decode config: %s", err)) + } + + endpoint = viper.GetString("endpoint") + loader = load.GetBenchmarkRunner(config) +} + +type benchmark struct{} + +func (b *benchmark) GetPointDecoder(br *bufio.Reader) load.PointDecoder { + return &decoder{reader: br} +} + +func (b *benchmark) GetBatchFactory() load.BatchFactory { + return &factory{} +} + +func (b *benchmark) GetPointIndexer(n uint) load.PointIndexer { + return &pointIndexer{nchan: n} +} + +func (b *benchmark) GetProcessor() load.Processor { + return &processor{endpoint: endpoint} +} + +func (b *benchmark) GetDBCreator() load.DBCreator { + return &dbCreator{} +} + +func main() { + bufPool = sync.Pool{ + New: func() interface{} { + return bytes.NewBuffer(make([]byte, 0, 4*1024*1024)) + }, + } + loader.RunBenchmark(&benchmark{}, load.WorkerPerQueue) +} diff --git a/cmd/tsbs_load_akumuli/process.go b/cmd/tsbs_load_akumuli/process.go new file mode 100644 index 000000000..9e3bb01de --- /dev/null +++ b/cmd/tsbs_load_akumuli/process.go @@ -0,0 +1,52 @@ +package main + +import ( + "encoding/binary" + "log" + "net" + + "github.com/timescale/tsbs/load" +) + +type processor struct { + endpoint string + conn net.Conn + worker int +} + +func (p *processor) Init(numWorker int, _ bool) { + p.worker = numWorker + c, err := net.Dial("tcp", p.endpoint) + if err == nil { + p.conn = c + log.Println("Connection with", p.endpoint, "successful") + } else { + log.Println("Can't establish connection with", p.endpoint) + panic("Connection error") + } +} + +func (p *processor) Close(doLoad bool) { + if doLoad { + p.conn.Close() + } +} + +func (p *processor) ProcessBatch(b load.Batch, doLoad bool) (uint64, uint64) { + batch := b.(*batch) + var nmetrics uint64 + if doLoad { + head := batch.buf.Bytes() + for len(head) != 0 { + nbytes := binary.LittleEndian.Uint16(head[4:6]) + nfields := binary.LittleEndian.Uint16(head[6:8]) + payload := head[8:nbytes] + p.conn.Write(payload) + nmetrics += uint64(nfields) + head = head[nbytes:] + } + } + batch.buf.Reset() + bufPool.Put(batch.buf) + return nmetrics, batch.rows +} diff --git a/cmd/tsbs_load_akumuli/scan.go b/cmd/tsbs_load_akumuli/scan.go new file mode 100644 index 000000000..a3d6d478f --- /dev/null +++ b/cmd/tsbs_load_akumuli/scan.go @@ -0,0 +1,59 @@ +package main + +import ( + "bufio" + "bytes" + "encoding/binary" + "io" + + "github.com/timescale/tsbs/load" +) + +type decoder struct { + reader *bufio.Reader +} + +func (d *decoder) Decode(_ *bufio.Reader) *load.Point { + hdr, err := d.reader.Peek(6) + if err == io.EOF { + return nil + } + nbytes := binary.LittleEndian.Uint16(hdr[4:6]) + body := make([]byte, nbytes) + _, err = io.ReadFull(d.reader, body) + if err == io.EOF { + return nil + } + return load.NewPoint(body) +} + +type pointIndexer struct { + nchan uint +} + +func (i *pointIndexer) GetIndex(p *load.Point) int { + hdr := p.Data.([]byte) + id := binary.LittleEndian.Uint32(hdr[0:4]) + return int(id % uint32(i.nchan)) +} + +type batch struct { + buf *bytes.Buffer + rows uint64 +} + +func (b *batch) Len() int { + return int(b.rows) +} + +func (b *batch) Append(item *load.Point) { + payload := item.Data.([]byte) + b.buf.Write(payload) + b.rows++ +} + +type factory struct{} + +func (f *factory) New() load.Batch { + return &batch{buf: bufPool.Get().(*bytes.Buffer)} +} diff --git a/cmd/tsbs_run_queries_akumuli/http_client.go b/cmd/tsbs_run_queries_akumuli/http_client.go new file mode 100644 index 000000000..03fffff05 --- /dev/null +++ b/cmd/tsbs_run_queries_akumuli/http_client.go @@ -0,0 +1,105 @@ +package main + +import ( + "bufio" + "bytes" + "fmt" + "io" + "net/http" + "os" + "time" + + "github.com/timescale/tsbs/query" +) + +// HTTPClient is a reusable HTTP Client. +type HTTPClient struct { + //client fasthttp.Client + client http.Client + Host []byte + HostString string + uri []byte +} + +// HTTPClientDoOptions wraps options uses when calling `Do`. +type HTTPClientDoOptions struct { + Debug int + PrintResponses bool +} + +// NewHTTPClient creates a new HTTPClient. +func NewHTTPClient(host string) *HTTPClient { + return &HTTPClient{ + client: http.Client{}, + Host: []byte(host), + HostString: host, + uri: []byte{}, // heap optimization + } +} + +// Do performs the action specified by the given Query. It uses fasthttp, and +// tries to minimize heap allocations. +func (w *HTTPClient) Do(q *query.HTTP, opts *HTTPClientDoOptions) (lag float64, err error) { + // populate uri from the reusable byte slice: + w.uri = w.uri[:0] + w.uri = append(w.uri, w.Host...) + w.uri = append(w.uri, q.Path...) + + // populate a request with data from the Query: + req, err := http.NewRequest(string(q.Method), string(w.uri), bytes.NewReader(q.Body)) + if err != nil { + panic(err) + } + + // Perform the request while tracking latency: + start := time.Now() + resp, err := w.client.Do(req) + if err != nil { + panic(err) + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { + panic("http request did not return status 200 OK") + } + + reader := bufio.NewReader(resp.Body) + buf := make([]byte, 8192) + for { + _, err = reader.Read(buf) + if err == io.EOF { + err = nil + break + } else if err != nil { + panic(err) + } + } + lag = float64(time.Since(start).Nanoseconds()) / 1e6 // milliseconds + + if opts != nil { + // Print debug messages, if applicable: + switch opts.Debug { + case 1: + fmt.Fprintf(os.Stderr, "debug: %s in %7.2fms\n", q.HumanLabel, lag) + case 2: + fmt.Fprintf(os.Stderr, "debug: %s in %7.2fms -- %s\n", q.HumanLabel, lag, q.HumanDescription) + case 3: + fmt.Fprintf(os.Stderr, "debug: %s in %7.2fms -- %s\n", q.HumanLabel, lag, q.HumanDescription) + fmt.Fprintf(os.Stderr, "debug: request: %s\n", string(q.String())) + case 4: + fmt.Fprintf(os.Stderr, "debug: %s in %7.2fms -- %s\n", q.HumanLabel, lag, q.HumanDescription) + fmt.Fprintf(os.Stderr, "debug: request: %s\n", string(q.String())) + //fmt.Fprintf(os.Stderr, "debug: response: %s\n", string(resp.Body())) + default: + } + + // Pretty print JSON responses, if applicable: + if opts.PrintResponses { + _, err = io.Copy(os.Stderr, resp.Body) + if err != nil { + return + } + } + } + + return lag, err +} diff --git a/cmd/tsbs_run_queries_akumuli/main.go b/cmd/tsbs_run_queries_akumuli/main.go new file mode 100644 index 000000000..842dcbee1 --- /dev/null +++ b/cmd/tsbs_run_queries_akumuli/main.go @@ -0,0 +1,80 @@ +// tsbs_run_queries_akumuli speed tests Akumuli using requests from stdin. +// +// It reads encoded Query objects from stdin, and makes concurrent requests +// to the provided HTTP endpoint. This program has no knowledge of the +// internals of the endpoint. +package main + +import ( + "fmt" + + "github.com/spf13/pflag" + "github.com/spf13/viper" + "github.com/timescale/tsbs/internal/utils" + "github.com/timescale/tsbs/query" +) + +// Program option vars: +var ( + endpoint string +) + +// Global vars: +var ( + runner *query.BenchmarkRunner +) + +// Parse args: +func init() { + var config query.BenchmarkRunnerConfig + config.AddToFlagSet(pflag.CommandLine) + + pflag.StringVar(&endpoint, "endpoint", "http://localhost:8181", "Akumuli API endpoint IP address.") + + pflag.Parse() + + err := utils.SetupConfigFile() + + if err != nil { + panic(fmt.Errorf("fatal error config file: %s", err)) + } + + if err := viper.Unmarshal(&config); err != nil { + panic(fmt.Errorf("unable to decode config: %s", err)) + } + + endpoint = viper.GetString("endpoint") + + runner = query.NewBenchmarkRunner(config) +} + +func main() { + runner.Run(&query.HTTPPool, newProcessor) +} + +type processor struct { + w *HTTPClient + opts *HTTPClientDoOptions +} + +func newProcessor() query.Processor { return &processor{} } + +func (p *processor) Init(workerNumber int) { + p.opts = &HTTPClientDoOptions{ + Debug: runner.DebugLevel(), + PrintResponses: runner.DoPrintResponses(), + } + url := endpoint + p.w = NewHTTPClient(url) +} + +func (p *processor) ProcessQuery(q query.Query, _ bool) ([]*query.Stat, error) { + hq := q.(*query.HTTP) + lag, err := p.w.Do(hq, p.opts) + if err != nil { + return nil, err + } + stat := query.GetStat() + stat.Init(q.HumanLabelName(), lag) + return []*query.Stat{stat}, nil +} diff --git a/docs/akumuli.md b/docs/akumuli.md new file mode 100644 index 000000000..8a7b11420 --- /dev/null +++ b/docs/akumuli.md @@ -0,0 +1,65 @@ +# TSBS Supplemental Guide: Akumuli + +Akumuli is an open-source time-series database written in C++ with. +performance in mind. This guide explains how to use TSBS to generate +test data and run the tests. + +## Data format + +Data generated by `tsbs_generate_data` for Akumuli is serialized in a +dictionary-based RESP format. The header of the file contains dictionary entries. +Every entry contains unique series name and id. +The header is followed by data-points which are using ids instead of full series names. +The text data in the input file is interleaved with binary data +wich acts as a cue for `tsbs_load_akumuli` tool. It uses them for emulating +realclients that send data independently without actually parsing the messages. + +--- + +## `tsbs_load_akumuli` Additional Flags + +#### `--endpoint` (type: `string`, default: `http://localhost:8282`) + +TCP endpoint to connect to for inserting data. Workers will create individual connections. + +--- + +## `tsbs_run_queries_akumuli` Additional Flags + +#### `--endpoint` (type: `string`, default: `http://localhost:8181`) + +HTTP API endpoint to run queries. + +--- + +## Getting started + +You can install Akumuli from this [packagecloud repository](https://packagecloud.io/Lazin/Akumuli). +It contains pre-built amd64 and arm64 packages for Debian, Ubuntu, and CentOS. +After installing you can run `akumulid --init` to create configuration file (~/.akumulid). +You can set the database size there. Also, you can remove or comment out 'WAL' section (Akumuli is +marginally slower if WAL is enabled). +The next step is to create the database using `akumulid --create`. This will create the database files. +After that you can run the database by running `akumulid` without parameters. + +To start over you should stop the database by sending SIGINT to the akumulid process followed by +`akumulid --delete; akumulid --create`. + +#### Generating the data + +Run `tsbs_generate_data` with `-format=akumuli` to generate the input. +Here is the sample command: +`./cmd/tsbs_generate_data/tsbs_generate_data --use-case="cpu-only" --seed=123 --scale=1000 --timestamp-start="2016-01-01T00:00:00Z" --timestamp-end="2016-01-02T00:00:00Z" --log-interval="10s" --format="akumuli" | gzip > /tmp/akumuli-data.gz` + +The easiest way to generate queries is to use `scripts/generate_queries.sh` script. +`FORMATS="akumuli" SCALE=1000 SEED=123 TS_START="2016-01-01T00:00:00Z" TS_END="2016-01-02T00:00:01Z" QUERY_TYPES="cpu-max-all-1 cpu-max-all-8 double-groupby-1 double-groupby-5 double-groupby-all high-cpu-1 high-cpu-all lastpoint single-groupby-1-1-1 single-groupby-1-1-12 single-groupby-1-8-1 single-groupby-5-1-1 single-groupby-5-1-12" QUERIES=1000 BULK_DATA_DIR="/tmp/bulk_queries" scripts/generate_queries.sh` + +#### Loading the data + +This can be done using `scripts/load_akumuli.sh`. Note that the script expects certain file name format. +`NUM_WORKERS=2 BATCH_SIZE=10000 BULK_DATA_DIR=/tmp scripts/load_akumuli.sh` + +#### Running the queries + +This could be done the same way documentation suggests. +`cat /tmp/bulk_queries/akumuli-high-cpu-1-queries.gz | gunzip | tsbs_run_queries_akumuli` diff --git a/internal/inputs/generator_data.go b/internal/inputs/generator_data.go index de008cbe0..cc8214f39 100644 --- a/internal/inputs/generator_data.go +++ b/internal/inputs/generator_data.go @@ -235,6 +235,8 @@ func (g *DataGenerator) getSerializer(sim common.Simulator, format string) (seri ret = &serialize.MongoSerializer{} case FormatSiriDB: ret = &serialize.SiriDBSerializer{} + case FormatAkumuli: + ret = serialize.NewAkumuliSerializer() case FormatCrateDB: g.writeHeader(sim) ret = &serialize.CrateDBSerializer{} diff --git a/internal/inputs/generator_queries.go b/internal/inputs/generator_queries.go index e63e3e26a..e36ffdc26 100644 --- a/internal/inputs/generator_queries.go +++ b/internal/inputs/generator_queries.go @@ -10,6 +10,7 @@ import ( "sort" "time" + "github.com/timescale/tsbs/cmd/tsbs_generate_queries/databases/akumuli" "github.com/spf13/pflag" "github.com/timescale/tsbs/cmd/tsbs_generate_queries/databases/cassandra" "github.com/timescale/tsbs/cmd/tsbs_generate_queries/databases/clickhouse" @@ -246,7 +247,8 @@ func (g *QueryGenerator) initFactories() error { return err } - return nil + akumuli := &akumuli.BaseGenerator{} + return g.addFactory(FormatAkumuli, akumuli) } func (g *QueryGenerator) addFactory(database string, factory interface{}) error { diff --git a/internal/inputs/utils.go b/internal/inputs/utils.go index fabe99a62..9b3b7222d 100644 --- a/internal/inputs/utils.go +++ b/internal/inputs/utils.go @@ -16,7 +16,8 @@ const ( FormatMongo = "mongo" FormatSiriDB = "siridb" FormatTimescaleDB = "timescaledb" - FormatCrateDB = "cratedb" + FormatAkumuli = "akumuli" + FormatCrateDB = "cratedb" ) const ( @@ -33,6 +34,7 @@ var formats = []string{ FormatMongo, FormatSiriDB, FormatTimescaleDB, + FormatAkumuli, FormatCrateDB, } diff --git a/scripts/load_akumuli.sh b/scripts/load_akumuli.sh new file mode 100755 index 000000000..d64c4859b --- /dev/null +++ b/scripts/load_akumuli.sh @@ -0,0 +1,27 @@ +#!/bin/bash + +# Ensure loader is available +EXE_FILE_NAME=${EXE_FILE_NAME:-$(which tsbs_load_akumuli)} +if [[ -z "$EXE_FILE_NAME" ]]; then + echo "tsbs_load_akumuli not available. It is not specified explicitly and not found in \$PATH" + exit 1 +fi + +# Load parameters - common +DATA_FILE_NAME=${DATA_FILE_NAME:-akumuli-data.gz} +INGESTION_PORT=${INGESTION_PORT:-8282} +QUERY_PORT=${QUERY_PORT:-8181} + +EXE_DIR=${EXE_DIR:-$(dirname $0)} +source ${EXE_DIR}/load_common.sh + +until curl http://${DATABASE_HOST}:${QUERY_PORT}/api/stats 2>/dev/null; do + echo "Waiting for akumulid" + sleep 1 +done + +# Load new data +cat ${DATA_FILE} | gunzip | $EXE_FILE_NAME \ + --workers=${NUM_WORKERS} \ + --batch-size=${BATCH_SIZE} \ + --endpoint=${DATABASE_HOST}:${INGESTION_PORT} diff --git a/scripts/run_queries_akumuli.sh b/scripts/run_queries_akumuli.sh new file mode 100755 index 000000000..42d5a98aa --- /dev/null +++ b/scripts/run_queries_akumuli.sh @@ -0,0 +1,62 @@ +#!/bin/bash + +# Ensure runner is available +EXE_FILE_NAME=${EXE_FILE_NAME:-$(which tsbs_run_queries_akumuli)} +if [[ -z "$EXE_FILE_NAME" ]]; then + echo "tsbs_run_queries_akumuli not available. It is not specified explicitly and not found in \$PATH" + exit 1 +fi + +# Default queries folder +BULK_DATA_DIR=${BULK_DATA_DIR:-"/tmp/bulk_queries"} +MAX_QUERIES=${MAX_QUERIES:-"0"} +# How many concurrent worker would run queries - match num of cores, or default to 4 +NUM_WORKERS=${NUM_WORKERS:-$(grep -c ^processor /proc/cpuinfo 2> /dev/null || echo 4)} + +# +# Run test for one file +# +function run_file() +{ + # $FULL_DATA_FILE_NAME: /full/path/to/file_with.ext + # $DATA_FILE_NAME: file_with.ext + # $DIR: /full/path/to + # $EXTENSION: ext + # NO_EXT_DATA_FILE_NAME: file_with + FULL_DATA_FILE_NAME=$1 + DATA_FILE_NAME=$(basename -- "${FULL_DATA_FILE_NAME}") + DIR=$(dirname "${FULL_DATA_FILE_NAME}") + EXTENSION="${DATA_FILE_NAME##*.}" + NO_EXT_DATA_FILE_NAME="${DATA_FILE_NAME%.*}" + + # Several options on how to name results file + #OUT_FULL_FILE_NAME="${DIR}/result_${DATA_FILE_NAME}" + OUT_FULL_FILE_NAME="${DIR}/result_${NO_EXT_DATA_FILE_NAME}.out" + #OUT_FULL_FILE_NAME="${DIR}/${NO_EXT_DATA_FILE_NAME}.out" + + if [ "${EXTENSION}" == "gz" ]; then + GUNZIP="gunzip" + else + GUNZIP="cat" + fi + + echo "Running ${DATA_FILE_NAME}" + cat $FULL_DATA_FILE_NAME \ + | $GUNZIP \ + | $EXE_FILE_NAME \ + -max-queries $MAX_QUERIES \ + -workers $NUM_WORKERS \ + | tee $OUT_FULL_FILE_NAME +} + +if [ "$#" -gt 0 ]; then + echo "Have $# files specified as params" + for FULL_DATA_FILE_NAME in "$@"; do + run_file $FULL_DATA_FILE_NAME + done +else + echo "Do not have any files specified - run from default queries folder as ${BULK_DATA_DIR}/queries_akumuli*" + for FULL_DATA_FILE_NAME in "${BULK_DATA_DIR}/queries_akumuli"*; do + run_file $FULL_DATA_FILE_NAME + done +fi