Skip to content

Commit

Permalink
Merge pull request #3569 from influxdb/pd-wal
Browse files Browse the repository at this point in the history
Add initial WAL implementation and tests
  • Loading branch information
pauldix committed Aug 19, 2015
2 parents e5704e4 + 028d0a6 commit c31b88d
Show file tree
Hide file tree
Showing 21 changed files with 3,168 additions and 190 deletions.
154 changes: 154 additions & 0 deletions cmd/influx_stress/influx_stress.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
package main

import (
"flag"
"fmt"
"math/rand"
"net/url"
"runtime"
"sort"
"sync"
"time"

"github.com/influxdb/influxdb/client"
)

var (
batchSize = flag.Int("batchsize", 5000, "number of points per batch")
seriesCount = flag.Int("series", 100000, "number of unique series to create")
pointCount = flag.Int("points", 100, "number of points per series to create")
concurrency = flag.Int("concurrency", 10, "number of simultaneous writes to run")
batchInterval = flag.Duration("batchinterval", 0*time.Second, "duration between batches")
database = flag.String("database", "stress", "name of database")
address = flag.String("addr", "localhost:8086", "IP address and port of database (e.g., localhost:8086)")
)

func main() {
flag.Parse()
runtime.GOMAXPROCS(runtime.NumCPU())

startTime := time.Now()
counter := NewConcurrencyLimiter(*concurrency)

u, _ := url.Parse(fmt.Sprintf("http://%s", *address))
c, err := client.NewClient(client.Config{URL: *u})
if err != nil {
panic(err)
}

var mu sync.Mutex
var wg sync.WaitGroup
responseTimes := make([]int, 0)

totalPoints := 0

for i := 1; i <= *pointCount; i++ {
batch := &client.BatchPoints{
Database: *database,
WriteConsistency: "any",
Time: time.Now(),
Precision: "n",
}

for j := 1; j <= *seriesCount; j++ {
p := client.Point{
Measurement: "cpu",
Tags: map[string]string{"region": "uswest", "host": fmt.Sprintf("host-%d", j)},
Fields: map[string]interface{}{"value": rand.Float64()},
}
batch.Points = append(batch.Points, p)
if len(batch.Points) >= *batchSize {
wg.Add(1)
counter.Increment()
totalPoints += len(batch.Points)
go func(b *client.BatchPoints, total int) {
st := time.Now()
if _, err := c.Write(*b); err != nil {
fmt.Println("ERROR: ", err.Error())
} else {
mu.Lock()
responseTimes = append(responseTimes, int(time.Since(st).Nanoseconds()))
mu.Unlock()
}
wg.Done()
counter.Decrement()
if total%1000000 == 0 {
fmt.Printf("%d total points. %d in %s\n", total, *batchSize, time.Since(st))
}
}(batch, totalPoints)

batch = &client.BatchPoints{
Database: *database,
WriteConsistency: "any",
Precision: "n",
}
}
}
}

wg.Wait()
sort.Sort(sort.Reverse(sort.IntSlice(responseTimes)))

total := int64(0)
for _, t := range responseTimes {
total += int64(t)
}
mean := total / int64(len(responseTimes))

fmt.Printf("Wrote %d points at average rate of %.0f\n", totalPoints, float64(totalPoints)/time.Since(startTime).Seconds())
fmt.Println("Average response time: ", time.Duration(mean))
fmt.Println("Slowest response times:")
for _, r := range responseTimes[:100] {
fmt.Println(time.Duration(r))
}
}

// ConcurrencyLimiter is a go routine safe struct that can be used to
// ensure that no more than a specifid max number of goroutines are
// executing.
type ConcurrencyLimiter struct {
inc chan chan struct{}
dec chan struct{}
max int
count int
}

// NewConcurrencyLimiter returns a configured limiter that will
// ensure that calls to Increment will block if the max is hit.
func NewConcurrencyLimiter(max int) *ConcurrencyLimiter {
c := &ConcurrencyLimiter{
inc: make(chan chan struct{}),
dec: make(chan struct{}, max),
max: max,
}
go c.handleLimits()
return c
}

// Increment will increase the count of running goroutines by 1.
// if the number is currently at the max, the call to Increment
// will block until another goroutine decrements.
func (c *ConcurrencyLimiter) Increment() {
r := make(chan struct{})
c.inc <- r
<-r
}

// Decrement will reduce the count of running goroutines by 1
func (c *ConcurrencyLimiter) Decrement() {
c.dec <- struct{}{}
}

