Skip to content

Commit

Permalink
Merge branch 'v2' into invalidated-data
Browse files Browse the repository at this point in the history
  • Loading branch information
Masood Adeli committed Sep 22, 2020
2 parents ad1d560 + e5a580e commit f9a9814
Show file tree
Hide file tree
Showing 11 changed files with 590 additions and 388 deletions.
70 changes: 51 additions & 19 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@ mnemosyneManager := mnemosyne.NewMnemosyne(config, nil, nil)
cacheInstance := mnemosyneManager.select("result-cache")
```

### Working with a cacheInstance
### Working with a CacheInstance
```go
cacheInstance.Set(context, key, value)
var myCachedData myType
err := cacheInstance.Get(context, key, &myCachedData)
// cache miss is also an Error
// remember: cacheMiss is also an Error
```

## Configuration
Expand Down Expand Up @@ -55,6 +55,7 @@ cache:
ttl: 24h
amnesia: 0
compression: true

my-user-cache:
soft-ttl: 2h
layers:
Expand All @@ -75,27 +76,57 @@ cache:
compression: true
```
`soft-ttl` is an instance-wide TTL which when expired will **NOT** remove the data from the instance, but warns that the data is old
Each cache layer can be of types `redis`, `gaurdian`, `memory` or `tiny`. `redis` is used for a single node Redis server, `gaurdian` is used for a master-slave Redis cluster configuration, `memory` uses the BigCache library to provide an efficient and fast in-memory cache, `tiny` uses the native sync.map data structure to store smaller cache values in memory (used for low-write caches).
Note: all of the cache types are sync-safe, meaning they can be safely used from simultaneously running goroutines.
**Each cache layer can be of the following types:**
`redis` is used for a single node Redis server.

`gaurdian` [Depricated] is used for a master-slave Redis cluster configuration but it's being depricated in favor of `rediscluster`.

`rediscluster` is an all-encompassing configuration for both client side sharding as well as cluster Redis (or both at the same time).

`memory` uses the BigCache library to provide an efficient and fast in-memory cache.

`tiny` uses the native sync.map data structure to store smaller cache values in memory (used for low-write caches).

_Note:_ all of the cache types are sync-safe, meaning they can be safely used from simultaneously running goroutines.

#### Instance Configs:

#### Common layer configs:
**`soft-ttl`** is an instance-wide TTL which when expired will **NOT** remove the data from the instance, but warns that the data is old.

`amnesia` is a stochastic fall-through mechanism which allows for a higher layer to be updated from a lower layer by the way of an artificial cache-miss,
a 0 amnesia means that the layers will never miss a data that they actually have, a 10 amnesia means when a key is present in the cache, 90% of the time it is returned but 10% of the time it is ignored and is treated as a cache-miss. a 100 amnesia effectively turns the layer off. (Default: 0)
#### Common Layer Configs:

`compression` is whther the data is compressed before being put into the cache memory. Currently only Zlib compression is supported. (Default: false)
**`amnesia`** is a stochastic fall-through mechanism which allows for a higher layer to be updated from a lower layer by the way of an artificial cache-miss,
an amnesia value of 0 means that the layers will never miss a data that they actually have, an amnesia value of 10 means when a key is present in the cache, 90% of the time it is returned but 10% of the time it is ignored and is treated as a cache-miss. a 100% amnesia effectively turns the layer off. (Default: 0)
_Note:_ 'SET' operations ignore Amnesia, to compeletly turn off a layer, remove its name from the layer list.

`ttl` is the hard Time To Live for the data in this particular layer, after which the data is expired and is expected to be removed.
**`compression`** dictates whther the data is compressed before being put into the cache memory. Currently only Zlib compression is supported. (Default: false)

#### Type-spesific layer configs:
**`ttl`** is the hard Time-To-Live for the data in this particular layer, after which the data is expired and is expected to be removed.

`db` [`redis` - `gaurdian`] is the Redis DB number to be used. (Default:0)
`idle-timeout` [`redis` - `gaurdian`] is the timeout for idle connections to the Redis Server (see Redis documentation) (Default:0 - no timeout)
`address` [`redis` - `gaurdian`] is the Redis Server's Address (the master's address in case of a cluster)
`slaves` [`gaurdian`] is a **list** of Redis servers addresses pertaining to the slave nodes.
`max-memory` [`memory`] is the maximum amount of system memory which can be used by this particular layer.
#### Type-spesific Layer Configs:

