-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathlocker.go
143 lines (127 loc) · 3.38 KB
/
locker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
package geche
import (
"sync"
"sync/atomic"
)
// Locker is a wrapper for any Geche interface implementation,
// that provides Lock() and RLock() methods that return Tx object
// implementing Geche interface.
// Returned object is not a transaction in a sense that it does not
// allow commit/rollback or isolation level higher than READ COMMITTED.
// It only provides a way to do multiple cache operations atomically.
type Locker[K comparable, V any] struct {
cache Geche[K, V]
mux *sync.RWMutex
}
// NewLocker creates a new Locker instance.
func NewLocker[K comparable, V any](
cache Geche[K, V],
) *Locker[K, V] {
t := Locker[K, V]{
cache: cache,
mux: &sync.RWMutex{},
}
return &t
}
// Tx is a "transaction" object returned by Locker.Lock() and Locker.RLock() methods.
// See Locker for more details.
type Tx[K comparable, V any] struct {
cache Geche[K, V]
mux *sync.RWMutex
writable bool
unlocked int32
}
// Retuns read/write locked cache object.
func (t *Locker[K, V]) Lock() *Tx[K, V] {
t.mux.Lock()
return &Tx[K, V]{
cache: t.cache,
mux: t.mux,
writable: true,
}
}
// Retuns read-only locked cache object.
func (t *Locker[K, V]) RLock() *Tx[K, V] {
t.mux.RLock()
return &Tx[K, V]{
cache: t.cache,
mux: t.mux,
writable: false,
}
}
// Unlock underlying cache.
func (tx *Tx[K, V]) Unlock() {
if atomic.LoadInt32(&tx.unlocked) == 1 {
panic("unlocking already unlocked transaction")
}
atomic.StoreInt32(&tx.unlocked, 1)
if tx.writable {
tx.mux.Unlock()
return
}
tx.mux.RUnlock()
}
// Set key-value pair in the underlying locked cache.
// Will panic if called on RLocked Tx.
func (tx *Tx[K, V]) Set(key K, value V) {
if atomic.LoadInt32(&tx.unlocked) == 1 {
panic("cannot use unlocked transaction")
}
if !tx.writable {
panic("cannot set in read-only transaction")
}
tx.cache.Set(key, value)
}
func (tx *Tx[K, V]) SetIfPresent(key K, value V) (V, bool) {
if atomic.LoadInt32(&tx.unlocked) == 1 {
panic("cannot use unlocked transaction")
}
if !tx.writable {
panic("cannot set in read-only transaction")
}
return tx.cache.SetIfPresent(key, value)
}
// Get value by key from the underlying sharded cache.
func (tx *Tx[K, V]) Get(key K) (V, error) {
if atomic.LoadInt32(&tx.unlocked) == 1 {
panic("cannot use unlocked transaction")
}
return tx.cache.Get(key)
}
// Del key from the underlying locked cache.
// Will panic if called on RLocked Tx.
func (tx *Tx[K, V]) Del(key K) error {
if atomic.LoadInt32(&tx.unlocked) == 1 {
panic("cannot use unlocked transaction")
}
if !tx.writable {
panic("cannot del in read-only transaction")
}
return tx.cache.Del(key)
}
// Snapshot returns a shallow copy of the cache data.
func (tx *Tx[K, V]) Snapshot() map[K]V {
if atomic.LoadInt32(&tx.unlocked) == 1 {
panic("cannot use unlocked transaction")
}
return tx.cache.Snapshot()
}
// Len returns total number of elements in the cache.
func (tx *Tx[K, V]) Len() int {
if atomic.LoadInt32(&tx.unlocked) == 1 {
panic("cannot use unlocked transaction")
}
return tx.cache.Len()
}
// ListByPrefix should only be called if underlying cache is KV.
// Otherwise it will panic.
func (tx *Tx[K, V]) ListByPrefix(prefix string) ([]V, error) {
if atomic.LoadInt32(&tx.unlocked) == 1 {
panic("cannot use unlocked transaction")
}
kv, ok := any(tx.cache).(*KV[V])
if !ok {
panic("cache does not support ListByPrefix")
}
return kv.ListByPrefix(prefix)
}