Skip to content

Commit

Permalink
refactored code
Browse files Browse the repository at this point in the history
add setters and getters for segment, update segment via setters

fix offset bug in KeyValueEntry creation and other minor bugs
  • Loading branch information
manosriram committed Dec 24, 2023
1 parent 5a86adc commit a973e72
Show file tree
Hide file tree
Showing 4 changed files with 118 additions and 73 deletions.
102 changes: 67 additions & 35 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,14 @@ import (
"io/fs"
"log"
"os"
"os/signal"
"path"
"path/filepath"
"strconv"
"strings"
"sync"
"sync/atomic"
"syscall"
"time"

"github.com/google/btree"
Expand Down Expand Up @@ -49,9 +51,8 @@ const (
KeySizeOffset = 21
ValueSizeOffset = 31

StaticChunkSize = 1 + 10 + 10 + 10
BTreeDegree = 10
SegmentInitialOffset = 0
StaticChunkSize = 1 + 10 + 10 + 10
BTreeDegree = 10
)
const (
TotalStaticChunkSize int64 = TstampOffset + KeySizeOffset + ValueSizeOffset + DeleteFlagOffset + StaticChunkSize
Expand All @@ -74,6 +75,9 @@ const (

LRU_SIZE = 50
LRU_TTL = 24 * time.Hour

EXIT_FAIL = 0
INITIAL_SEGMENT_OFFSET = 0
)

type Options struct {
Expand Down Expand Up @@ -106,17 +110,16 @@ func NewKeyDirValue(offset, size, tstamp int64, path string) *KeyDirValue {
}

type Db struct {
dirPath string
dataFilePath string
activeDataFile string
activeDataFilePointer *os.File
inActiveDataFilePointers *sync.Map
keyDir *BTree
opts *Options
lastOffset atomic.Int64
mu sync.RWMutex
segments map[string]*Segment
lru *expirable.LRU[int64, *Block]
dirPath string
dataFilePath string
activeDataFile string
activeDataFilePointer *os.File
keyDir *BTree
opts *Options
lastOffset atomic.Int64
mu sync.RWMutex
segments map[string]*Segment
lru *expirable.LRU[int64, *Block]
}

func NewDb(dirPath string) *Db {
Expand All @@ -126,9 +129,8 @@ func NewDb(dirPath string) *Db {
keyDir: &BTree{
tree: btree.New(BTreeDegree),
},
lru: expirable.NewLRU[int64, *Block](LRU_SIZE, nil, LRU_TTL),
segments: segments,
inActiveDataFilePointers: &sync.Map{},
lru: expirable.NewLRU[int64, *Block](LRU_SIZE, nil, LRU_TTL),
segments: segments,
}

return db
Expand Down Expand Up @@ -193,7 +195,7 @@ func (db *Db) setKeyDir(key []byte, kdValue KeyDirValue) (interface{}, error) {
return nil, ERROR_KEY_VALUE_SIZE_EXCEEDED
}

segment, ok := db.segments[kdValue.path]
segment, ok := db.getSegment(kdValue.path)
if !ok {
newSegment := createNewSegment(&kdValue)
fp, err := db.getSegmentFilePointerFromPath(kdValue.path)
Expand All @@ -205,15 +207,15 @@ func (db *Db) setKeyDir(key []byte, kdValue KeyDirValue) (interface{}, error) {

db.setSegment(kdValue.path, newSegment)
} else {
err := db.updateSegment(&kdValue, segment)
segment, err := db.updateSegment(&kdValue, segment)
if err != nil {
return nil, err
}
db.removeBlockCache(segment.getBlockNumber())
}

db.keyDir.Set(key, kdValue)
db.lastOffset.Store(kdValue.offset + kdValue.size)
db.removeBlockCache(db.getSegmentBlockNumber(kdValue.path))

return kdValue, nil
}
Expand All @@ -237,7 +239,11 @@ func (db *Db) getKeyDir(key []byte) (*KeyValueEntry, error) {
}
}

segment := db.segments[kv.path]
segment, ok := db.getSegment(kv.path)
if !ok {
return nil, ERROR_NO_ACTIVE_FILE_OPENED
}

block, ok := db.getSegmentBlock(kv.path, kv.blockNumber)
if !ok {
return nil, ERROR_CANNOT_READ_FILE
Expand Down Expand Up @@ -266,7 +272,7 @@ func (db *Db) getKeyDir(key []byte) (*KeyValueEntry, error) {
db.keyDir.Delete(key)
return nil, ERROR_KEY_NOT_FOUND
}
db.lru.Add(kv.blockNumber, cacheBlock)
db.setBlockCache(kv.blockNumber, cacheBlock)
return v, nil
}

Expand Down Expand Up @@ -422,7 +428,7 @@ func (db *Db) parseActiveKeyValueEntryFile(filePath string) error {
func (db *Db) createInactiveDatafile(dirPath string) error {
file, err := os.CreateTemp(dirPath, TempInactiveDataFilePattern)
db.setActiveDataFile(file.Name())
db.setLastOffset(0)
db.setLastOffset(INITIAL_SEGMENT_OFFSET)
if err != nil {
return err
}
Expand Down Expand Up @@ -452,21 +458,41 @@ func (db *Db) createActiveDatafile(dirPath string) error {
if err != nil {
return err
}
db.setSegment(inactiveName, db.getSegment(file.Name()))
db.setSegmentPath(inactiveName, inactiveName)
db.setSegmentFp(inactiveName, fp)
segment, ok := db.getSegment(file.Name())
if !ok {
return ERROR_CANNOT_READ_FILE
}
db.setSegment(inactiveName, segment)
segment.setPath(inactiveName)
segment.setFp(fp)
}
}

file, err := os.CreateTemp(dirPath, TempDataFilePattern)
db.setActiveDataFile(file.Name())
db.setLastOffset(SegmentInitialOffset)
db.setLastOffset(INITIAL_SEGMENT_OFFSET)
if err != nil {
return err
}
return nil
}

func (db *Db) handleInterrupt() {
terminateSignal := make(chan os.Signal, 1)
signal.Notify(terminateSignal, syscall.SIGINT, syscall.SIGKILL, syscall.SIGTERM)
for {
select {
case s := <-terminateSignal:
err := db.Close()
if err != nil {
log.Panicf("error closing DB: %s\n", err.Error())
}
log.Printf("closing DB via interrupt %v", s)
os.Exit(EXIT_FAIL)
}
}
}

func Open(opts *Options) (*Db, error) {
defer utils.Recover()

Expand All @@ -480,6 +506,8 @@ func Open(opts *Options) (*Db, error) {
dirPath = utils.JoinPaths(home, DefaultDataDir)
}
db := NewDb(dirPath)
go db.handleInterrupt()

err := os.MkdirAll(dirPath, os.ModePerm)
if err != nil {
return nil, err
Expand Down Expand Up @@ -527,9 +555,9 @@ func (db *Db) Close() error {
return err
}
for _, segment := range db.segments {
if !segment.closed {
segment.fp.Close()
segment.closed = true
err := segment.closeFp()
if err != nil {
return err
}
}
return nil
Expand Down Expand Up @@ -570,9 +598,13 @@ func (db *Db) deleteKey(key []byte) error {
return ERROR_KEY_NOT_FOUND
}

f := db.segments[v.path]
f.fp.WriteAt([]byte{DELETED_FLAG_SET_VALUE}, v.offset)
db.lru.Remove(v.blockNumber)
segment, ok := db.getSegment(v.path)
if !ok {
return ERROR_CANNOT_READ_FILE
}
fp := segment.getFp()
fp.WriteAt([]byte{DELETED_FLAG_SET_VALUE}, v.offset)
db.removeBlockCache(v.blockNumber)
db.keyDir.Delete(key)

return nil
Expand Down Expand Up @@ -638,7 +670,7 @@ func (db *Db) walk(s string, file fs.DirEntry, err error) error {

path := utils.JoinPaths(db.dirPath, file.Name())
db.setActiveDataFile(path)
db.setLastOffset(SegmentInitialOffset)
db.setLastOffset(INITIAL_SEGMENT_OFFSET)

keyValueEntries, _ := db.getActiveFileKeyValueEntries(path)
if len(keyValueEntries) == 0 {
Expand Down Expand Up @@ -666,7 +698,7 @@ func (db *Db) walk(s string, file fs.DirEntry, err error) error {
func (db *Db) Sync() error {
err := filepath.WalkDir(db.dirPath, db.walk)
if err != nil {
fmt.Println(err)
log.Println(err)
return err
}

Expand Down
10 changes: 6 additions & 4 deletions examples/b.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,11 @@
// }

// func main() {
// x := make(map[string]int)
// x["z"] = 1

// x["z"] += 100
// x := [5]int{1, 2, 3, 4, 5}
// fmt.Println(x)
// for i, v := range x {
// v += 2
// x[i] = v
// }
// fmt.Println(x)
// }
78 changes: 44 additions & 34 deletions segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,50 +36,60 @@ func (db *Db) getSegmentBlock(path string, blockNumber int64) (*BlockOffsetPair,
return block, true
}

func (db *Db) getSegment(path string) *Segment {
return db.segments[path]
func (db *Db) setSegment(path string, segment *Segment) {
db.segments[path] = segment
}

func (db *Db) getSegmentBlockOffset(path string) int64 {
return db.segments[path].currentBlockOffset
func (db *Db) getSegment(path string) (*Segment, bool) {
segment, ok := db.segments[path]
return segment, ok
}

func (db *Db) getSegmentBlockNumber(path string) int64 {
return db.segments[path].currentBlockNumber
func (seg *Segment) getBlockOffset() int64 {
return seg.currentBlockOffset
}

func (db *Db) setSegment(path string, segment *Segment) {
db.segments[path] = segment
func (seg *Segment) getBlockNumber() int64 {
return seg.currentBlockNumber
}

func (db *Db) setSegmentPath(segmentPath string, path string) {
segment := db.getSegment(segmentPath)
segment.path = path
db.setSegment(path, segment)
func (seg *Segment) getFp() *os.File {
return seg.fp
}

func (db *Db) setSegmentFp(path string, fp *os.File) {
segment := db.getSegment(path)
segment.fp = fp
db.setSegment(path, segment)
func (seg *Segment) getPath() string {
return seg.path
}

func (db *Db) setSegmentBlockNumber(path string, blockNumber int64) {
segment := db.getSegment(path)
segment.currentBlockNumber = blockNumber
db.setSegment(path, segment)
func (seg *Segment) setPath(path string) {
seg.path = path
}

func (db *Db) setSegmentBlockOffset(path string, blockOffset int64) {
segment := db.getSegment(path)
segment.currentBlockOffset = blockOffset
db.setSegment(path, segment)
func (seg *Segment) setFp(fp *os.File) {
seg.fp = fp
}

func (db *Db) setSegmentBlock(path string, blockNumber int64, block *BlockOffsetPair) {
segment := db.getSegment(path)
segment.blocks[blockNumber] = block
db.setSegment(path, segment)
func (seg *Segment) closeFp() error {
if !seg.closed {
err := seg.fp.Close()
if err != nil {
return err
}
}
seg.closed = true
return nil
}

func (seg *Segment) setBlockNumber(blockNumber int64) {
seg.currentBlockNumber = blockNumber
}

func (seg *Segment) setBlockOffset(blockOffset int64) {
seg.currentBlockOffset = blockOffset
}

func (seg *Segment) setBlock(blockNumber int64, block *BlockOffsetPair) {
seg.blocks[blockNumber] = block
}

func (db *Db) getSegmentFilePointerFromPath(keyDirPath string) (*os.File, error) {
Expand All @@ -91,16 +101,16 @@ func (db *Db) getSegmentFilePointerFromPath(keyDirPath string) (*os.File, error)
return f, nil
}

func (db *Db) updateSegment(kdValue *KeyDirValue, segment *Segment) error {
func (db *Db) updateSegment(kdValue *KeyDirValue, segment *Segment) (*Segment, error) {
segmentBlock, ok := db.getSegmentBlock(kdValue.path, segment.currentBlockNumber)
if !ok {
return ERROR_CANNOT_READ_FILE
return nil, ERROR_CANNOT_READ_FILE
}
segmentBlock.endOffset = kdValue.offset + kdValue.size
db.setSegmentBlock(kdValue.path, segment.currentBlockNumber, segmentBlock)
segment.setBlock(segment.currentBlockNumber, segmentBlock)
if segment.currentBlockOffset+kdValue.size <= BlockSize {
kdValue.blockNumber = segment.currentBlockNumber
db.setSegmentBlockOffset(kdValue.path, db.getSegmentBlockOffset(kdValue.path)+kdValue.size)
segment.setBlockOffset(segment.getBlockOffset() + kdValue.size)
} else {
segment.currentBlockNumber += 1
segment.blocks[segment.currentBlockNumber] = &BlockOffsetPair{
Expand All @@ -109,10 +119,10 @@ func (db *Db) updateSegment(kdValue *KeyDirValue, segment *Segment) error {
filePath: kdValue.path,
}
kdValue.blockNumber = segment.currentBlockNumber
db.setSegmentBlockOffset(kdValue.path, kdValue.size)
segment.setBlockOffset(kdValue.size)
}
db.setSegment(kdValue.path, segment)
return nil
return segment, nil
}

func createNewSegment(kdValue *KeyDirValue) *Segment {
Expand Down
1 change: 1 addition & 0 deletions wal.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ type Block struct {
func NewKeyValueEntry(deleted byte, offset, ksz, vsz, size int64, k, v []byte) *KeyValueEntry {
return &KeyValueEntry{
deleted: deleted,
offset: offset,
ksz: ksz,
vsz: vsz,
size: size,
Expand Down

0 comments on commit a973e72

Please sign in to comment.