Skip to content

Commit

Permalink
Add projectName to disk cache
Browse files Browse the repository at this point in the history
  • Loading branch information
gjasny committed May 26, 2019
1 parent 6ca8e0e commit 183f3c5
Show file tree
Hide file tree
Showing 6 changed files with 111 additions and 54 deletions.
6 changes: 3 additions & 3 deletions cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,14 @@ type Cache interface {

// Put stores a stream of `size` bytes from `r` into the cache. If `expectedSha256` is
// not the empty string, and the contents don't match it, an error is returned
Put(kind EntryKind, hash string, size int64, r io.Reader) error
Put(kind EntryKind, projectName string, hash string, size int64, r io.Reader) error

// Get writes the content of the cache item stored under `key` to `w`. If the item is
// not found, it returns ok = false.
Get(kind EntryKind, hash string) (data io.ReadCloser, sizeBytes int64, err error)
Get(kind EntryKind, projectName string, hash string) (data io.ReadCloser, sizeBytes int64, err error)

// Contains returns true if the `key` exists.
Contains(kind EntryKind, hash string) (ok bool)
Contains(kind EntryKind, projectName string, hash string) (ok bool)

// MaxSize returns the maximum cache size in bytes.
MaxSize() int64
Expand Down
30 changes: 18 additions & 12 deletions cache/disk/disk.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,10 +145,10 @@ func (c *diskCache) loadExistingFiles() error {
return nil
}

func (c *diskCache) Put(kind cache.EntryKind, hash string, size int64, r io.Reader) (err error) {
func (c *diskCache) Put(kind cache.EntryKind, projectName string, hash string, size int64, r io.Reader) (err error) {
c.mux.Lock()

key := cacheKey(kind, hash)
key := cacheKey(kind, projectName, hash)

// If there's an ongoing upload (i.e. cache key is present in uncommitted state),
// we drop the upload and discard the incoming stream. We do accept uploads
Expand Down Expand Up @@ -229,7 +229,7 @@ func (c *diskCache) Put(kind cache.EntryKind, hash string, size int64, r io.Read
}

// Rename to the final path
filePath := cacheFilePath(kind, c.dir, hash)
filePath := cacheFilePath(kind, c.dir, projectName, hash)
err = os.Rename(f.Name(), filePath)
// Only commit if renaming succeeded
if err == nil {
Expand All @@ -240,12 +240,12 @@ func (c *diskCache) Put(kind cache.EntryKind, hash string, size int64, r io.Read
return
}

func (c *diskCache) Get(kind cache.EntryKind, hash string) (data io.ReadCloser, sizeBytes int64, err error) {
if !c.Contains(kind, hash) {
func (c *diskCache) Get(kind cache.EntryKind, projectName string, hash string) (data io.ReadCloser, sizeBytes int64, err error) {
if !c.Contains(kind, projectName, hash) {
return
}

blobPath := cacheFilePath(kind, c.dir, hash)
blobPath := cacheFilePath(kind, c.dir, projectName, hash)

fileInfo, err := os.Stat(blobPath)
if err != nil {
Expand All @@ -261,11 +261,11 @@ func (c *diskCache) Get(kind cache.EntryKind, hash string) (data io.ReadCloser,
return
}

func (c *diskCache) Contains(kind cache.EntryKind, hash string) (ok bool) {
func (c *diskCache) Contains(kind cache.EntryKind, projectName string, hash string) (ok bool) {
c.mux.Lock()
defer c.mux.Unlock()

val, found := c.lru.Get(cacheKey(kind, hash))
val, found := c.lru.Get(cacheKey(kind, projectName, hash))
// Uncommitted (i.e. uploading items) should be reported as not ok
return found && val.(*lruItem).committed
}
Expand Down Expand Up @@ -297,10 +297,16 @@ func ensureDirExists(path string) {
}
}

func cacheKey(kind cache.EntryKind, hash string) string {
return filepath.Join(kind.String(), hash[:2], hash)
func cacheKey(kind cache.EntryKind, projectName string, hash string) string {
if kind == cache.CAS {
return filepath.Join(kind.String(), hash[:2], hash)
} else if projectName == "" {
return filepath.Join(kind.String(), hash[:2], hash)
} else {
return filepath.Join(kind.String(), hash[:2], hash+"_"+projectName)
}
}

func cacheFilePath(kind cache.EntryKind, cacheDir string, hash string) string {
return filepath.Join(cacheDir, cacheKey(kind, hash))
func cacheFilePath(kind cache.EntryKind, cacheDir string, projectName string, hash string) string {
return filepath.Join(cacheDir, cacheKey(kind, projectName, hash))
}
92 changes: 67 additions & 25 deletions cache/disk/disk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ func checkItems(t *testing.T, cache *diskCache, expSize int64, expNum int) {
}
}

const NO_PROJECT = ""
const KEY = "a-key"
const CONTENTS = "hello"
const CONTENTS_HASH = "2cf24dba5fb0a30e26e83b2ac5b9e29e1b161e5c1fa7425e73043362938b9824"
Expand All @@ -67,7 +68,7 @@ func TestCacheBasics(t *testing.T) {
checkItems(t, testCache.(*diskCache), 0, 0)

// Non-existing item
data, sizeBytes, err := testCache.Get(cache.CAS, CONTENTS_HASH)
data, sizeBytes, err := testCache.Get(cache.CAS, NO_PROJECT, CONTENTS_HASH)
if err != nil {
t.Fatal(err)
}
Expand All @@ -76,7 +77,7 @@ func TestCacheBasics(t *testing.T) {
}

// Add an item
err = testCache.Put(cache.CAS, CONTENTS_HASH, int64(len(CONTENTS)), strings.NewReader(CONTENTS))
err = testCache.Put(cache.CAS, NO_PROJECT, CONTENTS_HASH, int64(len(CONTENTS)), strings.NewReader(CONTENTS))
if err != nil {
t.Fatal(err)
}
Expand All @@ -86,7 +87,7 @@ func TestCacheBasics(t *testing.T) {
checkItems(t, testCache.(*diskCache), int64(len(CONTENTS)), 1)

// Get the item back
data, sizeBytes, err = testCache.Get(cache.CAS, CONTENTS_HASH)
data, sizeBytes, err = testCache.Get(cache.CAS, NO_PROJECT, CONTENTS_HASH)
if err != nil {
t.Fatal(err)
}
Expand All @@ -113,7 +114,7 @@ func TestCacheEviction(t *testing.T) {
}

for i, thisExp := range expectedSizesNumItems {
err := testCache.Put(cache.AC, fmt.Sprintf("aa-%d", i), int64(i), strings.NewReader("hello"))
err := testCache.Put(cache.AC, NO_PROJECT, fmt.Sprintf("aa-%d", i), int64(i), strings.NewReader("hello"))
if err != nil {
t.Fatal(err)
}
Expand All @@ -139,21 +140,30 @@ func expectContentEquals(t *testing.T, data io.ReadCloser, sizeBytes int64, expe
}
}

func putGetCompare(kind cache.EntryKind, hash string, content string, testCache cache.Cache,
func put(kind cache.EntryKind, projectName string, hash string, content string, testCache cache.Cache,
t *testing.T) {
err := testCache.Put(kind, hash, int64(len(content)), strings.NewReader(content))
err := testCache.Put(kind, projectName, hash, int64(len(content)), strings.NewReader(content))
if err != nil {
t.Fatal(err)
}
}

data, sizeBytes, err := testCache.Get(kind, hash)
func getCompare(kind cache.EntryKind, projectName string, hash string, content string, testCache cache.Cache,
t *testing.T) {
data, sizeBytes, err := testCache.Get(kind, projectName, hash)
if err != nil {
t.Fatal(err)
}
// Get the item back
expectContentEquals(t, data, sizeBytes, []byte(content))
}

func putGetCompare(kind cache.EntryKind, projectName string, hash string, content string, testCache cache.Cache,
t *testing.T) {
put(kind, projectName, hash, content, testCache, t)
getCompare(kind, projectName, hash, content, testCache, t)
}

func hashStr(content string) string {
hashBytes := sha256.Sum256([]byte(content))
return hex.EncodeToString(hashBytes[:])
Expand All @@ -165,11 +175,36 @@ func TestOverwrite(t *testing.T) {
defer os.RemoveAll(cacheDir)
testCache := New(cacheDir, 10)

putGetCompare(cache.CAS, hashStr("hello"), "hello", testCache, t)
putGetCompare(cache.CAS, hashStr("hello"), "hello", testCache, t)
putGetCompare(cache.CAS, NO_PROJECT, hashStr("hello"), "hello", testCache, t)
putGetCompare(cache.CAS, NO_PROJECT, hashStr("hello"), "hello", testCache, t)

putGetCompare(cache.AC, NO_PROJECT, hashStr("world"), "world1", testCache, t)
putGetCompare(cache.AC, NO_PROJECT, hashStr("world"), "world2", testCache, t)
}

// Make sure that we can overwrite items if we upload the same key again.
func TestRespectProjectName(t *testing.T) {
cacheDir := tempDir(t)
defer os.RemoveAll(cacheDir)
testCache := New(cacheDir, 30)

hash := hashStr("just some common hash")

put(cache.AC, NO_PROJECT, hash, "content", testCache, t)
put(cache.AC, "a", hash, "contentA", testCache, t)
put(cache.AC, "b", hash, "contentB", testCache, t)

// expect different content for different project names

putGetCompare(cache.AC, hashStr("world"), "world1", testCache, t)
putGetCompare(cache.AC, hashStr("world"), "world2", testCache, t)
getCompare(cache.AC, NO_PROJECT, hash, "content", testCache, t)
getCompare(cache.AC, "a", hash, "contentA", testCache, t)
getCompare(cache.AC, "b", hash, "contentB", testCache, t)

anotherTestCache := New(cacheDir, 30)

getCompare(cache.AC, NO_PROJECT, hash, "content", anotherTestCache, t)
getCompare(cache.AC, "a", hash, "contentA", anotherTestCache, t)
getCompare(cache.AC, "b", hash, "contentB", anotherTestCache, t)
}

func TestCacheExistingFiles(t *testing.T) {
Expand Down Expand Up @@ -201,12 +236,12 @@ func TestCacheExistingFiles(t *testing.T) {
checkItems(t, testCache.(*diskCache), expectedSize, 3)

// Adding a new file should evict items[0] (the oldest)
err := testCache.Put(cache.CAS, CONTENTS_HASH, int64(len(CONTENTS)), strings.NewReader(CONTENTS))
err := testCache.Put(cache.CAS, NO_PROJECT, CONTENTS_HASH, int64(len(CONTENTS)), strings.NewReader(CONTENTS))
if err != nil {
t.Fatal(err)
}
checkItems(t, testCache.(*diskCache), expectedSize, 3)
found := testCache.Contains(cache.CAS, "f53b46209596d170f7659a414c9ff9f6b545cf77ffd6e1cbe9bcc57e1afacfbd")
found := testCache.Contains(cache.CAS, NO_PROJECT, "f53b46209596d170f7659a414c9ff9f6b545cf77ffd6e1cbe9bcc57e1afacfbd")
if found {
t.Fatalf("%s should have been evicted", items[0])
}
Expand All @@ -219,7 +254,7 @@ func TestCacheBlobTooLarge(t *testing.T) {
defer os.RemoveAll(cacheDir)
testCache := New(cacheDir, 100)

err := testCache.Put(cache.AC, hashStr("foo"), 10000, strings.NewReader(CONTENTS))
err := testCache.Put(cache.AC, NO_PROJECT, hashStr("foo"), 10000, strings.NewReader(CONTENTS))
if err == nil {
t.Fatal("Expected an error")
}
Expand All @@ -239,7 +274,7 @@ func TestCacheCorruptedFile(t *testing.T) {
defer os.RemoveAll(cacheDir)
testCache := New(cacheDir, 1000)

err := testCache.Put(cache.CAS, hashStr("foo"), int64(len(CONTENTS)), strings.NewReader(CONTENTS))
err := testCache.Put(cache.CAS, NO_PROJECT, hashStr("foo"), int64(len(CONTENTS)), strings.NewReader(CONTENTS))
if err == nil {
t.Fatal("expected hash mismatch error")
}
Expand All @@ -265,13 +300,13 @@ func TestMigrateFromOldDirectoryStructure(t *testing.T) {
if testCache.NumItems() != 3 {
t.Fatalf("Expected test cache size 3 but was %d", testCache.NumItems())
}
if !testCache.Contains(cache.AC, acHash) {
if !testCache.Contains(cache.AC, NO_PROJECT, acHash) {
t.Fatalf("Expected cache to contain AC entry '%s'", acHash)
}
if !testCache.Contains(cache.CAS, casHash1) {
if !testCache.Contains(cache.CAS, NO_PROJECT, casHash1) {
t.Fatalf("Expected cache to contain CAS entry '%s'", casHash1)
}
if !testCache.Contains(cache.CAS, casHash2) {
if !testCache.Contains(cache.CAS, NO_PROJECT, casHash2) {
t.Fatalf("Expected cache to contain CAS entry '%s'", casHash2)
}
}
Expand All @@ -280,23 +315,30 @@ func TestLoadExistingEntries(t *testing.T) {
// Test that loading existing items works
cacheDir := testutils.TempDir(t)
defer os.RemoveAll(cacheDir)
acHash, err := testutils.CreateCacheFile(cacheDir+"/ac/", 1024)
acHash, err := testutils.CreateCacheFile(cacheDir+"/ac/", NO_PROJECT, 1024)
if err != nil {
t.Fatal(err)
}
casHash, err := testutils.CreateCacheFile(cacheDir+"/cas/", 1024)
acWithProjectHash, err := testutils.CreateCacheFile(cacheDir+"/ac/", "someProject", 1024)
if err != nil {
t.Fatal(err)
}
casHash, err := testutils.CreateCacheFile(cacheDir+"/cas/", NO_PROJECT, 1024)
if err != nil {
t.Fatal(err)
}

testCache := New(cacheDir, 2048)
if testCache.NumItems() != 2 {
t.Fatalf("Expected test cache size 2 but was %d", testCache.NumItems())
testCache := New(cacheDir, 3072)
if testCache.NumItems() != 3 {
t.Fatalf("Expected test cache size 3 but was %d", testCache.NumItems())
}
if !testCache.Contains(cache.AC, acHash) {
if !testCache.Contains(cache.AC, NO_PROJECT, acHash) {
t.Fatalf("Expected cache to contain AC entry '%s'", acHash)
}
if !testCache.Contains(cache.CAS, casHash) {
if !testCache.Contains(cache.AC, "someProject", acWithProjectHash) {
t.Fatalf("Expected cache to contain AC entry '%s'", acWithProjectHash)
}
if !testCache.Contains(cache.CAS, NO_PROJECT, casHash) {
t.Fatalf("Expected cache to contain CAS entry '%s'", casHash)
}
}
22 changes: 13 additions & 9 deletions server/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func (h *httpCache) CacheHandler(w http.ResponseWriter, r *http.Request) {
h.accessLogger.Printf("%4s %d %15s %s", r.Method, code, clientAddress, r.URL.Path)
}

kind, _, hash, err := parseRequestURL(r.URL.Path)
kind, projectName, hash, err := parseRequestURL(r.URL.Path)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
logResponse(http.StatusBadRequest)
Expand All @@ -107,14 +107,14 @@ func (h *httpCache) CacheHandler(w http.ResponseWriter, r *http.Request) {

switch m := r.Method; m {
case http.MethodGet:
data, sizeBytes, err := h.cache.Get(kind, hash)
data, sizeBytes, err := h.cache.Get(kind, projectName, hash)
if err != nil {
if e, ok := err.(*cache.Error); ok {
http.Error(w, e.Error(), e.Code)
} else {
http.Error(w, err.Error(), http.StatusInternalServerError)
}
h.errorLogger.Printf("GET %s: %s", path(kind, hash), err)
h.errorLogger.Printf("GET %s: %s", path(kind, projectName, hash), err)
return
}

Expand All @@ -133,26 +133,26 @@ func (h *httpCache) CacheHandler(w http.ResponseWriter, r *http.Request) {
case http.MethodPut:
if r.ContentLength == -1 {
// We need the content-length header to make sure we have enough disk space.
msg := fmt.Sprintf("PUT without Content-Length (key = %s)", path(kind, hash))
msg := fmt.Sprintf("PUT without Content-Length (key = %s)", path(kind, projectName, hash))
http.Error(w, msg, http.StatusBadRequest)
h.errorLogger.Printf("%s", msg)
return
}

err := h.cache.Put(kind, hash, r.ContentLength, r.Body)
err := h.cache.Put(kind, projectName, hash, r.ContentLength, r.Body)
if err != nil {
if cerr, ok := err.(*cache.Error); ok {
http.Error(w, err.Error(), cerr.Code)
} else {
http.Error(w, err.Error(), http.StatusInternalServerError)
}
h.errorLogger.Printf("PUT %s: %s", path(kind, hash), err)
h.errorLogger.Printf("PUT %s: %s", path(kind, projectName, hash), err)
} else {
logResponse(http.StatusOK)
}

case http.MethodHead:
ok := h.cache.Contains(kind, hash)
ok := h.cache.Contains(kind, projectName, hash)
if !ok {
http.Error(w, "Not found", http.StatusNotFound)
logResponse(http.StatusNotFound)
Expand Down Expand Up @@ -184,6 +184,10 @@ func (h *httpCache) StatusPageHandler(w http.ResponseWriter, r *http.Request) {
})
}

func path(kind cache.EntryKind, hash string) string {
return fmt.Sprintf("/%s/%s", kind, hash)
func path(kind cache.EntryKind, projectName string, hash string) string {
if projectName != "" {
return fmt.Sprintf("/%s/%s/%s", projectName, kind, hash)
} else {
return fmt.Sprintf("/%s/%s", kind, hash)
}
}
Loading

0 comments on commit 183f3c5

Please sign in to comment.