Skip to content

Commit

Permalink
Use copy instead of insert when storing metrics
Browse files Browse the repository at this point in the history
This patch changes processCSI to use COPY to store
metrics instead of multi-value INSERTs.
  • Loading branch information
svenklemm committed Aug 7, 2018
1 parent f6721e9 commit 7bcb12d
Showing 1 changed file with 40 additions and 23 deletions.
63 changes: 40 additions & 23 deletions cmd/tsbs_load_timescaledb/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"bitbucket.org/440-labs/tsbs/load"
"github.com/jmoiron/sqlx"
"github.com/lib/pq"
)

const insertCSI = `INSERT INTO %s(time,tags_id,%s%s,additional_tags) VALUES %s`
Expand Down Expand Up @@ -96,17 +97,16 @@ func insertTags(db *sqlx.DB, tagRows [][]string, returnResults bool) map[string]
}

func (p *processor) processCSI(hypertable string, rows []*insertData) uint64 {
partitionKey := ""
if inTableTag {
partitionKey = tableCols["tags"][0] + ","
}

hypertableCols := strings.Join(tableCols[hypertable], ",")

tagRows := make([][]string, 0, len(rows))
dataRows := make([]string, 0, len(rows))
dataRows := make([][]interface{}, 0, len(rows))
ret := uint64(0)
commonTagsLen := len(tableCols["tags"])

colLen := len(tableCols[hypertable]) + 2
if inTableTag {
colLen++
}

for _, data := range rows {
// Split the tags into individual common tags and an extra bit leftover
// for non-common tags that need to be added separately. For each of
Expand All @@ -116,7 +116,7 @@ func (p *processor) processCSI(hypertable string, rows []*insertData) uint64 {
for i := 0; i < commonTagsLen; i++ {
tags[i] = strings.Split(tags[i], "=")[1]
}
json := "NULL"
var json interface{} = nil
if len(tags) > commonTagsLen {
json = subsystemTagsToJSON(strings.Split(tags[commonTagsLen], ","))
}
Expand All @@ -130,16 +130,14 @@ func (p *processor) processCSI(hypertable string, rows []*insertData) uint64 {
}
ts := time.Unix(0, timeInt).Format("2006-01-02 15:04:05.999999 -0700")

// The last arguments in these sprintf's may seem a bit confusing at first, but
// it does work. We want each value to be surrounded by single quotes (' '), and
// to be separated by a comma. That means we strings.Join them with "', '", which
// leaves the first value without a preceding ' and the last with out a trailing ',
// therefore we put the %s returned by the Join inside of '' to solve the problem
var r string
// use nil at 2nd position as placeholder for tagKey
r := make([]interface{}, 0, colLen)
r = append(r, ts, nil, json)
if inTableTag {
r = fmt.Sprintf("('%s','[REPLACE_CSI]', '%s', '%s', %s)", ts, tags[0], strings.Join(metrics[1:], "', '"), json)
} else {
r = fmt.Sprintf("('%s', '[REPLACE_CSI]', '%s', %s)", ts, strings.Join(metrics[1:], "', '"), json)
r = append(r, tags[0])
}
for _, v := range metrics[1:] {
r = append(r, v)
}

dataRows = append(dataRows, r)
Expand All @@ -165,16 +163,35 @@ func (p *processor) processCSI(hypertable string, rows []*insertData) uint64 {
}

p.csi.mutex.RLock()
for i, r := range dataRows {
// TODO -- support more than 10 common tags
for i := range dataRows {
tagKey := tagRows[i][0]
dataRows[i] = strings.Replace(r, "[REPLACE_CSI]", strconv.FormatInt(p.csi.m[tagKey], 10), 1)
dataRows[i][1] = p.csi.m[tagKey]
}
p.csi.mutex.RUnlock()
tx := p.db.MustBegin()
_ = tx.MustExec(fmt.Sprintf(insertCSI, hypertable, partitionKey, hypertableCols, strings.Join(dataRows, ",")))

err := tx.Commit()
cols := make([]string, 0, colLen)
cols = append(cols, "time", "tags_id", "additional_tags")
if inTableTag {
cols = append(cols, tableCols["tags"][0])
}
cols = append(cols, tableCols[hypertable]...)
stmt, err := tx.Prepare(pq.CopyIn(hypertable, cols...))
for _, r := range dataRows {
stmt.Exec(r...)
}

_, err = stmt.Exec()
if err != nil {
panic(err)
}

err = stmt.Close()
if err != nil {
panic(err)
}

err = tx.Commit()
if err != nil {
panic(err)
}
Expand Down

0 comments on commit 7bcb12d

Please sign in to comment.