**`db`** {`redis` - `gaurdian`} is the Redis DB number to be used. (Default:0)
**`idle-timeout`** {`redis` - `gaurdian`} is the timeout for idle connections to the Redis Server (see Redis documentation) (Default:0 - no timeout)
**`address`** {`redis` - `gaurdian` - `rediscluster`} is the Redis Server's Address (the master's address in case of a cluster)
**`slaves`** {`gaurdian` - `rediscluster`} is a **list** of Redis servers addresses pertaining to the slave nodes.
**`max-memory`** {`memory`} is the maximum amount of system memory which can be used by this particular layer.


### Epimetheus Integration Guide

Add these two functions to your `container.go` file as well as to the `wire.build()` so _wire-gen_ can recognize the proper timer & counter to pass to Mnemosyne.

```go
func getCommTimer(epi *epimetheus.Epimetheus) mnemosyne.ITimer {
return epi.CommTimer
}
func getCacheRate(epi *epimetheus.Epimetheus) mnemosyne.ICounter {
return epi.CacheRate
}
```


## Documentation
Expand All @@ -121,9 +152,10 @@ We use [SemVer](http://semver.org/) for versioning. For the versions available,

## Authors

* **Ramtin Rostami** - *Initial work* - [rrostami](https://github.com/rrostami)
* **Pedram Teymoori** - *Initial work* - [pedramteymoori](https://github.com/pedramteymoori)
* **Parsa abdollahi** - *Initial work* - []()
* **Ramtin Rostami** [rrostami](https://github.com/rrostami) - *Initial work & Maintaining*
* **Pedram Teymoori** [pedramteymoori](https://github.com/pedramteymoori) - *Initial work & Maintaining*
* **Parsa abdollahi** - *Initial work*
* **Ava Abderezaei** [avv-va](https://github.com/avv-va) - *Tests*

See also the list of [contributors](https://github.com/cafebazaar/Mnemosyne/graphs/contributors) who participated in this project.

Expand Down
76 changes: 76 additions & 0 deletions c-memory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package mnemosyne

import (
"context"
"math/rand"
"time"

"github.com/allegro/bigcache"
"github.com/sirupsen/logrus"
)

type inMemoryCache struct {
baseCache
base *bigcache.BigCache
cacheTTL time.Duration
}

func NewInMemoryCache(opts *CacheOpts) *inMemoryCache {
internalOpts := bigcache.Config{
Shards: 1024,
LifeWindow: opts.cacheTTL,
MaxEntriesInWindow: 1100 * 10 * 60,
MaxEntrySize: 500,
Verbose: false,
HardMaxCacheSize: opts.memOpts.maxMem,
CleanWindow: 1 * time.Minute,
}
cacheInstance, err := bigcache.NewBigCache(internalOpts)
if err != nil {
logrus.Errorf("InMemCache %s Initialization Error: %v", opts.layerName, err)
}
return &inMemoryCache{
baseCache: baseCache{
layerName: opts.layerName,
amnesiaChance: opts.amnesiaChance,
compressionEnabled: opts.compressionEnabled,
},
base: cacheInstance,
cacheTTL: opts.cacheTTL,
}
}

func (mc *inMemoryCache) Get(ctx context.Context, key string) (*cachableRet, error) {
if mc.amnesiaChance > rand.Intn(100) {
return nil, newAmnesiaError(mc.amnesiaChance)
}
rawBytes, err := mc.base.Get(key)
if err != nil {
return nil, err
}
return finalizeCacheResponse(rawBytes, mc.compressionEnabled)
}

func (mc *inMemoryCache) Set(ctx context.Context, key string, value interface{}) error {
finalData, err := prepareCachePayload(value, mc.compressionEnabled)
if err != nil {
return err
}
return mc.base.Set(key, finalData)
}

func (mc *inMemoryCache) Delete(ctx context.Context, key string) error {
return mc.base.Delete(key)
}

func (mc *inMemoryCache) Clear() error {
return mc.base.Reset()
}

func (mc *inMemoryCache) TTL(ctx context.Context, key string) time.Duration {
return time.Second * 0
}

func (mc *inMemoryCache) Name() string {
return mc.layerName
}
158 changes: 158 additions & 0 deletions c-redis.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
package mnemosyne

import (
"context"
"hash/fnv"
"math/rand"
"time"

"github.com/go-redis/redis"
"github.com/sirupsen/logrus"
)

type RedisClusterAddress struct {
MasterAddr string `mapstructure:"address"`
SlaveAddrs []string `mapstructure:"slaves"`
}

type RedisOpts struct {
db int
idleTimeout time.Duration
shards []*RedisClusterAddress
}

type clusterClient struct {
master *redis.Client
slaves []*redis.Client
}

type redisCache struct {
baseCache
baseClients []*clusterClient
cacheTTL time.Duration
watcher ITimer
}

func makeClient(addr string, db int, idleTimeout time.Duration) *redis.Client {
redisOptions := &redis.Options{
Addr: addr,
DB: db,
}
if idleTimeout >= time.Second {
redisOptions.IdleTimeout = idleTimeout
}
newClient := redis.NewClient(redisOptions)

if err := newClient.Ping().Err(); err != nil {
logrus.WithError(err).WithField("address", addr).Error("error pinging Redis")
}
return newClient
}

func NewShardedClusterRedisCache(opts *CacheOpts, watcher ITimer) *redisCache {
rc := &redisCache{
baseCache: baseCache{
layerName: opts.layerName,
amnesiaChance: opts.amnesiaChance,
compressionEnabled: opts.compressionEnabled,
},
cacheTTL: opts.cacheTTL,
watcher: watcher,
}
rc.baseClients = make([]*clusterClient, len(opts.redisOpts.shards))
for i, shard := range opts.redisOpts.shards {
rc.baseClients[i].master = makeClient(shard.MasterAddr,
opts.redisOpts.db,
opts.redisOpts.idleTimeout)

rc.baseClients[i].slaves = make([]*redis.Client, len(shard.SlaveAddrs))
for j, slv := range shard.SlaveAddrs {
rc.baseClients[i].slaves[j] = makeClient(slv,
opts.redisOpts.db,
opts.redisOpts.idleTimeout)
}
}
return rc
}

func (rc *redisCache) Get(ctx context.Context, key string) (*cachableRet, error) {
if rc.amnesiaChance > rand.Intn(100) {
return nil, newAmnesiaError(rc.amnesiaChance)
}
client := rc.pickClient(key, false).WithContext(ctx)
startMarker := rc.watcher.Start()
strValue, err := client.Get(key).Result()
if err == nil {
rc.watcher.Done(startMarker, rc.layerName, "get", "ok")
} else if err == redis.Nil {
rc.watcher.Done(startMarker, rc.layerName, "get", "miss")
} else {
rc.watcher.Done(startMarker, rc.layerName, "get", "error")
}
rawBytes := []byte(strValue)
return finalizeCacheResponse(rawBytes, rc.compressionEnabled)
}

func (rc *redisCache) Set(ctx context.Context, key string, value interface{}) error {
finalData, err := prepareCachePayload(value, rc.compressionEnabled)
if err != nil {
return err
}
client := rc.pickClient(key, true).WithContext(ctx)
startMarker := rc.watcher.Start()
setError := client.SetNX(key, finalData, rc.cacheTTL).Err()
if setError != nil {
rc.watcher.Done(startMarker, rc.layerName, "set", "error")
} else {
rc.watcher.Done(startMarker, rc.layerName, "set", "ok")
}
return setError
}
func (rc *redisCache) Delete(ctx context.Context, key string) error {
client := rc.pickClient(key, true).WithContext(ctx)
return client.Del(key).Err()
}

func (rc *redisCache) Clear() error {
for _, cl := range rc.baseClients {
client := cl.master
err := client.FlushDB().Err()
if err != nil {
return err
}
}
return nil
}

func (rc *redisCache) TTL(ctx context.Context, key string) time.Duration {
client := rc.pickClient(key, false).WithContext(ctx)
res, err := client.TTL(key).Result()
if err != nil {
return time.Second * 0
}
return res
}

func (rc *redisCache) pickClient(key string, modification bool) *redis.Client {
shard := rc.shardKey(key)
if modification || len(rc.baseClients[shard].slaves) == 0 {
return rc.baseClients[shard].master
}
cl := rand.Intn(len(rc.baseClients[shard].slaves))
return rc.baseClients[shard].slaves[cl]
}

func (rc *redisCache) shardKey(key string) int {
shards := len(rc.baseClients)
if shards == 1 {
return 0
}
hasher := fnv.New32a()
hasher.Write([]byte(key))
keyHash := int(hasher.Sum32())
return keyHash % shards
}

func (rc *redisCache) Name() string {
return rc.layerName
}
Loading

0 comments on commit f9a9814

Please sign in to comment.