Skip to content

Commit

Permalink
Fix parsing columns when --do-create-db=false
Browse files Browse the repository at this point in the history
This fixes a bug where the PostCreateDB function would exit early when
the user set --do-create-db=false and/or --create-metrics-table=False.
This early exit caused TSBS to skip the updating of some global caches,
which broke assumptions in other parts of the codebase.

This commit also refactors the PostCreateDB function to split the
parsing of columns and the potential creation of tables and indexes into
separate functions. This makes it easier to test the functions in
isolation and cleaner to create the conditional create-table logic that is
at the heart of this bug.

While this does add tests to the parsing function, the create
tables/index function remains untested. This is left for a later PR that
will hopefully clean up global state and provide a more comprehensive
framework for testing IO.
  • Loading branch information
LeeHampton committed Apr 25, 2019
1 parent addc791 commit 19932e7
Show file tree
Hide file tree
Showing 2 changed files with 170 additions and 70 deletions.
167 changes: 97 additions & 70 deletions cmd/tsbs_load_timescaledb/creator.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,93 +124,105 @@ func (d *dbCreator) CreateDB(dbName string) error {
}

func (d *dbCreator) PostCreateDB(dbName string) error {
if !createMetricsTable {
return nil
}

dbBench := MustConnect(driver, getConnectString())
defer dbBench.Close()

parts := strings.Split(strings.TrimSpace(d.tags), ",")
if parts[0] != tagsKey {
return fmt.Errorf("input header in wrong format. got '%s', expected 'tags'", parts[0])
tags := strings.Split(strings.TrimSpace(d.tags), ",")
if tags[0] != tagsKey {
return fmt.Errorf("input header in wrong format. got '%s', expected 'tags'", tags[0])
}
if createMetricsTable {
createTagsTable(dbBench, tags[1:])
}
createTagsTable(dbBench, parts[1:])
tableCols[tagsKey] = parts[1:]
// tableCols is a global map. Globally cache the available tags
tableCols[tagsKey] = tags[1:]

for _, cols := range d.cols {
parts = strings.Split(strings.TrimSpace(cols), ",")
hypertable := parts[0]
partitioningField := tableCols[tagsKey][0]
tableCols[hypertable] = parts[1:]
// Each table is defined in the dbCreator 'cols' list. The definition consists of a
// comma separated list of the table name followed by its columns. Iterate over each
// definition to update our global cache and create the requisite tables and indexes
for _, tableDef := range d.cols {
columns := strings.Split(strings.TrimSpace(tableDef), ",")
tableName := columns[0]
// tableCols is a global map. Globally cache the available columns for the given table
tableCols[tableName] = columns[1:]

pseudoCols := []string{}
if inTableTag {
pseudoCols = append(pseudoCols, partitioningField)
fieldDefs, indexDefs := d.getFieldAndIndexDefinitions(columns)
if createMetricsTable {
d.createTableAndIndexes(dbBench, tableName, fieldDefs, indexDefs)
}
}
return nil
}

fieldDef := []string{}
indexes := []string{}
pseudoCols = append(pseudoCols, parts[1:]...)
extraCols := 0 // set to 1 when hostname is kept in-table
for idx, field := range pseudoCols {
if len(field) == 0 {
continue
}
fieldType := "DOUBLE PRECISION"
idxType := fieldIndex
if inTableTag && idx == 0 {
fieldType = "TEXT"
idxType = ""
extraCols = 1
}
// getFieldAndIndexDefinitions iterates over a list of table columns, populating lists of
// definitions for each desired field and index. Returns separate lists of fieldDefs and indexDefs
func (d *dbCreator) getFieldAndIndexDefinitions(columns []string) ([]string, []string) {
var fieldDefs []string
var indexDefs []string
var allCols []string

fieldDef = append(fieldDef, fmt.Sprintf("%s %s", field, fieldType))
if fieldIndexCount == -1 || idx < (fieldIndexCount+extraCols) {
indexes = append(indexes, d.getCreateIndexOnFieldCmds(hypertable, field, idxType)...)
}
}
MustExec(dbBench, fmt.Sprintf("DROP TABLE IF EXISTS %s", hypertable))
MustExec(dbBench, fmt.Sprintf("CREATE TABLE %s (time timestamptz, tags_id integer, %s, additional_tags JSONB DEFAULT NULL)", hypertable, strings.Join(fieldDef, ",")))
if partitionIndex {
MustExec(dbBench, fmt.Sprintf("CREATE INDEX ON %s(tags_id, \"time\" DESC)", hypertable))
}
partitioningField := tableCols[tagsKey][0]
tableName := columns[0]
// If the user has specified that we should partition on the primary tags key, we
// add that to the list of columns to create
if inTableTag {
allCols = append(allCols, partitioningField)
}

// Only allow one or the other, it's probably never right to have both.
// Experimentation suggests (so far) that for 100k devices it is better to
// use --time-partition-index for reduced index lock contention.
if timePartitionIndex {
MustExec(dbBench, fmt.Sprintf("CREATE INDEX ON %s(\"time\" DESC, tags_id)", hypertable))
} else if timeIndex {
MustExec(dbBench, fmt.Sprintf("CREATE INDEX ON %s(\"time\" DESC)", hypertable))
allCols = append(allCols, columns[1:]...)
extraCols := 0 // set to 1 when hostname is kept in-table
for idx, field := range allCols {
if len(field) == 0 {
continue
}

for _, idxDef := range indexes {
MustExec(dbBench, idxDef)
fieldType := "DOUBLE PRECISION"
idxType := fieldIndex
// This condition handles the case where we keep the primary tag key in the table
// and partition on it. Since under the current implementation this tag is always
// hostname, we set it to a TEXT field instead of DOUBLE PRECISION
if inTableTag && idx == 0 {
fieldType = "TEXT"
idxType = ""
extraCols = 1
}

if useHypertable {
MustExec(dbBench, "CREATE EXTENSION IF NOT EXISTS timescaledb CASCADE")
MustExec(dbBench,
fmt.Sprintf("SELECT create_hypertable('%s'::regclass, 'time'::name, partitioning_column => '%s'::name, number_partitions => %v::smallint, chunk_time_interval => %d, create_default_indexes=>FALSE)",
hypertable, "tags_id", numberPartitions, chunkTime.Nanoseconds()/1000))
fieldDefs = append(fieldDefs, fmt.Sprintf("%s %s", field, fieldType))
// If the user specifies indexes on additional fields, add them to
// our index definitions until we've reached the desired number of indexes
if fieldIndexCount == -1 || idx < (fieldIndexCount+extraCols) {
indexDefs = append(indexDefs, d.getCreateIndexOnFieldCmds(tableName, field, idxType)...)
}
}
return nil
return fieldDefs, indexDefs
}

func createTagsTable(db *sql.DB, tags []string) {
MustExec(db, "DROP TABLE IF EXISTS tags")
if useJSON {
MustExec(db, "CREATE TABLE tags(id SERIAL PRIMARY KEY, tagset JSONB)")
MustExec(db, "CREATE UNIQUE INDEX uniq1 ON tags(tagset)")
MustExec(db, "CREATE INDEX idxginp ON tags USING gin (tagset jsonb_path_ops);")
} else {
cols := strings.Join(tags, " TEXT, ")
cols += " TEXT"
MustExec(db, fmt.Sprintf("CREATE TABLE tags(id SERIAL PRIMARY KEY, %s)", cols))
MustExec(db, fmt.Sprintf("CREATE UNIQUE INDEX uniq1 ON tags(%s)", strings.Join(tags, ",")))
MustExec(db, fmt.Sprintf("CREATE INDEX ON tags(%s)", tags[0]))
// createTableAndIndexes takes a list of field and index definitions for a given tableName and constructs
// the necessary table, index, and potential hypertable based on the user's settings
func (d *dbCreator) createTableAndIndexes(dbBench *sql.DB, tableName string, fieldDefs []string, indexDefs []string) {
MustExec(dbBench, fmt.Sprintf("DROP TABLE IF EXISTS %s", tableName))
MustExec(dbBench, fmt.Sprintf("CREATE TABLE %s (time timestamptz, tags_id integer, %s, additional_tags JSONB DEFAULT NULL)", tableName, strings.Join(fieldDefs, ",")))
if partitionIndex {
MustExec(dbBench, fmt.Sprintf("CREATE INDEX ON %s(tags_id, \"time\" DESC)", tableName))
}

// Only allow one or the other, it's probably never right to have both.
// Experimentation suggests (so far) that for 100k devices it is better to
// use --time-partition-index for reduced index lock contention.
if timePartitionIndex {
MustExec(dbBench, fmt.Sprintf("CREATE INDEX ON %s(\"time\" DESC, tags_id)", tableName))
} else if timeIndex {
MustExec(dbBench, fmt.Sprintf("CREATE INDEX ON %s(\"time\" DESC)", tableName))
}

for _, indexDef := range indexDefs {
MustExec(dbBench, indexDef)
}

if useHypertable {
MustExec(dbBench, "CREATE EXTENSION IF NOT EXISTS timescaledb CASCADE")
MustExec(dbBench,
fmt.Sprintf("SELECT create_hypertable('%s'::regclass, 'time'::name, partitioning_column => '%s'::name, number_partitions => %v::smallint, chunk_time_interval => %d, create_default_indexes=>FALSE)",
tableName, "tags_id", numberPartitions, chunkTime.Nanoseconds()/1000))
}
}

Expand All @@ -234,3 +246,18 @@ func (d *dbCreator) getCreateIndexOnFieldCmds(hypertable, field, idxType string)
}
return ret
}

func createTagsTable(db *sql.DB, tags []string) {
MustExec(db, "DROP TABLE IF EXISTS tags")
if useJSON {
MustExec(db, "CREATE TABLE tags(id SERIAL PRIMARY KEY, tagset JSONB)")
MustExec(db, "CREATE UNIQUE INDEX uniq1 ON tags(tagset)")
MustExec(db, "CREATE INDEX idxginp ON tags USING gin (tagset jsonb_path_ops);")
} else {
cols := strings.Join(tags, " TEXT, ")
cols += " TEXT"
MustExec(db, fmt.Sprintf("CREATE TABLE tags(id SERIAL PRIMARY KEY, %s)", cols))
MustExec(db, fmt.Sprintf("CREATE UNIQUE INDEX uniq1 ON tags(%s)", strings.Join(tags, ",")))
MustExec(db, fmt.Sprintf("CREATE INDEX ON tags(%s)", tags[0]))
}
}
73 changes: 73 additions & 0 deletions cmd/tsbs_load_timescaledb/creator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,3 +204,76 @@ func TestDBCreatorGetCreateIndexOnFieldSQL(t *testing.T) {
}
}
}

func TestDBCreatorGetFieldAndIndexDefinitions(t *testing.T) {
cases := []struct {
desc string
columns []string
fieldIndexCount int
inTableTag bool
wantFieldDefs []string
wantIndexDefs []string
}{
{
desc: "all field indexes",
columns: []string{"cpu", "usage_user", "usage_system", "usage_idle", "usage_nice"},
fieldIndexCount: -1,
inTableTag: false,
wantFieldDefs: []string{"usage_user DOUBLE PRECISION", "usage_system DOUBLE PRECISION", "usage_idle DOUBLE PRECISION", "usage_nice DOUBLE PRECISION"},
wantIndexDefs: []string{"CREATE INDEX ON cpu (usage_user, time DESC)", "CREATE INDEX ON cpu (usage_system, time DESC)", "CREATE INDEX ON cpu (usage_idle, time DESC)", "CREATE INDEX ON cpu (usage_nice, time DESC)"},
},
{
desc: "no field indexes",
columns: []string{"cpu", "usage_user", "usage_system", "usage_idle", "usage_nice"},
fieldIndexCount: 0,
inTableTag: false,
wantFieldDefs: []string{"usage_user DOUBLE PRECISION", "usage_system DOUBLE PRECISION", "usage_idle DOUBLE PRECISION", "usage_nice DOUBLE PRECISION"},
wantIndexDefs: []string{},
},
{
desc: "no field indexes, in table tag",
columns: []string{"cpu", "usage_user", "usage_system", "usage_idle", "usage_nice"},
fieldIndexCount: 0,
inTableTag: true,
wantFieldDefs: []string{"hostname TEXT", "usage_user DOUBLE PRECISION", "usage_system DOUBLE PRECISION", "usage_idle DOUBLE PRECISION", "usage_nice DOUBLE PRECISION"},
wantIndexDefs: []string{},
},
{
desc: "one field index",
columns: []string{"cpu", "usage_user", "usage_system", "usage_idle", "usage_nice"},
fieldIndexCount: 1,
inTableTag: false,
wantFieldDefs: []string{"usage_user DOUBLE PRECISION", "usage_system DOUBLE PRECISION", "usage_idle DOUBLE PRECISION", "usage_nice DOUBLE PRECISION"},
wantIndexDefs: []string{"CREATE INDEX ON cpu (usage_user, time DESC)"},
},
{
desc: "two field indexes",
columns: []string{"cpu", "usage_user", "usage_system", "usage_idle", "usage_nice"},
fieldIndexCount: 2,
inTableTag: false,
wantFieldDefs: []string{"usage_user DOUBLE PRECISION", "usage_system DOUBLE PRECISION", "usage_idle DOUBLE PRECISION", "usage_nice DOUBLE PRECISION"},
wantIndexDefs: []string{"CREATE INDEX ON cpu (usage_user, time DESC)", "CREATE INDEX ON cpu (usage_system, time DESC)"},
},
}

for _, c := range cases {
// Set the global in-table-tag flag based on the test case
inTableTag = c.inTableTag
// Initialize global cache
tableCols[tagsKey] = []string{}
tableCols[tagsKey] = append(tableCols[tagsKey], "hostname")
dbc := &dbCreator{}
fieldIndexCount = c.fieldIndexCount
fieldDefs, indexDefs := dbc.getFieldAndIndexDefinitions(c.columns)
for i, fieldDef := range fieldDefs {
if fieldDef != c.wantFieldDefs[i] {
t.Errorf("%s: incorrect fieldDef at idx %d: got %s want %s", c.desc, i, fieldDef, c.wantFieldDefs[i])
}
}
for i, indexDef := range indexDefs {
if indexDef != c.wantIndexDefs[i] {
t.Errorf("%s: incorrect indexDef at idx %d: got %s want %s", c.desc, i, indexDef, c.wantIndexDefs[i])
}
}
}
}

0 comments on commit 19932e7

Please sign in to comment.