From 3f5664db50fffb839185787fb4e57c7f8a2ced0e Mon Sep 17 00:00:00 2001 From: Lee Hampton Date: Wed, 24 Apr 2019 14:41:25 -0400 Subject: [PATCH] Fix parsing columns when --do-create-db=false This fixes a bug where the PostCreateDB function would exit early when the user set --do-create-db=false and/or --create-metrics-table=False. This early exit caused TSBS to skip the updating of some global caches, which broke assumptions in other parts of the codebase. This commit also refactors the PostCreateDB function to split the parsing of columns and the potential creation of tables and indexes into separate functions. This makes it easier to test the functions in isolation and cleaner to create the conditional create-table logic that is at the heart of this bug. While this does add tests to the parsing function, the create tables/index function remains untested. This is left for a later PR that will hopefully clean up global state and provide a more comprehensive framework for testing IO. --- cmd/tsbs_load_timescaledb/creator.go | 167 +++++++++++++--------- cmd/tsbs_load_timescaledb/creator_test.go | 73 ++++++++++ 2 files changed, 170 insertions(+), 70 deletions(-) diff --git a/cmd/tsbs_load_timescaledb/creator.go b/cmd/tsbs_load_timescaledb/creator.go index d7d14f99d..71208e2db 100644 --- a/cmd/tsbs_load_timescaledb/creator.go +++ b/cmd/tsbs_load_timescaledb/creator.go @@ -124,93 +124,105 @@ func (d *dbCreator) CreateDB(dbName string) error { } func (d *dbCreator) PostCreateDB(dbName string) error { - if !createMetricsTable { - return nil - } - dbBench := MustConnect(driver, getConnectString()) defer dbBench.Close() - parts := strings.Split(strings.TrimSpace(d.tags), ",") - if parts[0] != tagsKey { - return fmt.Errorf("input header in wrong format. got '%s', expected 'tags'", parts[0]) + tags := strings.Split(strings.TrimSpace(d.tags), ",") + if tags[0] != tagsKey { + return fmt.Errorf("input header in wrong format. got '%s', expected 'tags'", tags[0]) + } + if createMetricsTable { + createTagsTable(dbBench, tags[1:]) } - createTagsTable(dbBench, parts[1:]) - tableCols[tagsKey] = parts[1:] + // tableCols is a global map. Globally cache the available tags + tableCols[tagsKey] = tags[1:] - for _, cols := range d.cols { - parts = strings.Split(strings.TrimSpace(cols), ",") - hypertable := parts[0] - partitioningField := tableCols[tagsKey][0] - tableCols[hypertable] = parts[1:] + // Each table is defined in the dbCreator 'cols' list. The definition consists of a + // comma separated list of the table name followed by its columns. Iterate over each + // definition to update our global cache and create the requisite tables and indexes + for _, tableDef := range d.cols { + columns := strings.Split(strings.TrimSpace(tableDef), ",") + tableName := columns[0] + // tableCols is a global map. Globally cache the available columns for the given table + tableCols[tableName] = columns[1:] - pseudoCols := []string{} - if inTableTag { - pseudoCols = append(pseudoCols, partitioningField) + fieldDefs, indexDefs := d.getFieldAndIndexDefinitions(columns) + if createMetricsTable { + d.createTableAndIndexes(dbBench, tableName, fieldDefs, indexDefs) } + } + return nil +} - fieldDef := []string{} - indexes := []string{} - pseudoCols = append(pseudoCols, parts[1:]...) - extraCols := 0 // set to 1 when hostname is kept in-table - for idx, field := range pseudoCols { - if len(field) == 0 { - continue - } - fieldType := "DOUBLE PRECISION" - idxType := fieldIndex - if inTableTag && idx == 0 { - fieldType = "TEXT" - idxType = "" - extraCols = 1 - } +// getFieldAndIndexDefinitions iterates over a list of table columns, populating lists of +// definitions for each desired field and index. Returns separate lists of fieldDefs and indexDefs +func (d *dbCreator) getFieldAndIndexDefinitions(columns []string) ([]string, []string) { + var fieldDefs []string + var indexDefs []string + var allCols []string - fieldDef = append(fieldDef, fmt.Sprintf("%s %s", field, fieldType)) - if fieldIndexCount == -1 || idx < (fieldIndexCount+extraCols) { - indexes = append(indexes, d.getCreateIndexOnFieldCmds(hypertable, field, idxType)...) - } - } - MustExec(dbBench, fmt.Sprintf("DROP TABLE IF EXISTS %s", hypertable)) - MustExec(dbBench, fmt.Sprintf("CREATE TABLE %s (time timestamptz, tags_id integer, %s, additional_tags JSONB DEFAULT NULL)", hypertable, strings.Join(fieldDef, ","))) - if partitionIndex { - MustExec(dbBench, fmt.Sprintf("CREATE INDEX ON %s(tags_id, \"time\" DESC)", hypertable)) - } + partitioningField := tableCols[tagsKey][0] + tableName := columns[0] + // If the user has specified that we should partition on the primary tags key, we + // add that to the list of columns to create + if inTableTag { + allCols = append(allCols, partitioningField) + } - // Only allow one or the other, it's probably never right to have both. - // Experimentation suggests (so far) that for 100k devices it is better to - // use --time-partition-index for reduced index lock contention. - if timePartitionIndex { - MustExec(dbBench, fmt.Sprintf("CREATE INDEX ON %s(\"time\" DESC, tags_id)", hypertable)) - } else if timeIndex { - MustExec(dbBench, fmt.Sprintf("CREATE INDEX ON %s(\"time\" DESC)", hypertable)) + allCols = append(allCols, columns[1:]...) + extraCols := 0 // set to 1 when hostname is kept in-table + for idx, field := range allCols { + if len(field) == 0 { + continue } - - for _, idxDef := range indexes { - MustExec(dbBench, idxDef) + fieldType := "DOUBLE PRECISION" + idxType := fieldIndex + // This condition handles the case where we keep the primary tag key in the table + // and partition on it. Since under the current implementation this tag is always + // hostname, we set it to a TEXT field instead of DOUBLE PRECISION + if inTableTag && idx == 0 { + fieldType = "TEXT" + idxType = "" + extraCols = 1 } - if useHypertable { - MustExec(dbBench, "CREATE EXTENSION IF NOT EXISTS timescaledb CASCADE") - MustExec(dbBench, - fmt.Sprintf("SELECT create_hypertable('%s'::regclass, 'time'::name, partitioning_column => '%s'::name, number_partitions => %v::smallint, chunk_time_interval => %d, create_default_indexes=>FALSE)", - hypertable, "tags_id", numberPartitions, chunkTime.Nanoseconds()/1000)) + fieldDefs = append(fieldDefs, fmt.Sprintf("%s %s", field, fieldType)) + // If the user specifies indexes on additional fields, add them to + // our index definitions until we've reached the desired number of indexes + if fieldIndexCount == -1 || idx < (fieldIndexCount+extraCols) { + indexDefs = append(indexDefs, d.getCreateIndexOnFieldCmds(tableName, field, idxType)...) } } - return nil + return fieldDefs, indexDefs } -func createTagsTable(db *sql.DB, tags []string) { - MustExec(db, "DROP TABLE IF EXISTS tags") - if useJSON { - MustExec(db, "CREATE TABLE tags(id SERIAL PRIMARY KEY, tagset JSONB)") - MustExec(db, "CREATE UNIQUE INDEX uniq1 ON tags(tagset)") - MustExec(db, "CREATE INDEX idxginp ON tags USING gin (tagset jsonb_path_ops);") - } else { - cols := strings.Join(tags, " TEXT, ") - cols += " TEXT" - MustExec(db, fmt.Sprintf("CREATE TABLE tags(id SERIAL PRIMARY KEY, %s)", cols)) - MustExec(db, fmt.Sprintf("CREATE UNIQUE INDEX uniq1 ON tags(%s)", strings.Join(tags, ","))) - MustExec(db, fmt.Sprintf("CREATE INDEX ON tags(%s)", tags[0])) +// createTableAndIndexes takes a list of field and index definitions for a given tableName and constructs +// the necessary table, index, and potential hypertable based on the user's settings +func (d *dbCreator) createTableAndIndexes(dbBench *sql.DB, tableName string, fieldDefs []string, indexDefs []string) { + MustExec(dbBench, fmt.Sprintf("DROP TABLE IF EXISTS %s", tableName)) + MustExec(dbBench, fmt.Sprintf("CREATE TABLE %s (time timestamptz, tags_id integer, %s, additional_tags JSONB DEFAULT NULL)", tableName, strings.Join(fieldDefs, ","))) + if partitionIndex { + MustExec(dbBench, fmt.Sprintf("CREATE INDEX ON %s(tags_id, \"time\" DESC)", tableName)) + } + + // Only allow one or the other, it's probably never right to have both. + // Experimentation suggests (so far) that for 100k devices it is better to + // use --time-partition-index for reduced index lock contention. + if timePartitionIndex { + MustExec(dbBench, fmt.Sprintf("CREATE INDEX ON %s(\"time\" DESC, tags_id)", tableName)) + } else if timeIndex { + MustExec(dbBench, fmt.Sprintf("CREATE INDEX ON %s(\"time\" DESC)", tableName)) + } + + for _, indexDef := range indexDefs { + MustExec(dbBench, indexDef) + } + + if useHypertable { + MustExec(dbBench, "CREATE EXTENSION IF NOT EXISTS timescaledb CASCADE") + MustExec(dbBench, + fmt.Sprintf("SELECT create_hypertable('%s'::regclass, 'time'::name, partitioning_column => '%s'::name, number_partitions => %v::smallint, chunk_time_interval => %d, create_default_indexes=>FALSE)", + tableName, "tags_id", numberPartitions, chunkTime.Nanoseconds()/1000)) } } @@ -234,3 +246,18 @@ func (d *dbCreator) getCreateIndexOnFieldCmds(hypertable, field, idxType string) } return ret } + +func createTagsTable(db *sql.DB, tags []string) { + MustExec(db, "DROP TABLE IF EXISTS tags") + if useJSON { + MustExec(db, "CREATE TABLE tags(id SERIAL PRIMARY KEY, tagset JSONB)") + MustExec(db, "CREATE UNIQUE INDEX uniq1 ON tags(tagset)") + MustExec(db, "CREATE INDEX idxginp ON tags USING gin (tagset jsonb_path_ops);") + } else { + cols := strings.Join(tags, " TEXT, ") + cols += " TEXT" + MustExec(db, fmt.Sprintf("CREATE TABLE tags(id SERIAL PRIMARY KEY, %s)", cols)) + MustExec(db, fmt.Sprintf("CREATE UNIQUE INDEX uniq1 ON tags(%s)", strings.Join(tags, ","))) + MustExec(db, fmt.Sprintf("CREATE INDEX ON tags(%s)", tags[0])) + } +} diff --git a/cmd/tsbs_load_timescaledb/creator_test.go b/cmd/tsbs_load_timescaledb/creator_test.go index f71a98620..c05375acf 100644 --- a/cmd/tsbs_load_timescaledb/creator_test.go +++ b/cmd/tsbs_load_timescaledb/creator_test.go @@ -204,3 +204,76 @@ func TestDBCreatorGetCreateIndexOnFieldSQL(t *testing.T) { } } } + +func TestDBCreatorGetFieldAndIndexDefinitions(t *testing.T) { + cases := []struct { + desc string + columns []string + fieldIndexCount int + inTableTag bool + wantFieldDefs []string + wantIndexDefs []string + }{ + { + desc: "all field indexes", + columns: []string{"cpu", "usage_user", "usage_system", "usage_idle", "usage_nice"}, + fieldIndexCount: -1, + inTableTag: false, + wantFieldDefs: []string{"usage_user DOUBLE PRECISION", "usage_system DOUBLE PRECISION", "usage_idle DOUBLE PRECISION", "usage_nice DOUBLE PRECISION"}, + wantIndexDefs: []string{"CREATE INDEX ON cpu (usage_user, time DESC)", "CREATE INDEX ON cpu (usage_system, time DESC)", "CREATE INDEX ON cpu (usage_idle, time DESC)", "CREATE INDEX ON cpu (usage_nice, time DESC)"}, + }, + { + desc: "no field indexes", + columns: []string{"cpu", "usage_user", "usage_system", "usage_idle", "usage_nice"}, + fieldIndexCount: 0, + inTableTag: false, + wantFieldDefs: []string{"usage_user DOUBLE PRECISION", "usage_system DOUBLE PRECISION", "usage_idle DOUBLE PRECISION", "usage_nice DOUBLE PRECISION"}, + wantIndexDefs: []string{}, + }, + { + desc: "no field indexes, in table tag", + columns: []string{"cpu", "usage_user", "usage_system", "usage_idle", "usage_nice"}, + fieldIndexCount: 0, + inTableTag: true, + wantFieldDefs: []string{"hostname TEXT", "usage_user DOUBLE PRECISION", "usage_system DOUBLE PRECISION", "usage_idle DOUBLE PRECISION", "usage_nice DOUBLE PRECISION"}, + wantIndexDefs: []string{}, + }, + { + desc: "one field index", + columns: []string{"cpu", "usage_user", "usage_system", "usage_idle", "usage_nice"}, + fieldIndexCount: 1, + inTableTag: false, + wantFieldDefs: []string{"usage_user DOUBLE PRECISION", "usage_system DOUBLE PRECISION", "usage_idle DOUBLE PRECISION", "usage_nice DOUBLE PRECISION"}, + wantIndexDefs: []string{"CREATE INDEX ON cpu (usage_user, time DESC)"}, + }, + { + desc: "two field indexes", + columns: []string{"cpu", "usage_user", "usage_system", "usage_idle", "usage_nice"}, + fieldIndexCount: 2, + inTableTag: false, + wantFieldDefs: []string{"usage_user DOUBLE PRECISION", "usage_system DOUBLE PRECISION", "usage_idle DOUBLE PRECISION", "usage_nice DOUBLE PRECISION"}, + wantIndexDefs: []string{"CREATE INDEX ON cpu (usage_user, time DESC)", "CREATE INDEX ON cpu (usage_system, time DESC)"}, + }, + } + + for _, c := range cases { + // Set the global in-table-tag flag based on the test case + inTableTag = c.inTableTag + // Initialize global cache + tableCols[tagsKey] = []string{} + tableCols[tagsKey] = append(tableCols[tagsKey], "hostname") + dbc := &dbCreator{} + fieldIndexCount = c.fieldIndexCount + fieldDefs, indexDefs := dbc.getFieldAndIndexDefinitions(c.columns) + for i, fieldDef := range fieldDefs { + if fieldDef != c.wantFieldDefs[i] { + t.Errorf("%s: incorrect fieldDef at idx %d: got %s want %s", c.desc, i, fieldDef, c.wantFieldDefs[i]) + } + } + for i, indexDef := range indexDefs { + if indexDef != c.wantIndexDefs[i] { + t.Errorf("%s: incorrect indexDef at idx %d: got %s want %s", c.desc, i, indexDef, c.wantIndexDefs[i]) + } + } + } +}