Skip to content

Commit

Permalink
inline tag storage; correct mongo storage
Browse files Browse the repository at this point in the history
  • Loading branch information
rw committed Aug 23, 2016
1 parent 198c02f commit 3d7b8fc
Show file tree
Hide file tree
Showing 5 changed files with 192 additions and 69 deletions.
42 changes: 21 additions & 21 deletions bulk_data_gen/point.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
"bytes"
"encoding/binary"
"fmt"
"io"
Expand Down Expand Up @@ -230,44 +231,44 @@ func (p *Point) SerializeMongo(w io.Writer) (err error) {
// Prepare the series id prefix, which is the set of tags associated
// with this point. The series id prefix is the base of each value's
// particular collection name:
seriesId := bufPool.Get().([]byte)
seriesId = append(seriesId, p.MeasurementName...)
lenBuf := bufPool8.Get().([]byte)
inlineTags := inlineTagsPool.Get().(*bytes.Buffer)
for i := 0; i < len(p.TagKeys); i++ {
seriesId = append(seriesId, charComma)
seriesId = append(seriesId, p.TagKeys[i]...)
seriesId = append(seriesId, charEquals)
seriesId = append(seriesId, p.TagValues[i]...)
sz := len(p.TagKeys[i]) + 1 + len(p.TagValues[i])
binary.LittleEndian.PutUint64(lenBuf, uint64(sz))
inlineTags.Write(lenBuf)
inlineTags.Write(p.TagKeys[i])
inlineTags.WriteByte(charEquals)
inlineTags.Write(p.TagValues[i])
}
seriesIdPrefixLen := len(seriesId)

// Prepare the timestamp, which is the same for each value in this
// Point:
timestampNanos := p.Timestamp.UTC().UnixNano()

// Fetch a flatbuffers builder from a pool:
lenBuf := bufPool8.Get().([]byte)
builder := fbBuilderPool.Get().(*flatbuffers.Builder)

// For each field in this Point, serialize its:
// collection name (series id prefix + the name of the value)
// timestamp in nanos (int64)
// numeric value (int, int64, or float64 -- determined by reflection)
for fieldId := 0; fieldId < len(p.FieldKeys); fieldId++ {
builder.Reset()

fieldName := p.FieldKeys[fieldId]
genericValue := p.FieldValues[fieldId]

// Make the collection name for this value, taking care to
// reuse the seriesId slice:
seriesId = seriesId[:seriesIdPrefixLen]
seriesId = append(seriesId, '#')
seriesId = append(seriesId, fieldName...)

// build the flatbuffer representing this point:
builder.Reset()
seriesIdOffset := builder.CreateByteVector(seriesId)
measurementNameOffset := builder.CreateByteVector(p.MeasurementName)
fieldNameOffset := builder.CreateByteVector(fieldName)
inlineTagsOffset := builder.CreateByteVector(inlineTags.Bytes())

mongo_serialization.ItemStart(builder)
mongo_serialization.ItemAddSeriesId(builder, seriesIdOffset)
mongo_serialization.ItemAddTimestampNanos(builder, timestampNanos)
mongo_serialization.ItemAddMeasurementName(builder, measurementNameOffset)
mongo_serialization.ItemAddInlineTags(builder, inlineTagsOffset)
mongo_serialization.ItemAddFieldName(builder, fieldNameOffset)

switch v := genericValue.(type) {
// (We can't switch on groups of types (e.g. int,int64) because
Expand All @@ -286,7 +287,6 @@ func (p *Point) SerializeMongo(w io.Writer) (err error) {
default:
panic(fmt.Sprintf("logic error in mongo serialization, %s", reflect.TypeOf(v)))
}
mongo_serialization.ItemAddSeriesId(builder, seriesIdOffset)
rootTable := mongo_serialization.ItemEnd(builder)
builder.Finish(rootTable)

Expand All @@ -311,9 +311,9 @@ func (p *Point) SerializeMongo(w io.Writer) (err error) {
builder.Reset()
fbBuilderPool.Put(builder)

// Give the series id byte slice back to a pool:
seriesId = seriesId[:0]
bufPool.Put(seriesId)
// Give the inline tags byte slice back to a pool:
inlineTags.Reset()
inlineTagsPool.Put(inlineTags)

// Give the 8-byte buf back to a pool:
bufPool8.Put(lenBuf)
Expand Down
7 changes: 6 additions & 1 deletion bulk_data_gen/pools.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
"bytes"
"sync"

flatbuffers "github.com/google/flatbuffers/go"
Expand All @@ -16,9 +17,13 @@ var bufPool = &sync.Pool{
return []byte{}
},
}
var inlineTagsPool = &sync.Pool{
New: func() interface{} {
return new(bytes.Buffer)
},
}
var bufPool8 = &sync.Pool{
New: func() interface{} {
return make([]byte, 8)
},
}

89 changes: 55 additions & 34 deletions bulk_load_mongo/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ import (
"time"

flatbuffers "github.com/google/flatbuffers/go"
"github.com/pkg/profile"
"gopkg.in/mgo.v2"
"gopkg.in/mgo.v2/bson"
"github.com/pkg/profile"

"github.com/influxdata/influxdb-comparisons/mongo_serialization"
)
Expand All @@ -28,7 +28,7 @@ var (
daemonUrl string
workers int
batchSize int
limit int64
limit int64
doLoad bool
writeTimeout time.Duration
)
Expand Down Expand Up @@ -184,7 +184,7 @@ func scan(session *mgo.Session, itemsPerBatch int) int64 {
batch = batchPool.Get().(*Batch)
}

_ = start
_ = start
//if itemsRead > 0 && itemsRead%100000 == 0 {
// _ = start
// //took := (time.Now().UnixNano() - start.UnixNano())
Expand All @@ -204,26 +204,27 @@ func scan(session *mgo.Session, itemsPerBatch int) int64 {
}

// processBatches reads byte buffers from batchChan, interprets them and writes
// them to the target server. Note that mgo incurs a lot of overhead.
// them to the target server. Note that mgo forcibly incurs serialization
// overhead (it always encodes to BSON).
func processBatches(session *mgo.Session) {
db := session.DB(dbName)

type pointLong struct {
id struct {
internalSeriesId []byte
timestamp int64
} `bson:"_id" json:"id"`
value int64 `bson:"v" json:"v"`
type PointLong struct {
MeasurementName []byte
FieldName []byte
Timestamp int64
Value int64 `bson:"v"` // json:"v"`
Tags [][]byte
}
type pointDouble struct {
id struct {
internalSeriesId []byte
timestamp int64
} `bson:"_id" json:"id"`
value float64 `bson:"v" json:"v"`
type PointDouble struct {
MeasurementName []byte
FieldName []byte
Timestamp int64
Value float64 `bson:"v"` // json:"v"`
Tags [][]byte
}
plPool := &sync.Pool{New: func() interface{} { return &pointLong{} }}
pdPool := &sync.Pool{New: func() interface{} { return &pointDouble{} }}
plPool := &sync.Pool{New: func() interface{} { return &PointLong{} }}
pdPool := &sync.Pool{New: func() interface{} { return &PointDouble{} }}
pvs := []interface{}{}

item := &mongo_serialization.Item{}
Expand All @@ -243,16 +244,20 @@ func processBatches(session *mgo.Session) {

switch item.ValueType() {
case mongo_serialization.ValueTypeLong:
x := plPool.Get().(*pointLong)
x.id.internalSeriesId = item.SeriesIdBytes()
x.id.timestamp = item.TimestampNanos()
x.value = item.LongValue()
x := plPool.Get().(*PointLong)
x.MeasurementName = item.MeasurementNameBytes()
x.FieldName = item.FieldNameBytes()
x.Timestamp = item.TimestampNanos()
x.Value = item.LongValue()
extractInlineTags(item.InlineTagsBytes(), &x.Tags)
pvs[i] = x
case mongo_serialization.ValueTypeDouble:
x := pdPool.Get().(*pointDouble)
x.id.internalSeriesId = item.SeriesIdBytes()
x.id.timestamp = item.TimestampNanos()
x.value = item.DoubleValue()
x := pdPool.Get().(*PointDouble)
x.MeasurementName = item.MeasurementNameBytes()
x.FieldName = item.FieldNameBytes()
x.Timestamp = item.TimestampNanos()
x.Value = item.DoubleValue()
extractInlineTags(item.InlineTagsBytes(), &x.Tags)
pvs[i] = x
default:
panic("logic error")
Expand All @@ -263,6 +268,12 @@ func processBatches(session *mgo.Session) {
bulk.Insert(pvs...)

if doLoad {
//for i := range pvs {
// err := collection.Insert(pvs[i])
// if err != nil {
// log.Fatalf("Insert err: %s", err.Error())
// }
//}
_, err := bulk.Run()
if err != nil {
log.Fatalf("Bulk err: %s\n", err.Error())
Expand All @@ -273,15 +284,15 @@ func processBatches(session *mgo.Session) {
// cleanup pvs
for _, x := range pvs {
switch x2 := x.(type) {
case *pointLong:
x2.id.internalSeriesId = nil
x2.id.timestamp = 0
x2.value = 0
case *PointLong:
x2.Timestamp = 0
x2.Value = 0
x2.Tags = x2.Tags[:0]
plPool.Put(x2)
case *pointDouble:
x2.id.internalSeriesId = nil
x2.id.timestamp = 0
x2.value = 0
case *PointDouble:
x2.Timestamp = 0
x2.Value = 0
x2.Tags = x2.Tags[:0]
pdPool.Put(x2)
default:
panic("logic error")
Expand Down Expand Up @@ -331,6 +342,16 @@ func mustCreateCollections(daemonUrl string) {
}
}

func extractInlineTags(buf []byte, dst *[][]byte) {
for i := 0; i < len(buf); {
l := int(binary.LittleEndian.Uint64(buf[i : i+8]))
i += 8
b := buf[i : i+l]
*dst = append(*dst, b)
i += l
}
}

//func ensureCollectionExists(session *mgo.Session, name []byte) {
// globalCollectionMappingMutex.RLock()
// _, ok := globalCollectionMapping[unsafeBytesToString(name)]
Expand Down
4 changes: 4 additions & 0 deletions mongo.flatbuffers.idl
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@ enum ValueType:byte { Long = 0, Double = 1 }
table Item {
seriesId:[ubyte];

measurement_name:[ubyte];
inline_tags:[ubyte];
field_name:[ubyte];

timestamp_nanos:long;

value_type:ValueType;
Expand Down
Loading

0 comments on commit 3d7b8fc

Please sign in to comment.