Skip to content

Commit

Permalink
Node/CCQ/Server: Add permissions file watcher (#3586)
Browse files Browse the repository at this point in the history
  • Loading branch information
bruce-riley authored Dec 11, 2023
1 parent 2a3d4c8 commit fd05cb0
Show file tree
Hide file tree
Showing 7 changed files with 288 additions and 167 deletions.
6 changes: 3 additions & 3 deletions node/cmd/ccq/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ type httpServer struct {
topic *pubsub.Topic
logger *zap.Logger
env common.Environment
permissions Permissions
permissions *Permissions
signerKey *ecdsa.PrivateKey
pendingResponses *PendingResponses
}
Expand Down Expand Up @@ -79,7 +79,7 @@ func (s *httpServer) handleQuery(w http.ResponseWriter, r *http.Request) {
apiKey := strings.ToLower(apiKeys[0])

// Make sure the user is authorized before we go any farther.
permEntry, exists := s.permissions[apiKey]
permEntry, exists := s.permissions.GetUserEntry(apiKey)
if !exists {
s.logger.Error("invalid api key", zap.String("apiKey", apiKey))
http.Error(w, "invalid api key", http.StatusForbidden)
Expand Down Expand Up @@ -202,7 +202,7 @@ func (s *httpServer) handleQuery(w http.ResponseWriter, r *http.Request) {
s.pendingResponses.Remove(pendingResponse)
}

func NewHTTPServer(addr string, t *pubsub.Topic, permissions Permissions, signerKey *ecdsa.PrivateKey, p *PendingResponses, logger *zap.Logger, env common.Environment) *http.Server {
func NewHTTPServer(addr string, t *pubsub.Topic, permissions *Permissions, signerKey *ecdsa.PrivateKey, p *PendingResponses, logger *zap.Logger, env common.Environment) *http.Server {
s := &httpServer{
topic: t,
permissions: permissions,
Expand Down
12 changes: 12 additions & 0 deletions node/cmd/ccq/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,4 +60,16 @@ var (
Help: "Time from request to response published in ms",
Buckets: []float64{10.0, 100.0, 250.0, 500.0, 1000.0, 5000.0, 10000.0, 30000.0},
})

permissionFileReloadsSuccess = promauto.NewCounter(
prometheus.CounterOpts{
Name: "ccq_server_perm_file_reload_success",
Help: "Total number of times the permissions file was successfully reloaded",
})

permissionFileReloadsFailure = promauto.NewCounter(
prometheus.CounterOpts{
Name: "ccq_server_perm_file_reload_failure",
Help: "Total number of times the permissions file failed to reload",
})
)
247 changes: 247 additions & 0 deletions node/cmd/ccq/permissions.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,247 @@
package ccq

import (
"context"
"encoding/hex"
"encoding/json"
"fmt"
"io"
"os"
"strings"
"sync"

"github.com/certusone/wormhole/node/pkg/common"
"github.com/wormhole-foundation/wormhole/sdk/vaa"
"go.uber.org/zap"

"gopkg.in/godo.v2/watcher/fswatch"
)

type (
Config struct {
Permissions []User `json:"Permissions"`
}

User struct {
UserName string `json:"userName"`
ApiKey string `json:"apiKey"`
AllowUnsigned bool `json:"allowUnsigned"`
AllowedCalls []AllowedCall `json:"allowedCalls"`
}

AllowedCall struct {
EthCall *EthCall `json:"ethCall"`
EthCallByTimestamp *EthCallByTimestamp `json:"ethCallByTimestamp"`
EthCallWithFinality *EthCallWithFinality `json:"ethCallWithFinality"`
}

EthCall struct {
Chain int `json:"chain"`
ContractAddress string `json:"contractAddress"`
Call string `json:"call"`
}

EthCallByTimestamp struct {
Chain int `json:"chain"`
ContractAddress string `json:"contractAddress"`
Call string `json:"call"`
}

EthCallWithFinality struct {
Chain int `json:"chain"`
ContractAddress string `json:"contractAddress"`
Call string `json:"call"`
}

PermissionsMap map[string]*permissionEntry

permissionEntry struct {
userName string
apiKey string
allowUnsigned bool
allowedCalls allowedCallsForUser // Key is something like "ethCall:2:000000000000000000000000b4fbf271143f4fbf7b91a5ded31805e42b2208d6:06fdde03"
}

allowedCallsForUser map[string]struct{}

Permissions struct {
lock sync.Mutex
permMap PermissionsMap
fileName string
watcher *fswatch.Watcher
}
)

// NewPermissions creates a Permissions object which contains the per-user permissions.
func NewPermissions(fileName string) (*Permissions, error) {
permMap, err := parseConfigFile(fileName)
if err != nil {
return nil, err
}

return &Permissions{
permMap: permMap,
fileName: fileName,
}, nil
}

// StartWatcher creates an fswatcher to watch for updates to the permissions file and reload it when it changes.
func (perms *Permissions) StartWatcher(ctx context.Context, logger *zap.Logger, errC chan error) {
logger = logger.With(zap.String("component", "perms"))
perms.watcher = fswatch.NewWatcher(perms.fileName)
fsChan := perms.watcher.Start()

common.RunWithScissors(ctx, errC, "perm_file_watcher", func(ctx context.Context) error {
for {
select {
case <-ctx.Done():
return nil
case notif := <-fsChan:
if notif.Path != perms.fileName {
return fmt.Errorf("permissions watcher received an update for an unexpected file: %s", notif.Path)
}

logger.Info("the permissions file has been updated", zap.String("fileName", notif.Path), zap.Int("event", int(notif.Event)))
perms.Reload(logger)
}
}
})
}

// Reload reloads the permissions file.
func (perms *Permissions) Reload(logger *zap.Logger) {
permMap, err := parseConfigFile(perms.fileName)
if err != nil {
logger.Error("failed to reload the permissions file, sticking with the old one", zap.String("fileName", perms.fileName), zap.Error(err))
permissionFileReloadsFailure.Inc()
return
}

logger.Info("successfully reloaded the permissions file, switching to it", zap.String("fileName", perms.fileName))
perms.lock.Lock()
perms.permMap = permMap
perms.lock.Unlock()
permissionFileReloadsSuccess.Inc()
}

// StopWatcher stops the permissions file watcher.
func (perms *Permissions) StopWatcher() {
if perms.watcher != nil {
perms.watcher.Stop()
}
}

// GetUserEntry returns the permissions entry for a given API key. It uses the lock to protect against updates.
func (perms *Permissions) GetUserEntry(apiKey string) (*permissionEntry, bool) {
perms.lock.Lock()
defer perms.lock.Unlock()
userEntry, exists := perms.permMap[apiKey]
return userEntry, exists
}

const ETH_CALL_SIG_LENGTH = 4

// parseConfigFile parses the permissions config file into a map keyed by API key.
func parseConfigFile(fileName string) (PermissionsMap, error) {
jsonFile, err := os.Open(fileName)
if err != nil {
return nil, fmt.Errorf(`failed to open permissions file "%s": %w`, fileName, err)
}
defer jsonFile.Close()

byteValue, err := io.ReadAll(jsonFile)
if err != nil {
return nil, fmt.Errorf(`failed to read permissions file "%s": %w`, fileName, err)
}

retVal, err := parseConfig(byteValue)
if err != nil {
return retVal, fmt.Errorf(`failed to parse permissions file "%s": %w`, fileName, err)
}

return retVal, err
}

// parseConfig parses the permissions config from a buffer into a map keyed by API key.
func parseConfig(byteValue []byte) (PermissionsMap, error) {
var config Config
if err := json.Unmarshal(byteValue, &config); err != nil {
return nil, fmt.Errorf(`failed to unmarshal json: %w`, err)
}

ret := make(PermissionsMap)
userNames := map[string]struct{}{}
for _, user := range config.Permissions {
// Since we log user names in all our error messages, make sure they are unique.
if _, exists := userNames[user.UserName]; exists {
return nil, fmt.Errorf(`UserName "%s" is a duplicate`, user.UserName)
}
userNames[user.UserName] = struct{}{}

apiKey := strings.ToLower(user.ApiKey)
if _, exists := ret[apiKey]; exists {
return nil, fmt.Errorf(`API key "%s" is a duplicate`, apiKey)
}

// Build the list of allowed calls for this API key.
allowedCalls := make(allowedCallsForUser)
for _, ac := range user.AllowedCalls {
var chain int
var callType, contractAddressStr, callStr string
// var contractAddressStr string
if ac.EthCall != nil {
callType = "ethCall"
chain = ac.EthCall.Chain
contractAddressStr = ac.EthCall.ContractAddress
callStr = ac.EthCall.Call
} else if ac.EthCallByTimestamp != nil {
callType = "ethCallByTimestamp"
chain = ac.EthCallByTimestamp.Chain
contractAddressStr = ac.EthCallByTimestamp.ContractAddress
callStr = ac.EthCallByTimestamp.Call
} else if ac.EthCallWithFinality != nil {
callType = "ethCallWithFinality"
chain = ac.EthCallWithFinality.Chain
contractAddressStr = ac.EthCallWithFinality.ContractAddress
callStr = ac.EthCallWithFinality.Call
} else {
return nil, fmt.Errorf(`unsupported call type for user "%s", must be "ethCall", "ethCallByTimestamp" or "ethCallWithFinality"`, user.UserName)
}

// Convert the contract address into a standard format like "000000000000000000000000b4fbf271143f4fbf7b91a5ded31805e42b2208d6".
contractAddress, err := vaa.StringToAddress(contractAddressStr)
if err != nil {
return nil, fmt.Errorf(`invalid contract address "%s" for user "%s"`, contractAddressStr, user.UserName)
}

// The call should be the ABI four byte hex hash of the function signature. Parse it into a standard form of "06fdde03".
call, err := hex.DecodeString(strings.TrimPrefix(callStr, "0x"))
if err != nil {
return nil, fmt.Errorf(`invalid eth call "%s" for user "%s"`, callStr, user.UserName)
}
if len(call) != ETH_CALL_SIG_LENGTH {
return nil, fmt.Errorf(`eth call "%s" for user "%s" has an invalid length, must be %d bytes`, callStr, user.UserName, ETH_CALL_SIG_LENGTH)
}

// The permission key is the chain, contract address and call formatted as a colon separated string.
callKey := fmt.Sprintf("%s:%d:%s:%s", callType, chain, contractAddress, hex.EncodeToString(call))

if _, exists := allowedCalls[callKey]; exists {
return nil, fmt.Errorf(`"%s" is a duplicate allowed call for user "%s"`, callKey, user.UserName)
}

allowedCalls[callKey] = struct{}{}
}

pe := &permissionEntry{
userName: user.UserName,
apiKey: apiKey,
allowUnsigned: user.AllowUnsigned,
allowedCalls: allowedCalls,
}

ret[apiKey] = pe
}

return ret, nil
}
23 changes: 18 additions & 5 deletions node/cmd/ccq/query_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func runQueryServer(cmd *cobra.Command, args []string) {
logger.Fatal("Please specify --ethContract")
}

permissions, err := parseConfigFile(*permFile)
permissions, err := NewPermissions(*permFile)
if err != nil {
logger.Fatal("Failed to load permissions file", zap.String("permFile", *permFile), zap.Error(err))
}
Expand Down Expand Up @@ -213,11 +213,24 @@ func runQueryServer(cmd *cobra.Command, args []string) {
cancel()
}()

<-ctx.Done()
logger.Info("Context cancelled, exiting...")
// Start watching for permissions file updates.
errC := make(chan error)
permissions.StartWatcher(ctx, logger, errC)

// Cleanly shutdown
// Without this the same host won't properly discover peers until some timeout
// Wait for either a shutdown or a fatal error from the permissions watcher.
select {
case <-ctx.Done():
logger.Info("Context cancelled, exiting...")
break
case err := <-errC:
logger.Error("Encountered an error, exiting", zap.Error(err))
break
}

// Stop the permissions file watcher.
permissions.StopWatcher()

// Shutdown p2p. Without this the same host won't properly discover peers until some timeout
p2p.sub.Cancel()
if err := p2p.topic_req.Close(); err != nil {
logger.Error("Error closing the request topic", zap.Error(err))
Expand Down
Loading

0 comments on commit fd05cb0

Please sign in to comment.