From 9fe7cc031a805ef50e14c8034f697fb5bf70c018 Mon Sep 17 00:00:00 2001 From: Vicent Marti Date: Wed, 3 Feb 2021 17:36:17 +0100 Subject: [PATCH] cache: remove more unused features Signed-off-by: Vicent Marti --- go/cache/ristretto/cache.go | 64 ++++------- go/cache/ristretto/cache_test.go | 178 +++++++----------------------- go/cache/ristretto/policy.go | 1 + go/cache/ristretto/policy_test.go | 17 +++ go/cache/ristretto/ring.go | 1 + go/cache/ristretto/ring_test.go | 17 +++ go/cache/ristretto/sketch.go | 1 + go/cache/ristretto/sketch_test.go | 17 +++ go/cache/ristretto/store.go | 68 +++--------- go/cache/ristretto/store_test.go | 17 +++ go/cache/ristretto/ttl.go | 147 ------------------------ 11 files changed, 147 insertions(+), 381 deletions(-) delete mode 100644 go/cache/ristretto/ttl.go diff --git a/go/cache/ristretto/cache.go b/go/cache/ristretto/cache.go index 5f31aa09054..62f086f69c2 100644 --- a/go/cache/ristretto/cache.go +++ b/go/cache/ristretto/cache.go @@ -1,5 +1,6 @@ /* * Copyright 2019 Dgraph Labs, Inc. and Contributors + * Copyright 2021 The Vitess Authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -65,7 +66,7 @@ type Cache struct { // onReject is called when an item is rejected via admission policy. onReject itemCallback // onExit is called whenever a value goes out of scope from the cache. - onExit (func(interface{})) + onExit func(interface{}) // KeyToHash function is used to customize the key hashing algorithm. // Each key will be hashed using the provided function. If keyToHash value // is not set, the default keyToHash function is used. @@ -79,8 +80,6 @@ type Cache struct { // ignoreInternalCost dictates whether to ignore the cost of internally storing // the item in the cost calculation. ignoreInternalCost bool - // cleanupTicker is used to periodically check for entries whose TTL has passed. - cleanupTicker *time.Ticker // Metrics contains a running log of important statistics like hits, misses, // and dropped items. Metrics *Metrics @@ -150,13 +149,12 @@ const ( // Item is passed to setBuf so items can eventually be added to the cache. type Item struct { - flag itemFlag - Key uint64 - Conflict uint64 - Value interface{} - Cost int64 - Expiration int64 - wg *sync.WaitGroup + flag itemFlag + Key uint64 + Conflict uint64 + Value interface{} + Cost int64 + wg *sync.WaitGroup } // NewCache returns a new Cache instance and any configuration errors, if any. @@ -179,7 +177,6 @@ func NewCache(config *Config) (*Cache, error) { stop: make(chan struct{}), cost: config.Cost, ignoreInternalCost: config.IgnoreInternalCost, - cleanupTicker: time.NewTicker(time.Duration(bucketDurationSecs) * time.Second / 2), } cache.onExit = func(val interface{}) { if config.OnExit != nil && val != nil { @@ -246,42 +243,26 @@ func (c *Cache) Get(key string) (interface{}, bool) { // its determined that the key-value item isn't worth keeping, but otherwise the // item will be added and other items will be evicted in order to make room. // -// To dynamically evaluate the items cost using the Config.Coster function, set -// the cost parameter to 0 and Coster will be ran when needed in order to find -// the items true cost. +// The cost of the entry will be evaluated lazily by the cache's Cost function. func (c *Cache) Set(key string, value interface{}) bool { - return c.SetWithTTL(key, value, 0, 0*time.Second) + return c.SetWithCost(key, value, 0) } -// SetWithTTL works like Set but adds a key-value pair to the cache that will expire -// after the specified TTL (time to live) has passed. A zero value means the value never -// expires, which is identical to calling Set. A negative value is a no-op and the value -// is discarded. -func (c *Cache) SetWithTTL(key string, value interface{}, cost int64, ttl time.Duration) bool { +// SetWithCost works like Set but adds a key-value pair to the cache with a specific +// cost. The built-in Cost function will not be called to evaluate the object's cost +// and instead the given value will be used. +func (c *Cache) SetWithCost(key string, value interface{}, cost int64) bool { if c == nil || c.isClosed { return false } - var expiration int64 - switch { - case ttl == 0: - // No expiration. - break - case ttl < 0: - // Treat this a a no-op. - return false - default: - expiration = time.Now().Add(ttl).Unix() - } - keyHash, conflictHash := c.keyToHash(key) i := &Item{ - flag: itemNew, - Key: keyHash, - Conflict: conflictHash, - Value: value, - Cost: cost, - Expiration: expiration, + flag: itemNew, + Key: keyHash, + Conflict: conflictHash, + Value: value, + Cost: cost, } // cost is eventually updated. The expiration must also be immediately updated // to prevent items from being prematurely removed from the map. @@ -411,7 +392,10 @@ func (c *Cache) SetCapacity(maxCost int64) { // Evictions returns the number of evictions func (c *Cache) Evictions() int64 { // TODO - return 0 + if c == nil || c.Metrics == nil { + return 0 + } + return int64(c.Metrics.KeysEvicted()) } // ForEach yields all the values currently stored in the cache to the given callback. @@ -488,8 +472,6 @@ func (c *Cache) processItems() { _, val := c.store.Del(i.Key, i.Conflict) c.onExit(val) } - case <-c.cleanupTicker.C: - c.store.Cleanup(c.policy, onEvict) case <-c.stop: return } diff --git a/go/cache/ristretto/cache_test.go b/go/cache/ristretto/cache_test.go index 0d71246f69e..a070c6f785a 100644 --- a/go/cache/ristretto/cache_test.go +++ b/go/cache/ristretto/cache_test.go @@ -1,3 +1,20 @@ +/* + * Copyright 2019 Dgraph Labs, Inc. and Contributors + * Copyright 2021 The Vitess Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package ristretto import ( @@ -27,7 +44,7 @@ func TestCacheKeyToHash(t *testing.T) { }, }) require.NoError(t, err) - if c.SetWithTTL("1", 1, 1, 0) { + if c.SetWithCost("1", 1, 1) { time.Sleep(wait) val, ok := c.Get("1") require.True(t, ok) @@ -71,7 +88,7 @@ func TestCacheMaxCost(t *testing.T) { } else { val = strings.Repeat("a", 1000) } - c.SetWithTTL(key(), val, int64(2+len(val)), 0) + c.SetWithCost(key(), val, int64(2+len(val))) } } } @@ -96,7 +113,7 @@ func TestUpdateMaxCost(t *testing.T) { }) require.NoError(t, err) require.Equal(t, int64(10), c.MaxCapacity()) - require.True(t, c.SetWithTTL("1", 1, 1, 0)) + require.True(t, c.SetWithCost("1", 1, 1)) time.Sleep(wait) _, ok := c.Get("1") // Set is rejected because the cost of the entry is too high @@ -106,7 +123,7 @@ func TestUpdateMaxCost(t *testing.T) { // Update the max cost of the cache and retry. c.SetCapacity(1000) require.Equal(t, int64(1000), c.MaxCapacity()) - require.True(t, c.SetWithTTL("1", 1, 1, 0)) + require.True(t, c.SetWithCost("1", 1, 1)) time.Sleep(wait) val, ok := c.Get("1") require.True(t, ok) @@ -149,7 +166,7 @@ func TestNilCache(t *testing.T) { require.False(t, ok) require.Nil(t, val) - require.False(t, c.SetWithTTL("1", 1, 1, 0)) + require.False(t, c.SetWithCost("1", 1, 1)) c.Delete("1") c.Clear() c.Close() @@ -177,7 +194,7 @@ func TestSetAfterClose(t *testing.T) { require.NotNil(t, c) c.Close() - require.False(t, c.SetWithTTL("1", 1, 1, 0)) + require.False(t, c.SetWithCost("1", 1, 1)) } func TestClearAfterClose(t *testing.T) { @@ -194,7 +211,7 @@ func TestGetAfterClose(t *testing.T) { require.NoError(t, err) require.NotNil(t, c) - require.True(t, c.SetWithTTL("1", 1, 1, 0)) + require.True(t, c.SetWithCost("1", 1, 1)) c.Close() _, ok := c.Get("2") @@ -206,7 +223,7 @@ func TestDelAfterClose(t *testing.T) { require.NoError(t, err) require.NotNil(t, c) - require.True(t, c.SetWithTTL("1", 1, 1, 0)) + require.True(t, c.SetWithCost("1", 1, 1)) c.Close() c.Delete("1") @@ -348,10 +365,10 @@ func TestCacheGet(t *testing.T) { require.Nil(t, val) } -// retrySet calls SetWithTTL until the item is accepted by the cache. -func retrySet(t *testing.T, c *Cache, key string, value int, cost int64, ttl time.Duration) { +// retrySet calls SetWithCost until the item is accepted by the cache. +func retrySet(t *testing.T, c *Cache, key string, value int, cost int64) { for { - if set := c.SetWithTTL(key, value, cost, ttl); !set { + if set := c.SetWithCost(key, value, cost); !set { time.Sleep(wait) continue } @@ -375,9 +392,9 @@ func TestCacheSet(t *testing.T) { }) require.NoError(t, err) - retrySet(t, c, "1", 1, 1, 0) + retrySet(t, c, "1", 1, 1) - c.SetWithTTL("1", 2, 2, 0) + c.SetWithCost("1", 2, 2) val, ok := c.store.Get(defaultStringHash("1")) require.True(t, ok) require.Equal(t, 2, val.(int)) @@ -393,13 +410,13 @@ func TestCacheSet(t *testing.T) { Cost: 1, } } - require.False(t, c.SetWithTTL("2", 2, 1, 0)) + require.False(t, c.SetWithCost("2", 2, 1)) require.Equal(t, uint64(1), c.Metrics.SetsDropped()) close(c.setBuf) close(c.stop) c = nil - require.False(t, c.SetWithTTL("1", 1, 1, 0)) + require.False(t, c.SetWithCost("1", 1, 1)) } func TestCacheInternalCost(t *testing.T) { @@ -413,106 +430,12 @@ func TestCacheInternalCost(t *testing.T) { // Get should return false because the cache's cost is too small to store the item // when accounting for the internal cost. - c.SetWithTTL("1", 1, 1, 0) + c.SetWithCost("1", 1, 1) time.Sleep(wait) _, ok := c.Get("1") require.False(t, ok) } -func TestRecacheWithTTL(t *testing.T) { - c, err := NewCache(&Config{ - NumCounters: 100, - MaxCost: 10, - IgnoreInternalCost: true, - BufferItems: 64, - Metrics: true, - }) - - require.NoError(t, err) - - // Set initial value for key = 1 - insert := c.SetWithTTL("1", 1, 1, 5*time.Second) - require.True(t, insert) - time.Sleep(2 * time.Second) - - // Get value from cache for key = 1 - val, ok := c.Get("1") - require.True(t, ok) - require.NotNil(t, val) - require.Equal(t, 1, val) - - // Wait for expiration - time.Sleep(5 * time.Second) - - // The cached value for key = 1 should be gone - val, ok = c.Get("1") - require.False(t, ok) - require.Nil(t, val) - - // Set new value for key = 1 - insert = c.SetWithTTL("1", 2, 1, 5*time.Second) - require.True(t, insert) - time.Sleep(2 * time.Second) - - // Get value from cache for key = 1 - val, ok = c.Get("1") - require.True(t, ok) - require.NotNil(t, val) - require.Equal(t, 2, val) -} - -func TestCacheSetWithTTL(t *testing.T) { - m := &sync.Mutex{} - evicted := make(map[uint64]struct{}) - c, err := NewCache(&Config{ - NumCounters: 100, - MaxCost: 10, - IgnoreInternalCost: true, - BufferItems: 64, - Metrics: true, - OnEvict: func(item *Item) { - m.Lock() - defer m.Unlock() - evicted[item.Key] = struct{}{} - }, - }) - require.NoError(t, err) - - retrySet(t, c, "1", 1, 1, time.Second) - - // Sleep to make sure the item has expired after execution resumes. - time.Sleep(2 * time.Second) - val, ok := c.Get("1") - require.False(t, ok) - require.Nil(t, val) - - // Sleep to ensure that the bucket where the item was stored has been cleared - // from the expiraton map. - time.Sleep(5 * time.Second) - m.Lock() - require.Equal(t, 1, len(evicted)) - evk, _ := defaultStringHash("1") - _, ok = evicted[evk] - require.True(t, ok) - m.Unlock() - - // Verify that expiration times are overwritten. - retrySet(t, c, "2", 1, 1, time.Second) - retrySet(t, c, "2", 2, 1, 100*time.Second) - time.Sleep(3 * time.Second) - val, ok = c.Get("2") - require.True(t, ok) - require.Equal(t, 2, val.(int)) - - // Verify that entries with no expiration are overwritten. - retrySet(t, c, "3", 1, 1, 0) - retrySet(t, c, "3", 2, 1, time.Second) - time.Sleep(3 * time.Second) - val, ok = c.Get("3") - require.False(t, ok) - require.Nil(t, val) -} - func TestCacheDel(t *testing.T) { c, err := NewCache(&Config{ NumCounters: 100, @@ -521,7 +444,7 @@ func TestCacheDel(t *testing.T) { }) require.NoError(t, err) - c.SetWithTTL("1", 1, 1, 0) + c.SetWithCost("1", 1, 1) c.Delete("1") // The deletes and sets are pushed through the setbuf. It might be possible // that the delete is not processed before the following get is called. So @@ -538,24 +461,6 @@ func TestCacheDel(t *testing.T) { c.Delete("1") } -func TestCacheDelWithTTL(t *testing.T) { - c, err := NewCache(&Config{ - NumCounters: 100, - MaxCost: 10, - IgnoreInternalCost: true, - BufferItems: 64, - }) - require.NoError(t, err) - retrySet(t, c, "3", 1, 1, 10*time.Second) - time.Sleep(1 * time.Second) - // Delete the item - c.Delete("3") - // Ensure the key is deleted. - val, ok := c.Get("3") - require.False(t, ok) - require.Nil(t, val) -} - func TestCacheClear(t *testing.T) { c, err := NewCache(&Config{ NumCounters: 100, @@ -567,7 +472,7 @@ func TestCacheClear(t *testing.T) { require.NoError(t, err) for i := 0; i < 10; i++ { - c.SetWithTTL(strconv.Itoa(i), i, 1, 0) + c.SetWithCost(strconv.Itoa(i), i, 1) } time.Sleep(wait) require.Equal(t, uint64(10), c.Metrics.KeysAdded()) @@ -593,7 +498,7 @@ func TestCacheMetrics(t *testing.T) { require.NoError(t, err) for i := 0; i < 10; i++ { - c.SetWithTTL(strconv.Itoa(i), i, 1, 0) + c.SetWithCost(strconv.Itoa(i), i, 1) } time.Sleep(wait) m := c.Metrics @@ -690,7 +595,7 @@ func TestCacheMetricsClear(t *testing.T) { }) require.NoError(t, err) - c.SetWithTTL("1", 1, 1, 0) + c.SetWithCost("1", 1, 1) stop := make(chan struct{}) go func() { for { @@ -709,11 +614,6 @@ func TestCacheMetricsClear(t *testing.T) { c.Metrics.Clear() } -func init() { - // Set bucketSizeSecs to 1 to avoid waiting too much during the tests. - bucketDurationSecs = 1 -} - // Regression test for bug https://github.com/dgraph-io/ristretto/issues/167 func TestDropUpdates(t *testing.T) { originalSetBugSize := setBufSize @@ -759,7 +659,7 @@ func TestDropUpdates(t *testing.T) { for i := 0; i < 5*setBufSize; i++ { v := fmt.Sprintf("%0100d", i) // We're updating the same key. - if !c.SetWithTTL("0", v, 1, 0) { + if !c.SetWithCost("0", v, 1) { // The race condition doesn't show up without this sleep. time.Sleep(time.Microsecond) droppedMap[i] = struct{}{} @@ -768,7 +668,7 @@ func TestDropUpdates(t *testing.T) { // Wait for all the items to be processed. c.Wait() // This will cause eviction from the cache. - require.True(t, c.SetWithTTL("1", nil, 10, 0)) + require.True(t, c.SetWithCost("1", nil, 10)) c.Close() } diff --git a/go/cache/ristretto/policy.go b/go/cache/ristretto/policy.go index b20b903309c..9ebf0b38d72 100644 --- a/go/cache/ristretto/policy.go +++ b/go/cache/ristretto/policy.go @@ -1,5 +1,6 @@ /* * Copyright 2020 Dgraph Labs, Inc. and Contributors + * Copyright 2021 The Vitess Authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/go/cache/ristretto/policy_test.go b/go/cache/ristretto/policy_test.go index 5e5df1ac1af..c864b6c74d0 100644 --- a/go/cache/ristretto/policy_test.go +++ b/go/cache/ristretto/policy_test.go @@ -1,3 +1,20 @@ +/* + * Copyright 2020 Dgraph Labs, Inc. and Contributors + * Copyright 2021 The Vitess Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package ristretto import ( diff --git a/go/cache/ristretto/ring.go b/go/cache/ristretto/ring.go index 5dbed4cc59c..afc2c1559f8 100644 --- a/go/cache/ristretto/ring.go +++ b/go/cache/ristretto/ring.go @@ -1,5 +1,6 @@ /* * Copyright 2019 Dgraph Labs, Inc. and Contributors + * Copyright 2021 The Vitess Authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/go/cache/ristretto/ring_test.go b/go/cache/ristretto/ring_test.go index 1c729fc3260..0dbe962ccc6 100644 --- a/go/cache/ristretto/ring_test.go +++ b/go/cache/ristretto/ring_test.go @@ -1,3 +1,20 @@ +/* + * Copyright 2020 Dgraph Labs, Inc. and Contributors + * Copyright 2021 The Vitess Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package ristretto import ( diff --git a/go/cache/ristretto/sketch.go b/go/cache/ristretto/sketch.go index f12add3aed5..ce0504a2a83 100644 --- a/go/cache/ristretto/sketch.go +++ b/go/cache/ristretto/sketch.go @@ -1,5 +1,6 @@ /* * Copyright 2019 Dgraph Labs, Inc. and Contributors + * Copyright 2021 The Vitess Authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/go/cache/ristretto/sketch_test.go b/go/cache/ristretto/sketch_test.go index 62142901fbd..f0d523df559 100644 --- a/go/cache/ristretto/sketch_test.go +++ b/go/cache/ristretto/sketch_test.go @@ -1,3 +1,20 @@ +/* + * Copyright 2020 Dgraph Labs, Inc. and Contributors + * Copyright 2021 The Vitess Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package ristretto import ( diff --git a/go/cache/ristretto/store.go b/go/cache/ristretto/store.go index c45f96718d7..44e5ad8b147 100644 --- a/go/cache/ristretto/store.go +++ b/go/cache/ristretto/store.go @@ -1,5 +1,6 @@ /* * Copyright 2019 Dgraph Labs, Inc. and Contributors + * Copyright 2021 The Vitess Authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,15 +19,13 @@ package ristretto import ( "sync" - "time" ) // TODO: Do we need this to be a separate struct from Item? type storeItem struct { - key uint64 - conflict uint64 - value interface{} - expiration int64 + key uint64 + conflict uint64 + value interface{} } // store is the interface fulfilled by all hash map implementations in this @@ -38,8 +37,6 @@ type storeItem struct { type store interface { // Get returns the value associated with the key parameter. Get(uint64, uint64) (interface{}, bool) - // Expiration returns the expiration time for this key. - Expiration(uint64) int64 // Set adds the key-value pair to the Map or updates the value if it's // already present. The key-value pair is passed as a pointer to an // item object. @@ -49,8 +46,6 @@ type store interface { // Update attempts to update the key with a new value and returns true if // successful. Update(*Item) (interface{}, bool) - // Cleanup removes items that have an expired TTL. - Cleanup(policy policy, onEvict itemCallback) // Clear clears all contents of the store. Clear(onEvict itemCallback) // ForEach yields all the values in the store @@ -67,17 +62,15 @@ func newStore() store { const numShards uint64 = 256 type shardedMap struct { - shards []*lockedMap - expiryMap *expirationMap + shards []*lockedMap } func newShardedMap() *shardedMap { sm := &shardedMap{ - shards: make([]*lockedMap, int(numShards)), - expiryMap: newExpirationMap(), + shards: make([]*lockedMap, int(numShards)), } for i := range sm.shards { - sm.shards[i] = newLockedMap(sm.expiryMap) + sm.shards[i] = newLockedMap() } return sm } @@ -86,10 +79,6 @@ func (sm *shardedMap) Get(key, conflict uint64) (interface{}, bool) { return sm.shards[key%numShards].get(key, conflict) } -func (sm *shardedMap) Expiration(key uint64) int64 { - return sm.shards[key%numShards].Expiration(key) -} - func (sm *shardedMap) Set(i *Item) { if i == nil { // If item is nil make this Set a no-op. @@ -107,10 +96,6 @@ func (sm *shardedMap) Update(newItem *Item) (interface{}, bool) { return sm.shards[newItem.Key%numShards].Update(newItem) } -func (sm *shardedMap) Cleanup(policy policy, onEvict itemCallback) { - sm.expiryMap.cleanup(sm, policy, onEvict) -} - func (sm *shardedMap) ForEach(forEach func(interface{}) bool) { for _, shard := range sm.shards { if !shard.foreach(forEach) { @@ -136,13 +121,11 @@ func (sm *shardedMap) Clear(onEvict itemCallback) { type lockedMap struct { sync.RWMutex data map[uint64]storeItem - em *expirationMap } -func newLockedMap(em *expirationMap) *lockedMap { +func newLockedMap() *lockedMap { return &lockedMap{ data: make(map[uint64]storeItem), - em: em, } } @@ -156,20 +139,9 @@ func (m *lockedMap) get(key, conflict uint64) (interface{}, bool) { if conflict != 0 && (conflict != item.conflict) { return nil, false } - - // Handle expired items. - if item.expiration != 0 && time.Now().Unix() > item.expiration { - return nil, false - } return item.value, true } -func (m *lockedMap) Expiration(key uint64) int64 { - m.RLock() - defer m.RUnlock() - return m.data[key].expiration -} - func (m *lockedMap) Set(i *Item) { if i == nil { // If the item is nil make this Set a no-op. @@ -186,18 +158,12 @@ func (m *lockedMap) Set(i *Item) { if i.Conflict != 0 && (i.Conflict != item.conflict) { return } - m.em.update(i.Key, i.Conflict, item.expiration, i.Expiration) - } else { - // The value is not in the map already. There's no need to return anything. - // Simply add the expiration map. - m.em.add(i.Key, i.Conflict, i.Expiration) } m.data[i.Key] = storeItem{ - key: i.Key, - conflict: i.Conflict, - value: i.Value, - expiration: i.Expiration, + key: i.Key, + conflict: i.Conflict, + value: i.Value, } } @@ -213,10 +179,6 @@ func (m *lockedMap) Del(key, conflict uint64) (uint64, interface{}) { return 0, nil } - if item.expiration != 0 { - m.em.del(key, item.expiration) - } - delete(m.data, key) m.Unlock() return item.conflict, item.value @@ -234,12 +196,10 @@ func (m *lockedMap) Update(newItem *Item) (interface{}, bool) { return nil, false } - m.em.update(newItem.Key, newItem.Conflict, item.expiration, newItem.Expiration) m.data[newItem.Key] = storeItem{ - key: newItem.Key, - conflict: newItem.Conflict, - value: newItem.Value, - expiration: newItem.Expiration, + key: newItem.Key, + conflict: newItem.Conflict, + value: newItem.Value, } m.Unlock() diff --git a/go/cache/ristretto/store_test.go b/go/cache/ristretto/store_test.go index 7f9aa052b3f..54634736a72 100644 --- a/go/cache/ristretto/store_test.go +++ b/go/cache/ristretto/store_test.go @@ -1,3 +1,20 @@ +/* + * Copyright 2020 Dgraph Labs, Inc. and Contributors + * Copyright 2021 The Vitess Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package ristretto import ( diff --git a/go/cache/ristretto/ttl.go b/go/cache/ristretto/ttl.go deleted file mode 100644 index 40a91bc1e51..00000000000 --- a/go/cache/ristretto/ttl.go +++ /dev/null @@ -1,147 +0,0 @@ -/* - * Copyright 2020 Dgraph Labs, Inc. and Contributors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package ristretto - -import ( - "sync" - "time" -) - -var ( - // TODO: find the optimal value or make it configurable. - bucketDurationSecs = int64(5) -) - -func storageBucket(t int64) int64 { - return (t / bucketDurationSecs) + 1 -} - -func cleanupBucket(t int64) int64 { - // The bucket to cleanup is always behind the storage bucket by one so that - // no elements in that bucket (which might not have expired yet) are deleted. - return storageBucket(t) - 1 -} - -// bucket type is a map of key to conflict. -type bucket map[uint64]uint64 - -// expirationMap is a map of bucket number to the corresponding bucket. -type expirationMap struct { - sync.RWMutex - buckets map[int64]bucket -} - -func newExpirationMap() *expirationMap { - return &expirationMap{ - buckets: make(map[int64]bucket), - } -} - -func (m *expirationMap) add(key, conflict uint64, expiration int64) { - if m == nil { - return - } - - // Items that don't expire don't need to be in the expiration map. - if expiration == 0 { - return - } - - bucketNum := storageBucket(expiration) - m.Lock() - defer m.Unlock() - - b, ok := m.buckets[bucketNum] - if !ok { - b = make(bucket) - m.buckets[bucketNum] = b - } - b[key] = conflict -} - -func (m *expirationMap) update(key, conflict uint64, oldExpTime, newExpTime int64) { - if m == nil { - return - } - - m.Lock() - defer m.Unlock() - - oldBucketNum := storageBucket(oldExpTime) - oldBucket, ok := m.buckets[oldBucketNum] - if ok { - delete(oldBucket, key) - } - - newBucketNum := storageBucket(newExpTime) - newBucket, ok := m.buckets[newBucketNum] - if !ok { - newBucket = make(bucket) - m.buckets[newBucketNum] = newBucket - } - newBucket[key] = conflict -} - -func (m *expirationMap) del(key uint64, expiration int64) { - if m == nil { - return - } - - bucketNum := storageBucket(expiration) - m.Lock() - defer m.Unlock() - _, ok := m.buckets[bucketNum] - if !ok { - return - } - delete(m.buckets[bucketNum], key) -} - -// cleanup removes all the items in the bucket that was just completed. It deletes -// those items from the store, and calls the onEvict function on those items. -// This function is meant to be called periodically. -func (m *expirationMap) cleanup(store store, policy policy, onEvict itemCallback) { - if m == nil { - return - } - - m.Lock() - now := time.Now().Unix() - bucketNum := cleanupBucket(now) - keys := m.buckets[bucketNum] - delete(m.buckets, bucketNum) - m.Unlock() - - for key, conflict := range keys { - // Sanity check. Verify that the store agrees that this key is expired. - if store.Expiration(key) > now { - continue - } - - cost := policy.Cost(key) - policy.Del(key) - _, value := store.Del(key, conflict) - - if onEvict != nil { - onEvict(&Item{Key: key, - Conflict: conflict, - Value: value, - Cost: cost, - }) - } - } -}