Skip to content

Commit

Permalink
Fix serialization for Mongo when using ints; add tests
Browse files Browse the repository at this point in the history
  • Loading branch information
RobAtticus committed Aug 2, 2018
1 parent c2cf2e2 commit 735fe04
Show file tree
Hide file tree
Showing 7 changed files with 237 additions and 29 deletions.
2 changes: 1 addition & 1 deletion cmd/tsbs_generate_data/serialize/cassandra_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ func TestCassandraSerializerSerialize(t *testing.T) {
{
desc: "a regular Point using int as value",
inputPoint: testPointInt,
output: "series_bigint,cpu,hostname=host_0,region=eu-west-1,datacenter=eu-west-1b,usage_guest_nice,2016-01-01,1451606400000000000,38\n",
output: "series_bigint,cpu,hostname=host_0,region=eu-west-1,datacenter=eu-west-1b,usage_guest,2016-01-01,1451606400000000000,38\n",
},
{
desc: "a Point with no tags",
Expand Down
4 changes: 2 additions & 2 deletions cmd/tsbs_generate_data/serialize/influx_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@ func TestInfluxSerializerSerialize(t *testing.T) {
{
desc: "a regular Point using int as value",
inputPoint: testPointInt,
output: "cpu,hostname=host_0,region=eu-west-1,datacenter=eu-west-1b usage_guest_nice=38i 1451606400000000000\n",
output: "cpu,hostname=host_0,region=eu-west-1,datacenter=eu-west-1b usage_guest=38i 1451606400000000000\n",
},
{
desc: "a regular Point with multiple fields",
inputPoint: testPointMultiField,
output: "cpu,hostname=host_0,region=eu-west-1,datacenter=eu-west-1b usage_guest=38i,usage_guest_nice=38.24311829 1451606400000000000\n",
output: "cpu,hostname=host_0,region=eu-west-1,datacenter=eu-west-1b big_usage_guest=5000000000i,usage_guest=38i,usage_guest_nice=38.24311829 1451606400000000000\n",
},
{
desc: "a Point with no tags",
Expand Down
26 changes: 21 additions & 5 deletions cmd/tsbs_generate_data/serialize/mongo.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package serialize

import (
"encoding/binary"
"fmt"
"io"

flatbuffers "github.com/google/flatbuffers/go"
Expand All @@ -27,9 +28,12 @@ func (s *MongoSerializer) Serialize(p *Point, w io.Writer) (err error) {
}

tags := []flatbuffers.UOffsetT{}
for k, v := range tagsMap {
// In order to keep the ordering the same on deserialization, we need
// to go in reverse order since we are prepending rather than appending.
for i := len(p.tagKeys); i > 0; i-- {
k := string(p.tagKeys[i-1])
key := b.CreateString(k)
val := b.CreateString(v)
val := b.CreateString(tagsMap[k])
MongoTagStart(b)
MongoTagAddKey(b, key)
MongoTagAddValue(b, val)
Expand All @@ -42,12 +46,24 @@ func (s *MongoSerializer) Serialize(p *Point, w io.Writer) (err error) {
tagsArr := b.EndVector(len(tags))

fields := []flatbuffers.UOffsetT{}
for k, v := range fieldsMap {
// In order to keep the ordering the same on deserialization, we need
// to go in reverse order since we are prepending rather than appending.
for i := len(p.fieldKeys); i > 0; i-- {
k := string(p.fieldKeys[i-1])
key := b.CreateString(k)
val := v.(float64)
MongoReadingStart(b)
MongoReadingAddKey(b, key)
MongoReadingAddValue(b, val)
v := fieldsMap[k]
switch val := v.(type) {
case float64:
MongoReadingAddValue(b, val)
case int:
MongoReadingAddValue(b, float64(val))
case int64:
MongoReadingAddValue(b, float64(val))
default:
panic(fmt.Sprintf("cannot covert %T to float64", val))
}
fields = append(fields, MongoReadingEnd(b))
}
MongoPointStartFieldsVector(b, len(fields))
Expand Down
178 changes: 178 additions & 0 deletions cmd/tsbs_generate_data/serialize/mongo_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
package serialize

import (
"bufio"
"bytes"
"encoding/binary"
"fmt"
"io"
"log"
"testing"

flatbuffers "github.com/google/flatbuffers/go"
)

func TestMongoSerializerSerialize(t *testing.T) {
type output struct {
name string
ts int64
tagKeys [][]byte
tagVals [][]byte
readingKeys [][]byte
readingVals []interface{}
}
cases := []struct {
desc string
inputPoint *Point
want output
}{
{
desc: "a regular Point",
inputPoint: testPointDefault,
want: output{
name: string(testMeasurement),
ts: testNow.UnixNano(),
tagKeys: testTagKeys,
tagVals: testTagVals,
readingKeys: testPointDefault.fieldKeys,
readingVals: testPointDefault.fieldValues,
},
},
{
desc: "a regular Point using int as value",
inputPoint: testPointInt,
want: output{
name: string(testMeasurement),
ts: testNow.UnixNano(),
tagKeys: testTagKeys,
tagVals: testTagVals,
readingKeys: testPointInt.fieldKeys,
readingVals: testPointInt.fieldValues,
},
},
{
desc: "a regular Point with multiple fields",
inputPoint: testPointMultiField,
want: output{
name: string(testMeasurement),
ts: testNow.UnixNano(),
tagKeys: testTagKeys,
tagVals: testTagVals,
readingKeys: testPointMultiField.fieldKeys,
readingVals: testPointMultiField.fieldValues,
},
},
{
desc: "a Point with no tags",
inputPoint: testPointNoTags,
want: output{
name: string(testMeasurement),
ts: testNow.UnixNano(),
tagKeys: [][]byte{},
tagVals: [][]byte{},
readingKeys: testPointNoTags.fieldKeys,
readingVals: testPointNoTags.fieldValues,
},
},
}

ps := &MongoSerializer{}
for _, c := range cases {
b := new(bytes.Buffer)
ps.Serialize(c.inputPoint, b)
br := bufio.NewReader(bytes.NewReader(b.Bytes()))
mp := deserializeMongo(br)

if got := string(mp.MeasurementName()); got != c.want.name {
t.Errorf("%s: incorrect measreuement name: got %s want %s", c.desc, got, c.want.name)
}
if got := mp.Timestamp(); got != c.want.ts {
t.Errorf("%s: incorrect timestamp: got %d want %d", c.desc, got, c.want.ts)
}

// Verify tags
if got := mp.TagsLength(); got != len(c.want.tagKeys) {
t.Errorf("%s: incorrect tag keys length: got %d want %d", c.desc, got, len(c.want.tagKeys))
}
if got := mp.TagsLength(); got != len(c.want.tagVals) {
t.Errorf("%s: incorrect tag vals length: got %d want %d", c.desc, got, len(c.want.tagVals))
}
tag := &MongoTag{}
for i := 0; i < mp.TagsLength(); i++ {
mp.Tags(tag, i)
want := string(c.want.tagKeys[i])
if got := string(tag.Key()); got != want {
t.Errorf("%s: incorrect tag key %d: got %s want %s", c.desc, i, got, want)
}
want = string(c.want.tagVals[i])
if got := string(tag.Value()); got != want {
t.Errorf("%s: incorrect tag val %d: got %s want %s", c.desc, i, got, want)
}
}

// Verify fields
if got := mp.FieldsLength(); got != len(c.want.readingKeys) {
t.Errorf("%s: incorrect reading keys length: got %d want %d", c.desc, got, len(c.want.readingKeys))
}
if got := mp.FieldsLength(); got != len(c.want.readingVals) {
t.Errorf("%s: incorrect reading vals length: got %d want %d", c.desc, got, len(c.want.readingVals))
}

reading := &MongoReading{}
for i := 0; i < mp.FieldsLength(); i++ {
mp.Fields(reading, i)
want := string(c.want.readingKeys[i])
if got := string(reading.Key()); got != want {
t.Errorf("%s: incorrect reading key %d: got %s want %s", c.desc, i, got, want)
}

var wantVal float64
switch x := c.want.readingVals[i].(type) {
case int:
wantVal = float64(x)
case int64:
wantVal = float64(x)
case float64:
wantVal = x
}
if got := reading.Value(); got != wantVal {
t.Errorf("%s: incorrect reading val %d: got %v want %v", c.desc, i, got, wantVal)
}
}
}
}

func deserializeMongo(r *bufio.Reader) *MongoPoint {
item := &MongoPoint{}
lenBuf := make([]byte, 8)

_, err := r.Read(lenBuf)
if err == io.EOF {
return nil
}
if err != nil {
log.Fatal(err.Error())
}

// ensure correct len of receiving buffer
l := int(binary.LittleEndian.Uint64(lenBuf))
itemBuf := make([]byte, l)

// read the bytes and init the flatbuffer object
totRead := 0
for totRead < l {
m, err := r.Read(itemBuf[totRead:])
// (EOF is also fatal)
if err != nil {
log.Fatal(err.Error())
}
totRead += m
}
if totRead != len(itemBuf) {
panic(fmt.Sprintf("reader/writer logic error, %d != %d", totRead, len(itemBuf)))
}
n := flatbuffers.GetUOffsetT(itemBuf)
item.Init(itemBuf, n)

return item
}
52 changes: 33 additions & 19 deletions cmd/tsbs_generate_data/serialize/point_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,42 +6,56 @@ import (
"time"
)

var testNow = time.Unix(1451606400, 0)
var (
testNow = time.Unix(1451606400, 0)
testMeasurement = []byte("cpu")
testTagKeys = [][]byte{[]byte("hostname"), []byte("region"), []byte("datacenter")}
testTagVals = [][]byte{[]byte("host_0"), []byte("eu-west-1"), []byte("eu-west-1b")}
testColFloat = []byte("usage_guest_nice")
testColInt = []byte("usage_guest")
testColInt64 = []byte("big_usage_guest")
)

const (
testFloat = float64(38.24311829)
testInt = 38
testInt64 = int64(5000000000)
)

var testPointDefault = &Point{
measurementName: []byte("cpu"),
tagKeys: [][]byte{[]byte("hostname"), []byte("region"), []byte("datacenter")},
tagValues: [][]byte{[]byte("host_0"), []byte("eu-west-1"), []byte("eu-west-1b")},
measurementName: testMeasurement,
tagKeys: testTagKeys,
tagValues: testTagVals,
timestamp: &testNow,
fieldKeys: [][]byte{[]byte("usage_guest_nice")},
fieldValues: []interface{}{float64(38.24311829)},
fieldKeys: [][]byte{testColFloat},
fieldValues: []interface{}{testFloat},
}

var testPointMultiField = &Point{
measurementName: []byte("cpu"),
tagKeys: [][]byte{[]byte("hostname"), []byte("region"), []byte("datacenter")},
tagValues: [][]byte{[]byte("host_0"), []byte("eu-west-1"), []byte("eu-west-1b")},
measurementName: testMeasurement,
tagKeys: testTagKeys,
tagValues: testTagVals,
timestamp: &testNow,
fieldKeys: [][]byte{[]byte("usage_guest"), []byte("usage_guest_nice")},
fieldValues: []interface{}{38, float64(38.24311829)},
fieldKeys: [][]byte{testColInt64, testColInt, testColFloat},
fieldValues: []interface{}{testInt64, testInt, testFloat},
}

var testPointInt = &Point{
measurementName: []byte("cpu"),
tagKeys: [][]byte{[]byte("hostname"), []byte("region"), []byte("datacenter")},
tagValues: [][]byte{[]byte("host_0"), []byte("eu-west-1"), []byte("eu-west-1b")},
measurementName: testMeasurement,
tagKeys: testTagKeys,
tagValues: testTagVals,
timestamp: &testNow,
fieldKeys: [][]byte{[]byte("usage_guest_nice")},
fieldValues: []interface{}{38},
fieldKeys: [][]byte{testColInt},
fieldValues: []interface{}{testInt},
}

var testPointNoTags = &Point{
measurementName: []byte("cpu"),
measurementName: testMeasurement,
tagKeys: [][]byte{},
tagValues: [][]byte{},
timestamp: &testNow,
fieldKeys: [][]byte{[]byte("usage_guest_nice")},
fieldValues: []interface{}{float64(38.24311829)},
fieldKeys: [][]byte{testColFloat},
fieldValues: []interface{}{testFloat},
}

type serializeCase struct {
Expand Down
2 changes: 1 addition & 1 deletion cmd/tsbs_generate_data/serialize/timescaledb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func TestTimescaleDBSerializerSerialize(t *testing.T) {
{
desc: "a regular Point with multiple fields",
inputPoint: testPointMultiField,
output: "tags,host_0,eu-west-1,eu-west-1b\ncpu,1451606400000000000,38,38.24311829\n",
output: "tags,host_0,eu-west-1,eu-west-1b\ncpu,1451606400000000000,5000000000,38,38.24311829\n",
},
{
desc: "a Point with no tags",
Expand Down
2 changes: 1 addition & 1 deletion cmd/tsbs_load_mongo/aggregate_loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ func (p *aggProcessor) ProcessBatch(b load.Batch, doLoad bool) (uint64, uint64)
// Determine which document this event belongs too
ts := event.Timestamp()
dateKey := time.Unix(0, ts).UTC().Format(aggDateFmt)
docKey := fmt.Sprintf("day_%s_%s", tagsMap["hostname"], dateKey)
docKey := fmt.Sprintf("day_%s_%s_%s", tagsMap["hostname"], dateKey, string(event.MeasurementName()))

// Check that it has been created using a cached map, if not, add
// to creation queue
Expand Down

0 comments on commit 735fe04

Please sign in to comment.