// handleLimits runs in a goroutine to manage the count of
// running goroutines.
func (c *ConcurrencyLimiter) handleLimits() {
for {
r := <-c.inc
if c.count >= c.max {
<-c.dec
c.count--
}
c.count++
r <- struct{}{}
}
}
5 changes: 4 additions & 1 deletion cmd/influxd/run/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,9 @@ type Server struct {
// NewServer returns a new instance of Server built from a config.
func NewServer(c *Config, version string) (*Server, error) {
// Construct base meta store and data store.
tsdbStore := tsdb.NewStore(c.Data.Dir)
tsdbStore.EngineOptions.Config = c.Data

s := &Server{
version: version,
err: make(chan error),
Expand All @@ -77,7 +80,7 @@ func NewServer(c *Config, version string) (*Server, error) {
BindAddress: c.Meta.BindAddress,

MetaStore: meta.NewStore(c.Meta),
TSDBStore: tsdb.NewStore(c.Data.Dir),
TSDBStore: tsdbStore,

reportingDisabled: c.ReportingDisabled,
}
Expand Down
2 changes: 2 additions & 0 deletions cmd/influxd/run/server_helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ func NewServer(c *run.Config) *Server {
Server: srv,
Config: c,
}
s.TSDBStore.EngineOptions.Config = c.Data
configureLogging(&s)
return &s
}
Expand Down Expand Up @@ -155,6 +156,7 @@ func NewConfig() *run.Config {
c.Meta.CommitTimeout = toml.Duration(5 * time.Millisecond)

c.Data.Dir = MustTempFile()
c.Data.WALDir = MustTempFile()

c.HintedHandoff.Dir = MustTempFile()

Expand Down
27 changes: 27 additions & 0 deletions etc/config.sample.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,37 @@ reporting-disabled = false

[data]
dir = "/var/opt/influxdb/data"

# The following WAL settings are for the b1 storage engine used in 0.9.2. They won't
# apply to any new shards created after upgrading to a version > 0.9.3.
max-wal-size = 104857600 # Maximum size the WAL can reach before a flush. Defaults to 100MB.
wal-flush-interval = "10m" # Maximum time data can sit in WAL before a flush.
wal-partition-flush-delay = "2s" # The delay time between each WAL partition being flushed.

# These are the WAL settings for the storage engine >= 0.9.3
wal-dir = "/var/opt/influxdb/wal"
wal-enable-logging = true

# When a series in the WAL in-memory cache reaches this size in bytes it is marked as ready to
# flush to the index
# wal-ready-series-size = 25600

# Flush and compact a partition once this ratio of series are over the ready size
# wal-compaction-threshold = 0.6

# Force a flush and compaction if any series in a partition gets above this size in bytes
# wal-max-series-size = 2097152

# Force a flush of all series and full compaction if there have been no writes in this
# amount of time. This is useful for ensuring that shards that are cold for writes don't
# keep a bunch of data cached in memory and in the WAL.
# wal-flush-cold-interval = "10m"

# Force a partition to flush its largest series if it reaches this approximate size in
# bytes. Remember there are 5 partitions so you'll need at least 5x this amount of memory.
# The more memory you have, the bigger this can be.
# wal-partition-size-threshold = 20971520

###
### [cluster]
###
Expand Down
44 changes: 43 additions & 1 deletion tsdb/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,61 @@ const (

// DefaultWALPartitionFlushDelay is the sleep time between WAL partition flushes.
DefaultWALPartitionFlushDelay = 2 * time.Second

// tsdb/engine/wal configuration options

// DefaultReadySeriesSize of 32KB specifies when a series is eligible to be flushed
DefaultReadySeriesSize = 30 * 1024

// DefaultCompactionThreshold flush and compact a partition once this ratio of keys are over the flush size
DefaultCompactionThreshold = 0.5

// DefaultMaxSeriesSize specifies the size at which a series will be forced to flush
DefaultMaxSeriesSize = 1024 * 1024

// DefaultFlushColdInterval specifies how long after a partition has been cold
// for writes that a full flush and compaction are forced
DefaultFlushColdInterval = 5 * time.Minute

// DefaultParititionSizeThreshold specifies when a partition gets to this size in
// memory, we should slow down writes until it gets a chance to compact.
// This will force clients to get backpressure if they're writing too fast. We need
// this because the WAL can take writes much faster than the index. So eventually
// we'll need to create backpressure, otherwise we'll fill up the memory and die.
// This number multiplied by the parition count is roughly the max possible memory
// size for the in-memory WAL cache.
DefaultPartitionSizeThreshold = 20 * 1024 * 1024 // 20MB
)

type Config struct {
Dir string `toml:"dir"`
Dir string `toml:"dir"`

// WAL config options for b1 (introduced in 0.9.2)
MaxWALSize int `toml:"max-wal-size"`
WALFlushInterval toml.Duration `toml:"wal-flush-interval"`
WALPartitionFlushDelay toml.Duration `toml:"wal-partition-flush-delay"`

// WAL configuration options for bz1 (introduced in 0.9.3)
WALDir string `toml:"wal-dir"`
WALEnableLogging bool `toml:"wal-enable-logging"`
WALReadySeriesSize int `toml:"wal-ready-series-size"`
WALCompactionThreshold float64 `toml:"wal-compaction-threshold"`
WALMaxSeriesSize int `toml:"wal-max-series-size"`
WALFlushColdInterval toml.Duration `toml:"wal-flush-cold-interval"`
WALPartitionSizeThreshold uint64 `toml:"wal-partition-size-threshold"`
}

func NewConfig() Config {
return Config{
MaxWALSize: DefaultMaxWALSize,
WALFlushInterval: toml.Duration(DefaultWALFlushInterval),
WALPartitionFlushDelay: toml.Duration(DefaultWALPartitionFlushDelay),

WALEnableLogging: true,
WALReadySeriesSize: DefaultReadySeriesSize,
WALCompactionThreshold: DefaultCompactionThreshold,
WALMaxSeriesSize: DefaultMaxSeriesSize,
WALFlushColdInterval: toml.Duration(DefaultFlushColdInterval),
WALPartitionSizeThreshold: DefaultPartitionSizeThreshold,
}
}
37 changes: 35 additions & 2 deletions tsdb/engine.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package tsdb

import (
"bytes"
"errors"
"fmt"
"io"
"os"
"sort"
"time"

"github.com/boltdb/bolt"
Expand All @@ -16,7 +18,7 @@ var (
)

// DefaultEngine is the default engine used by the shard when initializing.
const DefaultEngine = "b1"
const DefaultEngine = "bz1"

// Engine represents a swappable storage engine for the shard.
type Engine interface {
Expand Down Expand Up @@ -52,7 +54,7 @@ func RegisterEngine(name string, fn NewEngineFunc) {
func NewEngine(path string, options EngineOptions) (Engine, error) {
// Create a new engine
if _, err := os.Stat(path); os.IsNotExist(err) {
return newEngineFuncs[DefaultEngine](path, options), nil
return newEngineFuncs[options.EngineVersion](path, options), nil
}

// Only bolt-based backends are currently supported so open it and check the format.
Expand Down Expand Up @@ -96,17 +98,22 @@ func NewEngine(path string, options EngineOptions) (Engine, error) {

// EngineOptions represents the options used to initialize the engine.
type EngineOptions struct {
EngineVersion string
MaxWALSize int
WALFlushInterval time.Duration
WALPartitionFlushDelay time.Duration

Config Config
}

// NewEngineOptions returns the default options.
func NewEngineOptions() EngineOptions {
return EngineOptions{
EngineVersion: DefaultEngine,
MaxWALSize: DefaultMaxWALSize,
WALFlushInterval: DefaultWALFlushInterval,
WALPartitionFlushDelay: DefaultWALPartitionFlushDelay,
Config: NewConfig(),
}
}

Expand All @@ -125,3 +132,29 @@ type Cursor interface {
Seek(seek []byte) (key, value []byte)
Next() (key, value []byte)
}

// DedupeEntries returns slices with unique keys (the first 8 bytes).
func DedupeEntries(a [][]byte) [][]byte {
// Convert to a map where the last slice is used.
m := make(map[string][]byte)
for _, b := range a {
m[string(b[0:8])] = b
}

// Convert map back to a slice of byte slices.
other := make([][]byte, 0, len(m))
for _, v := range m {
other = append(other, v)
}

// Sort entries.
sort.Sort(ByteSlices(other))

return other
}

type ByteSlices [][]byte

func (a ByteSlices) Len() int { return len(a) }
func (a ByteSlices) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a ByteSlices) Less(i, j int) bool { return bytes.Compare(a[i], a[j]) == -1 }
2 changes: 1 addition & 1 deletion tsdb/engine/b1/b1.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ func (e *Engine) LoadMetadataIndex(index *tsdb.DatabaseIndex, measurementFields
meta = tx.Bucket([]byte("series"))
c = meta.Cursor()
for k, v := c.First(); k != nil; k, v = c.Next() {
series := &tsdb.Series{}
series := tsdb.NewSeries("", nil)
if err := series.UnmarshalBinary(v); err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion tsdb/engine/b1/b1_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func TestEngine_WritePoints(t *testing.T) {
mf := &tsdb.MeasurementFields{Fields: make(map[string]*tsdb.Field)}
mf.CreateFieldIfNotExists("value", influxql.Float)
seriesToCreate := []*tsdb.SeriesCreate{
{Series: &tsdb.Series{Key: string(tsdb.MakeKey([]byte("temperature"), nil))}},
{Series: tsdb.NewSeries(string(tsdb.MakeKey([]byte("temperature"), nil)), nil)},
}

// Parse point.
Expand Down
Loading

0 comments on commit c31b88d

Please sign in to comment.