Skip to content

Commit

Permalink
Add the ability to load a dynamic number of tags for Cassandra use cases
Browse files Browse the repository at this point in the history
  • Loading branch information
LeeHampton committed Aug 2, 2018
1 parent 39fda35 commit 282c99e
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 13 deletions.
21 changes: 8 additions & 13 deletions cmd/tsbs_load_cassandra/scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,22 +32,17 @@ func (d *decoder) Decode(_ *bufio.Reader) *load.Point {
// We currently only support a 1-line:1-metric mapping for Cassandra. Implement
// other functions here to support other formats.
func singleMetricToInsertStatement(text string) string {
const numTags = 11 // TODO: make number of tags dynamic
var insertStatement = "INSERT INTO %s(series_id, timestamp_ns, value) VALUES('%s#%s#%s', %s, %s)"

insertStatement := "INSERT INTO %s(series_id, timestamp_ns, value) VALUES('%s#%s#%s', %s, %s)"
parts := strings.Split(text, ",")

// Each line must consist of a table name, all comma separated tags, the measurement type, a day bucket, a timestamp, and a value
if len(parts) != numTags+5 {
log.Fatalf("Format error: Invalid number of values on CSV line")
}
tagsBeginIndex := 1 // list of tags begins after the table name
tagsEndIndex := (len(parts) - 1) - 4 // list of tags ends right before the last 4 parts of the line

table := parts[0]
tags := strings.Join(parts[1:numTags+1], ",") // offset: table
measurementName := parts[numTags+1] // offset: table + numTags
dayBucket := parts[numTags+2] // offset: table + numTags + measurementName
timestampNS := parts[numTags+3] // offset: table + numTags + numTags + measurementName + dayBucket
value := parts[numTags+4] // offset: table + numTags + timestamp + measurementName + dayBucket + timestampNS
tags := strings.Join(parts[tagsBeginIndex:tagsEndIndex+1], ",") // offset: table
measurementName := parts[tagsEndIndex+1] // offset: table + numTags
dayBucket := parts[tagsEndIndex+2] // offset: table + numTags + measurementName
timestampNS := parts[tagsEndIndex+3] // offset: table + numTags + numTags + measurementName + dayBucket
value := parts[tagsEndIndex+4] // offset: table + numTags + timestamp + measurementName + dayBucket + timestampNS

return fmt.Sprintf(insertStatement, table, tags, measurementName, dayBucket, timestampNS, value)
}
Expand Down
5 changes: 5 additions & 0 deletions cmd/tsbs_load_cassandra/scan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@ func TestSingleMetricToInsertStatement(t *testing.T) {
inputCSV: "series_double,cpu,hostname=host_0,region=eu-west-1,datacenter=eu-west-1b,rack=67,os=Ubuntu16.10,arch=x86,team=NYC,service=7,service_version=0,service_environment=production,usage_guest_nice,2016-01-01,1451606400000000000,38.2431182911542820",
outputInsertStatement: "INSERT INTO series_double(series_id, timestamp_ns, value) VALUES('cpu,hostname=host_0,region=eu-west-1,datacenter=eu-west-1b,rack=67,os=Ubuntu16.10,arch=x86,team=NYC,service=7,service_version=0,service_environment=production#usage_guest_nice#2016-01-01', 1451606400000000000, 38.2431182911542820)",
},
{
desc: "A properly formatted CSV line with an arbitrary number of tags should result in a properly formatted CQL INSERT statement",
inputCSV: "series_bigint,redis,hostname=host_0,region=eu-west-1,datacenter=eu-west-1b,rack=67,os=Ubuntu16.10,arch=x86,team=NYC,service=7,service_version=0,service_environment=production,port=6379,server=redis_1,used_cpu_user,2016-01-01,1451606400000000000,388",
outputInsertStatement: "INSERT INTO series_bigint(series_id, timestamp_ns, value) VALUES('redis,hostname=host_0,region=eu-west-1,datacenter=eu-west-1b,rack=67,os=Ubuntu16.10,arch=x86,team=NYC,service=7,service_version=0,service_environment=production,port=6379,server=redis_1#used_cpu_user#2016-01-01', 1451606400000000000, 388)",
},
}

for _, c := range cases {
Expand Down

0 comments on commit 282c99e

Please sign in to comment.