Skip to content

Commit

Permalink
Merged in rrk/load-handle-count (pull request timescale#52)
Browse files Browse the repository at this point in the history
Add load statistics tracking exclusively to load pkg

Approved-by: Lee Hampton <leejhampton@gmail.com>
  • Loading branch information
RobAtticus committed May 21, 2018
2 parents 6f4d763 + 81df4eb commit e1db088
Show file tree
Hide file tree
Showing 8 changed files with 58 additions and 68 deletions.
12 changes: 5 additions & 7 deletions cmd/tsbs_load_cassandra/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"log"
"os"
"strings"
"sync/atomic"
"time"

"bitbucket.org/440-labs/influxdb-comparisons/load"
Expand All @@ -29,8 +28,7 @@ var (

// Global vars
var (
metricCount uint64
loader *load.BenchmarkRunner
loader *load.BenchmarkRunner
)

// Map of user specified strings to gocql consistency settings
Expand Down Expand Up @@ -102,8 +100,7 @@ func main() {
defer session.Close()
}

b := &benchmark{session: session}
loader.RunBenchmark(b, load.SingleQueue, &metricCount, nil)
loader.RunBenchmark(&benchmark{session: session}, load.SingleQueue)
}

type processor struct {
Expand All @@ -114,7 +111,7 @@ func (p *processor) Init(_ int, _ bool) {}

// ProcessBatch reads eventsBatches which contain rows of CQL strings and
// creates a gocql.LoggedBatch to insert
func (p *processor) ProcessBatch(b load.Batch, doLoad bool) {
func (p *processor) ProcessBatch(b load.Batch, doLoad bool) (uint64, uint64) {
events := b.(*eventsBatch)
if doLoad {
batch := p.session.NewBatch(gocql.LoggedBatch)
Expand All @@ -127,9 +124,10 @@ func (p *processor) ProcessBatch(b load.Batch, doLoad bool) {
log.Fatalf("Error writing: %s\n", err.Error())
}
}
atomic.AddUint64(&metricCount, uint64(len(events.rows)))
metricCnt := uint64(len(events.rows))
events.rows = events.rows[:0]
ePool.Put(events)
return metricCnt, 0
}

func createKeyspace(hosts string) {
Expand Down
13 changes: 5 additions & 8 deletions cmd/tsbs_load_influx/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
"net/url"
"strings"
"sync"
"sync/atomic"
"time"

"bitbucket.org/440-labs/influxdb-comparisons/load"
Expand All @@ -41,9 +40,6 @@ var (
var (
loader *load.BenchmarkRunner
bufPool sync.Pool

rowCount uint64
metricCount uint64
)

var consistencyChoices = map[string]struct{}{
Expand Down Expand Up @@ -131,7 +127,7 @@ func main() {
},
}

loader.RunBenchmark(&benchmark{}, load.SingleQueue, &metricCount, &rowCount)
loader.RunBenchmark(&benchmark{}, load.SingleQueue)
}

type processor struct {
Expand Down Expand Up @@ -160,7 +156,7 @@ func (p *processor) Close(_ bool) {
<-p.backingOffDone
}

func (p *processor) ProcessBatch(b load.Batch, doLoad bool) {
func (p *processor) ProcessBatch(b load.Batch, doLoad bool) (uint64, uint64) {
batch := b.(*batch)

// Write the batch: try until backoff is not needed.
Expand Down Expand Up @@ -190,12 +186,13 @@ func (p *processor) ProcessBatch(b load.Batch, doLoad bool) {
log.Fatalf("Error writing: %s\n", err.Error())
}
}
atomic.AddUint64(&metricCount, batch.metrics)
atomic.AddUint64(&rowCount, batch.rows)
metricCnt := batch.metrics
rowCnt := batch.rows

// Return the batch buffer to the pool.
batch.buf.Reset()
bufPool.Put(batch.buf)
return metricCnt, rowCnt
}

func processBackoffMessages(workerID int, src chan bool, dst chan struct{}) {
Expand Down
10 changes: 5 additions & 5 deletions cmd/tsbs_load_mongo/aggregate_loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"hash/fnv"
"log"
"sync"
"sync/atomic"
"time"

"bitbucket.org/440-labs/influxdb-comparisons/cmd/tsbs_generate_data/serialize"
Expand Down Expand Up @@ -39,6 +38,9 @@ type aggBenchmark struct {
}

func newAggBenchmark(l *load.BenchmarkRunner, session *mgo.Session) *aggBenchmark {
// Pre-create the needed empty subdoc for new aggregate docs
generateEmptyHourDoc()

return &aggBenchmark{mongoBenchmark{l, session}}
}

Expand Down Expand Up @@ -112,7 +114,7 @@ func (p *aggProcessor) Init(workerNum int, doLoad bool) {
// ]
// ]
// }
func (p *aggProcessor) ProcessBatch(b load.Batch, doLoad bool) {
func (p *aggProcessor) ProcessBatch(b load.Batch, doLoad bool) (uint64, uint64) {
docToEvents := make(map[string][]*point)
batch := b.(*batch)

Expand Down Expand Up @@ -202,9 +204,7 @@ func (p *aggProcessor) ProcessBatch(b load.Batch, doLoad bool) {
}
}
}
// Update count of metrics inserted
atomic.AddUint64(&metricCount, eventCnt)

return eventCnt, 0
}

// insertNewAggregateDocs handles creating new aggregated documents when new devices
Expand Down
9 changes: 5 additions & 4 deletions cmd/tsbs_load_mongo/document_per_loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package main
import (
"log"
"sync"
"sync/atomic"

"bitbucket.org/440-labs/influxdb-comparisons/cmd/tsbs_generate_data/serialize"
"bitbucket.org/440-labs/influxdb-comparisons/load"
Expand Down Expand Up @@ -56,13 +55,13 @@ func (p *naiveProcessor) Init(workerNUm int, doLoad bool) {
// ProcessBatch creates a new document for each incoming event for a simpler
// approach to storing the data. This is _NOT_ the default since the aggregation method
// is recommended by Mongo and other blogs
func (p *naiveProcessor) ProcessBatch(b load.Batch, doLoad bool) {
func (p *naiveProcessor) ProcessBatch(b load.Batch, doLoad bool) (uint64, uint64) {
batch := b.(*batch).arr
if cap(p.pvs) < len(batch) {
p.pvs = make([]interface{}, len(batch))
}
p.pvs = p.pvs[:len(batch)]

var metricCnt uint64
for i, event := range batch {
x := spPool.Get().(*singlePoint)

Expand All @@ -81,7 +80,7 @@ func (p *naiveProcessor) ProcessBatch(b load.Batch, doLoad bool) {
x.Tags[string(t.Key())] = string(t.Value())
}
p.pvs[i] = x
atomic.AddUint64(&metricCount, uint64(event.FieldsLength()))
metricCnt += uint64(event.FieldsLength())
}

if doLoad {
Expand All @@ -95,4 +94,6 @@ func (p *naiveProcessor) ProcessBatch(b load.Batch, doLoad bool) {
for _, p := range p.pvs {
spPool.Put(p)
}

return metricCnt, 0
}
8 changes: 2 additions & 6 deletions cmd/tsbs_load_mongo/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,7 @@ var (

// Global vars
var (
metricCount uint64
loader *load.BenchmarkRunner
loader *load.BenchmarkRunner
)

// Parse args:
Expand Down Expand Up @@ -68,14 +67,11 @@ func main() {
benchmark = newNaiveBenchmark(loader, session)
workQueues = load.SingleQueue
} else {
// Pre-create the needed empty subdoc for new aggregate docs
generateEmptyHourDoc()

benchmark = newAggBenchmark(loader, session)
workQueues = load.WorkerPerQueue
}

loader.RunBenchmark(benchmark, workQueues, &metricCount, nil)
loader.RunBenchmark(benchmark, workQueues)
}

func createCollection(session *mgo.Session, collectionName string) {
Expand Down
19 changes: 8 additions & 11 deletions cmd/tsbs_load_timescaledb/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"strconv"
"strings"
"sync"
"sync/atomic"
"time"

"bitbucket.org/440-labs/influxdb-comparisons/load"
Expand Down Expand Up @@ -55,9 +54,7 @@ type insertData struct {

// Global vars
var (
metricCount uint64
rowCount uint64
loader *load.BenchmarkRunner
loader *load.BenchmarkRunner

tableCols map[string][]string
)
Expand Down Expand Up @@ -144,7 +141,7 @@ func main() {
go OutputReplicationStats(getConnectString(), replicationStatsFile, &replicationStatsWaitGroup)
}

loader.RunBenchmark(&benchmark{}, load.SingleQueue, &metricCount, &rowCount)
loader.RunBenchmark(&benchmark{}, load.SingleQueue)

if len(replicationStatsFile) > 0 {
replicationStatsWaitGroup.Wait()
Expand Down Expand Up @@ -376,16 +373,16 @@ func (p *processor) Close(doLoad bool) {
}
}

func (p *processor) ProcessBatch(b load.Batch, doLoad bool) {
func (p *processor) ProcessBatch(b load.Batch, doLoad bool) (uint64, uint64) {
batches := b.(*hypertableArr)
rowsCnt := 0
rowCnt := 0
metricCnt := uint64(0)
for hypertable, rows := range batches.m {
rowsCnt += len(rows)
rowCnt += len(rows)
if doLoad {
start := time.Now()
metricCountWorker := processCSI(p.db, hypertable, rows)
metricCnt += processCSI(p.db, hypertable, rows)
//metricCountWorker := processSplit(db, hypertable, rows)
atomic.AddUint64(&metricCount, metricCountWorker)

if logBatches {
now := time.Now()
Expand All @@ -397,7 +394,7 @@ func (p *processor) ProcessBatch(b load.Batch, doLoad bool) {
}
batches.m = map[string][]*insertData{}
batches.cnt = 0
atomic.AddUint64(&rowCount, uint64(rowsCnt))
return metricCnt, uint64(rowCnt)
}

func createTagsTable(db *sqlx.DB, tags []string) {
Expand Down
53 changes: 27 additions & 26 deletions load/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,9 @@ type BenchmarkRunner struct {
filename string // TODO implement file reading

// non-flag fields
br *bufio.Reader
br *bufio.Reader
metricCnt uint64
rowCnt uint64
}

var loader = &BenchmarkRunner{}
Expand Down Expand Up @@ -91,7 +93,7 @@ func (l *BenchmarkRunner) DoInit() bool {

// RunBenchmark takes in a Benchmark b, a bufio.Reader br, and holders for number of metrics and rows
// and uses those to run the load benchmark
func (l *BenchmarkRunner) RunBenchmark(b Benchmark, workQueues uint, metricCount, rowCount *uint64) {
func (l *BenchmarkRunner) RunBenchmark(b Benchmark, workQueues uint) {
l.br = l.GetBufferedReader()
var wg sync.WaitGroup

Expand All @@ -109,19 +111,19 @@ func (l *BenchmarkRunner) RunBenchmark(b Benchmark, workQueues uint, metricCount

for i := 0; i < int(l.workers); i++ {
wg.Add(1)
go work(b, &wg, channels[i%len(channels)], i, l.doLoad)
go l.work(b, &wg, channels[i%len(channels)], i)
}

start := time.Now()
l.scan(b, channels, maxPartitions, metricCount, rowCount)
l.scan(b, channels, maxPartitions)

for _, c := range channels {
c.close()
}
wg.Wait()
end := time.Now()

summary(end.Sub(start), l.workers, metricCount, rowCount)
l.summary(end.Sub(start))
}

// GetBufferedReader returns the buffered Reader that should be used by the loader
Expand All @@ -138,64 +140,63 @@ func (l *BenchmarkRunner) GetBufferedReader() *bufio.Reader {

// scan launches any needed reporting mechanism and proceeds to scan input data
// to distribute to workers
func (l *BenchmarkRunner) scan(b Benchmark, channels []*duplexChannel, maxPartitions uint, metricCount, rowCount *uint64) int64 {
func (l *BenchmarkRunner) scan(b Benchmark, channels []*duplexChannel, maxPartitions uint) int64 {
if l.reportingPeriod.Nanoseconds() > 0 {
go report(l.reportingPeriod, metricCount, rowCount)
go l.report(l.reportingPeriod)
}
return scanWithIndexer(channels, l.batchSize, l.limit, l.br, b.GetPointDecoder(l.br), b.GetBatchFactory(), b.GetPointIndexer(maxPartitions))
}

// work is the processing function for each worker in the loader
func work(b Benchmark, wg *sync.WaitGroup, c *duplexChannel, workerNum int, doLoad bool) {
func (l *BenchmarkRunner) work(b Benchmark, wg *sync.WaitGroup, c *duplexChannel, workerNum int) {
proc := b.GetProcessor()
proc.Init(workerNum, doLoad)
proc.Init(workerNum, l.doLoad)
for b := range c.toWorker {
proc.ProcessBatch(b, doLoad)
metricCnt, rowCnt := proc.ProcessBatch(b, l.doLoad)
atomic.AddUint64(&l.metricCnt, metricCnt)
atomic.AddUint64(&l.rowCnt, rowCnt)
c.sendToScanner()
}
wg.Done()
switch c := proc.(type) {
case ProcessorCloser:
c.Close(doLoad)
c.Close(l.doLoad)
}
}

// summary prints the summary of statistics from loading
func summary(took time.Duration, workers uint, metricCount, rowCount *uint64) {
metricRate := float64(*metricCount) / float64(took.Seconds())
func (l *BenchmarkRunner) summary(took time.Duration) {
metricRate := float64(l.metricCnt) / float64(took.Seconds())
fmt.Println("\nSummary:")
fmt.Printf("loaded %d metrics in %0.3fsec with %d workers (mean rate %0.2f metrics/sec)\n", *metricCount, took.Seconds(), workers, metricRate)
if rowCount != nil {
rowRate := float64(*rowCount) / float64(took.Seconds())
fmt.Printf("loaded %d rows in %0.3fsec with %d workers (mean rate %0.2f rows/sec)\n", *rowCount, took.Seconds(), workers, rowRate)
fmt.Printf("loaded %d metrics in %0.3fsec with %d workers (mean rate %0.2f metrics/sec)\n", l.metricCnt, took.Seconds(), l.workers, metricRate)
if l.rowCnt > 0 {
rowRate := float64(l.rowCnt) / float64(took.Seconds())
fmt.Printf("loaded %d rows in %0.3fsec with %d workers (mean rate %0.2f rows/sec)\n", l.rowCnt, took.Seconds(), l.workers, rowRate)
}
}

// report handles periodic reporting of loading stats
func report(period time.Duration, metricCount, rowCount *uint64) {
func (l *BenchmarkRunner) report(period time.Duration) {
start := time.Now()
prevTime := start
prevColCount := uint64(0)
prevRowCount := uint64(0)

rCount := uint64(0)
fmt.Printf("time,per. metric/s,metric total,overall metric/s,per. row/s,row total,overall row/s\n")
for now := range time.NewTicker(period).C {
cCount := atomic.LoadUint64(metricCount)
if rowCount != nil {
rCount = atomic.LoadUint64(rowCount)
}
cCount := atomic.LoadUint64(&l.metricCnt)
rCount := atomic.LoadUint64(&l.rowCnt)

sinceStart := now.Sub(start)
took := now.Sub(prevTime)
colrate := float64(cCount-prevColCount) / float64(took.Seconds())
overallColRate := float64(cCount) / float64(sinceStart.Seconds())
if rowCount != nil {
if rCount > 0 {
rowrate := float64(rCount-prevRowCount) / float64(took.Seconds())
overallRowRate := float64(rCount) / float64(sinceStart.Seconds())
fmt.Printf("%d,%0.3f,%E,%0.3f,%0.3f,%E,%0.3f\n", now.Unix(), colrate, float64(cCount), overallColRate, rowrate, float64(rCount), overallRowRate)
fmt.Printf("%d,%0.2f,%E,%0.2f,%0.2f,%E,%0.2f\n", now.Unix(), colrate, float64(cCount), overallColRate, rowrate, float64(rCount), overallRowRate)
} else {
fmt.Printf("%d,%0.3f,%E,%0.3f,-,-,-\n", now.Unix(), colrate, float64(cCount), overallColRate)
fmt.Printf("%d,%0.2f,%E,%0.2f,-,-,-\n", now.Unix(), colrate, float64(cCount), overallColRate)
}

prevColCount = cCount
Expand Down
Loading

0 comments on commit e1db088

Please sign in to comment.