Skip to content

Commit

Permalink
functions: restore dynamic routes
Browse files Browse the repository at this point in the history
Addresses partially #77
  • Loading branch information
ucirello committed Nov 15, 2016
1 parent d65a84d commit c0610fd
Show file tree
Hide file tree
Showing 5 changed files with 194 additions and 21 deletions.
100 changes: 100 additions & 0 deletions api/server/routecache/lru.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
// Package routecache is meant to assist in resolving the most used routes at
// an application. Implemented as a LRU, it returns always its full context for
// iteration at the router handler.
package routecache

// based on groupcache's LRU

import (
"container/list"

"github.com/iron-io/functions/api/models"
)

const defaultMaxEntries = 100

// Cache holds an internal linkedlist for hotness management. It is not safe
// for concurrent use, must be guarded externally.
type Cache struct {
maxentries int

ll *list.List
cache map[*models.Route]*list.Element
values []*models.Route
}

type routecacheentry struct {
route *models.Route
}

// New returns a route cache.
func New() *Cache {
return &Cache{
maxentries: defaultMaxEntries,
ll: list.New(),
cache: make(map[*models.Route]*list.Element),
}
}

// Routes is an ordered slice with the hottest routes at the beginning, so to
// increase the likelihood of matching with incoming requests.
func (c *Cache) Routes() []*models.Route {
return c.values
}

// Refresh updates internal linkedlist either adding a new route to the top, or
// moving it to the top when used. It will discard seldom used routes.
func (c *Cache) Refresh(route *models.Route) {
if c.cache == nil {
c.cache = make(map[*models.Route]*list.Element)
c.ll = list.New()
}
if ee, ok := c.cache[route]; ok {
c.ll.MoveToFront(ee)
ee.Value.(*routecacheentry).route = route
c.updatevalues()
return
}
ele := c.ll.PushFront(&routecacheentry{route})
c.cache[route] = ele
if c.maxentries != 0 && c.ll.Len() > c.maxentries {
c.removeOldest()
return
}

c.updatevalues()
}

func (c *Cache) updatevalues() {
c.values = make([]*models.Route, 0, c.ll.Len())
for e := c.ll.Front(); e != nil; e = e.Next() {
route := e.Value.(*routecacheentry).route
c.values = append(c.values, route)
}
}

func (c *Cache) remove(key *models.Route) {
if c.cache == nil {
return
}
if ele, hit := c.cache[key]; hit {
c.removeElement(ele)
}
}

func (c *Cache) removeOldest() {
if c.cache == nil {
return
}
ele := c.ll.Back()
if ele != nil {
c.removeElement(ele)
}
}

func (c *Cache) removeElement(e *list.Element) {
c.ll.Remove(e)
kv := e.Value.(*routecacheentry)
delete(c.cache, kv.route)
c.updatevalues()
}
4 changes: 3 additions & 1 deletion api/server/routes_delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"github.com/iron-io/runner/common"
)

func handleRouteDelete(c *gin.Context) {
func handleRouteDelete(c *gin.Context, resetcache func(appname string)) {
ctx := c.MustGet("ctx").(context.Context)
log := common.Logger(ctx)

Expand All @@ -23,5 +23,7 @@ func handleRouteDelete(c *gin.Context) {
return
}

resetcache(appName)

c.JSON(http.StatusOK, gin.H{"message": "Route deleted"})
}
66 changes: 49 additions & 17 deletions api/server/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"github.com/iron-io/functions/api/models"
"github.com/iron-io/functions/api/runner"
"github.com/iron-io/runner/common"
"github.com/iron-io/runner/drivers"
uuid "github.com/satori/go.uuid"
)

Expand All @@ -35,7 +34,7 @@ func ToEnvName(envtype, name string) string {
return fmt.Sprintf("%s_%s", envtype, name)
}

