Skip to content

Commit

Permalink
Merged in sven/copy (pull request timescale#85)
Browse files Browse the repository at this point in the history
Use copy instead of insert when storing metrics

Approved-by: RobAtticus NA <rob.kiefer@gmail.com>
Approved-by: Lee Hampton <leejhampton@gmail.com>
  • Loading branch information
svenklemm committed Aug 7, 2018
2 parents f6721e9 + 7bcb12d commit de13239
Showing 1 changed file with 40 additions and 23 deletions.
63 changes: 40 additions & 23 deletions cmd/tsbs_load_timescaledb/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"bitbucket.org/440-labs/tsbs/load"
"github.com/jmoiron/sqlx"
"github.com/lib/pq"
)

const insertCSI = `INSERT INTO %s(time,tags_id,%s%s,additional_tags) VALUES %s`
Expand Down Expand Up @@ -96,17 +97,16 @@ func insertTags(db *sqlx.DB, tagRows [][]string, returnResults bool) map[string]
}

func (p *processor) processCSI(hypertable string, rows []*insertData) uint64 {
partitionKey := ""
if inTableTag {
partitionKey = tableCols["tags"][0] + ","
}

hypertableCols := strings.Join(tableCols[hypertable], ",")

tagRows := make([][]string, 0, len(rows))
dataRows := make([]string, 0, len(rows))
dataRows := make([][]interface{}, 0, len(rows))
ret := uint64(0)
commonTagsLen := len(tableCols["tags"])

colLen := len(tableCols[hypertable]) + 2
if inTableTag {
colLen++
}

for _, data := range rows {
// Split the tags into individual common tags and an extra bit leftover
// for non-common tags that need to be added separately. For each of
Expand All @@ -116,7 +116,7 @@ func (p *processor) processCSI(hypertable string, rows []*insertData) uint64 {
for i := 0; i < commonTagsLen; i++ {
tags[i] = strings.Split(tags[i], "=")[1]
}
json := "NULL"
var json interface{} = nil
if len(tags) > commonTagsLen {
json = subsystemTagsToJSON(strings.Split(tags[commonTagsLen], ","))
}
Expand All @@ -130,16 +130,14 @@ func (p *processor) processCSI(hypertable string, rows []*insertData) uint64 {
}
ts := time.Unix(0, timeInt).Format("2006-01-02 15:04:05.999999 -0700")

// The last arguments in these sprintf's may seem a bit confusing at first, but
// it does work. We want each value to be surrounded by single quotes (' '), and
// to be separated by a comma. That means we strings.Join them with "', '", which
// leaves the first value without a preceding ' and the last with out a trailing ',
// therefore we put the %s returned by the Join inside of '' to solve the problem
var r string
// use nil at 2nd position as placeholder for tagKey
r := make([]interface{}, 0, colLen)
r = append(r, ts, nil, json)
if inTableTag {
r = fmt.Sprintf("('%s','[REPLACE_CSI]', '%s', '%s', %s)", ts, tags[0], strings.Join(metrics[1:], "', '"), json)
} else {
r = fmt.Sprintf("('%s', '[REPLACE_CSI]', '%s', %s)", ts, strings.Join(metrics[1:], "', '"), json)
r = append(r, tags[0])
}
for _, v := range metrics[1:] {
r = append(r, v)
}

dataRows = append(dataRows, r)
Expand All @@ -165,16 +163,35 @@ func (p *processor) processCSI(hypertable string, rows []*insertData) uint64 {
}

p.csi.mutex.RLock()
for i, r := range dataRows {
// TODO -- support more than 10 common tags
for i := range dataRows {
tagKey := tagRows[i][0]
dataRows[i] = strings.Replace(r, "[REPLACE_CSI]", strconv.FormatInt(p.csi.m[tagKey], 10), 1)
dataRows[i][1] = p.csi.m[tagKey]
}
p.csi.mutex.RUnlock()
tx := p.db.MustBegin()
_ = tx.MustExec(fmt.Sprintf(insertCSI, hypertable, partitionKey, hypertableCols, strings.Join(dataRows, ",")))

err := tx.Commit()
cols := make([]string, 0, colLen)
cols = append(cols, "time", "tags_id", "additional_tags")
if inTableTag {
cols = append(cols, tableCols["tags"][0])
}
cols = append(cols, tableCols[hypertable]...)
stmt, err := tx.Prepare(pq.CopyIn(hypertable, cols...))
for _, r := range dataRows {
stmt.Exec(r...)
}

_, err = stmt.Exec()
if err != nil {
panic(err)
}

err = stmt.Close()
if err != nil {
panic(err)
}

err = tx.Commit()
if err != nil {
panic(err)
}
Expand Down

0 comments on commit de13239

Please sign in to comment.