Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Akumuli database support #73

Merged
merged 39 commits into from
Dec 28, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
6695176
Add akumuli data generator
Lazin Apr 14, 2019
bae2b53
Update akumuli serializer
Lazin Apr 15, 2019
35a1be2
Add cueues for data loader
Lazin Apr 16, 2019
3c0e354
Remove SerializerCloser interface
Lazin Apr 16, 2019
e99f2f4
Use encoding/binary package instead of unsafe
Lazin Apr 17, 2019
d8900b1
Add tsbs_load_akumuli
Lazin Apr 18, 2019
f5cd2c9
Add akumuli_writer component
Lazin Apr 19, 2019
6d7dafb
Implement processor
Lazin Apr 19, 2019
827a211
Add load script
Lazin Apr 19, 2019
5b6ff8b
Fix load generator for akumuli
Lazin Apr 21, 2019
d291b76
Improve akumuli loader script
Lazin Apr 21, 2019
b1ec9a4
Fix concurrency issue
Lazin Apr 22, 2019
b7edd9e
Simplify loader
Lazin Apr 22, 2019
ddc0d1c
Add Akumuli load generator
Lazin Apr 24, 2019
8819ac8
Add query generator for Akumuli
Lazin Apr 24, 2019
f3a1bcd
Use json package to generate queries
Lazin Apr 28, 2019
c3df4dd
Add lastpoint query support
Lazin May 4, 2019
4086451
Add double-groupby query generator
Lazin May 11, 2019
f3ac7e5
Add query runner for Akumuli
Lazin May 17, 2019
bdbee20
Add serializer unit-test
Lazin May 18, 2019
72407ba
Add akumuli.md and clean up flags
Lazin May 18, 2019
ca91dc6
Add more details to docs/akumuli.md
Lazin May 18, 2019
6bab456
Update akumuli.md
Lazin May 18, 2019
e96d10a
Remove unused parameter
Lazin Jun 17, 2019
8e5bcfc
Merge branch 'master' of github.com:Lazin/tsbs
Lazin Jun 17, 2019
f876756
Merge branch 'master' of github.com:timescale/tsbs
Lazin Jun 18, 2019
4561616
Update akumuli query gen
Lazin Jun 18, 2019
b01fbba
Fix format string
Lazin Jun 19, 2019
7d07611
Merge branch 'master' of github.com:timescale/tsbs
Lazin Sep 9, 2019
0a166e8
Use utils.QueryGenerator interface
Lazin Sep 9, 2019
bfee78c
Fix akumuli unit-test
Lazin Sep 18, 2019
3b5d592
Merge branch 'master' of github.com:timescale/tsbs
Lazin Sep 18, 2019
c118f47
Fix review issues
Lazin Oct 9, 2019
d4c1091
Update tsbs_load_akumuli
Lazin Oct 17, 2019
b8a9639
Update tsbs_run_queries_akumuli
Lazin Oct 17, 2019
ec2f47c
Merge branch 'master' of github.com:timescale/tsbs
Lazin Oct 21, 2019
d246c46
Update akumuli.md
Lazin Oct 21, 2019
e4ace97
Merge branch 'master' into master
Lazin Nov 1, 2019
3429e43
Fix #73 code-review issues
Lazin Nov 26, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ Current databases supported:
+ ClickHouse [(supplemental docs)](docs/clickhouse.md)
+ CrateDB [(supplemental docs)](docs/cratedb.md)
+ SiriDB [(supplemental docs)](docs/siridb.md)
+ Akumuli [(supplemental docs)](docs/akumuli.md)

## Overview

Expand Down
141 changes: 141 additions & 0 deletions cmd/tsbs_generate_data/serialize/akumuli.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
package serialize

import (
"encoding/binary"
"errors"
"fmt"
"io"
)

const (
placeholderText = "AAAAFFEE"
)

