From a675fb708d5d478639fb8c0d250433e5e14877b0 Mon Sep 17 00:00:00 2001 From: Zsolt Felfoldi Date: Wed, 17 Mar 2021 02:29:13 +0100 Subject: [PATCH] les: rebase fix --- les/clientpool.go | 453 ----------------------- les/server.go | 3 +- les/vflux/server/clientpool.go | 32 +- les/vflux/server/clientpool_test.go | 4 + tests/fuzzers/vflux/clientpool-fuzzer.go | 2 +- 5 files changed, 36 insertions(+), 458 deletions(-) delete mode 100644 les/clientpool.go diff --git a/les/clientpool.go b/les/clientpool.go deleted file mode 100644 index 3965d54508db..000000000000 --- a/les/clientpool.go +++ /dev/null @@ -1,453 +0,0 @@ -// Copyright 2019 The go-ethereum Authors -// This file is part of the go-ethereum library. -// -// The go-ethereum library is free software: you can redistribute it and/or modify -// it under the terms of the GNU Lesser General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// The go-ethereum library is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Lesser General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with the go-ethereum library. If not, see . - -package les - -import ( - "fmt" - "sync" - "time" - - "github.com/ethereum/go-ethereum/common/mclock" - "github.com/ethereum/go-ethereum/ethdb" - "github.com/ethereum/go-ethereum/les/utils" - "github.com/ethereum/go-ethereum/les/vflux" - vfs "github.com/ethereum/go-ethereum/les/vflux/server" - "github.com/ethereum/go-ethereum/log" - "github.com/ethereum/go-ethereum/p2p/enode" - "github.com/ethereum/go-ethereum/p2p/enr" - "github.com/ethereum/go-ethereum/p2p/nodestate" - "github.com/ethereum/go-ethereum/rlp" -) - -const ( - defaultNegExpTC = 3600 // default time constant (in seconds) for exponentially reducing negative balance - - // defaultConnectedBias is applied to already connected clients So that - // already connected client won't be kicked out very soon and we - // can ensure all connected clients can have enough time to request - // or sync some data. - // - // todo(rjl493456442) make it configurable. It can be the option of - // free trial time! - defaultConnectedBias = time.Minute * 3 - inactiveTimeout = time.Second * 10 -) - -// clientPool implements a client database that assigns a priority to each client -// based on a positive and negative balance. Positive balance is externally assigned -// to prioritized clients and is decreased with connection time and processed -// requests (unless the price factors are zero). If the positive balance is zero -// then negative balance is accumulated. -// -// Balance tracking and priority calculation for connected clients is done by -// balanceTracker. activeQueue ensures that clients with the lowest positive or -// highest negative balance get evicted when the total capacity allowance is full -// and new clients with a better balance want to connect. -// -// Already connected nodes receive a small bias in their favor in order to avoid -// accepting and instantly kicking out clients. In theory, we try to ensure that -// each client can have several minutes of connection time. -// -// Balances of disconnected clients are stored in nodeDB including positive balance -// and negative banalce. Boeth positive balance and negative balance will decrease -// exponentially. If the balance is low enough, then the record will be dropped. -type clientPool struct { - vfs.BalanceTrackerSetup - vfs.PriorityPoolSetup - lock sync.Mutex - clock mclock.Clock - closed bool - removePeer func(enode.ID) - synced func() bool - ns *nodestate.NodeStateMachine - pp *vfs.PriorityPool - bt *vfs.BalanceTracker - - defaultPosFactors, defaultNegFactors vfs.PriceFactors - posExpTC, negExpTC uint64 - minCap uint64 // The minimal capacity value allowed for any client - connectedBias time.Duration - capLimit uint64 -} - -// clientPoolPeer represents a client peer in the pool. -// Positive balances are assigned to node key while negative balances are assigned -// to freeClientId. Currently network IP address without port is used because -// clients have a limited access to IP addresses while new node keys can be easily -// generated so it would be useless to assign a negative value to them. -type clientPoolPeer interface { - Node() *enode.Node - freeClientId() string - updateCapacity(uint64) - freeze() - allowInactive() bool -} - -// clientInfo defines all information required by clientpool. -type clientInfo struct { - node *enode.Node - address string - peer clientPoolPeer - connected, priority bool - connectedAt mclock.AbsTime - balance *vfs.NodeBalance -} - -// newClientPool creates a new client pool -func newClientPool(ns *nodestate.NodeStateMachine, lesDb ethdb.Database, minCap uint64, connectedBias time.Duration, clock mclock.Clock, removePeer func(enode.ID), synced func() bool) *clientPool { - pool := &clientPool{ - ns: ns, - BalanceTrackerSetup: balanceTrackerSetup, - PriorityPoolSetup: priorityPoolSetup, - clock: clock, - minCap: minCap, - connectedBias: connectedBias, - removePeer: removePeer, - synced: synced, - } - pool.bt = vfs.NewBalanceTracker(ns, balanceTrackerSetup, lesDb, clock, &utils.Expirer{}, &utils.Expirer{}) - pool.pp = vfs.NewPriorityPool(ns, priorityPoolSetup, clock, minCap, connectedBias, 4) - - // set default expiration constants used by tests - // Note: server overwrites this if token sale is active - pool.bt.SetExpirationTCs(0, defaultNegExpTC) - - ns.SubscribeState(pool.InactiveFlag.Or(pool.PriorityFlag), func(node *enode.Node, oldState, newState nodestate.Flags) { - if newState.Equals(pool.InactiveFlag) { - ns.AddTimeout(node, pool.InactiveFlag, inactiveTimeout) - } - if oldState.Equals(pool.InactiveFlag) && newState.Equals(pool.InactiveFlag.Or(pool.PriorityFlag)) { - ns.SetStateSub(node, pool.InactiveFlag, nodestate.Flags{}, 0) // remove timeout - } - }) - - ns.SubscribeState(pool.ActiveFlag.Or(pool.PriorityFlag), func(node *enode.Node, oldState, newState nodestate.Flags) { - c, _ := ns.GetField(node, clientInfoField).(*clientInfo) - if c == nil { - return - } - c.priority = newState.HasAll(pool.PriorityFlag) - if newState.Equals(pool.ActiveFlag) { - cap, _ := ns.GetField(node, pool.CapacityField).(uint64) - if cap > minCap { - pool.pp.RequestCapacity(node, minCap, 0, true) - } - } - }) - - ns.SubscribeState(pool.InactiveFlag.Or(pool.ActiveFlag), func(node *enode.Node, oldState, newState nodestate.Flags) { - if oldState.IsEmpty() { - clientConnectedMeter.Mark(1) - log.Debug("Client connected", "id", node.ID()) - } - if oldState.Equals(pool.InactiveFlag) && newState.Equals(pool.ActiveFlag) { - clientActivatedMeter.Mark(1) - log.Debug("Client activated", "id", node.ID()) - } - if oldState.Equals(pool.ActiveFlag) && newState.Equals(pool.InactiveFlag) { - clientDeactivatedMeter.Mark(1) - log.Debug("Client deactivated", "id", node.ID()) - c, _ := ns.GetField(node, clientInfoField).(*clientInfo) - if c == nil || !c.peer.allowInactive() { - pool.removePeer(node.ID()) - } - } - if newState.IsEmpty() { - clientDisconnectedMeter.Mark(1) - log.Debug("Client disconnected", "id", node.ID()) - pool.removePeer(node.ID()) - } - }) - - var totalConnected uint64 - ns.SubscribeField(pool.CapacityField, func(node *enode.Node, state nodestate.Flags, oldValue, newValue interface{}) { - oldCap, _ := oldValue.(uint64) - newCap, _ := newValue.(uint64) - totalConnected += newCap - oldCap - totalConnectedGauge.Update(int64(totalConnected)) - c, _ := ns.GetField(node, clientInfoField).(*clientInfo) - if c != nil { - c.peer.updateCapacity(newCap) - } - }) - return pool -} - -// stop shuts the client pool down -func (f *clientPool) stop() { - f.lock.Lock() - f.closed = true - f.lock.Unlock() - f.ns.ForEach(nodestate.Flags{}, nodestate.Flags{}, func(node *enode.Node, state nodestate.Flags) { - // enforces saving all balances in BalanceTracker - f.disconnectNode(node) - }) - f.bt.Stop() -} - -// connect should be called after a successful handshake. If the connection was -// rejected, there is no need to call disconnect. -func (f *clientPool) connect(peer clientPoolPeer) (uint64, error) { - f.lock.Lock() - defer f.lock.Unlock() - - // Short circuit if clientPool is already closed. - if f.closed { - return 0, fmt.Errorf("Client pool is already closed") - } - // Dedup connected peers. - node, freeID := peer.Node(), peer.freeClientId() - if f.ns.GetField(node, clientInfoField) != nil { - log.Debug("Client already connected", "address", freeID, "id", node.ID().String()) - return 0, fmt.Errorf("Client already connected address=%s id=%s", freeID, node.ID().String()) - } - now := f.clock.Now() - c := &clientInfo{ - node: node, - address: freeID, - peer: peer, - connected: true, - connectedAt: now, - } - f.ns.SetField(node, clientInfoField, c) - f.ns.SetField(node, connAddressField, freeID) - if c.balance, _ = f.ns.GetField(node, f.BalanceField).(*vfs.NodeBalance); c.balance == nil { - f.disconnect(peer) - return 0, nil - } - c.balance.SetPriceFactors(f.defaultPosFactors, f.defaultNegFactors) - - f.ns.SetState(node, f.InactiveFlag, nodestate.Flags{}, 0) - var allowed bool - f.ns.Operation(func() { - _, allowed = f.pp.RequestCapacity(node, f.minCap, f.connectedBias, true) - }) - if allowed { - return f.minCap, nil - } - if !peer.allowInactive() { - f.disconnect(peer) - } - return 0, nil -} - -// setConnectedBias sets the connection bias, which is applied to already connected clients -// So that already connected client won't be kicked out very soon and we can ensure all -// connected clients can have enough time to request or sync some data. -func (f *clientPool) setConnectedBias(bias time.Duration) { - f.lock.Lock() - defer f.lock.Unlock() - - f.connectedBias = bias - f.pp.SetActiveBias(bias) -} - -// disconnect should be called when a connection is terminated. If the disconnection -// was initiated by the pool itself using disconnectFn then calling disconnect is -// not necessary but permitted. -func (f *clientPool) disconnect(p clientPoolPeer) { - f.disconnectNode(p.Node()) -} - -// disconnectNode removes node fields and flags related to connected status -func (f *clientPool) disconnectNode(node *enode.Node) { - f.ns.SetField(node, connAddressField, nil) - f.ns.SetField(node, clientInfoField, nil) -} - -// setDefaultFactors sets the default price factors applied to subsequently connected clients -func (f *clientPool) setDefaultFactors(posFactors, negFactors vfs.PriceFactors) { - f.lock.Lock() - defer f.lock.Unlock() - - f.defaultPosFactors = posFactors - f.defaultNegFactors = negFactors -} - -// capacityInfo returns the total capacity allowance, the total capacity of connected -// clients and the total capacity of connected and prioritized clients -func (f *clientPool) capacityInfo() (uint64, uint64, uint64) { - f.lock.Lock() - defer f.lock.Unlock() - - // total priority active cap will be supported when the token issuer module is added - _, activeCap := f.pp.Active() - return f.capLimit, activeCap, 0 -} - -// setLimits sets the maximum number and total capacity of connected clients, -// dropping some of them if necessary. -func (f *clientPool) setLimits(totalConn int, totalCap uint64) { - f.lock.Lock() - defer f.lock.Unlock() - - f.capLimit = totalCap - f.pp.SetLimits(uint64(totalConn), totalCap) -} - -// setCapacity sets the assigned capacity of a connected client -func (f *clientPool) setCapacity(node *enode.Node, freeID string, capacity uint64, bias time.Duration, setCap bool) (uint64, error) { - c, _ := f.ns.GetField(node, clientInfoField).(*clientInfo) - if c == nil { - if setCap { - return 0, fmt.Errorf("client %064x is not connected", node.ID()) - } - c = &clientInfo{node: node} - f.ns.SetField(node, clientInfoField, c) - f.ns.SetField(node, connAddressField, freeID) - if c.balance, _ = f.ns.GetField(node, f.BalanceField).(*vfs.NodeBalance); c.balance == nil { - log.Error("BalanceField is missing", "node", node.ID()) - return 0, fmt.Errorf("BalanceField of %064x is missing", node.ID()) - } - defer func() { - f.ns.SetField(node, connAddressField, nil) - f.ns.SetField(node, clientInfoField, nil) - }() - } - var ( - minPriority int64 - allowed bool - ) - f.ns.Operation(func() { - if !setCap || c.priority { - // check clientInfo.priority inside Operation to ensure thread safety - minPriority, allowed = f.pp.RequestCapacity(node, capacity, bias, setCap) - } - }) - if allowed { - return 0, nil - } - missing := c.balance.PosBalanceMissing(minPriority, capacity, bias) - if missing < 1 { - // ensure that we never return 0 missing and insufficient priority error - missing = 1 - } - return missing, errNoPriority -} - -// setCapacityLocked is the equivalent of setCapacity used when f.lock is already locked -func (f *clientPool) setCapacityLocked(node *enode.Node, freeID string, capacity uint64, minConnTime time.Duration, setCap bool) (uint64, error) { - f.lock.Lock() - defer f.lock.Unlock() - - return f.setCapacity(node, freeID, capacity, minConnTime, setCap) -} - -// forClients calls the supplied callback for either the listed node IDs or all connected -// nodes. It passes a valid clientInfo to the callback and ensures that the necessary -// fields and flags are set in order for BalanceTracker and PriorityPool to work even if -// the node is not connected. -func (f *clientPool) forClients(ids []enode.ID, cb func(client *clientInfo)) { - f.lock.Lock() - defer f.lock.Unlock() - - if len(ids) == 0 { - f.ns.ForEach(nodestate.Flags{}, nodestate.Flags{}, func(node *enode.Node, state nodestate.Flags) { - c, _ := f.ns.GetField(node, clientInfoField).(*clientInfo) - if c != nil { - cb(c) - } - }) - } else { - for _, id := range ids { - node := f.ns.GetNode(id) - if node == nil { - node = enode.SignNull(&enr.Record{}, id) - } - c, _ := f.ns.GetField(node, clientInfoField).(*clientInfo) - if c != nil { - cb(c) - } else { - c = &clientInfo{node: node} - f.ns.SetField(node, clientInfoField, c) - f.ns.SetField(node, connAddressField, "") - if c.balance, _ = f.ns.GetField(node, f.BalanceField).(*vfs.NodeBalance); c.balance != nil { - cb(c) - } else { - log.Error("BalanceField is missing") - } - f.ns.SetField(node, connAddressField, nil) - f.ns.SetField(node, clientInfoField, nil) - } - } - } -} - -// serveCapQuery serves a vflux capacity query. It receives multiple token amount values -// and a bias time value. For each given token amount it calculates the maximum achievable -// capacity in case the amount is added to the balance. -func (f *clientPool) serveCapQuery(id enode.ID, freeID string, data []byte) []byte { - var req vflux.CapacityQueryReq - if rlp.DecodeBytes(data, &req) != nil { - return nil - } - if l := len(req.AddTokens); l == 0 || l > vflux.CapacityQueryMaxLen { - return nil - } - result := make(vflux.CapacityQueryReply, len(req.AddTokens)) - if !f.synced() { - capacityQueryZeroMeter.Mark(1) - reply, _ := rlp.EncodeToBytes(&result) - return reply - } - - node := f.ns.GetNode(id) - if node == nil { - node = enode.SignNull(&enr.Record{}, id) - } - c, _ := f.ns.GetField(node, clientInfoField).(*clientInfo) - if c == nil { - c = &clientInfo{node: node} - f.ns.SetField(node, clientInfoField, c) - f.ns.SetField(node, connAddressField, freeID) - defer func() { - f.ns.SetField(node, connAddressField, nil) - f.ns.SetField(node, clientInfoField, nil) - }() - if c.balance, _ = f.ns.GetField(node, f.BalanceField).(*vfs.NodeBalance); c.balance == nil { - log.Error("BalanceField is missing", "node", node.ID()) - return nil - } - } - // use vfs.CapacityCurve to answer request for multiple newly bought token amounts - curve := f.pp.GetCapacityCurve().Exclude(id) - bias := time.Second * time.Duration(req.Bias) - if f.connectedBias > bias { - bias = f.connectedBias - } - pb, _ := c.balance.GetBalance() - for i, addTokens := range req.AddTokens { - add := addTokens.Int64() - result[i] = curve.MaxCapacity(func(capacity uint64) int64 { - return c.balance.EstimatePriority(capacity, add, 0, bias, false) / int64(capacity) - }) - if add <= 0 && uint64(-add) >= pb && result[i] > f.minCap { - result[i] = f.minCap - } - if result[i] < f.minCap { - result[i] = 0 - } - } - // add first result to metrics (don't care about priority client multi-queries yet) - if result[0] == 0 { - capacityQueryZeroMeter.Mark(1) - } else { - capacityQueryNonZeroMeter.Mark(1) - } - reply, _ := rlp.EncodeToBytes(&result) - return reply -} diff --git a/les/server.go b/les/server.go index d4f0c3e8d56b..8bb4513119c1 100644 --- a/les/server.go +++ b/les/server.go @@ -138,7 +138,8 @@ func NewLesServer(node *node.Node, e ethBackend, config *ethconfig.Config) (*Les } srv.fcManager.SetCapacityLimits(srv.minCapacity, srv.maxCapacity, srv.minCapacity*2) srv.clientPool = vfs.NewClientPool(lesDb, srv.minCapacity, defaultConnectedBias, mclock.System{}, issync) - srv.clientPool.AddMetrics(totalConnectedGauge, clientConnectedMeter, clientDisconnectedMeter, clientActivatedMeter, clientDeactivatedMeter) + srv.clientPool.AddMetrics(totalConnectedGauge, clientConnectedMeter, clientDisconnectedMeter, + clientActivatedMeter, clientDeactivatedMeter, capacityQueryZeroMeter, capacityQueryNonZeroMeter) srv.clientPool.Start() srv.clientPool.SetDefaultFactors(defaultPosFactors, defaultNegFactors) srv.vfluxServer.Register(srv.clientPool, "les", "Ethereum light client service") diff --git a/les/vflux/server/clientpool.go b/les/vflux/server/clientpool.go index dc862503966a..a747adf302af 100644 --- a/les/vflux/server/clientpool.go +++ b/les/vflux/server/clientpool.go @@ -75,6 +75,7 @@ type ClientPool struct { clock mclock.Clock closed bool ns *nodestate.NodeStateMachine + synced func() bool lock sync.RWMutex defaultPosFactors, defaultNegFactors PriceFactors @@ -82,6 +83,8 @@ type ClientPool struct { minCap uint64 // the minimal capacity value allowed for any client capReqNode *enode.Node // node that is requesting capacity change; only used inside NSM operation + + capacityQueryZeroMeter, capacityQueryNonZeroMeter metrics.Meter } // clientPeer represents a peer in the client pool. None of the callbacks should block. @@ -96,7 +99,7 @@ type clientPeer interface { type clientPeerInstance struct{ clientPeer } // the NodeStateMachine type system needs this wrapper // NewClientPool creates a new client pool -func NewClientPool(balanceDb ethdb.KeyValueStore, minCap uint64, connectedBias time.Duration, clock mclock.Clock) *ClientPool { +func NewClientPool(balanceDb ethdb.KeyValueStore, minCap uint64, connectedBias time.Duration, clock mclock.Clock, synced func() bool) *ClientPool { ns := nodestate.NewNodeStateMachine(nil, nil, clock, serverSetup) cp := &ClientPool{ ns: ns, @@ -105,6 +108,7 @@ func NewClientPool(balanceDb ethdb.KeyValueStore, minCap uint64, connectedBias t clock: clock, minCap: minCap, connectedBias: connectedBias, + synced: synced, } ns.SubscribeState(nodestate.MergeFlags(ppSetup.ActiveFlag, ppSetup.InactiveFlag, btSetup.PriorityFlag), func(node *enode.Node, oldState, newState nodestate.Flags) { @@ -160,7 +164,9 @@ func NewClientPool(balanceDb ethdb.KeyValueStore, minCap uint64, connectedBias t // AddMetrics adds metrics to the client pool. Should be called before Start(). func (cp *ClientPool) AddMetrics(totalConnectedGauge metrics.Gauge, - clientConnectedMeter, clientDisconnectedMeter, clientActivatedMeter, clientDeactivatedMeter metrics.Meter) { + clientConnectedMeter, clientDisconnectedMeter, clientActivatedMeter, clientDeactivatedMeter, + capacityQueryZeroMeter, capacityQueryNonZeroMeter metrics.Meter) { + cp.ns.SubscribeState(nodestate.MergeFlags(ppSetup.ActiveFlag, ppSetup.InactiveFlag), func(node *enode.Node, oldState, newState nodestate.Flags) { if oldState.IsEmpty() && !newState.IsEmpty() { clientConnectedMeter.Mark(1) @@ -177,6 +183,8 @@ func (cp *ClientPool) AddMetrics(totalConnectedGauge metrics.Gauge, _, connected := cp.Active() totalConnectedGauge.Update(int64(connected)) }) + cp.capacityQueryZeroMeter = capacityQueryZeroMeter + cp.capacityQueryNonZeroMeter = capacityQueryNonZeroMeter } // Start starts the client pool. Should be called before Register/Unregister. @@ -309,6 +317,15 @@ func (cp *ClientPool) serveCapQuery(id enode.ID, freeID string, data []byte) []b if l := len(req.AddTokens); l == 0 || l > vflux.CapacityQueryMaxLen { return nil } + result := make(vflux.CapacityQueryReply, len(req.AddTokens)) + if !cp.synced() { + if cp.capacityQueryZeroMeter != nil { + cp.capacityQueryZeroMeter.Mark(1) + } + reply, _ := rlp.EncodeToBytes(&result) + return reply + } + bias := time.Second * time.Duration(req.Bias) cp.lock.RLock() if cp.connectedBias > bias { @@ -318,7 +335,6 @@ func (cp *ClientPool) serveCapQuery(id enode.ID, freeID string, data []byte) []b // use CapacityCurve to answer request for multiple newly bought token amounts curve := cp.GetCapacityCurve().Exclude(id) - result := make(vflux.CapacityQueryReply, len(req.AddTokens)) cp.BalanceOperation(id, freeID, func(balance AtomicBalanceOperator) { pb, _ := balance.GetBalance() for i, addTokens := range req.AddTokens { @@ -334,6 +350,16 @@ func (cp *ClientPool) serveCapQuery(id enode.ID, freeID string, data []byte) []b } } }) + // add first result to metrics (don't care about priority client multi-queries yet) + if result[0] == 0 { + if cp.capacityQueryZeroMeter != nil { + cp.capacityQueryZeroMeter.Mark(1) + } + } else { + if cp.capacityQueryNonZeroMeter != nil { + cp.capacityQueryNonZeroMeter.Mark(1) + } + } reply, _ := rlp.EncodeToBytes(&result) return reply } diff --git a/les/vflux/server/clientpool_test.go b/les/vflux/server/clientpool_test.go index 3eac72fff4ed..c65dc90a7750 100644 --- a/les/vflux/server/clientpool_test.go +++ b/les/vflux/server/clientpool_test.go @@ -130,6 +130,10 @@ func disconnect(pool *ClientPool, peer *poolTestPeer) { pool.Unregister(peer) } +func alwaysTrueFn() bool { + return true +} + func testClientPool(t *testing.T, activeLimit, clientCount, paidCount int, randomDisconnect bool) { rand.Seed(time.Now().UnixNano()) var ( diff --git a/tests/fuzzers/vflux/clientpool-fuzzer.go b/tests/fuzzers/vflux/clientpool-fuzzer.go index ba45c2ddb41d..3d71f5b7b1d8 100644 --- a/tests/fuzzers/vflux/clientpool-fuzzer.go +++ b/tests/fuzzers/vflux/clientpool-fuzzer.go @@ -201,7 +201,7 @@ func FuzzClientPool(input []byte) int { } clock := &mclock.Simulated{} db := memorydb.New() - pool := vfs.NewClientPool(db, 10, f.randomDelay(), clock) + pool := vfs.NewClientPool(db, 10, f.randomDelay(), clock, func() bool { return true }) pool.Start() defer pool.Stop()