Skip to content

Commit

Permalink
Merged in rrk/fix-load-ts-header (pull request timescale#78)
Browse files Browse the repository at this point in the history
Fix incorrect header length assumption in TimescaleDB loader

Approved-by: Lee Hampton <leejhampton@gmail.com>
  • Loading branch information
RobAtticus committed Aug 2, 2018
2 parents 4ee1731 + 0ac2918 commit 39fda35
Show file tree
Hide file tree
Showing 2 changed files with 91 additions and 72 deletions.
129 changes: 67 additions & 62 deletions cmd/tsbs_load_timescaledb/creator.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (

type dbCreator struct {
tags string
cols string
cols []string
connStr string
}

Expand All @@ -25,28 +25,31 @@ func (d *dbCreator) Init() {
}

func (d *dbCreator) readDataHeader(br *bufio.Reader) {
// First three lines are header, with the first line containing the tags
// and their names, the second line containing the column names, and
// third line being blank to separate from the data
for i := 0; i < 3; i++ {
// First N lines are header, with the first line containing the tags
// and their names, the second through N-1 line containing the column
// names, and last line being blank to separate from the data
i := 0
for {
var err error
var empty string
var line string
if i == 0 {
d.tags, err = br.ReadString('\n')
if err != nil {
fatal("input has wrong header format: %v", err)
}
d.tags = strings.TrimSpace(d.tags)
} else if i == 1 {
d.cols, err = br.ReadString('\n')
d.cols = strings.TrimSpace(d.cols)
} else {
empty, err = br.ReadString('\n')
empty = strings.TrimSpace(empty)
if len(empty) > 0 {
fatal("input has wrong header format: third line is not blank")
line, err = br.ReadString('\n')
if err != nil {
fatal("input has wrong header format: %v", err)
}
line = strings.TrimSpace(line)
if len(line) == 0 {
break
}
d.cols = append(d.cols, line)
}
if err != nil {
fatal("input has wrong header format: %v", err)
}
i++
}
}

Expand Down Expand Up @@ -80,60 +83,62 @@ func (d *dbCreator) CreateDB(dbName string) error {
createTagsTable(dbBench, parts[1:])
tableCols["tags"] = parts[1:]

parts = strings.Split(strings.TrimSpace(d.cols), ",")
hypertable := parts[0]
partitioningField := tableCols["tags"][0]
tableCols[hypertable] = parts[1:]
for _, cols := range d.cols {
parts = strings.Split(strings.TrimSpace(cols), ",")
hypertable := parts[0]
partitioningField := tableCols["tags"][0]
tableCols[hypertable] = parts[1:]

psuedoCols := []string{}
if inTableTag {
psuedoCols = append(psuedoCols, partitioningField)
}
psuedoCols := []string{}
if inTableTag {
psuedoCols = append(psuedoCols, partitioningField)
}

fieldDef := []string{}
indexes := []string{}
psuedoCols = append(psuedoCols, parts[1:]...)
extraCols := 0 // set to 1 when hostname is kept in-table
for idx, field := range psuedoCols {
if len(field) == 0 {
continue
fieldDef := []string{}
indexes := []string{}
psuedoCols = append(psuedoCols, parts[1:]...)
extraCols := 0 // set to 1 when hostname is kept in-table
for idx, field := range psuedoCols {
if len(field) == 0 {
continue
}
fieldType := "DOUBLE PRECISION"
idxType := fieldIndex
if inTableTag && idx == 0 {
fieldType = "TEXT"
idxType = ""
extraCols = 1
}

fieldDef = append(fieldDef, fmt.Sprintf("%s %s", field, fieldType))
if fieldIndexCount == -1 || idx < (fieldIndexCount+extraCols) {
indexes = append(indexes, d.getCreateIndexOnFieldCmds(hypertable, field, idxType)...)
}
}
fieldType := "DOUBLE PRECISION"
idxType := fieldIndex
if inTableTag && idx == 0 {
fieldType = "TEXT"
idxType = ""
extraCols = 1
dbBench.MustExec(fmt.Sprintf("CREATE TABLE %s (time timestamptz, tags_id integer, %s)", hypertable, strings.Join(fieldDef, ",")))
if partitionIndex {
dbBench.MustExec(fmt.Sprintf("CREATE INDEX ON %s(tags_id, \"time\" DESC)", hypertable))
}

fieldDef = append(fieldDef, fmt.Sprintf("%s %s", field, fieldType))
if fieldIndexCount == -1 || idx < (fieldIndexCount+extraCols) {
indexes = append(indexes, d.getCreateIndexOnFieldCmds(hypertable, field, idxType)...)
// Only allow one or the other, it's probably never right to have both.
// Experimentation suggests (so far) that for 100k devices it is better to
// use --time-partition-index for reduced index lock contention.
if timePartitionIndex {
dbBench.MustExec(fmt.Sprintf("CREATE INDEX ON %s(\"time\" DESC, tags_id)", hypertable))
} else if timeIndex {
dbBench.MustExec(fmt.Sprintf("CREATE INDEX ON %s(\"time\" DESC)", hypertable))
}
}
dbBench.MustExec(fmt.Sprintf("CREATE TABLE %s (time timestamptz, tags_id integer, %s)", hypertable, strings.Join(fieldDef, ",")))
if partitionIndex {
dbBench.MustExec(fmt.Sprintf("CREATE INDEX ON %s(tags_id, \"time\" DESC)", hypertable))
}

// Only allow one or the other, it's probably never right to have both.
// Experimentation suggests (so far) that for 100k devices it is better to
// use --time-partition-index for reduced index lock contention.
if timePartitionIndex {
dbBench.MustExec(fmt.Sprintf("CREATE INDEX ON %s(\"time\" DESC, tags_id)", hypertable))
} else if timeIndex {
dbBench.MustExec(fmt.Sprintf("CREATE INDEX ON %s(\"time\" DESC)", hypertable))
}

for _, idxDef := range indexes {
dbBench.MustExec(idxDef)
}
for _, idxDef := range indexes {
dbBench.MustExec(idxDef)
}

if useHypertable {
dbBench.MustExec("CREATE EXTENSION IF NOT EXISTS timescaledb CASCADE")
dbBench.MustExec(
fmt.Sprintf("SELECT create_hypertable('%s'::regclass, 'time'::name, partitioning_column => '%s'::name, number_partitions => %v::smallint, chunk_time_interval => %d, create_default_indexes=>FALSE)",
hypertable, "tags_id", numberPartitions, chunkTime.Nanoseconds()/1000))
if useHypertable {
dbBench.MustExec("CREATE EXTENSION IF NOT EXISTS timescaledb CASCADE")
dbBench.MustExec(
fmt.Sprintf("SELECT create_hypertable('%s'::regclass, 'time'::name, partitioning_column => '%s'::name, number_partitions => %v::smallint, chunk_time_interval => %d, create_default_indexes=>FALSE)",
hypertable, "tags_id", numberPartitions, chunkTime.Nanoseconds()/1000))
}
}

return nil
Expand Down
34 changes: 24 additions & 10 deletions cmd/tsbs_load_timescaledb/creator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,28 +12,37 @@ func TestDBCreatorReadDataHeader(t *testing.T) {
desc string
input string
wantTags string
wantCols string
wantCols []string
wantBuffered int
shouldFatal bool
}{
{
desc: "exactly three lines",
desc: "min case: exactly three lines",
input: "tags,tag1,tag2\ncols,col1,col2\n\n",
wantTags: "tags,tag1,tag2",
wantCols: "cols,col1,col2",
wantCols: []string{"cols,col1,col2"},
wantBuffered: 0,
},
{
desc: "more than the header 3 lines",
desc: "min case: more than the header 3 lines",
input: "tags,tag1,tag2\ncols,col1,col2\n\nrow1\nrow2\n",
wantTags: "tags,tag1,tag2",
wantCols: "cols,col1,col2",
wantCols: []string{"cols,col1,col2"},
wantBuffered: len([]byte("row1\nrow2\n")),
},
{
desc: "non-empty 3rd line",
input: "tags\ncols\nfoo\n",
shouldFatal: true,
desc: "multiple tables: more than 3 lines for header",
input: "tags,tag1,tag2\ncols,col1,col2\ncols2,col21,col22\n\n",
wantTags: "tags,tag1,tag2",
wantCols: []string{"cols,col1,col2", "cols2,col21,col22"},
wantBuffered: 0,
},
{
desc: "multiple tables: more than 3 lines for header w/ extra",
input: "tags,tag1,tag2\ncols,col1,col2\ncols2,col21,col22\n\nrow1\nrow2\n",
wantTags: "tags,tag1,tag2",
wantCols: []string{"cols,col1,col2", "cols2,col21,col22"},
wantBuffered: len([]byte("row1\nrow2\n")),
},
{
desc: "too few lines",
Expand Down Expand Up @@ -65,8 +74,13 @@ func TestDBCreatorReadDataHeader(t *testing.T) {
if dbc.tags != c.wantTags {
t.Errorf("%s: incorrect tags: got\n%s\nwant\n%s", c.desc, dbc.tags, c.wantTags)
}
if dbc.cols != c.wantCols {
t.Errorf("%s: incorrect cols: got\n%s\nwant\n%s", c.desc, dbc.cols, c.wantCols)
if len(dbc.cols) != len(c.wantCols) {
t.Errorf("%s: incorrect cols len: got %d want %d", c.desc, len(dbc.cols), len(c.wantCols))
}
for i := range dbc.cols {
if got := dbc.cols[i]; got != c.wantCols[i] {
t.Errorf("%s: cols row %d incorrect: got\n%s\nwant\n%s\n", c.desc, i, got, c.wantCols[i])
}
}
if br.Buffered() != c.wantBuffered {
t.Errorf("%s: incorrect amt buffered: got\n%d\nwant\n%d", c.desc, br.Buffered(), c.wantBuffered)
Expand Down

0 comments on commit 39fda35

Please sign in to comment.