diff --git a/README.md b/README.md index 5942269..2416b4d 100644 --- a/README.md +++ b/README.md @@ -20,12 +20,12 @@ mnemosyneManager := mnemosyne.NewMnemosyne(config, nil, nil) cacheInstance := mnemosyneManager.select("result-cache") ``` -### Working with a cacheInstance +### Working with a CacheInstance ```go cacheInstance.Set(context, key, value) var myCachedData myType err := cacheInstance.Get(context, key, &myCachedData) - // cache miss is also an Error + // remember: cacheMiss is also an Error ``` ## Configuration @@ -55,6 +55,7 @@ cache: ttl: 24h amnesia: 0 compression: true + my-user-cache: soft-ttl: 2h layers: @@ -75,27 +76,57 @@ cache: compression: true ``` -`soft-ttl` is an instance-wide TTL which when expired will **NOT** remove the data from the instance, but warns that the data is old -Each cache layer can be of types `redis`, `gaurdian`, `memory` or `tiny`. `redis` is used for a single node Redis server, `gaurdian` is used for a master-slave Redis cluster configuration, `memory` uses the BigCache library to provide an efficient and fast in-memory cache, `tiny` uses the native sync.map data structure to store smaller cache values in memory (used for low-write caches). -Note: all of the cache types are sync-safe, meaning they can be safely used from simultaneously running goroutines. +**Each cache layer can be of the following types:** + +`redis` is used for a single node Redis server. + +`gaurdian` [Depricated] is used for a master-slave Redis cluster configuration but it's being depricated in favor of `rediscluster`. + +`rediscluster` is an all-encompassing configuration for both client side sharding as well as cluster Redis (or both at the same time). + +`memory` uses the BigCache library to provide an efficient and fast in-memory cache. + +`tiny` uses the native sync.map data structure to store smaller cache values in memory (used for low-write caches). + +_Note:_ all of the cache types are sync-safe, meaning they can be safely used from simultaneously running goroutines. + +#### Instance Configs: -#### Common layer configs: +**`soft-ttl`** is an instance-wide TTL which when expired will **NOT** remove the data from the instance, but warns that the data is old. -`amnesia` is a stochastic fall-through mechanism which allows for a higher layer to be updated from a lower layer by the way of an artificial cache-miss, -a 0 amnesia means that the layers will never miss a data that they actually have, a 10 amnesia means when a key is present in the cache, 90% of the time it is returned but 10% of the time it is ignored and is treated as a cache-miss. a 100 amnesia effectively turns the layer off. (Default: 0) +#### Common Layer Configs: -`compression` is whther the data is compressed before being put into the cache memory. Currently only Zlib compression is supported. (Default: false) +**`amnesia`** is a stochastic fall-through mechanism which allows for a higher layer to be updated from a lower layer by the way of an artificial cache-miss, +an amnesia value of 0 means that the layers will never miss a data that they actually have, an amnesia value of 10 means when a key is present in the cache, 90% of the time it is returned but 10% of the time it is ignored and is treated as a cache-miss. a 100% amnesia effectively turns the layer off. (Default: 0) +_Note:_ 'SET' operations ignore Amnesia, to compeletly turn off a layer, remove its name from the layer list. -`ttl` is the hard Time To Live for the data in this particular layer, after which the data is expired and is expected to be removed. +**`compression`** dictates whther the data is compressed before being put into the cache memory. Currently only Zlib compression is supported. (Default: false) -#### Type-spesific layer configs: +**`ttl`** is the hard Time-To-Live for the data in this particular layer, after which the data is expired and is expected to be removed. -`db` [`redis` - `gaurdian`] is the Redis DB number to be used. (Default:0) -`idle-timeout` [`redis` - `gaurdian`] is the timeout for idle connections to the Redis Server (see Redis documentation) (Default:0 - no timeout) -`address` [`redis` - `gaurdian`] is the Redis Server's Address (the master's address in case of a cluster) -`slaves` [`gaurdian`] is a **list** of Redis servers addresses pertaining to the slave nodes. -`max-memory` [`memory`] is the maximum amount of system memory which can be used by this particular layer. +#### Type-spesific Layer Configs: + +**`db`** {`redis` - `gaurdian`} is the Redis DB number to be used. (Default:0) +**`idle-timeout`** {`redis` - `gaurdian`} is the timeout for idle connections to the Redis Server (see Redis documentation) (Default:0 - no timeout) +**`address`** {`redis` - `gaurdian` - `rediscluster`} is the Redis Server's Address (the master's address in case of a cluster) +**`slaves`** {`gaurdian` - `rediscluster`} is a **list** of Redis servers addresses pertaining to the slave nodes. +**`max-memory`** {`memory`} is the maximum amount of system memory which can be used by this particular layer. + + +### Epimetheus Integration Guide + +Add these two functions to your `container.go` file as well as to the `wire.build()` so _wire-gen_ can recognize the proper timer & counter to pass to Mnemosyne. + +```go +func getCommTimer(epi *epimetheus.Epimetheus) mnemosyne.ITimer { + return epi.CommTimer +} + +func getCacheRate(epi *epimetheus.Epimetheus) mnemosyne.ICounter { + return epi.CacheRate +} +``` ## Documentation @@ -121,9 +152,10 @@ We use [SemVer](http://semver.org/) for versioning. For the versions available, ## Authors -* **Ramtin Rostami** - *Initial work* - [rrostami](https://github.com/rrostami) -* **Pedram Teymoori** - *Initial work* - [pedramteymoori](https://github.com/pedramteymoori) -* **Parsa abdollahi** - *Initial work* - []() +* **Ramtin Rostami** [rrostami](https://github.com/rrostami) - *Initial work & Maintaining* +* **Pedram Teymoori** [pedramteymoori](https://github.com/pedramteymoori) - *Initial work & Maintaining* +* **Parsa abdollahi** - *Initial work* +* **Ava Abderezaei** [avv-va](https://github.com/avv-va) - *Tests* See also the list of [contributors](https://github.com/cafebazaar/Mnemosyne/graphs/contributors) who participated in this project. diff --git a/c-memory.go b/c-memory.go new file mode 100644 index 0000000..d75936f --- /dev/null +++ b/c-memory.go @@ -0,0 +1,76 @@ +package mnemosyne + +import ( + "context" + "math/rand" + "time" + + "github.com/allegro/bigcache" + "github.com/sirupsen/logrus" +) + +type inMemoryCache struct { + baseCache + base *bigcache.BigCache + cacheTTL time.Duration +} + +func NewInMemoryCache(opts *CacheOpts) *inMemoryCache { + internalOpts := bigcache.Config{ + Shards: 1024, + LifeWindow: opts.cacheTTL, + MaxEntriesInWindow: 1100 * 10 * 60, + MaxEntrySize: 500, + Verbose: false, + HardMaxCacheSize: opts.memOpts.maxMem, + CleanWindow: 1 * time.Minute, + } + cacheInstance, err := bigcache.NewBigCache(internalOpts) + if err != nil { + logrus.Errorf("InMemCache %s Initialization Error: %v", opts.layerName, err) + } + return &inMemoryCache{ + baseCache: baseCache{ + layerName: opts.layerName, + amnesiaChance: opts.amnesiaChance, + compressionEnabled: opts.compressionEnabled, + }, + base: cacheInstance, + cacheTTL: opts.cacheTTL, + } +} + +func (mc *inMemoryCache) Get(ctx context.Context, key string) (*cachableRet, error) { + if mc.amnesiaChance > rand.Intn(100) { + return nil, newAmnesiaError(mc.amnesiaChance) + } + rawBytes, err := mc.base.Get(key) + if err != nil { + return nil, err + } + return finalizeCacheResponse(rawBytes, mc.compressionEnabled) +} + +func (mc *inMemoryCache) Set(ctx context.Context, key string, value interface{}) error { + finalData, err := prepareCachePayload(value, mc.compressionEnabled) + if err != nil { + return err + } + return mc.base.Set(key, finalData) +} + +func (mc *inMemoryCache) Delete(ctx context.Context, key string) error { + return mc.base.Delete(key) +} + +func (mc *inMemoryCache) Clear() error { + return mc.base.Reset() +} + +func (mc *inMemoryCache) TTL(ctx context.Context, key string) time.Duration { + return time.Second * 0 +} + +func (mc *inMemoryCache) Name() string { + return mc.layerName +} diff --git a/c-redis.go b/c-redis.go new file mode 100644 index 0000000..3cf0c26 --- /dev/null +++ b/c-redis.go @@ -0,0 +1,158 @@ +package mnemosyne + +import ( + "context" + "hash/fnv" + "math/rand" + "time" + + "github.com/go-redis/redis" + "github.com/sirupsen/logrus" +) + +type RedisClusterAddress struct { + MasterAddr string `mapstructure:"address"` + SlaveAddrs []string `mapstructure:"slaves"` +} + +type RedisOpts struct { + db int + idleTimeout time.Duration + shards []*RedisClusterAddress +} + +type clusterClient struct { + master *redis.Client + slaves []*redis.Client +} + +type redisCache struct { + baseCache + baseClients []*clusterClient + cacheTTL time.Duration + watcher ITimer +} + +func makeClient(addr string, db int, idleTimeout time.Duration) *redis.Client { + redisOptions := &redis.Options{ + Addr: addr, + DB: db, + } + if idleTimeout >= time.Second { + redisOptions.IdleTimeout = idleTimeout + } + newClient := redis.NewClient(redisOptions) + + if err := newClient.Ping().Err(); err != nil { + logrus.WithError(err).WithField("address", addr).Error("error pinging Redis") + } + return newClient +} + +func NewShardedClusterRedisCache(opts *CacheOpts, watcher ITimer) *redisCache { + rc := &redisCache{ + baseCache: baseCache{ + layerName: opts.layerName, + amnesiaChance: opts.amnesiaChance, + compressionEnabled: opts.compressionEnabled, + }, + cacheTTL: opts.cacheTTL, + watcher: watcher, + } + rc.baseClients = make([]*clusterClient, len(opts.redisOpts.shards)) + for i, shard := range opts.redisOpts.shards { + rc.baseClients[i].master = makeClient(shard.MasterAddr, + opts.redisOpts.db, + opts.redisOpts.idleTimeout) + + rc.baseClients[i].slaves = make([]*redis.Client, len(shard.SlaveAddrs)) + for j, slv := range shard.SlaveAddrs { + rc.baseClients[i].slaves[j] = makeClient(slv, + opts.redisOpts.db, + opts.redisOpts.idleTimeout) + } + } + return rc +} + +func (rc *redisCache) Get(ctx context.Context, key string) (*cachableRet, error) { + if rc.amnesiaChance > rand.Intn(100) { + return nil, newAmnesiaError(rc.amnesiaChance) + } + client := rc.pickClient(key, false).WithContext(ctx) + startMarker := rc.watcher.Start() + strValue, err := client.Get(key).Result() + if err == nil { + rc.watcher.Done(startMarker, rc.layerName, "get", "ok") + } else if err == redis.Nil { + rc.watcher.Done(startMarker, rc.layerName, "get", "miss") + } else { + rc.watcher.Done(startMarker, rc.layerName, "get", "error") + } + rawBytes := []byte(strValue) + return finalizeCacheResponse(rawBytes, rc.compressionEnabled) +} + +func (rc *redisCache) Set(ctx context.Context, key string, value interface{}) error { + finalData, err := prepareCachePayload(value, rc.compressionEnabled) + if err != nil { + return err + } + client := rc.pickClient(key, true).WithContext(ctx) + startMarker := rc.watcher.Start() + setError := client.SetNX(key, finalData, rc.cacheTTL).Err() + if setError != nil { + rc.watcher.Done(startMarker, rc.layerName, "set", "error") + } else { + rc.watcher.Done(startMarker, rc.layerName, "set", "ok") + } + return setError +} +func (rc *redisCache) Delete(ctx context.Context, key string) error { + client := rc.pickClient(key, true).WithContext(ctx) + return client.Del(key).Err() +} + +func (rc *redisCache) Clear() error { + for _, cl := range rc.baseClients { + client := cl.master + err := client.FlushDB().Err() + if err != nil { + return err + } + } + return nil +} + +func (rc *redisCache) TTL(ctx context.Context, key string) time.Duration { + client := rc.pickClient(key, false).WithContext(ctx) + res, err := client.TTL(key).Result() + if err != nil { + return time.Second * 0 + } + return res +} + +func (rc *redisCache) pickClient(key string, modification bool) *redis.Client { + shard := rc.shardKey(key) + if modification || len(rc.baseClients[shard].slaves) == 0 { + return rc.baseClients[shard].master + } + cl := rand.Intn(len(rc.baseClients[shard].slaves)) + return rc.baseClients[shard].slaves[cl] +} + +func (rc *redisCache) shardKey(key string) int { + shards := len(rc.baseClients) + if shards == 1 { + return 0 + } + hasher := fnv.New32a() + hasher.Write([]byte(key)) + keyHash := int(hasher.Sum32()) + return keyHash % shards +} + +func (rc *redisCache) Name() string { + return rc.layerName +} diff --git a/c-tiny.go b/c-tiny.go new file mode 100644 index 0000000..2eae2d3 --- /dev/null +++ b/c-tiny.go @@ -0,0 +1,70 @@ +package mnemosyne + +import ( + "context" + "errors" + "math/rand" + "sync" + "time" +) + +type tinyCache struct { + baseCache + base *sync.Map +} + +func NewTinyCache(opts *CacheOpts) *tinyCache { + data := sync.Map{} + return &tinyCache{ + baseCache: baseCache{ + layerName: opts.layerName, + amnesiaChance: opts.amnesiaChance, + compressionEnabled: opts.compressionEnabled, + }, + base: &data, + } +} + +func (tc *tinyCache) Get(ctx context.Context, key string) (*cachableRet, error) { + if tc.amnesiaChance > rand.Intn(100) { + return nil, newAmnesiaError(tc.amnesiaChance) + } + var rawBytes []byte + val, ok := tc.base.Load(key) + if !ok { + return nil, errors.New("Failed to load from syncmap") + } else { + rawBytes, ok = val.([]byte) + if !ok { + return nil, errors.New("Failed to load from syncmap") + } + } + return finalizeCacheResponse(rawBytes, tc.compressionEnabled) +} + +func (tc *tinyCache) Set(ctx context.Context, key string, value interface{}) error { + finalData, err := prepareCachePayload(value, tc.compressionEnabled) + if err != nil { + return err + } + tc.base.Store(key, finalData) + return nil +} + +func (tc *tinyCache) Delete(ctx context.Context, key string) error { + tc.base.Delete(key) + return nil +} + +func (tc *tinyCache) Clear() error { + tc.base = &sync.Map{} + return nil +} + +func (tc *tinyCache) TTL(ctx context.Context, key string) time.Duration { + return time.Second * 0 +} + +func (tc *tinyCache) Name() string { + return tc.layerName +} diff --git a/cache.go b/cache.go index 1eb063c..d4727bc 100644 --- a/cache.go +++ b/cache.go @@ -2,275 +2,49 @@ package mnemosyne import ( "context" - "encoding/json" - "errors" - "fmt" - "math/rand" - "sync" "time" - "github.com/allegro/bigcache" - "github.com/go-redis/redis" "github.com/sirupsen/logrus" ) -type cache struct { - layerName string - baseRedisClient *redis.Client - slaveRedisClients []*redis.Client - inMemCache *bigcache.BigCache - syncmap *sync.Map - amnesiaChance int - compressionEnabled bool - cacheTTL time.Duration - ctx context.Context - watcher ITimer -} - -func newCacheRedis(layerName string, addr string, db int, TTL time.Duration, redisIdleTimeout time.Duration, amnesiaChance int, compressionEnabled bool, watcher ITimer) *cache { - redisOptions := &redis.Options{ - Addr: addr, - DB: db, - } - if redisIdleTimeout >= time.Second { - redisOptions.IdleTimeout = redisIdleTimeout - } - redisClient := redis.NewClient(redisOptions) - - err := redisClient.Ping().Err() - if err != nil { - logrus.WithError(err).Error("error while connecting to Redis") - } - return &cache{ - layerName: layerName, - baseRedisClient: redisClient, - amnesiaChance: amnesiaChance, - compressionEnabled: compressionEnabled, - cacheTTL: TTL, - ctx: context.Background(), - watcher: watcher, - } -} - -func newCacheClusterRedis(layerName string, masterAddr string, slaveAddrs []string, db int, TTL time.Duration, redisIdleTimeout time.Duration, amnesiaChance int, compressionEnabled bool, watcher ITimer) *cache { - slaveClients := make([]*redis.Client, len(slaveAddrs)) - for i, addr := range slaveAddrs { - redisOptions := &redis.Options{ - Addr: addr, - DB: db, - } - if redisIdleTimeout >= time.Second { - redisOptions.IdleTimeout = redisIdleTimeout - } - slaveClients[i] = redis.NewClient(redisOptions) - } - - redisOptions := &redis.Options{ - Addr: masterAddr, - DB: db, - } - redisClient := redis.NewClient(redisOptions) - - if err := redisClient.Ping().Err(); err != nil { - logrus.WithError(err).Error("error while connecting to Redis Master") - } - - return &cache{ - layerName: layerName, - baseRedisClient: redisClient, - slaveRedisClients: slaveClients, - amnesiaChance: amnesiaChance, - compressionEnabled: compressionEnabled, - cacheTTL: TTL, - ctx: context.Background(), - watcher: watcher, - } +type ICache interface { + Get(context.Context, string) (*cachableRet, error) + Set(context.Context, string, interface{}) error + Delete(context.Context, string) error + Clear() error + TTL(context.Context, string) time.Duration + Name() string } -func newCacheInMem(layerName string, maxMem int, TTL time.Duration, amnesiaChance int, compressionEnabled bool) *cache { - opts := bigcache.Config{ - Shards: 1024, - LifeWindow: TTL, - MaxEntriesInWindow: 1100 * 10 * 60, - MaxEntrySize: 500, - Verbose: false, - HardMaxCacheSize: maxMem, - CleanWindow: 1 * time.Minute, - } - cacheInstance, err := bigcache.NewBigCache(opts) - if err != nil { - logrus.Errorf("InMemCache Error: %v", err) - } - return &cache{ - layerName: layerName, - inMemCache: cacheInstance, - amnesiaChance: amnesiaChance, - compressionEnabled: compressionEnabled, - cacheTTL: TTL, - ctx: context.Background(), - } +type MemoryOpts struct { + maxMem int } -func newCacheTiny(layerName string, amnesiaChance int, compressionEnabled bool) *cache { - data := sync.Map{} - return &cache{ - layerName: layerName, - syncmap: &data, - amnesiaChance: amnesiaChance, - compressionEnabled: compressionEnabled, - cacheTTL: time.Hour * 9999, - ctx: context.Background(), - } -} - -func (cr *cache) withContext(ctx context.Context) *cache { - return &cache{ - layerName: cr.layerName, - baseRedisClient: cr.baseRedisClient, - slaveRedisClients: cr.slaveRedisClients, - inMemCache: cr.inMemCache, - syncmap: cr.syncmap, - amnesiaChance: cr.amnesiaChance, - compressionEnabled: cr.compressionEnabled, - cacheTTL: cr.cacheTTL, - ctx: ctx, - watcher: cr.watcher, - } -} - -func (cr *cache) get(key string) (*cachableRet, error) { - if cr.amnesiaChance > rand.Intn(100) { - return nil, errors.New("Had Amnesia") - } - var rawBytes []byte - var err error - if cr.syncmap != nil { - val, ok := cr.syncmap.Load(key) - if !ok { - err = errors.New("Failed to load from syncmap") - } else { - rawBytes, ok = val.([]byte) - if !ok { - err = errors.New("Failed to load from syncmap") - } - } - } else if cr.inMemCache != nil { - rawBytes, err = cr.inMemCache.Get(key) - } else { - var strValue string - client := cr.pickClient().WithContext(cr.ctx) - startMarker := cr.watcher.Start() - strValue, err = client.Get(key).Result() - if err == nil { - cr.watcher.Done(startMarker, cr.layerName, "get", "ok") - } else if err == redis.Nil { - cr.watcher.Done(startMarker, cr.layerName, "get", "miss") - } else { - cr.watcher.Done(startMarker, cr.layerName, "get", "error") - } - rawBytes = []byte(strValue) - } - if err != nil { - return nil, err - } - var finalBytes []byte - if cr.compressionEnabled { - finalBytes = DecompressZlib(rawBytes) - } else { - finalBytes = rawBytes - } - var finalObject cachableRet - unmarshalErr := json.Unmarshal(finalBytes, &finalObject) - if unmarshalErr != nil { - return nil, fmt.Errorf("failed to unmarshall cached value : %v", unmarshalErr) - } - return &finalObject, nil -} - -func (cr *cache) set(key string, value interface{}) (setError error) { - if cr.amnesiaChance == 100 { - return errors.New("Had Amnesia") - } - defer func() { - if r := recover(); r != nil { - //json.Marshal panics under heavy-load which is not repeated with the same values - setError = fmt.Errorf("panic in cache-set: %v", r) - } - }() - rawData, err := json.Marshal(value) - if err != nil { - return err - } - var finalData []byte - if cr.compressionEnabled { - finalData = CompressZlib(rawData) - } else { - finalData = rawData - } - if cr.syncmap != nil { - cr.syncmap.Store(key, finalData) - return nil - } else if cr.inMemCache != nil { - return cr.inMemCache.Set(key, finalData) - } - client := cr.baseRedisClient.WithContext(cr.ctx) - startMarker := cr.watcher.Start() - setError = client.Set(key, finalData, cr.cacheTTL).Err() - if setError != nil { - cr.watcher.Done(startMarker, cr.layerName, "set", "error") - } else { - cr.watcher.Done(startMarker, cr.layerName, "set", "ok") - } - return -} - -func (cr *cache) delete(ctx context.Context, key string) error { - if cr.amnesiaChance == 100 { - return errors.New("Had Amnesia") - } - if cr.syncmap != nil { - cr.syncmap.Delete(key) - } else if cr.inMemCache != nil { - return cr.inMemCache.Delete(key) - } - client := cr.baseRedisClient.WithContext(ctx) - err := client.Del(key).Err() - return err -} - -func (cr *cache) clear() error { - if cr.amnesiaChance == 100 { - return errors.New("Had Amnesia") - } - if cr.syncmap != nil { - cr.syncmap = &sync.Map{} - } else if cr.inMemCache != nil { - return cr.inMemCache.Reset() - } - client := cr.baseRedisClient - err := client.FlushDB().Err() - return err +type CacheOpts struct { + layerName string + layerType string + redisOpts RedisOpts + memOpts MemoryOpts + amnesiaChance int + compressionEnabled bool + cacheTTL time.Duration } -func (cr *cache) getTTL(key string) time.Duration { - if cr.inMemCache != nil || cr.syncmap != nil { - return time.Second * 0 - } - client := cr.pickClient().WithContext(cr.ctx) - res, err := client.TTL(key).Result() - if err != nil { - return time.Second * 0 - } - return res +type baseCache struct { + layerName string + amnesiaChance int + compressionEnabled bool } -func (cr *cache) pickClient() *redis.Client { - if len(cr.slaveRedisClients) == 0 { - return cr.baseRedisClient - } - cl := rand.Intn(len(cr.slaveRedisClients) + 1) - if cl == 0 { - return cr.baseRedisClient +func NewCacheLayer(opts *CacheOpts, watcher ITimer) ICache { + layerType := opts.layerType + if layerType == "memory" { + return NewInMemoryCache(opts) + } else if layerType == "tiny" { + return NewTinyCache(opts) + } else if layerType == "redis" { + return NewShardedClusterRedisCache(opts, watcher) } - return cr.slaveRedisClients[cl-1] + logrus.Errorf("Malformed: Unknown cache type %s", layerType) + return nil } diff --git a/core.go b/core.go index 9a69e73..2bafcd2 100644 --- a/core.go +++ b/core.go @@ -22,7 +22,7 @@ type Mnemosyne struct { // MnemosyneInstance is an instance of a multi-layer cache type MnemosyneInstance struct { name string - cacheLayers []*cache + cacheLayers []ICache cacheWatcher ICounter softTTL time.Duration } @@ -62,51 +62,38 @@ func (m *Mnemosyne) Select(cacheName string) *MnemosyneInstance { func newMnemosyneInstance(name string, config *viper.Viper, commTimer ITimer, hitCounter ICounter) *MnemosyneInstance { configKeyPrefix := fmt.Sprintf("cache.%s", name) layerNames := config.GetStringSlice(configKeyPrefix + ".layers") - cacheLayers := make([]*cache, len(layerNames)) + cacheLayers := make([]ICache, len(layerNames)) for i, layerName := range layerNames { keyPrefix := configKeyPrefix + "." + layerName - layerType := config.GetString(keyPrefix + ".type") - if layerType == "memory" { - cacheLayers[i] = newCacheInMem( - layerName, - config.GetInt(keyPrefix+".max-memory"), - config.GetDuration(keyPrefix+".ttl"), - config.GetInt(keyPrefix+".amnesia"), - config.GetBool(keyPrefix+".compression"), - ) - } else if layerType == "redis" { - cacheLayers[i] = newCacheRedis( - layerName, - config.GetString(keyPrefix+".address"), - config.GetInt(keyPrefix+".db"), - config.GetDuration(keyPrefix+".ttl"), - config.GetDuration(keyPrefix+".idle-timeout"), - config.GetInt(keyPrefix+".amnesia"), - config.GetBool(keyPrefix+".compression"), - commTimer, - ) - } else if layerType == "gaurdian" { - cacheLayers[i] = newCacheClusterRedis( - layerName, - config.GetString(keyPrefix+".address"), - config.GetStringSlice(keyPrefix+".slaves"), - config.GetInt(keyPrefix+".db"), - config.GetDuration(keyPrefix+".ttl"), - config.GetDuration(keyPrefix+".idle-timeout"), - config.GetInt(keyPrefix+".amnesia"), - config.GetBool(keyPrefix+".compression"), - commTimer, - ) - } else if layerType == "tiny" { - cacheLayers[i] = newCacheTiny( - layerName, - config.GetInt(keyPrefix+".amnesia"), - config.GetBool(keyPrefix+".compression"), - ) - } else { - logrus.Errorf("Malformed Config: Unknown cache type %s", layerType) - return nil + layerOptions := &CacheOpts{ + layerType: config.GetString(keyPrefix + ".type"), + layerName: layerName, + cacheTTL: config.GetDuration(keyPrefix + ".ttl"), + amnesiaChance: config.GetInt(keyPrefix + ".amnesia"), + compressionEnabled: config.GetBool(keyPrefix + ".compression"), + memOpts: MemoryOpts{ + maxMem: config.GetInt(keyPrefix + ".max-memory"), + }, + redisOpts: RedisOpts{ + db: config.GetInt(keyPrefix + ".db"), + idleTimeout: config.GetDuration(keyPrefix + ".idle-timeout"), + shards: make([]*RedisClusterAddress, 1), + }, } + if layerOptions.layerType == "redis" { + layerOptions.redisOpts.shards[0].MasterAddr = config.GetString(keyPrefix + ".address") + } else if layerOptions.layerType == "gaurdian" { + // to preserve backward-compatibility + layerOptions.redisOpts.shards[0].MasterAddr = config.GetString(keyPrefix + ".address") + layerOptions.redisOpts.shards[0].SlaveAddrs = config.GetStringSlice(keyPrefix + ".slaves") + } else if layerOptions.layerType == "rediscluster" { + err := config.UnmarshalKey(keyPrefix+".cluster", &layerOptions.redisOpts.shards) + if err != nil { + logrus.WithError(err).Error("Error reading redis cluster config") + } + } + cacheLayers[i] = NewCacheLayer(layerOptions, commTimer) + } return &MnemosyneInstance{ name: name, @@ -120,10 +107,12 @@ func (mn *MnemosyneInstance) get(ctx context.Context, key string) (*cachableRet, cacheErrors := make([]error, len(mn.cacheLayers)) var result *cachableRet for i, layer := range mn.cacheLayers { - result, cacheErrors[i] = layer.withContext(ctx).get(key) + result, cacheErrors[i] = layer.Get(ctx, key) if cacheErrors[i] == nil { - go mn.cacheWatcher.Inc(mn.name, fmt.Sprintf("layer%d", i)) - go mn.fillUpperLayers(key, result, i) + go func() { + mn.fillUpperLayers(key, result, i) + mn.cacheWatcher.Inc(mn.name, fmt.Sprintf("layer%d", i)) + }() return result, nil } } @@ -168,7 +157,7 @@ func (mn *MnemosyneInstance) GetAndShouldUpdate(ctx context.Context, key string, if err != nil { return false, err } - dataAge := time.Now().Sub(cachableObj.Time) + dataAge := time.Since(cachableObj.Time) go mn.monitorDataHotness(dataAge) shouldUpdate := dataAge > mn.softTTL return shouldUpdate, nil @@ -188,7 +177,7 @@ func (mn *MnemosyneInstance) ShouldUpdate(ctx context.Context, key string) (bool return false, errors.New("nil found") } - shouldUpdate := time.Now().Sub(cachableObj.Time) > mn.softTTL + shouldUpdate := time.Since(cachableObj.Time) > mn.softTTL return shouldUpdate, nil } @@ -207,7 +196,7 @@ func (mn *MnemosyneInstance) Set(ctx context.Context, key string, value interfac errorStrings := make([]string, len(mn.cacheLayers)) haveErorr := false for i, layer := range mn.cacheLayers { - cacheErrors[i] = layer.withContext(ctx).set(key, toCache) + cacheErrors[i] = layer.Set(ctx, key, toCache) if cacheErrors[i] != nil { errorStrings[i] = cacheErrors[i].Error() haveErorr = true @@ -220,9 +209,9 @@ func (mn *MnemosyneInstance) Set(ctx context.Context, key string, value interfac } // TTL returns the TTL of the first accessible data instance as well as the layer it was found on -func (mn *MnemosyneInstance) TTL(key string) (int, time.Duration) { +func (mn *MnemosyneInstance) TTL(ctx context.Context, key string) (int, time.Duration) { for i, layer := range mn.cacheLayers { - dur := layer.getTTL(key) + dur := layer.TTL(ctx, key) if dur > 0 { return i, dur } @@ -236,7 +225,7 @@ func (mn *MnemosyneInstance) Delete(ctx context.Context, key string) error { errorStrings := make([]string, len(mn.cacheLayers)) haveErorr := false for i, layer := range mn.cacheLayers { - cacheErrors[i] = layer.delete(ctx, key) + cacheErrors[i] = layer.Delete(ctx, key) if cacheErrors[i] != nil { errorStrings[i] = cacheErrors[i].Error() haveErorr = true @@ -251,19 +240,21 @@ func (mn *MnemosyneInstance) Delete(ctx context.Context, key string) error { // Flush completly clears a single layer of the cache func (mn *MnemosyneInstance) Flush(targetLayerName string) error { for _, layer := range mn.cacheLayers { - if layer.layerName == targetLayerName { - return layer.clear() + if layer.Name() == targetLayerName { + return layer.Clear() } } return fmt.Errorf("Layer Named: %v Not Found", targetLayerName) } func (mn *MnemosyneInstance) fillUpperLayers(key string, value *cachableRet, layer int) { + ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) + defer cancel() for i := layer - 1; i >= 0; i-- { if value == nil { continue } - err := mn.cacheLayers[i].set(key, *value) + err := mn.cacheLayers[i].Set(ctx, key, *value) if err != nil { logrus.Errorf("failed to fill layer %d : %v", i, err) } diff --git a/data.go b/data.go deleted file mode 100644 index 32b57ba..0000000 --- a/data.go +++ /dev/null @@ -1,16 +0,0 @@ -package mnemosyne - -import ( - "encoding/json" - "time" -) - -type cachable struct { - Time time.Time - CachedObject interface{} -} - -type cachableRet struct { - Time time.Time - CachedObject *json.RawMessage -} diff --git a/datamanip.go b/datamanip.go new file mode 100644 index 0000000..c2116e8 --- /dev/null +++ b/datamanip.go @@ -0,0 +1,73 @@ +package mnemosyne + +import ( + "bytes" + "compress/zlib" + "encoding/json" + "fmt" + "io" + "time" +) + +type cachable struct { + Time time.Time + CachedObject interface{} +} + +type cachableRet struct { + Time time.Time + CachedObject *json.RawMessage +} + +func finalizeCacheResponse(rawBytes []byte, compress bool) (*cachableRet, error) { + var finalBytes []byte + if compress { + finalBytes = decompressZlib(rawBytes) + } else { + finalBytes = rawBytes + } + var finalObject cachableRet + unmarshalErr := json.Unmarshal(finalBytes, &finalObject) + if unmarshalErr != nil { + return nil, fmt.Errorf("failed to unmarshall cached value : %w", unmarshalErr) + } + return &finalObject, nil +} + +func prepareCachePayload(value interface{}, compress bool) (finalData []byte, prepError error) { + defer func() { + if r := recover(); r != nil { + //json.Marshal panics under heavy-load which is not repeated with the same values + prepError = fmt.Errorf("panic in cache-set: %v", r) + } + }() + rawData, err := json.Marshal(value) + if err != nil { + prepError = err + return + } + if compress { + finalData = compressZlib(rawData) + } else { + finalData = rawData + } + return +} + +func compressZlib(input []byte) []byte { + var buf bytes.Buffer + w := zlib.NewWriter(&buf) + w.Write(input) + w.Close() + compressed := buf.Bytes() + return compressed +} + +func decompressZlib(input []byte) []byte { + var out bytes.Buffer + r, _ := zlib.NewReader(bytes.NewBuffer(input)) + io.Copy(&out, r) + r.Close() + original := out.Bytes() + return original +} diff --git a/examples/config.yml b/examples/config.yml new file mode 100644 index 0000000..7a63564 --- /dev/null +++ b/examples/config.yml @@ -0,0 +1,70 @@ +cache: + result: + soft-ttl: 2h + layers: + - result-memory + - result-gaurdian + result-memory: + type: memory + max-memory: 2560 + ttl: 3h + amnesia: 0 + compression: true + result-gaurdian: + type: gaurdian + address: "bazaar-octopus-redis.redis:6379" + slaves: + - "bazaar-octopus-redis-readonly.redis:6379" + - "bazaar-octopus-redis-readonly.redis:6379" + db: 8 + ttl: 72h + amnesia: 0 + compression: true + idle-timeout: 5s + + package-info: + soft-ttl: 2h + layers: + - pkg-memory + - pkg-gaurdian + pkg-memory: + type: memory + max-memory: 512 + ttl: 3h + amnesia: 0 + compression: true + pkg-gaurdian: + type: gaurdian + address: "bazaar-octopus-redis.redis:6379" + slaves: + - "bazaar-octopus-redis-readonly.redis:6379" + - "bazaar-octopus-redis-readonly.redis:6379" + db: 5 + ttl: 72h + amnesia: 0 + compression: true + idle-timeout: 5s + + spell-checker: + soft-ttl: 60h + layers: + - spell-checker-cluster + spell-checker-cluster: + type: cluster + cluster: + - address: "bazaar-octopus-redis-one.redis:6379" + slaves: + - "bazaar-octopus-redis-readonly-one.redis:6379" + - "bazaar-octopus-redis-readonly-one.redis:6379" + - address: "bazaar-octopus-redis-two.redis:6379" + slaves: + - "bazaar-octopus-redis-readonly-two.redis:6379" + - "bazaar-octopus-redis-readonly-two.redis:6379" + - address: "bazaar-octopus-redis-three.redis:6379" + slaves: + - "bazaar-octopus-redis-readonly-three.redis:6379" + - "bazaar-octopus-redis-readonly-three.redis:6379" + db: 1 + ttl: 120h + amnesia: 0 + compression: false diff --git a/tests/cache_test.go b/tests/cache_test.go index 204e508..3adfb52 100644 --- a/tests/cache_test.go +++ b/tests/cache_test.go @@ -54,17 +54,15 @@ func TestGetAndShouldUpdate(t *testing.T) { assert.Equal(t, testCache, myCachedData) - assert.Equal(t, err, nil) + assert.Equal(t, nil, err) - assert.Equal(t, shouldUpdate, false) + assert.Equal(t, false, shouldUpdate) wayback := time.Now().Add(time.Hour * 3) patch := monkey.Patch(time.Now, func() time.Time { return wayback }) defer patch.Unpatch() shouldUpdate, _ = cacheInstance.GetAndShouldUpdate(cacheCtx, "test_item1", &myCachedData) - - assert.Equal(t, shouldUpdate, true) -} - + assert.Equal(t, true, shouldUpdate) +} diff --git a/utils.go b/utils.go index 36b6bdb..16bfdb7 100644 --- a/utils.go +++ b/utils.go @@ -1,45 +1,21 @@ package mnemosyne import ( - "bytes" - "compress/zlib" - "io" + "fmt" "strings" ) -func MakeKey(keys ...string) string { - return strings.Join(keys, ";") +type amnesiaError struct { + Chance int } -// func CompressLz4(input []byte) []byte { -// var buf bytes.Buffer -// w := lz4.NewWriter(&buf) -// w.Write(input) -// w.Close() -// compressed := buf.Bytes() -// return compressed -// } -// func DecompressLz4(input []byte) []byte { -// var out bytes.Buffer -// r := lz4.NewReader(bytes.NewBuffer(input)) -// io.Copy(&out, r) -// original := out.Bytes() -// return original -// } - -func CompressZlib(input []byte) []byte { - var buf bytes.Buffer - w := zlib.NewWriter(&buf) - w.Write(input) - w.Close() - compressed := buf.Bytes() - return compressed +func (e *amnesiaError) Error() string { + return fmt.Sprintf("Had Amnesia (Chance:%d)", e.Chance) +} +func newAmnesiaError(c int) *amnesiaError { + return &amnesiaError{Chance: c} } -func DecompressZlib(input []byte) []byte { - var out bytes.Buffer - r, _ := zlib.NewReader(bytes.NewBuffer(input)) - io.Copy(&out, r) - r.Close() - original := out.Bytes() - return original + +func MakeKey(keys ...string) string { + return strings.Join(keys, ";") }