Skip to content

Commit

Permalink
Merged in rrk/dbcreator-interface (pull request timescale#73)
Browse files Browse the repository at this point in the history
Unify the database setup in loaders to use DBCreator interface

Approved-by: Lee Hampton <leejhampton@gmail.com>
  • Loading branch information
RobAtticus committed Jul 26, 2018
2 parents 93f883d + 53baad7 commit f018743
Show file tree
Hide file tree
Showing 18 changed files with 741 additions and 571 deletions.
86 changes: 86 additions & 0 deletions cmd/tsbs_load_cassandra/creator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package main

import (
"fmt"
"log"
"strings"
"time"

"github.com/gocql/gocql"
)

type dbCreator struct {
globalSession *gocql.Session
clientSession *gocql.Session
}

func (d *dbCreator) Init() {
cluster := gocql.NewCluster(strings.Split(hosts, ",")...)
cluster.Consistency = consistencyMapping[consistencyLevel]
cluster.ProtoVersion = 4
cluster.Timeout = 10 * time.Second
session, err := cluster.CreateSession()
if err != nil {
log.Fatal(err)
}
d.globalSession = session
}

func (d *dbCreator) DBExists(dbName string) bool {
iter := d.globalSession.Query(fmt.Sprintf("SELECT keyspace_name FROM system_schema.keyspaces;")).Iter()
defer iter.Close()
row := ""
for iter.Scan(&row) {
if row == dbName {
return true
}
}
return false
}

func (d *dbCreator) RemoveOldDB(dbName string) error {
if err := d.globalSession.Query(fmt.Sprintf("drop keyspace if exists %s;", dbName)).Exec(); err != nil {
return err
}
return nil
}

func (d *dbCreator) CreateDB(dbName string) error {
defer d.globalSession.Close()
replicationConfiguration := fmt.Sprintf("{ 'class': 'SimpleStrategy', 'replication_factor': %d }", replicationFactor)
if err := d.globalSession.Query(fmt.Sprintf("create keyspace %s with replication = %s;", dbName, replicationConfiguration)).Exec(); err != nil {
return err
}
for _, cassandraTypename := range []string{"bigint", "float", "double", "boolean", "blob"} {
q := fmt.Sprintf(`CREATE TABLE %s.series_%s (
series_id text,
timestamp_ns bigint,
value %s,
PRIMARY KEY (series_id, timestamp_ns)
)
WITH COMPACT STORAGE;`,
dbName, cassandraTypename, cassandraTypename)
if err := d.globalSession.Query(q).Exec(); err != nil {
return err
}
}
return nil
}

func (d *dbCreator) PostCreateDB(dbName string) error {
cluster := gocql.NewCluster(strings.Split(hosts, ",")...)
cluster.Keyspace = dbName
cluster.Timeout = writeTimeout
cluster.Consistency = consistencyMapping[consistencyLevel]
cluster.ProtoVersion = 4
session, err := cluster.CreateSession()
if err != nil {
return err
}
d.clientSession = session
return nil
}

func (d *dbCreator) Close() {
d.clientSession.Close()
}
73 changes: 10 additions & 63 deletions cmd/tsbs_load_cassandra/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"fmt"
"log"
"os"
"strings"
"time"

"bitbucket.org/440-labs/tsbs/load"
Expand Down Expand Up @@ -60,7 +59,7 @@ func init() {
}

type benchmark struct {
session *gocql.Session
dbc *dbCreator
}

func (b *benchmark) GetPointDecoder(br *bufio.Reader) load.PointDecoder {
Expand All @@ -76,34 +75,19 @@ func (b *benchmark) GetPointIndexer(_ uint) load.PointIndexer {
}

func (b *benchmark) GetProcessor() load.Processor {
return &processor{b.session}
return &processor{b.dbc}
}

func main() {
var session *gocql.Session
if loader.DoLoad() {
if loader.DoInit() {
createKeyspace(hosts)
}

cluster := gocql.NewCluster(strings.Split(hosts, ",")...)
cluster.Keyspace = loader.DatabaseName()
cluster.Timeout = writeTimeout
cluster.Consistency = consistencyMapping[consistencyLevel]
cluster.ProtoVersion = 4
var err error
session, err = cluster.CreateSession()
if err != nil {
log.Fatal(err)
}
defer session.Close()
}
func (b *benchmark) GetDBCreator() load.DBCreator {
return b.dbc
}

loader.RunBenchmark(&benchmark{session: session}, load.SingleQueue)
func main() {
loader.RunBenchmark(&benchmark{dbc: &dbCreator{}}, load.SingleQueue)
}

type processor struct {
session *gocql.Session
dbc *dbCreator
}

func (p *processor) Init(_ int, _ bool) {}
Expand All @@ -114,12 +98,12 @@ func (p *processor) ProcessBatch(b load.Batch, doLoad bool) (uint64, uint64) {
events := b.(*eventsBatch)

if doLoad {
batch := p.session.NewBatch(gocql.LoggedBatch)
batch := p.dbc.clientSession.NewBatch(gocql.LoggedBatch)
for _, event := range events.rows {
batch.Query(singleMetricToInsertStatement(event))
}

err := p.session.ExecuteBatch(batch)
err := p.dbc.clientSession.ExecuteBatch(batch)
if err != nil {
log.Fatalf("Error writing: %s\n", err.Error())
}
Expand All @@ -129,40 +113,3 @@ func (p *processor) ProcessBatch(b load.Batch, doLoad bool) (uint64, uint64) {
ePool.Put(events)
return metricCnt, 0
}

func createKeyspace(hosts string) {
cluster := gocql.NewCluster(strings.Split(hosts, ",")...)
cluster.Consistency = consistencyMapping[consistencyLevel]
cluster.ProtoVersion = 4
cluster.Timeout = 10 * time.Second
session, err := cluster.CreateSession()
if err != nil {
log.Fatal(err)
}
defer session.Close()

// Drop the keyspace to avoid errors about existing keyspaces
if err := session.Query(fmt.Sprintf("drop keyspace if exists %s;", loader.DatabaseName())).Exec(); err != nil {
log.Fatal(err)
}

replicationConfiguration := fmt.Sprintf("{ 'class': 'SimpleStrategy', 'replication_factor': %d }", replicationFactor)
if err := session.Query(fmt.Sprintf("create keyspace %s with replication = %s;", loader.DatabaseName(), replicationConfiguration)).Exec(); err != nil {
log.Print("if you know what you are doing, drop the keyspace with a command line:")
log.Print(fmt.Sprintf("echo 'drop keyspace %s;' | cqlsh <host>", loader.DatabaseName()))
log.Fatal(err)
}
for _, cassandraTypename := range []string{"bigint", "float", "double", "boolean", "blob"} {
q := fmt.Sprintf(`CREATE TABLE %s.series_%s (
series_id text,
timestamp_ns bigint,
value %s,
PRIMARY KEY (series_id, timestamp_ns)
)
WITH COMPACT STORAGE;`,
loader.DatabaseName(), cassandraTypename, cassandraTypename)
if err := session.Query(q).Exec(); err != nil {
log.Fatal(err)
}
}
}
120 changes: 120 additions & 0 deletions cmd/tsbs_load_influx/creator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
package main

import (
"encoding/json"
"fmt"
"io/ioutil"
"log"
"net/http"
"net/url"
"time"
)

type dbCreator struct {
daemonURL string
}

func (d *dbCreator) Init() {
d.daemonURL = daemonURLs[0] // pick first one since it always exists
}

func (d *dbCreator) DBExists(dbName string) bool {
dbs, err := d.listDatabases()
if err != nil {
log.Fatal(err)
}

for _, db := range dbs {
if db == loader.DatabaseName() {
return true
}
}
return false
}

func (d *dbCreator) listDatabases() ([]string, error) {
u := fmt.Sprintf("%s/query?q=show%%20databases", d.daemonURL)
resp, err := http.Get(u)
if err != nil {
return nil, fmt.Errorf("listDatabases error: %s", err.Error())
}
defer resp.Body.Close()

body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, err
}

// Do ad-hoc parsing to find existing database names:
// {"results":[{"series":[{"name":"databases","columns":["name"],"values":[["_internal"],["benchmark_db"]]}]}]}%
type listingType struct {
Results []struct {
Series []struct {
Values [][]string
}
}
}
var listing listingType
err = json.Unmarshal(body, &listing)
if err != nil {
return nil, err
}

ret := []string{}
for _, nestedName := range listing.Results[0].Series[0].Values {
name := nestedName[0]
// the _internal database is skipped:
if name == "_internal" {
continue
}
ret = append(ret, name)
}
return ret, nil
}

func (d *dbCreator) RemoveOldDB(dbName string) error {
u := fmt.Sprintf("%s/query?q=drop+database+%s", d.daemonURL, dbName)
resp, err := http.Post(u, "text/plain", nil)
if err != nil {
return fmt.Errorf("drop db error: %s", err.Error())
}
if resp.StatusCode != 200 {
return fmt.Errorf("drop db returned non-200 code: %d", resp.StatusCode)
}
time.Sleep(time.Second)
return nil
}

func (d *dbCreator) CreateDB(dbName string) error {
u, err := url.Parse(d.daemonURL)
if err != nil {
return err
}

// serialize params the right way:
u.Path = "query"
v := u.Query()
v.Set("consistency", "all")
v.Set("q", fmt.Sprintf("CREATE DATABASE %s WITH REPLICATION %d", dbName, replicationFactor))
u.RawQuery = v.Encode()

req, err := http.NewRequest("GET", u.String(), nil)
if err != nil {
return err
}

client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
// does the body need to be read into the void?

if resp.StatusCode != 200 {
return fmt.Errorf("bad db create")
}

time.Sleep(time.Second)
return nil
}
Loading

0 comments on commit f018743

Please sign in to comment.