Skip to content

Commit

Permalink
Replace EventLog with Logger (#1203)
Browse files Browse the repository at this point in the history
Fixes #1193
  • Loading branch information
gabru-md authored Apr 13, 2020
1 parent 4f6763c commit 09dd2e1
Show file tree
Hide file tree
Showing 7 changed files with 29 additions and 75 deletions.
41 changes: 16 additions & 25 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ import (
"github.com/dgraph-io/ristretto"
humanize "github.com/dustin/go-humanize"
"github.com/pkg/errors"
"golang.org/x/net/trace"
)

var (
Expand Down Expand Up @@ -68,7 +67,6 @@ type DB struct {
valueDirGuard *directoryLockGuard

closers closers
elog trace.EventLog
mt *skl.Skiplist // Our latest (actively written) in-memory table
imm []*skl.Skiplist // Add here only AFTER pushing to flushChan.
opt Options
Expand Down Expand Up @@ -110,7 +108,7 @@ func (db *DB) replayFunction() func(Entry, valuePointer) error {

toLSM := func(nk []byte, vs y.ValueStruct) {
for err := db.ensureRoomForWrite(); err != nil; err = db.ensureRoomForWrite() {
db.elog.Printf("Replay: Making room for writes")
db.opt.Debugf("Replay: Making room for writes")
time.Sleep(10 * time.Millisecond)
}
db.mt.Put(nk, vs)
Expand All @@ -119,7 +117,7 @@ func (db *DB) replayFunction() func(Entry, valuePointer) error {
first := true
return func(e Entry, vp valuePointer) error { // Function for replaying.
if first {
db.elog.Printf("First key=%q\n", e.Key)
db.opt.Debugf("First key=%q\n", e.Key)
}
first = false
db.orc.Lock()
Expand Down Expand Up @@ -282,18 +280,12 @@ func Open(opt Options) (db *DB, err error) {
}
}()

elog := y.NoEventLog
if opt.EventLogging {
elog = trace.NewEventLog("Badger", "DB")
}

db = &DB{
imm: make([]*skl.Skiplist, 0, opt.NumMemtables),
flushChan: make(chan flushTask, opt.NumMemtables),
writeCh: make(chan *request, kvWriteChCapacity),
opt: opt,
manifest: manifestFile,
elog: elog,
dirLockGuard: dirLockGuard,
valueDirGuard: valueDirLockGuard,
orc: newOracle(opt),
Expand Down Expand Up @@ -440,7 +432,7 @@ func (db *DB) Close() error {
}

func (db *DB) close() (err error) {
db.elog.Printf("Closing database")
db.opt.Debugf("Closing database")

atomic.StoreInt32(&db.blockWrites, 1)

Expand Down Expand Up @@ -468,7 +460,7 @@ func (db *DB) close() (err error) {
// trying to push stuff into the memtable. This will also resolve the value
// offset problem: as we push into memtable, we update value offsets there.
if !db.mt.Empty() {
db.elog.Printf("Flushing memtable")
db.opt.Debugf("Flushing memtable")
for {
pushedFlushTask := func() bool {
db.Lock()
Expand All @@ -478,7 +470,7 @@ func (db *DB) close() (err error) {
case db.flushChan <- flushTask{mt: db.mt, vptr: db.vhead}:
db.imm = append(db.imm, db.mt) // Flusher will attempt to remove this from s.imm.
db.mt = nil // Will segfault if we try writing!
db.elog.Printf("pushed to flush chan\n")
db.opt.Debugf("pushed to flush chan\n")
return true
default:
// If we fail to push, we need to unlock and wait for a short while.
Expand Down Expand Up @@ -514,13 +506,12 @@ func (db *DB) close() (err error) {
if lcErr := db.lc.close(); err == nil {
err = errors.Wrap(lcErr, "DB.Close")
}
db.elog.Printf("Waiting for closer")
db.opt.Debugf("Waiting for closer")
db.closers.updateSize.SignalAndWait()
db.orc.Stop()
db.blockCache.Close()
db.bfCache.Close()

db.elog.Finish()
if db.opt.InMemory {
return
}
Expand Down Expand Up @@ -718,16 +709,16 @@ func (db *DB) writeRequests(reqs []*request) error {
r.Wg.Done()
}
}
db.elog.Printf("writeRequests called. Writing to value log")
db.opt.Debugf("writeRequests called. Writing to value log")
err := db.vlog.write(reqs)
if err != nil {
done(err)
return err
}

db.elog.Printf("Sending updates to subscribers")
db.opt.Debugf("Sending updates to subscribers")
db.pub.sendUpdates(reqs)
db.elog.Printf("Writing to memtable")
db.opt.Debugf("Writing to memtable")
var count int
for _, b := range reqs {
if len(b.Entries) == 0 {
Expand All @@ -738,7 +729,7 @@ func (db *DB) writeRequests(reqs []*request) error {
for err = db.ensureRoomForWrite(); err == errNoRoom; err = db.ensureRoomForWrite() {
i++
if i%100 == 0 {
db.elog.Printf("Making room for writes")
db.opt.Debugf("Making room for writes")
}
// We need to poll a bit because both hasRoomForWrite and the flusher need access to s.imm.
// When flushChan is full and you are blocked there, and the flusher is trying to update s.imm,
Expand All @@ -756,7 +747,7 @@ func (db *DB) writeRequests(reqs []*request) error {
db.updateHead(b.Ptrs)
}
done(nil)
db.elog.Printf("%d entries written", count)
db.opt.Debugf("%d entries written", count)
return nil
}

Expand Down Expand Up @@ -970,7 +961,7 @@ func (db *DB) handleFlushTask(ft flushTask) error {

// Store badger head even if vptr is zero, need it for readTs
db.opt.Debugf("Storing value log head: %+v\n", ft.vptr)
db.elog.Printf("Storing offset: %+v\n", ft.vptr)
db.opt.Debugf("Storing offset: %+v\n", ft.vptr)
val := ft.vptr.Encode()

// Pick the max commit ts, so in case of crash, our read ts would be higher than all the
Expand Down Expand Up @@ -1008,17 +999,17 @@ func (db *DB) handleFlushTask(ft flushTask) error {
go func() { dirSyncCh <- db.syncDir(db.opt.Dir) }()

if _, err = fd.Write(tableData); err != nil {
db.elog.Errorf("ERROR while writing to level 0: %v", err)
db.opt.Errorf("ERROR while writing to level 0: %v", err)
return err
}

if dirSyncErr := <-dirSyncCh; dirSyncErr != nil {
// Do dir sync as best effort. No need to return due to an error there.
db.elog.Errorf("ERROR while syncing level directory: %v", dirSyncErr)
db.opt.Errorf("ERROR while syncing level directory: %v", dirSyncErr)
}
tbl, err := table.OpenTable(fd, bopts)
if err != nil {
db.elog.Printf("ERROR while opening table: %v", err)
db.opt.Debugf("ERROR while opening table: %v", err)
return err
}
// We own a ref on tbl.
Expand Down Expand Up @@ -1101,7 +1092,7 @@ func (db *DB) calculateSize() {
return nil
})
if err != nil {
db.elog.Printf("Got error while calculating total size of directory: %s", dir)
db.opt.Debugf("Got error while calculating total size of directory: %s", dir)
}
return lsmSize, vlogSize
}
Expand Down
12 changes: 5 additions & 7 deletions levels.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ import (

type levelsController struct {
nextFileID uint64 // Atomic
elog trace.EventLog

// The following are initialized once and const.
levels []*levelHandler
Expand All @@ -62,7 +61,7 @@ func revertToManifest(kv *DB, mf *Manifest, idMap map[uint64]struct{}) error {
// 2. Delete files that shouldn't exist.
for id := range idMap {
if _, ok := mf.Tables[id]; !ok {
kv.elog.Printf("Table file %d not referenced in MANIFEST\n", id)
kv.opt.Debugf("Table file %d not referenced in MANIFEST\n", id)
filename := table.NewFilename(id, kv.opt.Dir)
if err := os.Remove(filename); err != nil {
return y.Wrapf(err, "While removing table %d", id)
Expand All @@ -77,7 +76,6 @@ func newLevelsController(db *DB, mf *Manifest) (*levelsController, error) {
y.AssertTrue(db.opt.NumLevelZeroTablesStall > db.opt.NumLevelZeroTables)
s := &levelsController{
kv: db,
elog: db.elog,
levels: make([]*levelHandler, db.opt.MaxLevels),
}
s.cstatus.levels = make([]*levelCompactStatus, db.opt.MaxLevels)
Expand Down Expand Up @@ -934,10 +932,10 @@ func (s *levelsController) addLevel0Table(t *table.Table) error {
// Stall. Make sure all levels are healthy before we unstall.
var timeStart time.Time
{
s.elog.Printf("STALLED STALLED STALLED: %v\n", time.Since(s.lastUnstalled))
s.kv.opt.Debugf("STALLED STALLED STALLED: %v\n", time.Since(s.lastUnstalled))
s.cstatus.RLock()
for i := 0; i < s.kv.opt.MaxLevels; i++ {
s.elog.Printf("level=%d. Status=%s Size=%d\n",
s.kv.opt.Debugf("level=%d. Status=%s Size=%d\n",
i, s.cstatus.levels[i].debug(), s.levels[i].getTotalSize())
}
s.cstatus.RUnlock()
Expand All @@ -955,12 +953,12 @@ func (s *levelsController) addLevel0Table(t *table.Table) error {
time.Sleep(10 * time.Millisecond)
if i%100 == 0 {
prios := s.pickCompactLevels()
s.elog.Printf("Waiting to add level 0 table. Compaction priorities: %+v\n", prios)
s.kv.opt.Debugf("Waiting to add level 0 table. Compaction priorities: %+v\n", prios)
i = 0
}
}
{
s.elog.Printf("UNSTALLED UNSTALLED UNSTALLED: %v\n", time.Since(timeStart))
s.kv.opt.Debugf("UNSTALLED UNSTALLED UNSTALLED: %v\n", time.Since(timeStart))
s.lastUnstalled = time.Now()
}
}
Expand Down
1 change: 0 additions & 1 deletion levels_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -548,7 +548,6 @@ func TestL0Stall(t *testing.T) {
}

opt := DefaultOptions("")
opt.EventLogging = false
// Disable all compactions.
opt.NumCompactors = 0
// Number of level zero tables.
Expand Down
12 changes: 0 additions & 12 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ type Options struct {
Truncate bool
Logger Logger
Compression options.CompressionType
EventLogging bool
InMemory bool

// Fine tuning options.
Expand Down Expand Up @@ -156,7 +155,6 @@ func DefaultOptions(path string) Options {
Truncate: false,
Logger: defaultLogger(INFO),
LogRotatesToFlush: 2,
EventLogging: false,
EncryptionKey: []byte{},
EncryptionKeyRotationDuration: 10 * 24 * time.Hour, // Default 10 days.
}
Expand Down Expand Up @@ -305,16 +303,6 @@ func (opt Options) WithLogger(val Logger) Options {
return opt
}

// WithEventLogging returns a new Options value with EventLogging set to the given value.
//
// EventLogging provides a way to enable or disable trace.EventLog logging.
//
// The default value of EventLogging is false.
func (opt Options) WithEventLogging(enabled bool) Options {
opt.EventLogging = enabled
return opt
}

// WithMaxTableSize returns a new Options value with MaxTableSize set to the given value.
//
// MaxTableSize sets the maximum size in bytes for each LSM table or file.
Expand Down
4 changes: 2 additions & 2 deletions txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,8 @@ func newOracle(opt Options) *oracle {
txnMark: &y.WaterMark{Name: "badger.TxnTimestamp"},
closer: y.NewCloser(2),
}
orc.readMark.Init(orc.closer, opt.EventLogging)
orc.txnMark.Init(orc.closer, opt.EventLogging)
orc.readMark.Init(orc.closer)
orc.txnMark.Init(orc.closer)
return orc
}

Expand Down
15 changes: 5 additions & 10 deletions value.go
Original file line number Diff line number Diff line change
Expand Up @@ -832,7 +832,6 @@ type lfDiscardStats struct {

type valueLog struct {
dirPath string
elog trace.EventLog

// guards our view of which files exist, which to be deleted, how many active iterators
filesLock sync.RWMutex
Expand Down Expand Up @@ -1072,10 +1071,7 @@ func (vlog *valueLog) init(db *DB) {
return
}
vlog.dirPath = vlog.opt.ValueDir
vlog.elog = y.NoEventLog
if vlog.opt.EventLogging {
vlog.elog = trace.NewEventLog("Badger", "Valuelog")
}

vlog.garbageCh = make(chan struct{}, 1) // Only allow one GC at a time.
vlog.lfDiscardStats = &lfDiscardStats{
m: make(map[uint32]int64),
Expand Down Expand Up @@ -1229,8 +1225,7 @@ func (vlog *valueLog) Close() error {
// close flushDiscardStats.
vlog.lfDiscardStats.closer.SignalAndWait()

vlog.elog.Printf("Stopping garbage collection of values.")
defer vlog.elog.Finish()
vlog.opt.Debugf("Stopping garbage collection of values.")

var err error
for id, f := range vlog.filesMap {
Expand Down Expand Up @@ -1379,15 +1374,15 @@ func (vlog *valueLog) write(reqs []*request) error {
if buf.Len() == 0 {
return nil
}
vlog.elog.Printf("Flushing buffer of size %d to vlog", buf.Len())
vlog.opt.Debugf("Flushing buffer of size %d to vlog", buf.Len())
n, err := curlf.fd.Write(buf.Bytes())
if err != nil {
return errors.Wrapf(err, "Unable to write to value log file: %q", curlf.path)
}
buf.Reset()
y.NumWrites.Add(1)
y.NumBytesWritten.Add(int64(n))
vlog.elog.Printf("Done")
vlog.opt.Debugf("Done")
atomic.AddUint32(&vlog.writableLogOffset, uint32(n))
atomic.StoreUint32(&curlf.size, vlog.writableLogOffset)
return nil
Expand Down Expand Up @@ -1717,7 +1712,7 @@ func (vlog *valueLog) doRunGC(lf *logFile, discardRatio float64, tr trace.Trace)
// This is still the active entry. This would need to be rewritten.

} else {
vlog.elog.Printf("Reason=%+v\n", r)
vlog.opt.Debugf("Reason=%+v\n", r)
buf, lf, err := vlog.readValueBytes(vp, s)
// we need to decide, whether to unlock the lock file immediately based on the
// loading mode. getUnlockCallback will take care of it.
Expand Down
19 changes: 1 addition & 18 deletions y/watermark.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@ import (
"container/heap"
"context"
"sync/atomic"

"golang.org/x/net/trace"
)

type uint64Heap []uint64
Expand Down Expand Up @@ -64,17 +62,11 @@ type WaterMark struct {
lastIndex uint64
Name string
markCh chan mark
elog trace.EventLog
}

// Init initializes a WaterMark struct. MUST be called before using it.
func (w *WaterMark) Init(closer *Closer, eventLogging bool) {
func (w *WaterMark) Init(closer *Closer) {
w.markCh = make(chan mark, 100)
if eventLogging {
w.elog = trace.NewEventLog("Watermark", w.Name)
} else {
w.elog = NoEventLog
}
go w.process(closer)
}

Expand Down Expand Up @@ -150,7 +142,6 @@ func (w *WaterMark) process(closer *Closer) {
waiters := make(map[uint64][]chan struct{})

heap.Init(&indices)
var loop uint64

processOne := func(index uint64, done bool) {
// If not already done, then set. Otherwise, don't undo a done entry.
Expand All @@ -165,13 +156,6 @@ func (w *WaterMark) process(closer *Closer) {
}
pending[index] = prev + delta

loop++
if len(indices) > 0 && loop%10000 == 0 {
min := indices[0]
w.elog.Printf("WaterMark %s: Done entry %4d. Size: %4d Watermark: %-4d Looking for: "+
"%-4d. Value: %d\n", w.Name, index, len(indices), w.DoneUntil(), min, pending[min])
}

// Update mark by going through all indices in order; and checking if they have
// been done. Stop at the first index, which isn't done.
doneUntil := w.DoneUntil()
Expand All @@ -197,7 +181,6 @@ func (w *WaterMark) process(closer *Closer) {

if until != doneUntil {
AssertTrue(atomic.CompareAndSwapUint64(&w.doneUntil, doneUntil, until))
w.elog.Printf("%s: Done until %d. Loops: %d\n", w.Name, until, loops)
}

notifyAndRemove := func(idx uint64, toNotify []chan struct{}) {
Expand Down

0 comments on commit 09dd2e1

Please sign in to comment.