diff --git a/cmd/tsbs_load_timescaledb/creator.go b/cmd/tsbs_load_timescaledb/creator.go index d7d14f99d..71208e2db 100644 --- a/cmd/tsbs_load_timescaledb/creator.go +++ b/cmd/tsbs_load_timescaledb/creator.go @@ -124,93 +124,105 @@ func (d *dbCreator) CreateDB(dbName string) error { } func (d *dbCreator) PostCreateDB(dbName string) error { - if !createMetricsTable { - return nil - } - dbBench := MustConnect(driver, getConnectString()) defer dbBench.Close() - parts := strings.Split(strings.TrimSpace(d.tags), ",") - if parts[0] != tagsKey { - return fmt.Errorf("input header in wrong format. got '%s', expected 'tags'", parts[0]) + tags := strings.Split(strings.TrimSpace(d.tags), ",") + if tags[0] != tagsKey { + return fmt.Errorf("input header in wrong format. got '%s', expected 'tags'", tags[0]) + } + if createMetricsTable { + createTagsTable(dbBench, tags[1:]) } - createTagsTable(dbBench, parts[1:]) - tableCols[tagsKey] = parts[1:] + // tableCols is a global map. Globally cache the available tags + tableCols[tagsKey] = tags[1:] - for _, cols := range d.cols { - parts = strings.Split(strings.TrimSpace(cols), ",") - hypertable := parts[0] - partitioningField := tableCols[tagsKey][0] - tableCols[hypertable] = parts[1:] + // Each table is defined in the dbCreator 'cols' list. The definition consists of a + // comma separated list of the table name followed by its columns. Iterate over each + // definition to update our global cache and create the requisite tables and indexes + for _, tableDef := range d.cols { + columns := strings.Split(strings.TrimSpace(tableDef), ",") + tableName := columns[0] + // tableCols is a global map. Globally cache the available columns for the given table + tableCols[tableName] = columns[1:] - pseudoCols := []string{} - if inTableTag { - pseudoCols = append(pseudoCols, partitioningField) + fieldDefs, indexDefs := d.getFieldAndIndexDefinitions(columns) + if createMetricsTable { + d.createTableAndIndexes(dbBench, tableName, fieldDefs, indexDefs) } + } + return nil +} - fieldDef := []string{} - indexes := []string{} - pseudoCols = append(pseudoCols, parts[1:]...) - extraCols := 0 // set to 1 when hostname is kept in-table - for idx, field := range pseudoCols { - if len(field) == 0 { - continue - } - fieldType := "DOUBLE PRECISION" - idxType := fieldIndex - if inTableTag && idx == 0 { - fieldType = "TEXT" - idxType = "" - extraCols = 1 - } +// getFieldAndIndexDefinitions iterates over a list of table columns, populating lists of +// definitions for each desired field and index. Returns separate lists of fieldDefs and indexDefs +func (d *dbCreator) getFieldAndIndexDefinitions(columns []string) ([]string, []string) { + var fieldDefs []string + var indexDefs []string + var allCols []string - fieldDef = append(fieldDef, fmt.Sprintf("%s %s", field, fieldType)) - if fieldIndexCount == -1 || idx < (fieldIndexCount+extraCols) { - indexes = append(indexes, d.getCreateIndexOnFieldCmds(hypertable, field, idxType)...) - } - } - MustExec(dbBench, fmt.Sprintf("DROP TABLE IF EXISTS %s", hypertable)) - MustExec(dbBench, fmt.Sprintf("CREATE TABLE %s (time timestamptz, tags_id integer, %s, additional_tags JSONB DEFAULT NULL)", hypertable, strings.Join(fieldDef, ","))) - if partitionIndex { - MustExec(dbBench, fmt.Sprintf("CREATE INDEX ON %s(tags_id, \"time\" DESC)", hypertable)) - } + partitioningField := tableCols[tagsKey][0] + tableName := columns[0] + // If the user has specified that we should partition on the primary tags key, we + // add that to the list of columns to create + if inTableTag { + allCols = append(allCols, partitioningField) + } - // Only allow one or the other, it's probably never right to have both. - // Experimentation suggests (so far) that for 100k devices it is better to - // use --time-partition-index for reduced index lock contention. - if timePartitionIndex { - MustExec(dbBench, fmt.Sprintf("CREATE INDEX ON %s(\"time\" DESC, tags_id)", hypertable)) - } else if timeIndex { - MustExec(dbBench, fmt.Sprintf("CREATE INDEX ON %s(\"time\" DESC)", hypertable)) + allCols = append(allCols, columns[1:]...) + extraCols := 0 // set to 1 when hostname is kept in-table + for idx, field := range allCols { + if len(field) == 0 { + continue } - - for _, idxDef := range indexes { - MustExec(dbBench, idxDef) + fieldType := "DOUBLE PRECISION" + idxType := fieldIndex + // This condition handles the case where we keep the primary tag key in the table + // and partition on it. Since under the current implementation this tag is always + // hostname, we set it to a TEXT field instead of DOUBLE PRECISION + if inTableTag && idx == 0 { + fieldType = "TEXT" + idxType = "" + extraCols = 1 } - if useHypertable { - MustExec(dbBench, "CREATE EXTENSION IF NOT EXISTS timescaledb CASCADE") - MustExec(dbBench, - fmt.Sprintf("SELECT create_hypertable('%s'::regclass, 'time'::name, partitioning_column => '%s'::name, number_partitions => %v::smallint, chunk_time_interval => %d, create_default_indexes=>FALSE)", - hypertable, "tags_id", numberPartitions, chunkTime.Nanoseconds()/1000)) + fieldDefs = append(fieldDefs, fmt.Sprintf("%s %s", field, fieldType)) + // If the user specifies indexes on additional fields, add them to + // our index definitions until we've reached the desired number of indexes + if fieldIndexCount == -1 || idx < (fieldIndexCount+extraCols) { + indexDefs = append(indexDefs, d.getCreateIndexOnFieldCmds(tableName, field, idxType)...) } } - return nil + return fieldDefs, indexDefs } -func createTagsTable(db *sql.DB, tags []string) { - MustExec(db, "DROP TABLE IF EXISTS tags") - if useJSON { - MustExec(db, "CREATE TABLE tags(id SERIAL PRIMARY KEY, tagset JSONB)") - MustExec(db, "CREATE UNIQUE INDEX uniq1 ON tags(tagset)") - MustExec(db, "CREATE INDEX idxginp ON tags USING gin (tagset jsonb_path_ops);") - } else { - cols := strings.Join(tags, " TEXT, ") - cols += " TEXT" - MustExec(db, fmt.Sprintf("CREATE TABLE tags(id SERIAL PRIMARY KEY, %s)", cols)) - MustExec(db, fmt.Sprintf("CREATE UNIQUE INDEX uniq1 ON tags(%s)", strings.Join(tags, ","))) - MustExec(db, fmt.Sprintf("CREATE INDEX ON tags(%s)", tags[0])) +// createTableAndIndexes takes a list of field and index definitions for a given tableName and constructs +// the necessary table, index, and potential hypertable based on the user's settings +func (d *dbCreator) createTableAndIndexes(dbBench *sql.DB, tableName string, fieldDefs []string, indexDefs []string) { + MustExec(dbBench, fmt.Sprintf("DROP TABLE IF EXISTS %s", tableName)) + MustExec(dbBench, fmt.Sprintf("CREATE TABLE %s (time timestamptz, tags_id integer, %s, additional_tags JSONB DEFAULT NULL)", tableName, strings.Join(fieldDefs, ","))) + if partitionIndex { + MustExec(dbBench, fmt.Sprintf("CREATE INDEX ON %s(tags_id, \"time\" DESC)", tableName)) + } + + // Only allow one or the other, it's probably never right to have both. + // Experimentation suggests (so far) that for 100k devices it is better to + // use --time-partition-index for reduced index lock contention. + if timePartitionIndex { + MustExec(dbBench, fmt.Sprintf("CREATE INDEX ON %s(\"time\" DESC, tags_id)", tableName)) + } else if timeIndex { + MustExec(dbBench, fmt.Sprintf("CREATE INDEX ON %s(\"time\" DESC)", tableName)) + } + + for _, indexDef := range indexDefs { + MustExec(dbBench, indexDef) + } + + if useHypertable { + MustExec(dbBench, "CREATE EXTENSION IF NOT EXISTS timescaledb CASCADE") + MustExec(dbBench, + fmt.Sprintf("SELECT create_hypertable('%s'::regclass, 'time'::name, partitioning_column => '%s'::name, number_partitions => %v::smallint, chunk_time_interval => %d, create_default_indexes=>FALSE)", + tableName, "tags_id", numberPartitions, chunkTime.Nanoseconds()/1000)) } } @@ -234,3 +246,18 @@ func (d *dbCreator) getCreateIndexOnFieldCmds(hypertable, field, idxType string) } return ret } + +func createTagsTable(db *sql.DB, tags []string) { + MustExec(db, "DROP TABLE IF EXISTS tags") + if useJSON { + MustExec(db, "CREATE TABLE tags(id SERIAL PRIMARY KEY, tagset JSONB)") + MustExec(db, "CREATE UNIQUE INDEX uniq1 ON tags(tagset)") + MustExec(db, "CREATE INDEX idxginp ON tags USING gin (tagset jsonb_path_ops);") + } else { + cols := strings.Join(tags, " TEXT, ") + cols += " TEXT" + MustExec(db, fmt.Sprintf("CREATE TABLE tags(id SERIAL PRIMARY KEY, %s)", cols)) + MustExec(db, fmt.Sprintf("CREATE UNIQUE INDEX uniq1 ON tags(%s)", strings.Join(tags, ","))) + MustExec(db, fmt.Sprintf("CREATE INDEX ON tags(%s)", tags[0])) + } +} diff --git a/cmd/tsbs_load_timescaledb/creator_test.go b/cmd/tsbs_load_timescaledb/creator_test.go index f71a98620..c05375acf 100644 --- a/cmd/tsbs_load_timescaledb/creator_test.go +++ b/cmd/tsbs_load_timescaledb/creator_test.go @@ -204,3 +204,76 @@ func TestDBCreatorGetCreateIndexOnFieldSQL(t *testing.T) { } } } + +func TestDBCreatorGetFieldAndIndexDefinitions(t *testing.T) { + cases := []struct { + desc string + columns []string + fieldIndexCount int + inTableTag bool + wantFieldDefs []string + wantIndexDefs []string + }{ + { + desc: "all field indexes", + columns: []string{"cpu", "usage_user", "usage_system", "usage_idle", "usage_nice"}, + fieldIndexCount: -1, + inTableTag: false, + wantFieldDefs: []string{"usage_user DOUBLE PRECISION", "usage_system DOUBLE PRECISION", "usage_idle DOUBLE PRECISION", "usage_nice DOUBLE PRECISION"}, + wantIndexDefs: []string{"CREATE INDEX ON cpu (usage_user, time DESC)", "CREATE INDEX ON cpu (usage_system, time DESC)", "CREATE INDEX ON cpu (usage_idle, time DESC)", "CREATE INDEX ON cpu (usage_nice, time DESC)"}, + }, + { + desc: "no field indexes", + columns: []string{"cpu", "usage_user", "usage_system", "usage_idle", "usage_nice"}, + fieldIndexCount: 0, + inTableTag: false, + wantFieldDefs: []string{"usage_user DOUBLE PRECISION", "usage_system DOUBLE PRECISION", "usage_idle DOUBLE PRECISION", "usage_nice DOUBLE PRECISION"}, + wantIndexDefs: []string{}, + }, + { + desc: "no field indexes, in table tag", + columns: []string{"cpu", "usage_user", "usage_system", "usage_idle", "usage_nice"}, + fieldIndexCount: 0, + inTableTag: true, + wantFieldDefs: []string{"hostname TEXT", "usage_user DOUBLE PRECISION", "usage_system DOUBLE PRECISION", "usage_idle DOUBLE PRECISION", "usage_nice DOUBLE PRECISION"}, + wantIndexDefs: []string{}, + }, + { + desc: "one field index", + columns: []string{"cpu", "usage_user", "usage_system", "usage_idle", "usage_nice"}, + fieldIndexCount: 1, + inTableTag: false, + wantFieldDefs: []string{"usage_user DOUBLE PRECISION", "usage_system DOUBLE PRECISION", "usage_idle DOUBLE PRECISION", "usage_nice DOUBLE PRECISION"}, + wantIndexDefs: []string{"CREATE INDEX ON cpu (usage_user, time DESC)"}, + }, + { + desc: "two field indexes", + columns: []string{"cpu", "usage_user", "usage_system", "usage_idle", "usage_nice"}, + fieldIndexCount: 2, + inTableTag: false, + wantFieldDefs: []string{"usage_user DOUBLE PRECISION", "usage_system DOUBLE PRECISION", "usage_idle DOUBLE PRECISION", "usage_nice DOUBLE PRECISION"}, + wantIndexDefs: []string{"CREATE INDEX ON cpu (usage_user, time DESC)", "CREATE INDEX ON cpu (usage_system, time DESC)"}, + }, + } + + for _, c := range cases { + // Set the global in-table-tag flag based on the test case + inTableTag = c.inTableTag + // Initialize global cache + tableCols[tagsKey] = []string{} + tableCols[tagsKey] = append(tableCols[tagsKey], "hostname") + dbc := &dbCreator{} + fieldIndexCount = c.fieldIndexCount + fieldDefs, indexDefs := dbc.getFieldAndIndexDefinitions(c.columns) + for i, fieldDef := range fieldDefs { + if fieldDef != c.wantFieldDefs[i] { + t.Errorf("%s: incorrect fieldDef at idx %d: got %s want %s", c.desc, i, fieldDef, c.wantFieldDefs[i]) + } + } + for i, indexDef := range indexDefs { + if indexDef != c.wantIndexDefs[i] { + t.Errorf("%s: incorrect indexDef at idx %d: got %s want %s", c.desc, i, indexDef, c.wantIndexDefs[i]) + } + } + } +}