Skip to content

Commit

Permalink
Diskmap now has explicit support for duplicate keys
Browse files Browse the repository at this point in the history
  • Loading branch information
John Doak authored and John Doak committed Dec 13, 2022
1 parent d49b462 commit a7350ab
Show file tree
Hide file tree
Showing 2 changed files with 119 additions and 13 deletions.
71 changes: 60 additions & 11 deletions diskmap/diskmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ Package diskmap provides disk storage of key/value pairs. The data is immutable
In addition the diskmap utilizes mmap on reads to make the random access faster.
On Linux, diskmap uses directio to speed up writes.
Unlike a regular map, keys can have duplicates. If you need this filtered out you
must do it before adding to the diskmap.
Usage is simplistic:
// Create a new diskmap.
Expand Down Expand Up @@ -101,11 +104,19 @@ const reservedHeader = 64
// endian is the endianess of all our binary encodings.
var endian = binary.LittleEndian

// Reader provides read access to the the diskmap file.
// ErrKeyNotFound indicates that a searched for key was not found.
var ErrKeyNotFound = fmt.Errorf("key was not found")

// Reader provides read access to the the diskmap file. If you fake this, you need
// to embed it in your fake.
type Reader interface {
// Get fetches key "k" and returns the value. Errors when key not found. Thread-safe.
// Read fetches key "k" and returns the value. If there are multi-key matches,
// it returns the last key added. Errors when key not found. Thread-safe.
Read(k []byte) ([]byte, error)

// ReadAll fetches all matches to key "k". Does not error if not found. Thread-safe.
ReadAll(k []byte) ([][]byte, error)

// Range allows iteration over all the key/value pairs stored in the diskmap. If not interating
// over all values, Cancel() or a timeout should be used on the Context to prevent a goroutine leak.
Range(ctx context.Context) chan KeyValue
Expand All @@ -115,6 +126,7 @@ type Reader interface {
}

// Writer provides write access to the diskmap file. An error on write makes the Writer unusable.
// If you fake this, you need to embed it in your fake.
type Writer interface {
// Write writes a key/value pair to disk. Thread-safe.
Write(k, v []byte) error
Expand Down Expand Up @@ -267,11 +279,13 @@ func (w *writer) Close() error {
// reader implements Reader.
type reader struct {
// index is the key to offset data mapping.
index map[string]value
index map[string][]value

// file holds the mapping file in mmap.
file *os.File

// The mutex is protecting the *os.File. Otherwise
// we can have the file pointer moving while doing a read operation.
sync.Mutex
}

Expand Down Expand Up @@ -300,7 +314,7 @@ func Open(p string) (Reader, error) {
return nil, fmt.Errorf("cannot seek to index offset: %q", err)
}

kv := make(map[string]value, num)
kv := make(map[string][]value, num)

var dOff, dLen, kLen int64

Expand All @@ -324,10 +338,14 @@ func Open(p string) (Reader, error) {
return nil, fmt.Errorf("error reading in a key from the index: %q", err)
}

kv[string(key)] = value{
offset: dOff,
length: dLen,
strKey := byteSlice2String(key)
if sl, ok := kv[strKey]; ok {
sl = append(sl, value{offset: dOff, length: dLen})
kv[strKey] = sl
continue
}

kv[string(key)] = []value{{offset: dOff, length: dLen}}
}

return &reader{index: kv, file: f}, nil
Expand All @@ -338,18 +356,45 @@ func (r *reader) Read(k []byte) ([]byte, error) {
r.Lock()
defer r.Unlock()

if v, ok := r.index[string(k)]; ok {
vals, ok := r.index[byteSlice2String(k)]
if !ok {
return nil, ErrKeyNotFound
}
// If there are multiple values with the same key, only return the last one.
v := vals[len(vals)-1]

if _, err := r.file.Seek(v.offset, 0); err != nil {
return nil, fmt.Errorf("cannot reach offset for key supplied by the index: %q", err)
}
b := make([]byte, v.length)
if _, err := r.file.Read(b); err != nil {
return nil, fmt.Errorf("error reading value from file: %q", err)
}
return b, nil
}

// ReadAll implements Reader.ReadAll().
func (r *reader) ReadAll(k []byte) ([][]byte, error) {
r.Lock()
defer r.Unlock()

vals, ok := r.index[byteSlice2String(k)]
if !ok {
return nil, nil
}

sl := make([][]byte, 0, len(vals))
for _, v := range vals {
if _, err := r.file.Seek(v.offset, 0); err != nil {
return nil, fmt.Errorf("cannot reach offset for key supplied by the index: %q", err)
}
b := make([]byte, v.length)
if _, err := r.file.Read(b); err != nil {
return nil, fmt.Errorf("error reading value from file: %q", err)
}
return b, nil
sl = append(sl, b)
}

return nil, fmt.Errorf("key was not found")
return sl, nil
}

// Range implements Reader.Range().
Expand Down Expand Up @@ -393,3 +438,7 @@ func UnsafeGetBytes(s string) []byte {
(*reflect.StringHeader)(unsafe.Pointer(&s)).Data),
)[:len(s):len(s)]
}

func byteSlice2String(bs []byte) string {
return *(*string)(unsafe.Pointer(&bs))
}
61 changes: 59 additions & 2 deletions diskmap/diskmap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func TestDiskMap(t *testing.T) {
continue
}

if bytes.Compare(val, v) != 0 {
if !bytes.Equal(val, v) {
t.Errorf("a value was not correctly stored")
}
}
Expand All @@ -65,7 +65,64 @@ func TestDiskMap(t *testing.T) {
}
}

func BenchmarkDiskMap(b *testing.B) {
func TestDiskMapDuplicateKeys(t *testing.T) {
p := path.Join(os.TempDir(), nextSuffix())
w, err := New(p)
if err != nil {
panic(err)
}
defer os.Remove(p)

_1stKey := []byte(nextSuffix())
_1stData := randStringBytes()
dupKey := []byte(nextSuffix())
dupData0 := randStringBytes()
dupData1 := randStringBytes()
_2ndKey := []byte(nextSuffix())
_2ndData := randStringBytes()

for _, kv := range []KeyValue{
{Key: _1stKey, Value: _1stData},
{Key: dupKey, Value: dupData0},
{Key: _2ndKey, Value: _2ndData},
{Key: dupKey, Value: dupData1},
} {
if err := w.Write(kv.Key, kv.Value); err != nil {
t.Fatalf("error writing:\nkey:%q\nvalue:%q\n", kv.Key, kv.Value)
}
}

w.Close()

r, err := Open(p)
if err != nil {
t.Fatalf("error opening diskmap %q", err)
}

got, err := r.Read(dupKey)
if err != nil {
t.Fatalf("TestDiskMapDuplicateKeys(r.Read()): got err == %s, want err == nil", err)
}
if !bytes.Equal(got, dupData1) {
t.Fatalf("TestDiskMapDuplicateKeys(r.Read()): got incorrect data")
}

gotBatch, err := r.ReadAll(dupKey)
if err != nil {
t.Fatalf("TestDiskMapDuplicateKeys(r.ReadAll()): got err == %s, want err == nil", err)
}
if len(gotBatch) != 2 {
t.Fatalf("TestDiskMapDuplicateKeys(r.ReadAll()): got %d return values, want %d", len(gotBatch), 2)
}
want := [][]byte{dupData0, dupData1}
for i := 0; i < len(gotBatch); i++ {
if !bytes.Equal(gotBatch[i], want[i]) {
t.Fatalf("TestDiskMapDuplicateKeys(r.ReadAll()): returned value %d was incorrect", i)
}
}
}

func BenchmarkDiskMapWriter(b *testing.B) {
b.ReportAllocs()

p := path.Join(os.TempDir(), nextSuffix())
Expand Down

0 comments on commit a7350ab

Please sign in to comment.