func handleRequest(c *gin.Context, enqueue models.Enqueue) {
func (s *Server) handleRequest(c *gin.Context, enqueue models.Enqueue) {
if strings.HasPrefix(c.Request.URL.Path, "/v1") {
c.Status(http.StatusNotFound)
return
Expand Down Expand Up @@ -77,21 +76,41 @@ func handleRequest(c *gin.Context, enqueue models.Enqueue) {
c.JSON(http.StatusBadRequest, simpleError(models.ErrAppsNotFound))
return
}
route := c.Param("route")
if route == "" {
route = c.Request.URL.Path
rawroute := c.Param("route")
if rawroute == "" {
rawroute = c.Request.URL.Path
}

log.WithFields(logrus.Fields{"app": appName, "path": route}).Debug("Finding route on datastore")

app, err := Api.Datastore.GetApp(appName)
if err != nil || app == nil {
log.WithError(err).Error(models.ErrAppsNotFound)
c.JSON(http.StatusNotFound, simpleError(models.ErrAppsNotFound))
return
}

routes, err := Api.Datastore.GetRoutesByApp(appName, &models.RouteFilter{AppName: appName, Path: route})
log.WithFields(logrus.Fields{"app": appName, "path": rawroute}).Debug("Finding route on cache")
var found bool
routes := s.loadcache(appName)
for _, r := range routes {
ok := processRoute(c, log, appName, r, app, rawroute, reqID, payload, enqueue)
if ok {
found = true
break
}
}
log.WithFields(logrus.Fields{"app": appName, "path": rawroute}).Debug("Done")
if found {
return
}

log.WithFields(logrus.Fields{"app": appName, "path": rawroute}).Debug("Finding route on datastore")
routes, err = Api.Datastore.GetRoutesByApp(
appName,
&models.RouteFilter{
AppName: appName,
Path: rawroute,
},
)
if err != nil {
log.WithError(err).Error(models.ErrRoutesList)
c.JSON(http.StatusInternalServerError, simpleError(models.ErrRoutesList))
Expand All @@ -106,15 +125,26 @@ func handleRequest(c *gin.Context, enqueue models.Enqueue) {
return
}

found := routes[0]
log = log.WithFields(logrus.Fields{
"app": appName, "route": found.Path, "image": found.Image})
for _, r := range routes {
ok := processRoute(c, log, appName, r, app, rawroute, reqID, payload, enqueue)
if ok {
found = true
s.refreshcache(appName, r)
break
}
}
if !found {
log.Error(models.ErrRunnerRouteNotFound)
c.JSON(http.StatusNotFound, simpleError(models.ErrRunnerRouteNotFound))
}
}

func processRoute(c *gin.Context, log logrus.FieldLogger, appName string, found *models.Route, app *models.App, route, reqID string, payload io.Reader, enqueue models.Enqueue) (ok bool) {
log = log.WithFields(logrus.Fields{"app": appName, "route": found.Path, "image": found.Image})

params, match := matchRoute(found.Path, route)
if !match {
log.WithError(err).Error(models.ErrRunnerRouteNotFound)
c.JSON(http.StatusNotFound, simpleError(models.ErrRunnerRouteNotFound))
return
return false
}

var stdout bytes.Buffer // TODO: should limit the size of this, error if gets too big. akin to: https://golang.org/pkg/io/#LimitReader
Expand Down Expand Up @@ -156,15 +186,14 @@ func handleRequest(c *gin.Context, enqueue models.Enqueue) {
Stdin: payload,
}

var result drivers.RunResult
switch found.Type {
case "async":
// Read payload
pl, err := ioutil.ReadAll(cfg.Stdin)
if err != nil {
log.WithError(err).Error(models.ErrInvalidPayload)
c.JSON(http.StatusBadRequest, simpleError(models.ErrInvalidPayload))
return
return true
}

// Create Task
Expand All @@ -182,7 +211,8 @@ func handleRequest(c *gin.Context, enqueue models.Enqueue) {
log.Info("Added new task to queue")

default:
if result, err = Api.Runner.Run(c, cfg); err != nil {
result, err := Api.Runner.Run(c, cfg)
if err != nil {
break
}
for k, v := range found.Headers {
Expand All @@ -195,6 +225,8 @@ func handleRequest(c *gin.Context, enqueue models.Enqueue) {
c.AbortWithStatus(http.StatusInternalServerError)
}
}

return true
}

var fakeHandler = func(http.ResponseWriter, *http.Request, Params) {}
Expand Down
44 changes: 41 additions & 3 deletions api/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,14 @@ import (
"io/ioutil"
"net/http"
"path"
"sync"

"github.com/Sirupsen/logrus"
"github.com/gin-gonic/gin"
"github.com/iron-io/functions/api/ifaces"
"github.com/iron-io/functions/api/models"
"github.com/iron-io/functions/api/runner"
"github.com/iron-io/functions/api/server/routecache"
"github.com/iron-io/runner/common"
)

Expand All @@ -27,6 +29,9 @@ type Server struct {
MQ models.MessageQueue
AppListeners []ifaces.AppListener
SpecialHandlers []ifaces.SpecialHandler

mu sync.Mutex
hotroutes map[string]*routecache.Cache
}

func New(ds models.Datastore, mq models.MessageQueue, r *runner.Runner) *Server {
Expand All @@ -35,6 +40,7 @@ func New(ds models.Datastore, mq models.MessageQueue, r *runner.Runner) *Server
Datastore: ds,
MQ: mq,
Runner: r,
hotroutes: make(map[string]*routecache.Cache),
}
return Api
}
Expand Down Expand Up @@ -80,7 +86,7 @@ func (s *Server) UseSpecialHandlers(ginC *gin.Context) error {
}
}
// now call the normal runner call
handleRequest(ginC, nil)
s.handleRequest(ginC, nil)
return nil
}

Expand All @@ -90,7 +96,37 @@ func (s *Server) handleRunnerRequest(c *gin.Context) {
ctx, _ := common.LoggerWithFields(c, logrus.Fields{"call_id": task.ID})
return s.MQ.Push(ctx, task)
}
handleRequest(c, enqueue)
s.handleRequest(c, enqueue)

}

func (s *Server) loadcache(appname string) []*models.Route {
s.mu.Lock()
cache, ok := s.hotroutes[appname]
if !ok {
s.mu.Unlock()
return nil
}
routes := cache.Routes()
s.mu.Unlock()
return routes
}

func (s *Server) refreshcache(appname string, route *models.Route) {
s.mu.Lock()
cache, ok := s.hotroutes[appname]
if !ok {
s.hotroutes[appname] = routecache.New()
cache = s.hotroutes[appname]
}
cache.Refresh(route)
s.mu.Unlock()
}

func (s *Server) resetcache(appname string) {
s.mu.Lock()
s.hotroutes[appname] = routecache.New()
s.mu.Unlock()
}

func (s *Server) handleTaskRequest(c *gin.Context) {
Expand Down Expand Up @@ -174,7 +210,9 @@ func (s *Server) bindHandlers() {
apps.POST("/routes", handleRouteCreate)
apps.GET("/routes/*route", handleRouteGet)
apps.PUT("/routes/*route", handleRouteUpdate)
apps.DELETE("/routes/*route", handleRouteDelete)
apps.DELETE("/routes/*route", func(c *gin.Context) {
handleRouteDelete(c, s.resetcache)
})
}
}

Expand Down
1 change: 1 addition & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ func main() {
}

svr := &supervisor.Supervisor{
MaxRestarts: supervisor.AlwaysRestart,
Log: func(msg interface{}) {
log.Debug("supervisor: ", msg)
},
Expand Down

0 comments on commit c0610fd

Please sign in to comment.