// AkumuliSerializer writes a series of Point elements into RESP encoded
// buffer.
type AkumuliSerializer struct {
book map[string]uint32
bookClosed bool
deferred []byte
index uint32
}

// NewAkumuliSerializer initializes AkumuliSerializer instance.
func NewAkumuliSerializer() *AkumuliSerializer {
s := &AkumuliSerializer{}
s.book = make(map[string]uint32)
s.deferred = make([]byte, 0, 4096)
s.bookClosed = false
return s
}

// Serialize writes Point data to the given writer, conforming to the
// AKUMULI RESP protocol. Serializer adds extra data to guide data loader.
// This function writes output that contains binary and text data in RESP format.
func (s *AkumuliSerializer) Serialize(p *Point, w io.Writer) (err error) {
deferPoint := false

buf := make([]byte, 0, 1024)
// Add cue
const HeaderLength = 8
buf = append(buf, placeholderText...)
buf = append(buf, "+"...)

// Series name
for i := 0; i < len(p.fieldKeys); i++ {
buf = append(buf, p.measurementName...)
buf = append(buf, '.')
buf = append(buf, p.fieldKeys[i]...)
if i+1 < len(p.fieldKeys) {
buf = append(buf, '|')
} else {
buf = append(buf, ' ')
}
}

for i := 0; i < len(p.tagKeys); i++ {
buf = append(buf, ' ')
buf = append(buf, p.tagKeys[i]...)
buf = append(buf, '=')
buf = append(buf, p.tagValues[i].(string)...)
}

series := string(buf[HeaderLength:])
if !s.bookClosed {
// Save point for later
if id, ok := s.book[series]; ok {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this if/else is a bit long, maybe you can extract parts of it to small functions

s.bookClosed = true
_, err = w.Write(s.deferred)
if err != nil {
return err
}
buf = buf[:HeaderLength]
buf = append(buf, fmt.Sprintf(":%d", id)...)
binary.LittleEndian.PutUint32(buf[:4], id)
} else {
// Shortcut
s.index++
tmp := make([]byte, 0, 1024)
tmp = append(tmp, placeholderText...)
tmp = append(tmp, "*2\n"...)
tmp = append(tmp, buf[HeaderLength:]...)
tmp = append(tmp, '\n')
tmp = append(tmp, fmt.Sprintf(":%d\n", s.index)...)
s.book[series] = s.index
// Update cue
binary.LittleEndian.PutUint16(tmp[4:6], uint16(len(tmp)))
binary.LittleEndian.PutUint16(tmp[6:HeaderLength], uint16(0))
binary.LittleEndian.PutUint32(tmp[:4], s.index)
binary.LittleEndian.PutUint32(buf[:4], s.index)
_, err = w.Write(tmp)
if err != nil {
return err
}
deferPoint = true
buf = buf[:HeaderLength]
buf = append(buf, fmt.Sprintf(":%d", s.index)...)
}
} else {
// Replace the series name with the value from the book
if id, ok := s.book[series]; ok {
buf = buf[:HeaderLength]
buf = append(buf, fmt.Sprintf(":%d", id)...)
binary.LittleEndian.PutUint16(buf[4:6], uint16(len(buf)))
binary.LittleEndian.PutUint16(buf[6:HeaderLength], uint16(0))
binary.LittleEndian.PutUint32(buf[:4], id)
} else {
return errors.New("unexpected series name")
}
}

buf = append(buf, '\n')

// Timestamp
buf = append(buf, ':')
buf = fastFormatAppend(p.timestamp.UTC().UnixNano(), buf)
buf = append(buf, '\n')

// Values
buf = append(buf, fmt.Sprintf("*%d\n", len(p.fieldValues))...)
for i := 0; i < len(p.fieldValues); i++ {
v := p.fieldValues[i]
switch v.(type) {
case int, int64:
buf = append(buf, ':')
case float64:
buf = append(buf, '+')
}
buf = fastFormatAppend(v, buf)
buf = append(buf, '\n')
}

// Update cue
binary.LittleEndian.PutUint16(buf[4:6], uint16(len(buf)))
binary.LittleEndian.PutUint16(buf[6:HeaderLength], uint16(len(p.fieldValues)))
if deferPoint {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is probably more go-like written as:

if deferPoint {
  s.deferred = append(s.deferred, buf...)
  return nil
}
_, err = w.Write(buf)
return err

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

s.deferred = append(s.deferred, buf...)
return nil
}
_, err = w.Write(buf)
return err
}
78 changes: 78 additions & 0 deletions cmd/tsbs_generate_data/serialize/akumuli_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package serialize

