Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix parsing columns when --do-create-db=false #70

Merged
merged 1 commit into from
Apr 25, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
167 changes: 97 additions & 70 deletions cmd/tsbs_load_timescaledb/creator.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,93 +124,105 @@ func (d *dbCreator) CreateDB(dbName string) error {
}

func (d *dbCreator) PostCreateDB(dbName string) error {
if !createMetricsTable {
return nil
}

dbBench := MustConnect(driver, getConnectString())
defer dbBench.Close()

parts := strings.Split(strings.TrimSpace(d.tags), ",")
if parts[0] != tagsKey {
return fmt.Errorf("input header in wrong format. got '%s', expected 'tags'", parts[0])
tags := strings.Split(strings.TrimSpace(d.tags), ",")
if tags[0] != tagsKey {
return fmt.Errorf("input header in wrong format. got '%s', expected 'tags'", tags[0])
}
if createMetricsTable {
createTagsTable(dbBench, tags[1:])
}
createTagsTable(dbBench, parts[1:])
tableCols[tagsKey] = parts[1:]
// tableCols is a global map. Globally cache the available tags
tableCols[tagsKey] = tags[1:]

for _, cols := range d.cols {
parts = strings.Split(strings.TrimSpace(cols), ",")
hypertable := parts[0]
partitioningField := tableCols[tagsKey][0]
tableCols[hypertable] = parts[1:]
// Each table is defined in the dbCreator 'cols' list. The definition consists of a
// comma separated list of the table name followed by its columns. Iterate over each
// definition to update our global cache and create the requisite tables and indexes
for _, tableDef := range d.cols {
columns := strings.Split(strings.TrimSpace(tableDef), ",")
tableName := columns[0]
// tableCols is a global map. Globally cache the available columns for the given table
tableCols[tableName] = columns[1:]

pseudoCols := []string{}
if inTableTag {
pseudoCols = append(pseudoCols, partitioningField)
fieldDefs, indexDefs := d.getFieldAndIndexDefinitions(columns)
if createMetricsTable {
d.createTableAndIndexes(dbBench, tableName, fieldDefs, indexDefs)
}
}
return nil
}

fieldDef := []string{}
indexes := []string{}
pseudoCols = append(pseudoCols, parts[1:]...)
extraCols := 0 // set to 1 when hostname is kept in-table
for idx, field := range pseudoCols {
if len(field) == 0 {
continue
}
fieldType := "DOUBLE PRECISION"
idxType := fieldIndex
if inTableTag && idx == 0 {
fieldType = "TEXT"
idxType = ""
extraCols = 1
}
// getFieldAndIndexDefinitions iterates over a list of table columns, populating lists of
// definitions for each desired field and index. Returns separate lists of fieldDefs and indexDefs
func (d *dbCreator) getFieldAndIndexDefinitions(columns []string) ([]string, []string) {
var fieldDefs []string
var indexDefs []string
var allCols []string

fieldDef = append(fieldDef, fmt.Sprintf("%s %s", field, fieldType))
if fieldIndexCount == -1 || idx < (fieldIndexCount+extraCols) {
indexes = append(indexes, d.getCreateIndexOnFieldCmds(hypertable, field, idxType)...)
}
}
MustExec(dbBench, fmt.Sprintf("DROP TABLE IF EXISTS %s", hypertable))
MustExec(dbBench, fmt.Sprintf("CREATE TABLE %s (time timestamptz, tags_id integer, %s, additional_tags JSONB DEFAULT NULL)", hypertable, strings.Join(fieldDef, ",")))
if partitionIndex {
MustExec(dbBench, fmt.Sprintf("CREATE INDEX ON %s(tags_id, \"time\" DESC)", hypertable))
}
partitioningField := tableCols[tagsKey][0]
tableName := columns[0]
// If the user has specified that we should partition on the primary tags key, we
// add that to the list of columns to create
if inTableTag {
allCols = append(allCols, partitioningField)
}

// Only allow one or the other, it's probably never right to have both.
// Experimentation suggests (so far) that for 100k devices it is better to
// use --time-partition-index for reduced index lock contention.
if timePartitionIndex {
MustExec(dbBench, fmt.Sprintf("CREATE INDEX ON %s(\"time\" DESC, tags_id)", hypertable))
} else if timeIndex {
MustExec(dbBench, fmt.Sprintf("CREATE INDEX ON %s(\"time\" DESC)", hypertable))
allCols = append(allCols, columns[1:]...)
extraCols := 0 // set to 1 when hostname is kept in-table
for idx, field := range allCols {
if len(field) == 0 {
continue
}

for _, idxDef := range indexes {
MustExec(dbBench, idxDef)
fieldType := "DOUBLE PRECISION"
idxType := fieldIndex
// This condition handles the case where we keep the primary tag key in the table
// and partition on it. Since under the current implementation this tag is always
// hostname, we set it to a TEXT field instead of DOUBLE PRECISION
if inTableTag && idx == 0 {
fieldType = "TEXT"
idxType = ""
extraCols = 1
}

if useHypertable {
MustExec(dbBench, "CREATE EXTENSION IF NOT EXISTS timescaledb CASCADE")
MustExec(dbBench,
fmt.Sprintf("SELECT create_hypertable('%s'::regclass, 'time'::name, partitioning_column => '%s'::name, number_partitions => %v::smallint, chunk_time_interval => %d, create_default_indexes=>FALSE)",
hypertable, "tags_id", numberPartitions, chunkTime.Nanoseconds()/1000))
fieldDefs = append(fieldDefs, fmt.Sprintf("%s %s", field, fieldType))
// If the user specifies indexes on additional fields, add them to
// our index definitions until we've reached the desired number of indexes
if fieldIndexCount == -1 || idx < (fieldIndexCount+extraCols) {
indexDefs = append(indexDefs, d.getCreateIndexOnFieldCmds(tableName, field, idxType)...)
}
}
return nil
return fieldDefs, indexDefs
}

func createTagsTable(db *sql.DB, tags []string) {
MustExec(db, "DROP TABLE IF EXISTS tags")
if useJSON {
MustExec(db, "CREATE TABLE tags(id SERIAL PRIMARY KEY, tagset JSONB)")
MustExec(db, "CREATE UNIQUE INDEX uniq1 ON tags(tagset)")
MustExec(db, "CREATE INDEX idxginp ON tags USING gin (tagset jsonb_path_ops);")
} else {
cols := strings.Join(tags, " TEXT, ")
cols += " TEXT"
MustExec(db, fmt.Sprintf("CREATE TABLE tags(id SERIAL PRIMARY KEY, %s)", cols))
MustExec(db, fmt.Sprintf("CREATE UNIQUE INDEX uniq1 ON tags(%s)", strings.Join(tags, ",")))
MustExec(db, fmt.Sprintf("CREATE INDEX ON tags(%s)", tags[0]))
// createTableAndIndexes takes a list of field and index definitions for a given tableName and constructs
// the necessary table, index, and potential hypertable based on the user's settings
func (d *dbCreator) createTableAndIndexes(dbBench *sql.DB, tableName string, fieldDefs []string, indexDefs []string) {
MustExec(dbBench, fmt.Sprintf("DROP TABLE IF EXISTS %s", tableName))
MustExec(dbBench, fmt.Sprintf("CREATE TABLE %s (time timestamptz, tags_id integer, %s, additional_tags JSONB DEFAULT NULL)", tableName, strings.Join(fieldDefs, ",")))
if partitionIndex {
MustExec(dbBench, fmt.Sprintf("CREATE INDEX ON %s(tags_id, \"time\" DESC)", tableName))
}

// Only allow one or the other, it's probably never right to have both.
// Experimentation suggests (so far) that for 100k devices it is better to
// use --time-partition-index for reduced index lock contention.
if timePartitionIndex {
MustExec(dbBench, fmt.Sprintf("CREATE INDEX ON %s(\"time\" DESC, tags_id)", tableName))
} else if timeIndex {
MustExec(dbBench, fmt.Sprintf("CREATE INDEX ON %s(\"time\" DESC)", tableName))
}

for _, indexDef := range indexDefs {
MustExec(dbBench, indexDef)
}

if useHypertable {
MustExec(dbBench, "CREATE EXTENSION IF NOT EXISTS timescaledb CASCADE")
MustExec(dbBench,
fmt.Sprintf("SELECT create_hypertable('%s'::regclass, 'time'::name, partitioning_column => '%s'::name, number_partitions => %v::smallint, chunk_time_interval => %d, create_default_indexes=>FALSE)",
tableName, "tags_id", numberPartitions, chunkTime.Nanoseconds()/1000))
}
}

Expand All @@ -234,3 +246,18 @@ func (d *dbCreator) getCreateIndexOnFieldCmds(hypertable, field, idxType string)
}
return ret
}

func createTagsTable(db *sql.DB, tags []string) {
MustExec(db, "DROP TABLE IF EXISTS tags")
if useJSON {
MustExec(db, "CREATE TABLE tags(id SERIAL PRIMARY KEY, tagset JSONB)")
MustExec(db, "CREATE UNIQUE INDEX uniq1 ON tags(tagset)")
MustExec(db, "CREATE INDEX idxginp ON tags USING gin (tagset jsonb_path_ops);")
} else {
cols := strings.Join(tags, " TEXT, ")
cols += " TEXT"
MustExec(db, fmt.Sprintf("CREATE TABLE tags(id SERIAL PRIMARY KEY, %s)", cols))
MustExec(db, fmt.Sprintf("CREATE UNIQUE INDEX uniq1 ON tags(%s)", strings.Join(tags, ",")))
MustExec(db, fmt.Sprintf("CREATE INDEX ON tags(%s)", tags[0]))
}
}
73 changes: 73 additions & 0 deletions cmd/tsbs_load_timescaledb/creator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,3 +204,76 @@ func TestDBCreatorGetCreateIndexOnFieldSQL(t *testing.T) {
}
}
}

func TestDBCreatorGetFieldAndIndexDefinitions(t *testing.T) {
cases := []struct {
desc string
columns []string
fieldIndexCount int
inTableTag bool
wantFieldDefs []string
wantIndexDefs []string
}{
{
desc: "all field indexes",
columns: []string{"cpu", "usage_user", "usage_system", "usage_idle", "usage_nice"},
fieldIndexCount: -1,
inTableTag: false,
wantFieldDefs: []string{"usage_user DOUBLE PRECISION", "usage_system DOUBLE PRECISION", "usage_idle DOUBLE PRECISION", "usage_nice DOUBLE PRECISION"},
wantIndexDefs: []string{"CREATE INDEX ON cpu (usage_user, time DESC)", "CREATE INDEX ON cpu (usage_system, time DESC)", "CREATE INDEX ON cpu (usage_idle, time DESC)", "CREATE INDEX ON cpu (usage_nice, time DESC)"},
},
{
desc: "no field indexes",
columns: []string{"cpu", "usage_user", "usage_system", "usage_idle", "usage_nice"},
fieldIndexCount: 0,
inTableTag: false,
wantFieldDefs: []string{"usage_user DOUBLE PRECISION", "usage_system DOUBLE PRECISION", "usage_idle DOUBLE PRECISION", "usage_nice DOUBLE PRECISION"},
wantIndexDefs: []string{},
},
{
desc: "no field indexes, in table tag",
columns: []string{"cpu", "usage_user", "usage_system", "usage_idle", "usage_nice"},
fieldIndexCount: 0,
inTableTag: true,
wantFieldDefs: []string{"hostname TEXT", "usage_user DOUBLE PRECISION", "usage_system DOUBLE PRECISION", "usage_idle DOUBLE PRECISION", "usage_nice DOUBLE PRECISION"},
wantIndexDefs: []string{},
},
{
desc: "one field index",
columns: []string{"cpu", "usage_user", "usage_system", "usage_idle", "usage_nice"},
fieldIndexCount: 1,
inTableTag: false,
wantFieldDefs: []string{"usage_user DOUBLE PRECISION", "usage_system DOUBLE PRECISION", "usage_idle DOUBLE PRECISION", "usage_nice DOUBLE PRECISION"},
wantIndexDefs: []string{"CREATE INDEX ON cpu (usage_user, time DESC)"},
},
{
desc: "two field indexes",
columns: []string{"cpu", "usage_user", "usage_system", "usage_idle", "usage_nice"},
fieldIndexCount: 2,
inTableTag: false,
wantFieldDefs: []string{"usage_user DOUBLE PRECISION", "usage_system DOUBLE PRECISION", "usage_idle DOUBLE PRECISION", "usage_nice DOUBLE PRECISION"},
wantIndexDefs: []string{"CREATE INDEX ON cpu (usage_user, time DESC)", "CREATE INDEX ON cpu (usage_system, time DESC)"},
},
}

for _, c := range cases {
// Set the global in-table-tag flag based on the test case
inTableTag = c.inTableTag
// Initialize global cache
tableCols[tagsKey] = []string{}
tableCols[tagsKey] = append(tableCols[tagsKey], "hostname")
dbc := &dbCreator{}
fieldIndexCount = c.fieldIndexCount
fieldDefs, indexDefs := dbc.getFieldAndIndexDefinitions(c.columns)
for i, fieldDef := range fieldDefs {
if fieldDef != c.wantFieldDefs[i] {
t.Errorf("%s: incorrect fieldDef at idx %d: got %s want %s", c.desc, i, fieldDef, c.wantFieldDefs[i])
}
}
for i, indexDef := range indexDefs {
if indexDef != c.wantIndexDefs[i] {
t.Errorf("%s: incorrect indexDef at idx %d: got %s want %s", c.desc, i, indexDef, c.wantIndexDefs[i])
}
}
}
}