Skip to content

Commit

Permalink
Improved Watch Manager and Store API compatibility (#1046)
Browse files Browse the repository at this point in the history
  • Loading branch information
JyotinderSingh authored Oct 9, 2024
1 parent b0c9fe5 commit b577b4e
Show file tree
Hide file tree
Showing 6 changed files with 77 additions and 49 deletions.
6 changes: 4 additions & 2 deletions internal/store/constants.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package store

const (
Set string = "set"
Del string = "del"
Set string = "SET"
Del string = "DEL"
Get string = "GET"
Rename string = "RENAME"
)
6 changes: 3 additions & 3 deletions internal/store/eviction.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
// TODO: Make it efficient by doing thorough sampling
func evictFirst(store *Store) {
store.store.All(func(k string, obj *object.Obj) bool {
store.delByPtr(k)
store.delByPtr(k, WithDelCmd(Del))
// stop after iterating over the first element
return false
})
Expand All @@ -25,7 +25,7 @@ func evictAllkeysRandom(store *Store) {
// Iteration of Golang dictionary can be considered as a random
// because it depends on the hash of the inserted key
store.store.All(func(k string, obj *object.Obj) bool {
store.delByPtr(k)
store.delByPtr(k, WithDelCmd(Del))
evictCount--
// continue if evictCount > 0
return evictCount > 0
Expand Down Expand Up @@ -116,7 +116,7 @@ func EvictAllkeysLRUOrLFU(store *Store) {
if item == nil {
return
}
store.DelByPtr(item.keyPtr)
store.DelByPtr(item.keyPtr, WithDelCmd(Del))
}
}

Expand Down
2 changes: 1 addition & 1 deletion internal/store/expire.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func expireSample(store *Store) float32 {

// Delete the keys outside the read lock
for _, keyPtr := range keysToDelete {
store.DelByPtr(keyPtr)
store.DelByPtr(keyPtr, WithDelCmd(Del))
}

return float32(expiredCount) / float32(20.0)
Expand Down
54 changes: 21 additions & 33 deletions internal/store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,10 +90,6 @@ func (store *Store) ResetStore() {
store.expires = NewExpireMap()
}

type PutOptions struct {
KeepTTL bool
}

func (store *Store) Put(k string, obj *object.Obj, opts ...PutOption) {
store.putHelper(k, obj, opts...)
}
Expand All @@ -106,20 +102,6 @@ func (store *Store) IncrementKeyCount() {
store.numKeys++
}

func getDefaultOptions() *PutOptions {
return &PutOptions{
KeepTTL: false,
}
}

type PutOption func(*PutOptions)

func WithKeepTTL(value bool) PutOption {
return func(po *PutOptions) {
po.KeepTTL = value
}
}

func (store *Store) PutAll(data map[string]*object.Obj) {
for k, obj := range data {
store.putHelper(k, obj)
Expand All @@ -131,7 +113,7 @@ func (store *Store) GetNoTouch(k string) *object.Obj {
}

func (store *Store) putHelper(k string, obj *object.Obj, opts ...PutOption) {
options := getDefaultOptions()
options := getDefaultPutOptions()

for _, optApplier := range opts {
optApplier(options)
Expand Down Expand Up @@ -160,7 +142,7 @@ func (store *Store) putHelper(k string, obj *object.Obj, opts ...PutOption) {
store.notifyQueryManager(k, Set, *obj)
}
if store.cmdWatchChan != nil {
store.notifyWatchManager("SET", k)
store.notifyWatchManager(options.PutCmd, k)
}
}

Expand Down Expand Up @@ -198,16 +180,16 @@ func (store *Store) GetAll(keys []string) []*object.Obj {
return response
}

func (store *Store) Del(k string) bool {
func (store *Store) Del(k string, opts ...DelOption) bool {
v, ok := store.store.Get(k)
if ok {
return store.deleteKey(k, v)
return store.deleteKey(k, v, opts...)
}
return false
}

func (store *Store) DelByPtr(ptr string) bool {
return store.delByPtr(ptr)
func (store *Store) DelByPtr(ptr string, opts ...DelOption) bool {
return store.delByPtr(ptr, opts...)
}

func (store *Store) Keys(p string) ([]string, error) {
Expand Down Expand Up @@ -246,13 +228,13 @@ func (store *Store) Rename(sourceKey, destKey string) bool {
sourceObj, _ := store.store.Get(sourceKey)
if sourceObj == nil || hasExpired(sourceObj, store) {
if sourceObj != nil {
store.deleteKey(sourceKey, sourceObj)
store.deleteKey(sourceKey, sourceObj, WithDelCmd(Rename))
}
return false
}

// Use putHelper to handle putting the object at the destination key
store.putHelper(destKey, sourceObj)
store.putHelper(destKey, sourceObj, WithPutCmd(Set))

// Remove the source key
store.store.Delete(sourceKey)
Expand All @@ -263,7 +245,7 @@ func (store *Store) Rename(sourceKey, destKey string) bool {
store.notifyQueryManager(sourceKey, Del, *sourceObj)
}
if store.cmdWatchChan != nil {
store.notifyWatchManager("DEL", sourceKey)
store.notifyWatchManager(Rename, sourceKey)
}

return true
Expand All @@ -273,12 +255,12 @@ func (store *Store) Get(k string) *object.Obj {
return store.getHelper(k, true)
}

func (store *Store) GetDel(k string) *object.Obj {
func (store *Store) GetDel(k string, opts ...DelOption) *object.Obj {
var v *object.Obj
v, _ = store.store.Get(k)
if v != nil {
expired := hasExpired(v, store)
store.deleteKey(k, v)
store.deleteKey(k, v, opts...)
if expired {
v = nil
}
Expand All @@ -299,7 +281,13 @@ func (store *Store) SetUnixTimeExpiry(obj *object.Obj, exUnixTimeSec int64) {
store.expires.Put(obj, uint64(exUnixTimeSec*1000))
}

func (store *Store) deleteKey(k string, obj *object.Obj) bool {
func (store *Store) deleteKey(k string, obj *object.Obj, opts ...DelOption) bool {
options := getDefaultDelOptions()

for _, optApplier := range opts {
optApplier(options)
}

if obj != nil {
store.store.Delete(k)
store.expires.Delete(obj)
Expand All @@ -309,7 +297,7 @@ func (store *Store) deleteKey(k string, obj *object.Obj) bool {
store.notifyQueryManager(k, Del, *obj)
}
if store.cmdWatchChan != nil {
store.notifyWatchManager("DEL", k)
store.notifyWatchManager(options.DelCmd, k)
}

return true
Expand All @@ -318,10 +306,10 @@ func (store *Store) deleteKey(k string, obj *object.Obj) bool {
return false
}

func (store *Store) delByPtr(ptr string) bool {
func (store *Store) delByPtr(ptr string, opts ...DelOption) bool {
if obj, ok := store.store.Get(ptr); ok {
key := ptr
return store.deleteKey(key, obj)
return store.deleteKey(key, obj, opts...)
}
return false
}
Expand Down
45 changes: 45 additions & 0 deletions internal/store/store_options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package store

type PutOptions struct {
KeepTTL bool
PutCmd string
}

func getDefaultPutOptions() *PutOptions {
return &PutOptions{
KeepTTL: false,
PutCmd: Set,
}
}

type PutOption func(*PutOptions)

func WithKeepTTL(value bool) PutOption {
return func(po *PutOptions) {
po.KeepTTL = value
}
}

func WithPutCmd(cmd string) PutOption {
return func(po *PutOptions) {
po.PutCmd = cmd
}
}

type DelOptions struct {
DelCmd string
}

func getDefaultDelOptions() *DelOptions {
return &DelOptions{
DelCmd: Del,
}
}

type DelOption func(*DelOptions)

func WithDelCmd(cmd string) DelOption {
return func(po *DelOptions) {
po.DelCmd = cmd
}
}
13 changes: 3 additions & 10 deletions internal/watchmanager/watch_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,9 @@ type (
var (
CmdWatchSubscriptionChan chan WatchSubscription
affectedCmdMap = map[string]map[string]struct{}{
"SET": {"GET": struct{}{}},
"DEL": {"GET": struct{}{}},
dstore.Set: {dstore.Get: struct{}{}},
dstore.Del: {dstore.Get: struct{}{}},
dstore.Rename: {dstore.Get: struct{}{}},
}
)

Expand Down Expand Up @@ -109,10 +110,6 @@ func (m *Manager) handleUnsubscription(sub WatchSubscription) {
delete(m.tcpSubscriptionMap, fingerprint)
// Also remove the fingerprint from fingerprintCmdMap
delete(m.fingerprintCmdMap, fingerprint)
} else {
// Update the map with the new set of clients
// TODO: Is this actually required?
m.tcpSubscriptionMap[fingerprint] = clients
}
}

Expand All @@ -125,10 +122,6 @@ func (m *Manager) handleUnsubscription(sub WatchSubscription) {
// If there are no more fingerprints listening to this key, remove it from the map
if len(fingerprints) == 0 {
delete(m.querySubscriptionMap, key)
} else {
// Update the map with the new set of fingerprints.
// TODO: Is this actually required?
m.querySubscriptionMap[key] = fingerprints
}
}
}
Expand Down

0 comments on commit b577b4e

Please sign in to comment.