import (
"bytes"
"strings"
"testing"
)

func TestAkumuliSerializerSerialize(t *testing.T) {

serializer := NewAkumuliSerializer()

points := []*Point{
testPointDefault,
testPointInt,
testPointMultiField,
testPointDefault,
testPointInt,
testPointMultiField,
}

type testCase struct {
expCount int
expValue string
name string
}

cases := []testCase{
{
expCount: 1,
expValue: "+cpu.usage_guest_nice hostname=host_0 region=eu-west-1 datacenter=eu-west-1b",
name: "series name default",
},
{
expCount: 1,
expValue: "+cpu.usage_guest hostname=host_0 region=eu-west-1 datacenter=eu-west-1b",
name: "series name int",
},
{
expCount: 1,
expValue: "+cpu.big_usage_guest|cpu.usage_guest|cpu.usage_guest_nice hostname=host_0 region=eu-west-1 datacenter=eu-west-1b",
name: "series name multi-field",
},
{
expCount: 2,
expValue: "*1\n+38.24311829",
name: "value default",
},
{
expCount: 2,
expValue: "*1\n:38",
name: "value int",
},
{
expCount: 2,
expValue: "*3\n:5000000000\n:38\n+38.24311829",
name: "value multi-field",
},
{
expCount: 6,
expValue: ":1451606400000000000",
name: "timestamp",
},
}
buf := new(bytes.Buffer)
for _, point := range points {
serializer.Serialize(point, buf)
}

got := buf.String()

for _, c := range cases {
actualCnt := strings.Count(got, c.expValue)
if actualCnt != c.expCount {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the serialized content is not tested/asserted

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's checked on line 73. If one of the test samples is not present in the output required number of times it will fail.

t.Errorf("Output incorrect: %s expected %d times got %d times", c.name, c.expCount, actualCnt)
}
}
}
45 changes: 45 additions & 0 deletions cmd/tsbs_generate_queries/databases/akumuli/common.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package akumuli

import (
"time"

"github.com/timescale/tsbs/cmd/tsbs_generate_queries/uses/devops"
"github.com/timescale/tsbs/cmd/tsbs_generate_queries/utils"
"github.com/timescale/tsbs/query"
)

// BaseGenerator contains settings specific for Akumuli database.
type BaseGenerator struct {
}

// GenerateEmptyQuery returns an empty query.HTTP
func (d *Devops) GenerateEmptyQuery() query.Query {
return query.NewHTTP()
}

// fillInQuery fills the query struct with data.
func (g *BaseGenerator) fillInQuery(qi query.Query, humanLabel, humanDesc, body string, begin, end int64) {
q := qi.(*query.HTTP)
q.HumanLabel = []byte(humanLabel)
q.HumanDescription = []byte(humanDesc)
q.Method = []byte("POST")
q.Path = []byte("/api/query")
q.Body = []byte(body)
q.StartTimestamp = begin
q.EndTimestamp = end
}

// NewDevops makes an Devops object ready to generate Queries.
func (g *BaseGenerator) NewDevops(start, end time.Time, scale int) (utils.QueryGenerator, error) {
core, err := devops.NewCore(start, end, scale)
if err != nil {
return nil, err
}

devops := &Devops{
BaseGenerator: g,
Core: core,
}

return devops, nil
}
Loading