diff --git a/README.md b/README.md
index bb61172..f05e5aa 100644
--- a/README.md
+++ b/README.md
@@ -1,8 +1,109 @@
## nimbusdb
-
Persistent Key-Value store based on Bitcask paper.
-Readme to be updated.
+nimbusdb is a fast, lightweight, and scalable key-value store written in golang, based on bitcask.
+
+nimbusdb maintains an active datafile to which data is written. When it crosses a threshold, the datafile is made inactive and new datafile is created.
+As time passes, expired/deleted keys take up space which is not useful; Hence, a process called `merge` is done which removes all expired/deleted keys and frees up space.
+
+## Features
+
+
+ Thread-Safe
+
+ All operations are thread-safe. Read and Write operations can handle multiple operations from multiple goroutines at the same time with consistency.
+
+
+
+
+ Portable
+
+ Data is extremely portable since it is only a bunch of files. All you have to do is move the folder and open an DB connection at that path.
+
+
+
+
+ Custom Expiry
+
+ Supports custom expiry for keys. Default expiry is 1 week.
+
+
+
+
+ Supports Merge
+
+ Supports `Sync` which can be called periodically to remove expired/deleted keys from disk and free-up more space.
+
+
+
+
+ Single disk-seek write
+
+ Writes are just one disk seek since we're appending to the file.
+
+
+
+
+ Block cache for faster reads.
+
+ Blocks are cached for faster reads. Default size of an Block is 32KB.
+
+
+## Documentation
+#### Open DB connection
+```go
+d, err := nimbusdb.Open(&nimbusdb.Options{Path: "/path/to/data/directory"})
+if err != nil {
+ // handle error
+}
+```
+
+#### Set
+```go
+kvPair := &nimbusdb.KeyValuePair{
+ Key: []byte("key"),
+ Value: []byte("value"),
+ Ttl: 5 * time.Minute, // Optional, default is 1 week
+}
+setValue, err := d.Set(kvPair)
+if err != nil {
+ // handle error
+}
+```
+
+#### Get
+
+```go
+value, err := d.Get([]byte("key"))
+if err != nil {
+ // handle error
+}
+```
+
+#### Delete
+
+```go
+value, err := d.Get([]byte("key"))
+if err != nil {
+ // handle error
+}
+```
+
+#### Sync
+This does the merge process. This can be an expensive operation, hence it is better to run this periodically and whenever the traffic is low.
+
+```go
+err := d.Sync()
+if err != nil {
+ // handle error
+}
+```
+
+## Benchmarks
+![Screenshot 2023-12-23 at 4 08 56 AM](https://github.com/manosriram/nimbusdb/assets/38112857/76720f68-ad16-44ee-a408-12b06e6c051a)
+
+
+
[Progress Board](https://trello.com/b/2eDSLLb3/nimbusdb) | [Streams](https://youtube.com/playlist?list=PLJALjJgNSDVo5veOf2apgMIE1QgN7IEfk) | [godoc](https://pkg.go.dev/github.com/manosriram/nimbusdb)
diff --git a/btree.go b/btree.go
index 24a5b1c..dce047e 100644
--- a/btree.go
+++ b/btree.go
@@ -8,16 +8,9 @@ import (
"github.com/manosriram/nimbusdb/utils"
)
-type BlockOffsetPair struct {
- startOffset int64
- endOffset int64
- filePath string
-}
-
type BTree struct {
- tree *btree.BTree
- blockOffsets map[int64]BlockOffsetPair
- mu sync.RWMutex
+ tree *btree.BTree
+ mu sync.RWMutex
}
type item struct {
@@ -39,16 +32,6 @@ func (b *BTree) Get(key []byte) *KeyDirValue {
func (b *BTree) Set(key []byte, value KeyDirValue) *KeyDirValue {
i := b.tree.ReplaceOrInsert(&item{key: key, v: value})
- y, ok := b.blockOffsets[value.blockNumber]
- if !ok {
- y.startOffset = value.offset
- y.endOffset = value.offset + value.size
- y.filePath = value.path
- b.blockOffsets[value.blockNumber] = y
- } else {
- y.endOffset = value.offset + value.size
- b.blockOffsets[value.blockNumber] = y
- }
if i != nil {
return &i.(*item).v
}
diff --git a/db.go b/db.go
index 39f42dc..e60362e 100644
--- a/db.go
+++ b/db.go
@@ -1,7 +1,3 @@
-/*
- block's filePath should be same for all the entries in it.
-*/
-
package nimbusdb
import (
@@ -61,11 +57,12 @@ const (
)
const (
- KEY_EXPIRES_IN_DEFAULT = 24 * time.Hour
+ KEY_EXPIRES_IN_DEFAULT = 168 * time.Hour // 1 week
KEY_NOT_FOUND = "key expired or does not exist"
NO_ACTIVE_FILE_OPENED = "no file opened for writing"
OFFSET_EXCEEDED_FILE_SIZE = "offset exceeded file size"
+ CANNOT_READ_FILE = "error reading file"
DELETED_FLAG_BYTE_VALUE = byte(0x31)
DELETED_FLAG_SET_VALUE = byte(0x01)
@@ -97,34 +94,40 @@ type KeyDirValue struct {
size int64
path string
tstamp int64
- B int64
+}
+
+func NewKeyDirValue(offset, size, tstamp int64, path string) *KeyDirValue {
+ return &KeyDirValue{
+ offset: offset,
+ size: size,
+ tstamp: tstamp,
+ path: path,
+ }
}
type Db struct {
dirPath string
- mu sync.RWMutex
dataFilePath string
+ activeDataFile string
activeDataFilePointer *os.File
inActiveDataFilePointers *sync.Map
- activeDataFile string
- lastOffset atomic.Int64
keyDir *BTree
opts *Options
+ lastOffset atomic.Int64
+ mu sync.RWMutex
+ segments map[string]*Segment
lru *expirable.LRU[int64, *Block]
-
- currentBlockOffset atomic.Int64
- currentBlockNumber atomic.Int64
}
func NewDb(dirPath string) *Db {
- off := make(map[int64]BlockOffsetPair)
+ segments := make(map[string]*Segment)
db := &Db{
dirPath: dirPath,
keyDir: &BTree{
- tree: btree.New(BTreeDegree),
- blockOffsets: off,
+ tree: btree.New(BTreeDegree),
},
lru: expirable.NewLRU[int64, *Block](LRU_SIZE, nil, LRU_TTL),
+ segments: segments,
inActiveDataFilePointers: &sync.Map{},
}
@@ -135,7 +138,7 @@ func (db *Db) setLastOffset(v int64) {
db.lastOffset.Store(v)
}
-func (db *Db) LastOffset() int64 {
+func (db *Db) getLastOffset() int64 {
return db.lastOffset.Load()
}
@@ -146,8 +149,15 @@ func (db *Db) getActiveDataFilePointer() (*os.File, error) {
return db.activeDataFilePointer, nil
}
+func (db *Db) closeActiveDataFilePointer() error {
+ if db.activeDataFilePointer != nil {
+ return db.activeDataFilePointer.Close()
+ }
+ return nil
+}
+
func (db *Db) setActiveDataFile(activeDataFile string) error {
- err := db.Close() // close existing active datafile
+ err := db.closeActiveDataFilePointer()
if err != nil {
return err
}
@@ -170,33 +180,54 @@ func (db *Db) setKeyDir(key []byte, kdValue KeyDirValue) (interface{}, error) {
return nil, errors.New(KEY_VALUE_SIZE_EXCEEDED)
}
- block, ok := db.keyDir.blockOffsets[db.currentBlockNumber.Load()]
+ segment, ok := db.segments[kdValue.path]
if !ok {
- db.keyDir.blockOffsets[kdValue.blockNumber] = BlockOffsetPair{
- startOffset: 0,
- endOffset: 0,
- filePath: kdValue.path,
+ newSegment := &Segment{
+ blocks: map[int64]*BlockOffsetPair{
+ 0: {startOffset: kdValue.offset,
+ endOffset: kdValue.offset + kdValue.size,
+ filePath: kdValue.path,
+ },
+ },
+ path: kdValue.path,
+ currentBlockNumber: 0,
+ currentBlockOffset: 0,
}
- }
- if kdValue.path != block.filePath {
- db.currentBlockNumber.Add(1)
- db.currentBlockOffset.Store(0)
- kdValue.blockNumber = db.currentBlockNumber.Load()
+ fp, err := db.getSegmentFilePointerFromPath(kdValue.path)
+ if err != nil {
+ return nil, err
+ }
+ newSegment.fp = fp
+ newSegment.closed = false
+
+ db.setSegment(kdValue.path, newSegment)
} else {
- if db.currentBlockOffset.Load()+kdValue.size <= BlockSize {
- kdValue.blockNumber = db.currentBlockNumber.Load()
- db.currentBlockOffset.Add(kdValue.size)
+ segmentBlock, ok := db.getSegmentBlock(kdValue.path, segment.currentBlockNumber)
+ if !ok {
+ return nil, errors.New(CANNOT_READ_FILE)
+ }
+ segmentBlock.endOffset = kdValue.offset + kdValue.size
+ db.setSegmentBlock(kdValue.path, segment.currentBlockNumber, segmentBlock)
+ if segment.currentBlockOffset+kdValue.size <= BlockSize {
+ kdValue.blockNumber = segment.currentBlockNumber
+ db.setSegmentBlockOffset(kdValue.path, db.getSegmentBlockOffset(kdValue.path)+kdValue.size)
} else {
- db.currentBlockNumber.Add(1)
- kdValue.blockNumber = db.currentBlockNumber.Load()
- db.currentBlockOffset.Store(kdValue.size)
+ segment.currentBlockNumber += 1
+ segment.blocks[segment.currentBlockNumber] = &BlockOffsetPair{
+ startOffset: kdValue.offset,
+ endOffset: kdValue.offset + kdValue.size,
+ filePath: kdValue.path,
+ }
+ kdValue.blockNumber = segment.currentBlockNumber
+ db.setSegmentBlockOffset(kdValue.path, kdValue.size)
}
+ db.setSegment(kdValue.path, segment)
}
db.keyDir.Set(key, kdValue)
db.lastOffset.Store(kdValue.offset + kdValue.size)
- db.lru.Remove(db.currentBlockNumber.Load())
+ db.lru.Remove(db.getSegmentBlockNumber(kdValue.path))
return kdValue, nil
}
@@ -222,8 +253,12 @@ func (db *Db) getKeyDir(key []byte) (*KeyValueEntry, error) {
}
var v *KeyValueEntry
- block := db.keyDir.blockOffsets[kv.blockNumber]
- data, err := db.getKeyValueEntryFromOffsetViaFilePath(block.startOffset, block.endOffset-block.startOffset, block.filePath)
+ segment := db.segments[kv.path]
+ block, ok := db.getSegmentBlock(kv.path, kv.blockNumber)
+ if !ok {
+ return nil, errors.New(CANNOT_READ_FILE)
+ }
+ data, err := db.getKeyValueEntryFromOffsetViaFilePath(segment.path)
if err != nil {
return nil, err
}
@@ -263,17 +298,13 @@ func (db *Db) getKeyDir(key []byte) (*KeyValueEntry, error) {
return nil, errors.New(KEY_NOT_FOUND)
}
-func (db *Db) getKeyValueEntryFromOffsetViaFilePath(offset int64, sz int64, path string) ([]byte, error) {
- // TODO: improve dfile and idfile recognizing
+func (db *Db) getKeyValueEntryFromOffsetViaFilePath(keyDirPath string) ([]byte, error) {
+ // TODO: improve file handling here
var data []byte
- a := filepath.Join(db.dirPath, fmt.Sprintf("%s.idfile", path))
- data, err := os.ReadFile(a)
+ path := filepath.Join(db.dirPath, keyDirPath)
+ data, err := os.ReadFile(path)
if err != nil {
- b := filepath.Join(db.dirPath, fmt.Sprintf("%s.dfile", path))
- data, err = os.ReadFile(b)
- if err != nil {
- return nil, err
- }
+ return nil, err
}
return data, nil
}
@@ -332,17 +363,10 @@ func getKeyValueEntryFromOffsetViaData(offset int64, data []byte) (*KeyValueEntr
func (db *Db) seekOffsetFromDataFile(kdValue KeyDirValue) (*KeyValueEntry, error) {
defer utils.Recover()
- // TODO: improve dfile and idfile recognizing
- a := filepath.Join(db.dirPath, fmt.Sprintf("%s.idfile", kdValue.path))
- f, err := os.OpenFile(a, os.O_RDONLY, 0644)
+ f, err := db.getSegmentFilePointerFromPath(kdValue.path)
if err != nil {
- b := filepath.Join(db.dirPath, fmt.Sprintf("%s.dfile", kdValue.path))
- f, err = os.OpenFile(b, os.O_RDONLY, 0644)
- if err != nil {
- return nil, err
- }
+ return nil, err
}
- defer f.Close()
data := make([]byte, kdValue.size)
f.Seek(kdValue.offset, io.SeekCurrent)
@@ -410,7 +434,7 @@ func (db *Db) getActiveFileKeyValueEntries(filePath string) ([]*KeyValueEntry, e
kdValue := KeyDirValue{
offset: keyValueEntry.offset,
size: keyValueEntry.size,
- path: strings.Split(fileName, ".")[0],
+ path: fileName,
tstamp: keyValueEntry.tstamp,
}
_, err := db.setKeyDir(keyValueEntry.k, kdValue) // TODO: use Set here?
@@ -448,7 +472,7 @@ func (db *Db) parseActiveKeyValueEntryFile(filePath string) error {
keyValueEntry.fileID = strings.Split(utils.GetFilenameWithoutExtension(filePath), ".")[0]
hasTimestampExpired := utils.HasTimestampExpired(keyValueEntry.tstamp)
if !hasTimestampExpired {
- fileName := strings.Split(utils.GetFilenameWithoutExtension(filePath), ".")[0]
+ fileName := utils.GetFilenameWithoutExtension(filePath)
kdValue := KeyDirValue{
offset: keyValueEntry.offset,
size: keyValueEntry.size,
@@ -472,7 +496,7 @@ func (db *Db) parseActiveKeyValueEntryFile(filePath string) error {
return nil
}
-func (db *Db) CreateInactiveDatafile(dirPath string) error {
+func (db *Db) createInactiveDatafile(dirPath string) error {
file, err := os.CreateTemp(dirPath, TempInactiveDataFilePattern)
db.setActiveDataFile(file.Name())
db.setLastOffset(0)
@@ -482,7 +506,7 @@ func (db *Db) CreateInactiveDatafile(dirPath string) error {
return nil
}
-func (db *Db) CreateActiveDatafile(dirPath string) error {
+func (db *Db) createActiveDatafile(dirPath string) error {
defer utils.Recover()
dir, err := os.ReadDir(dirPath)
@@ -500,6 +524,14 @@ func (db *Db) CreateActiveDatafile(dirPath string) error {
oldPath := filepath.Join(dirPath, file.Name())
newPath := filepath.Join(dirPath, inactiveName)
os.Rename(oldPath, newPath)
+
+ fp, err := db.getSegmentFilePointerFromPath(inactiveName)
+ if err != nil {
+ return err
+ }
+ db.setSegment(inactiveName, db.getSegment(file.Name()))
+ db.setSegmentPath(inactiveName, inactiveName)
+ db.setSegmentFp(inactiveName, fp)
}
}
@@ -512,11 +544,19 @@ func (db *Db) CreateActiveDatafile(dirPath string) error {
return nil
}
+// Closes the database. Closes the file pointer used to read/write the activeDataFile.
+// Closes all file inactiveDataFile pointers and marks them as closed.
func (db *Db) Close() error {
if db.activeDataFilePointer != nil {
err := db.activeDataFilePointer.Close()
return err
}
+ for _, segment := range db.segments {
+ if !segment.closed {
+ segment.fp.Close()
+ segment.closed = true
+ }
+ }
return nil
}
@@ -545,7 +585,7 @@ func Open(opts *Options) (*Db, error) {
// Empty path, starting new
if len(dir) == 0 {
- err = db.CreateActiveDatafile(dirPath)
+ err = db.createActiveDatafile(dirPath)
if err != nil {
return nil, err
}
@@ -576,7 +616,7 @@ func (db *Db) All() []*KeyValuePair {
return db.keyDir.List()
}
-func (db *Db) LimitDatafileToThreshold(newKeyValueEntry *KeyValueEntry, opts *Options) {
+func (db *Db) limitDatafileToThreshold(newKeyValueEntry *KeyValueEntry, opts *Options) {
var sz os.FileInfo
var err error
f, err := db.getActiveDataFilePointer()
@@ -588,10 +628,10 @@ func (db *Db) LimitDatafileToThreshold(newKeyValueEntry *KeyValueEntry, opts *Op
if size+newKeyValueEntry.size > DatafileThreshold {
if opts.IsMerge {
- db.CreateInactiveDatafile(db.dirPath)
+ db.createInactiveDatafile(db.dirPath)
os.Remove(opts.MergeFilePath)
} else {
- db.CreateActiveDatafile(db.dirPath)
+ db.createActiveDatafile(db.dirPath)
newKeyValueEntry.offset = 0
}
}
@@ -607,25 +647,16 @@ func (db *Db) deleteKey(key []byte) error {
return errors.New(KEY_NOT_FOUND)
}
- a := filepath.Join(db.dirPath, fmt.Sprintf("%s.idfile", v.path))
- f, err := os.OpenFile(a, os.O_WRONLY, 0644)
- if err != nil {
- b := filepath.Join(db.dirPath, fmt.Sprintf("%s.dfile", v.path))
- f, err = os.OpenFile(b, os.O_WRONLY, 0644)
- if err != nil {
- return err
- }
- }
- defer f.Close()
-
- f.WriteAt([]byte{DELETED_FLAG_SET_VALUE}, v.offset)
-
+ f := db.segments[v.path]
+ f.fp.WriteAt([]byte{DELETED_FLAG_SET_VALUE}, v.offset)
db.lru.Remove(v.blockNumber)
db.keyDir.Delete(key)
return nil
}
+// Gets a key-value pair.
+// Returns the value if the key exists and error if any.
func (db *Db) Get(key []byte) ([]byte, error) {
db.mu.Lock()
defer db.mu.Unlock()
@@ -637,42 +668,38 @@ func (db *Db) Get(key []byte) ([]byte, error) {
return v.v, nil
}
+// Sets a key-value pair.
+// Returns the value if set succeeds, else returns an error.
func (db *Db) Set(kv *KeyValuePair) (interface{}, error) {
- db.mu.Lock()
- defer db.mu.Unlock()
intKSz := int64(len(kv.Key))
intVSz := int64(len(utils.Encode(kv.Value)))
- newKeyValueEntry := &KeyValueEntry{
- deleted: DELETED_FLAG_UNSET_VALUE,
- ksz: int64(len(kv.Key)),
- vsz: int64(len(utils.Encode(kv.Value))),
- k: kv.Key,
- v: utils.Encode(kv.Value),
- size: int64(StaticChunkSize + intKSz + intVSz),
- offset: db.LastOffset(),
- }
+ newKeyValueEntry := NewKeyValueEntry(
+ DELETED_FLAG_UNSET_VALUE,
+ db.getLastOffset(),
+ int64(len(kv.Key)),
+ int64(len(utils.Encode(kv.Value))),
+ int64(StaticChunkSize+intKSz+intVSz),
+ kv.Key,
+ utils.Encode(kv.Value),
+ )
if kv.Ttl > 0 {
newKeyValueEntry.tstamp = int64(time.Now().Add(kv.Ttl).UnixNano())
} else {
newKeyValueEntry.tstamp = int64(time.Now().Add(KEY_EXPIRES_IN_DEFAULT).UnixNano())
}
- db.LimitDatafileToThreshold(newKeyValueEntry, &Options{})
- err := db.WriteKeyValueEntry(newKeyValueEntry)
+ db.mu.Lock()
+ defer db.mu.Unlock()
+ db.limitDatafileToThreshold(newKeyValueEntry, &Options{})
+ err := db.writeKeyValueEntry(newKeyValueEntry)
if err != nil {
return nil, err
}
- kdValue := KeyDirValue{
- offset: newKeyValueEntry.offset,
- size: newKeyValueEntry.size,
- path: strings.Split(utils.GetFilenameWithoutExtension(db.activeDataFile), ".")[0],
- tstamp: newKeyValueEntry.tstamp,
- }
-
- _, err = db.setKeyDir(kv.Key, kdValue)
+ kdValue := NewKeyDirValue(newKeyValueEntry.offset, newKeyValueEntry.size, newKeyValueEntry.tstamp, utils.GetFilenameWithoutExtension(db.activeDataFile))
+ _, err = db.setKeyDir(kv.Key, *kdValue)
if err != nil {
return nil, err
}
@@ -680,6 +707,8 @@ func (db *Db) Set(kv *KeyValuePair) (interface{}, error) {
return kv.Value, err
}
+// Deletes a key-value pair.
+// Returns error if any.
func (db *Db) Delete(key []byte) error {
err := db.deleteKey(key)
return err
@@ -703,11 +732,11 @@ func (db *Db) walk(s string, file fs.DirEntry, err error) error {
}
for _, keyValueEntry := range keyValueEntries {
- db.LimitDatafileToThreshold(keyValueEntry, &Options{
+ db.limitDatafileToThreshold(keyValueEntry, &Options{
IsMerge: true,
MergeFilePath: path,
})
- err := db.WriteKeyValueEntry(keyValueEntry)
+ err := db.writeKeyValueEntry(keyValueEntry)
if err != nil {
return err
}
@@ -715,6 +744,8 @@ func (db *Db) walk(s string, file fs.DirEntry, err error) error {
return nil
}
+// Syncs the database. Will remove all expired/deleted keys from disk.
+// Since items are removed, disk usage will reduce.
func (db *Db) Sync() error {
err := filepath.WalkDir(db.dirPath, db.walk)
if err != nil {
diff --git a/examples/b.go b/examples/b.go
new file mode 100644
index 0000000..226f9c1
--- /dev/null
+++ b/examples/b.go
@@ -0,0 +1,15 @@
+// package main
+
+// import "fmt"
+
+// type y struct {
+// name string
+// }
+
+// func main() {
+// x := make(map[string]int)
+// x["z"] = 1
+
+// x["z"] += 100
+// fmt.Println(x)
+// }
diff --git a/examples/db.go b/examples/db.go
index 5d88c41..72f42d2 100644
--- a/examples/db.go
+++ b/examples/db.go
@@ -56,8 +56,6 @@ func main() {
fmt.Println(err)
}
fmt.Println(string(z))
- } else if text == "stat" {
- d.CreateActiveDatafile(DirPath)
} else if text == "sync" {
d.Sync()
}
diff --git a/segment.go b/segment.go
new file mode 100644
index 0000000..a0530dc
--- /dev/null
+++ b/segment.go
@@ -0,0 +1,92 @@
+package nimbusdb
+
+import (
+ "os"
+ "path/filepath"
+)
+
+// Segment represents an entire file. It is divided into Blocks.
+// Each Segment is a collection of Blocks of size 32KB. A file pointer is kept opened for reading purposes.
+// closed represents the state of the Segment's file pointer.
+type Segment struct {
+ closed bool
+ currentBlockNumber int64
+ currentBlockOffset int64
+ path string
+ blocks map[int64]*BlockOffsetPair
+ fp *os.File
+}
+
+// BlockOffsetPair contains metadata about the Block. The start and ending offsets of the Block, and the path.
+type BlockOffsetPair struct {
+ startOffset int64
+ endOffset int64
+ filePath string
+}
+
+func (db *Db) getSegmentBlock(path string, blockNumber int64) (*BlockOffsetPair, bool) {
+ segment, ok := db.segments[path]
+ if !ok {
+ return nil, ok
+ }
+ block, ok := segment.blocks[blockNumber]
+ if !ok {
+ return nil, ok
+ }
+ return block, true
+}
+
+func (db *Db) getSegment(path string) *Segment {
+ return db.segments[path]
+}
+
+func (db *Db) getSegmentBlockOffset(path string) int64 {
+ return db.segments[path].currentBlockOffset
+}
+
+func (db *Db) getSegmentBlockNumber(path string) int64 {
+ return db.segments[path].currentBlockNumber
+}
+
+func (db *Db) setSegment(path string, segment *Segment) {
+ db.segments[path] = segment
+}
+
+func (db *Db) setSegmentPath(segmentPath string, path string) {
+ segment := db.getSegment(segmentPath)
+ segment.path = path
+ db.setSegment(path, segment)
+}
+
+func (db *Db) setSegmentFp(path string, fp *os.File) {
+ segment := db.getSegment(path)
+ segment.fp = fp
+ db.setSegment(path, segment)
+}
+
+func (db *Db) setSegmentBlockNumber(path string, blockNumber int64) {
+ segment := db.getSegment(path)
+ segment.currentBlockNumber = blockNumber
+ db.setSegment(path, segment)
+}
+
+func (db *Db) setSegmentBlockOffset(path string, blockOffset int64) {
+ segment := db.getSegment(path)
+ segment.currentBlockOffset = blockOffset
+ db.setSegment(path, segment)
+}
+
+func (db *Db) setSegmentBlock(path string, blockNumber int64, block *BlockOffsetPair) {
+ segment := db.getSegment(path)
+ segment.blocks[blockNumber] = block
+ db.setSegment(path, segment)
+}
+
+func (db *Db) getSegmentFilePointerFromPath(keyDirPath string) (*os.File, error) {
+ path := filepath.Join(db.dirPath, keyDirPath)
+ f, err := os.OpenFile(path, os.O_RDONLY, 0644)
+ if err != nil {
+ return nil, err
+ }
+ return f, nil
+}
diff --git a/tests/db_test.go b/tests/db_test.go
index 687601a..ca11abf 100644
--- a/tests/db_test.go
+++ b/tests/db_test.go
@@ -90,7 +90,7 @@ func Test_InMemory_Stress_SetGet(t *testing.T) {
assert.Nil(t, err)
}
- for i := 0; i < 100000; i++ {
+ for i := 0; i < 1000; i++ {
kv := &nimbusdb.KeyValuePair{
Key: []byte(utils.GetTestKey(i)),
Value: []byte("testkey"),
diff --git a/wal.go b/wal.go
index 9418e0a..50da522 100644
--- a/wal.go
+++ b/wal.go
@@ -1,16 +1,14 @@
package nimbusdb
import (
- "os"
- "time"
-
"github.com/manosriram/nimbusdb/utils"
)
+// KeyValueEntry is the raw and complete uncompressed data existing on the disk.
+// KeyValueEntry is stored in Blocks in cache for faster reads.
type KeyValueEntry struct {
deleted byte
blockNumber int64
- fileID string
offset int64
size int64 // Equals StaticChunkSize + keysize + valuesize
tstamp int64
@@ -18,14 +16,28 @@ type KeyValueEntry struct {
vsz int64
k []byte
v []byte
+ fileID string
}
+// Block represents a single block of disk memory. Default size is 32KB.
+// Each Segment is a collection of blocks; Each block is a collection of KeyValueEntries.
type Block struct {
entries []*KeyValueEntry
blockNumber int64
blockOffset int64
}
+func NewKeyValueEntry(deleted byte, offset, ksz, vsz, size int64, k, v []byte) *KeyValueEntry {
+ return &KeyValueEntry{
+ deleted: deleted,
+ ksz: ksz,
+ vsz: vsz,
+ size: size,
+ k: k,
+ v: v,
+ }
+}
+
func (s *KeyValueEntry) StaticChunkSize() int {
return StaticChunkSize + len(s.k) + len(s.v)
}
@@ -54,7 +66,7 @@ func (s *KeyValueEntry) ToByte() []byte {
return keyValueEntryInBytes
}
-func (db *Db) WriteKeyValueEntry(keyValueEntry *KeyValueEntry) error {
+func (db *Db) writeKeyValueEntry(keyValueEntry *KeyValueEntry) error {
f, err := db.getActiveDataFilePointer()
if err != nil {
return err
@@ -62,18 +74,3 @@ func (db *Db) WriteKeyValueEntry(keyValueEntry *KeyValueEntry) error {
_, err = f.Write(keyValueEntry.ToByte())
return err
}
-
-func (db *Db) ExpireKey(offset int64) error {
- f, err := os.OpenFile(db.activeDataFile, os.O_RDWR, 0644)
- if err != nil {
- return err
- }
-
- expireTstamp := time.Now().Add(-1 * time.Hour).UnixNano()
- _, err = f.WriteAt(utils.Int64ToByte(expireTstamp), int64(offset))
- if err != nil {
- return err
- }
-
- return nil
-}