This repository has been archived by the owner on Nov 23, 2018. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 10
/
Copy pathchainstore.go
161 lines (142 loc) · 2.96 KB
/
chainstore.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
package chainstore
import (
"regexp"
"golang.org/x/net/context"
)
var (
keyInvalidator = regexp.MustCompile(`(i?)[^a-z0-9\/_\-:\.]`)
)
const (
maxKeyLen = 256
)
// Store represents a store than can be used as a chainstore link.
type Store interface {
Open() error
Close() error
Put(ctx context.Context, key string, val []byte) error
Get(ctx context.Context, key string) ([]byte, error)
Del(ctx context.Context, key string) error
}
// Chain represents a store chain.
type Chain struct {
stores []Store
async bool
errCallback func(error)
}
// New creates a new store chain backed by the passed stores.
func New(stores ...Store) Store {
return &Chain{stores, false, nil}
}
// Async creates and async store.
func Async(errCallback func(error), stores ...Store) Store {
return &Chain{stores, true, errCallback}
}
// Open all the stores.
func (c *Chain) Open() (err error) {
for _, s := range c.stores {
err = s.Open()
if err != nil {
return // return first error that comes up
}
}
return
}
// Close closes all the stores.
func (c *Chain) Close() error {
errs := fewerrors{}
for _, s := range c.stores {
err := s.Close()
if err != nil {
errs = append(errs, err)
}
}
if len(errs) > 0 {
return errs
}
return nil
}
// Put propagates a key-value pair to all stores.
func (c *Chain) Put(ctx context.Context, key string, val []byte) (err error) {
if !IsValidKey(key) {
return ErrInvalidKey
}
fn := func() (err error) {
for _, s := range c.stores {
err = s.Put(ctx, key, val)
if err != nil {
if c.errCallback != nil {
c.errCallback(err)
}
return
}
}
return
}
if c.async {
go fn()
} else {
err = fn()
}
return
}
// Get returns the value identified by the given key. This is a sequential
// scan. When a value is found it gets propagated to all the stores that do not
// have it.
func (c *Chain) Get(ctx context.Context, key string) (val []byte, err error) {
if !IsValidKey(key) {
return nil, ErrInvalidKey
}
for i, s := range c.stores {
val, err = s.Get(ctx, key)
if err != nil {
if c.errCallback != nil {
c.errCallback(err)
}
return
}
if len(val) > 0 {
if i > 0 {
// put the value in all other stores up the chain
fn := func() {
for n := i - 1; n >= 0; n-- {
err := c.stores[n].Put(ctx, key, val)
if c.errCallback != nil {
c.errCallback(err)
}
}
}
go fn()
}
// return the first value found on the chain
return
}
}
return
}
// Del removes a key from all stores.
func (c *Chain) Del(ctx context.Context, key string) (err error) {
if !IsValidKey(key) {
return ErrInvalidKey
}
fn := func() (err error) {
for _, s := range c.stores {
err = s.Del(ctx, key)
if err != nil {
if c.errCallback != nil {
c.errCallback(err)
}
return
}
}
return
}
if c.async {
go fn()
} else {
err = fn()
}
return
}
func IsValidKey(key string) bool {
return len(key) <= maxKeyLen && !keyInvalidator.MatchString(key)
}