Skip to content

Commit

Permalink
refactor: cache interface with context
Browse files Browse the repository at this point in the history
  • Loading branch information
iwpnd committed Nov 26, 2024
1 parent 93b917e commit 5577bbd
Show file tree
Hide file tree
Showing 16 changed files with 320 additions and 122 deletions.
12 changes: 6 additions & 6 deletions atlas/atlas.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,15 +147,15 @@ func (a *Atlas) SeedMapTile(ctx context.Context, m Map, z, x, y uint) error {
Y: y,
}

return a.cacher.Set(&key, b)
return a.cacher.Set(ctx, &key, b)
}

// PurgeMapTile will purge a map tile from the configured cache backend
func (a *Atlas) PurgeMapTile(m Map, tile *tegola.Tile) error {
func (a *Atlas) PurgeMapTile(ctx context.Context, m Map, tile *tegola.Tile) error {
if a == nil {
// Use the default Atlas if a, is nil. This way the empty value is
// still useful.
return defaultAtlas.PurgeMapTile(m, tile)
return defaultAtlas.PurgeMapTile(ctx, m, tile)
}

if len(m.Params) > 0 {
Expand All @@ -174,7 +174,7 @@ func (a *Atlas) PurgeMapTile(m Map, tile *tegola.Tile) error {
Y: tile.Y,
}

return a.cacher.Purge(&key)
return a.cacher.Purge(ctx, &key)
}

// Map looks up a Map by name and returns a copy of the Map
Expand Down Expand Up @@ -337,8 +337,8 @@ func SeedMapTile(ctx context.Context, m Map, z, x, y uint) error {

// PurgeMapTile will purge a map tile from the configured cache backend
// for the defaultAtlas
func PurgeMapTile(m Map, tile *tegola.Tile) error {
return defaultAtlas.PurgeMapTile(m, tile)
func PurgeMapTile(ctx context.Context, m Map, tile *tegola.Tile) error {
return defaultAtlas.PurgeMapTile(ctx, m, tile)
}

// SetObservability sets the observability backend for the defaultAtlas
Expand Down
33 changes: 14 additions & 19 deletions cache/azblob/azblob.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"bytes"
"context"
"fmt"
"io/ioutil"
"io"
"net/http"
"net/url"
"path/filepath"
Expand Down Expand Up @@ -34,7 +34,7 @@ const (
)

const (
BlobHeaderLen = 8 // bytes
BlobHeaderLen = 8 // bytes
BlobReqMaxLen = 4194304 // ~4MB
)

Expand Down Expand Up @@ -124,10 +124,12 @@ func New(config dict.Dicter) (cache.Interface, error) {
Y: 0,
}

ctx := context.Background()

// seperate test for read only caches
if azCache.ReadOnly {
// read test file
_, _, err := azCache.Get(&key)
_, _, err := azCache.Get(ctx, &key)
if err != nil {
return nil, cache.ErrGettingFromCache{
Err: err,
Expand All @@ -136,7 +138,7 @@ func New(config dict.Dicter) (cache.Interface, error) {
}
} else {
// write test file
err = azCache.Set(&key, []byte(testMsg))
err = azCache.Set(ctx, &key, []byte(testMsg))
if err != nil {
return nil, cache.ErrSettingToCache{
Err: err,
Expand All @@ -145,7 +147,7 @@ func New(config dict.Dicter) (cache.Interface, error) {
}

// read test file
byt, hit, err := azCache.Get(&key)
byt, hit, err := azCache.Get(ctx, &key)
if err != nil {
return nil, cache.ErrGettingFromCache{
Err: err,
Expand All @@ -167,7 +169,7 @@ func New(config dict.Dicter) (cache.Interface, error) {
}
}

err = azCache.Purge(&key)
err = azCache.Purge(ctx, &key)
if err != nil {
return nil, cache.ErrPurgingCache{
Err: err,
Expand All @@ -186,13 +188,11 @@ type Cache struct {
Container azblob.ContainerURL
}

func (azb *Cache) Set(key *cache.Key, val []byte) error {
func (azb *Cache) Set(ctx context.Context, key *cache.Key, val []byte) error {
if key.Z > azb.MaxZoom || azb.ReadOnly {
return nil
}

ctx := context.Background()

httpHeaders := azblob.BlobHTTPHeaders{
ContentType: "application/x-protobuf",
}
Expand All @@ -211,18 +211,15 @@ func (azb *Cache) Set(key *cache.Key, val []byte) error {
return nil
}

func (azb *Cache) Get(key *cache.Key) ([]byte, bool, error) {
func (azb *Cache) Get(ctx context.Context, key *cache.Key) ([]byte, bool, error) {
if key.Z > azb.MaxZoom {
return nil, false, nil
}

ctx := context.Background()

res, err := azb.makeBlob(key).
ToBlockBlobURL().
Download(ctx, 0, 0, azblob.BlobAccessConditions{}, false)


if err != nil {
// check if 404
resErr, ok := err.(azblob.ResponseError)
Expand All @@ -237,24 +234,22 @@ func (azb *Cache) Get(key *cache.Key) ([]byte, bool, error) {
body := res.Body(azblob.RetryReaderOptions{})
defer body.Close()

blobSlice, err := ioutil.ReadAll(body)
blobSlice, err := io.ReadAll(body)
if err != nil {
return nil, false, err
}

return blobSlice, true, nil
}

func (azb *Cache) Purge(key *cache.Key) error {
func (azb *Cache) Purge(ctx context.Context, key *cache.Key) error {
if azb.ReadOnly {
return nil
}

ctx := context.Background()

_, err := azb.makeBlob(key).
_, err := azb.makeBlob(key).
Delete(ctx, azblob.DeleteSnapshotsOptionNone,
azblob.BlobAccessConditions{})
azblob.BlobAccessConditions{})

if err != nil {
return err
Expand Down
27 changes: 15 additions & 12 deletions cache/azblob/azblob_test.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
package azblob_test

import (
"context"
"crypto/rand"
"fmt"
"os"
"reflect"
"testing"

"math/rand"

"github.com/go-spatial/tegola/cache"
"github.com/go-spatial/tegola/cache/azblob"
"github.com/go-spatial/tegola/dict"
Expand Down Expand Up @@ -93,6 +93,7 @@ func TestSetGetPurge(t *testing.T) {
expected []byte
}

ctx := context.Background()
fn := func(tc tcase) func(*testing.T) {
return func(t *testing.T) {

Expand All @@ -103,12 +104,12 @@ func TestSetGetPurge(t *testing.T) {
}

// test write
if err = fc.Set(&tc.key, tc.expected); err != nil {
if err = fc.Set(ctx, &tc.key, tc.expected); err != nil {
t.Errorf("write failed. err: %v", err)
return
}

output, hit, err := fc.Get(&tc.key)
output, hit, err := fc.Get(ctx, &tc.key)
if err != nil {
t.Errorf("read failed. err: %v", err)
return
Expand All @@ -124,7 +125,7 @@ func TestSetGetPurge(t *testing.T) {
}

// test purge
if err = fc.Purge(&tc.key); err != nil {
if err = fc.Purge(ctx, &tc.key); err != nil {
t.Errorf("purge failed. err: %v", err)
return
}
Expand Down Expand Up @@ -186,6 +187,7 @@ func TestSetOverwrite(t *testing.T) {
expected []byte
}

ctx := context.Background()
fn := func(tc tcase) func(*testing.T) {
return func(t *testing.T) {
// This test must be run in series otherwise
Expand All @@ -200,19 +202,19 @@ func TestSetOverwrite(t *testing.T) {
}

// test write1
if err = fc.Set(&tc.key, tc.bytes1); err != nil {
if err = fc.Set(ctx, &tc.key, tc.bytes1); err != nil {
t.Errorf("write 1 failed. err: %v", err)
return
}

// test write2
if err = fc.Set(&tc.key, tc.bytes2); err != nil {
if err = fc.Set(ctx, &tc.key, tc.bytes2); err != nil {
t.Errorf("write 2 failed. err: %v", err)
return
}

// fetch the cache entry
output, hit, err := fc.Get(&tc.key)
output, hit, err := fc.Get(ctx, &tc.key)
if err != nil {
t.Errorf("read failed. err: %v", err)
return
Expand All @@ -228,7 +230,7 @@ func TestSetOverwrite(t *testing.T) {
}

// clean up
if err = fc.Purge(&tc.key); err != nil {
if err = fc.Purge(ctx, &tc.key); err != nil {
t.Errorf("purge failed. err: %v", err)
return
}
Expand Down Expand Up @@ -268,6 +270,7 @@ func TestMaxZoom(t *testing.T) {
expectedHit bool
}

ctx := context.Background()
fn := func(tc tcase) func(*testing.T) {
return func(t *testing.T) {
// This test must be run in series otherwise
Expand All @@ -282,13 +285,13 @@ func TestMaxZoom(t *testing.T) {
}

// test set
if err = fc.Set(&tc.key, tc.bytes); err != nil {
if err = fc.Set(ctx, &tc.key, tc.bytes); err != nil {
t.Errorf("write failed. err: %v", err)
return
}

// fetch the cache entry
_, hit, err := fc.Get(&tc.key)
_, hit, err := fc.Get(ctx, &tc.key)
if err != nil {
t.Errorf("read failed. err: %v", err)
return
Expand All @@ -300,7 +303,7 @@ func TestMaxZoom(t *testing.T) {

// clean up
if tc.expectedHit {
if err != fc.Purge(&tc.key) {
if err != fc.Purge(ctx, &tc.key) {
t.Errorf("error cleaning %v", err)
return
}
Expand Down
7 changes: 4 additions & 3 deletions cache/cache.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package cache

import (
"context"
"fmt"
"log"
"path/filepath"
Expand All @@ -15,9 +16,9 @@ import (

// Interface defines a cache back end
type Interface interface {
Get(key *Key) (val []byte, hit bool, err error)
Set(key *Key, val []byte) error
Purge(key *Key) error
Get(ctx context.Context, key *Key) (val []byte, hit bool, err error)
Set(ctx context.Context, key *Key, val []byte) error
Purge(ctx context.Context, key *Key) error
}

// Wrapped Cache are for cache backend that wrap other cache backends
Expand Down
31 changes: 22 additions & 9 deletions cache/file/file.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
package file

import (
"context"
"errors"
"io/ioutil"
"io"
"os"
"path/filepath"

Expand All @@ -28,9 +29,8 @@ func init() {

// New instantiates a Cache. The config expects the following params:
//
// basepath (string): a path to where the cache will be written
// max_zoom (int): max zoom to use the cache. beyond this zoom cache Set() calls will be ignored
//
// basepath (string): a path to where the cache will be written
// max_zoom (int): max zoom to use the cache. beyond this zoom cache Set() calls will be ignored
func New(config dict.Dicter) (cache.Interface, error) {
var err error

Expand Down Expand Up @@ -68,10 +68,11 @@ type Cache struct {
MaxZoom uint
}

// Get reads a z,x,y entry from the cache and returns the contents
// Get reads a z,x,y entry from the cache and returns the contents
//
// if there is a hit. the second argument denotes a hit or miss
// so the consumer does not need to sniff errors for cache read misses
func (fc *Cache) Get(key *cache.Key) ([]byte, bool, error) {
func (fc *Cache) Get(ctx context.Context, key *cache.Key) ([]byte, bool, error) {
path := filepath.Join(fc.Basepath, key.String())

f, err := os.Open(path)
Expand All @@ -84,22 +85,30 @@ func (fc *Cache) Get(key *cache.Key) ([]byte, bool, error) {
}
defer f.Close()

val, err := ioutil.ReadAll(f)
if err := ctx.Err(); err != nil {
return nil, false, err
}

val, err := io.ReadAll(f)
if err != nil {
return nil, false, err
}

return val, true, nil
}

func (fc *Cache) Set(key *cache.Key, val []byte) error {
func (fc *Cache) Set(ctx context.Context, key *cache.Key, val []byte) error {
var err error

// check for maxzoom
if key.Z > fc.MaxZoom {
return nil
}

if err := ctx.Err(); err != nil {
return err
}

// the tmpPath uses the destPath with a simple "-tmp" suffix. we're going to do
// a Rename at the end of this method and according to the os.Rename() docs:
// "If newpath already exists and is not a directory, Rename replaces it.
Expand Down Expand Up @@ -135,14 +144,18 @@ func (fc *Cache) Set(key *cache.Key, val []byte) error {
return os.Rename(tmpPath, destPath)
}

func (fc *Cache) Purge(key *cache.Key) error {
func (fc *Cache) Purge(ctx context.Context, key *cache.Key) error {
path := filepath.Join(fc.Basepath, key.String())

// check if we have a file. if no file exists, return
if _, err := os.Stat(path); os.IsNotExist(err) {
return nil
}

if err := ctx.Err(); err != nil {
return err
}

// remove the locker key on purge
return os.Remove(path)
}
Loading

0 comments on commit 5577bbd

Please sign in to comment.