From 1b0b619cd52c997d9bf6ab9f5d9dd3753cdd0b10 Mon Sep 17 00:00:00 2001 From: Zsolt Felfoldi Date: Mon, 10 Aug 2020 22:40:26 +0200 Subject: [PATCH] les: restored setConnectedBias function --- les/api.go | 5 ++--- les/clientpool.go | 25 +++++++++++++++++++------ les/clientpool_test.go | 24 ++++++++++++------------ les/lespay/server/prioritypool.go | 17 +++++++++++++---- les/server.go | 2 +- les/test_helper.go | 2 +- 6 files changed, 48 insertions(+), 27 deletions(-) diff --git a/les/api.go b/les/api.go index 1b3605ac6d78..aacf4abb82c3 100644 --- a/les/api.go +++ b/les/api.go @@ -192,14 +192,13 @@ func (api *PrivateLightServerAPI) SetDefaultParams(params map[string]interface{} // So that already connected client won't be kicked out very soon and we can ensure all // connected clients can have enough time to request or sync some data. // When the input parameter `bias` < 0 (illegal), return error. -/*func (api *PrivateLightServerAPI) SetConnectedBias(bias time.Duration) error { +func (api *PrivateLightServerAPI) SetConnectedBias(bias time.Duration) error { if bias < time.Duration(0) { return fmt.Errorf("bias illegal: %v less than 0", bias) } api.server.clientPool.setConnectedBias(bias) return nil -}*/ -//TODO fix regression +} // AddBalance updates the balance of a client (either overwrites it or adds to it). // It also updates the balance meta info string. diff --git a/les/clientpool.go b/les/clientpool.go index a4c608397f95..6fae2e952f16 100644 --- a/les/clientpool.go +++ b/les/clientpool.go @@ -36,15 +36,15 @@ const ( defaultPosExpTC = 36000 // default time constant (in seconds) for exponentially reducing positive balance defaultNegExpTC = 3600 // default time constant (in seconds) for exponentially reducing negative balance - // activeBias is applied to already connected clients So that + // defaultConnectedBias is applied to already connected clients So that // already connected client won't be kicked out very soon and we // can ensure all connected clients can have enough time to request // or sync some data. // // todo(rjl493456442) make it configurable. It can be the option of // free trial time! - activeBias = time.Minute * 3 - inactiveTimeout = time.Second * 10 + defaultConnectedBias = time.Minute * 3 + inactiveTimeout = time.Second * 10 ) var ( @@ -93,6 +93,7 @@ type clientPool struct { defaultPosFactors, defaultNegFactors lps.PriceFactors posExpTC, negExpTC uint64 minCap uint64 // The minimal capacity value allowed for any client + connectedBias time.Duration capLimit uint64 freeClientCap uint64 // The capacity value of each free client } @@ -121,7 +122,7 @@ type clientInfo struct { } // newClientPool creates a new client pool -func newClientPool(lespayDb ethdb.Database, minCap, freeClientCap uint64, activeBias time.Duration, clock mclock.Clock, removePeer func(enode.ID)) *clientPool { +func newClientPool(lespayDb ethdb.Database, minCap, freeClientCap uint64, connectedBias time.Duration, clock mclock.Clock, removePeer func(enode.ID)) *clientPool { if minCap > freeClientCap { panic(nil) } @@ -132,11 +133,12 @@ func newClientPool(lespayDb ethdb.Database, minCap, freeClientCap uint64, active PriorityPoolSetup: priorityPoolSetup, clock: clock, minCap: minCap, + connectedBias: connectedBias, freeClientCap: freeClientCap, removePeer: removePeer, } pool.bt = lps.NewBalanceTracker(ns, balanceTrackerSetup, lespayDb, clock, &utils.Expirer{}, &utils.Expirer{}) - pool.pp = lps.NewPriorityPool(ns, priorityPoolSetup, clock, minCap, activeBias, 4) + pool.pp = lps.NewPriorityPool(ns, priorityPoolSetup, clock, minCap, connectedBias, 4) // set default expiration constants used by tests // Note: server overwrites this if token sale is active @@ -242,7 +244,7 @@ func (f *clientPool) connect(peer clientPoolPeer, reqCapacity uint64) (uint64, e } f.ns.SetState(node, f.InactiveFlag, nodestate.Flags{}, 0) - if _, allowed := f.pp.RequestCapacity(node, reqCapacity, activeBias, true); allowed { + if _, allowed := f.pp.RequestCapacity(node, reqCapacity, f.connectedBias, true); allowed { return reqCapacity, nil } if !peer.allowInactive() { @@ -251,6 +253,17 @@ func (f *clientPool) connect(peer clientPoolPeer, reqCapacity uint64) (uint64, e return 0, nil } +// setConnectedBias sets the connection bias, which is applied to already connected clients +// So that already connected client won't be kicked out very soon and we can ensure all +// connected clients can have enough time to request or sync some data. +func (f *clientPool) setConnectedBias(bias time.Duration) { + f.lock.Lock() + defer f.lock.Unlock() + + f.connectedBias = bias + f.pp.SetActiveBias(bias) +} + // disconnect should be called when a connection is terminated. If the disconnection // was initiated by the pool itself using disconnectFn then calling disconnect is // not necessary but permitted. diff --git a/les/clientpool_test.go b/les/clientpool_test.go index 3e326e99032c..ffa6baa04b3e 100644 --- a/les/clientpool_test.go +++ b/les/clientpool_test.go @@ -218,7 +218,7 @@ func TestConnectPaidClient(t *testing.T) { clock mclock.Simulated db = rawdb.NewMemoryDatabase() ) - pool := newClientPool(db, 1, 1, activeBias, &clock, func(id enode.ID) {}) + pool := newClientPool(db, 1, 1, defaultConnectedBias, &clock, func(id enode.ID) {}) defer pool.stop() pool.setLimits(10, uint64(10)) pool.setDefaultFactors(lps.PriceFactors{1, 0, 1}, lps.PriceFactors{1, 0, 1}) @@ -236,7 +236,7 @@ func TestConnectPaidClientToSmallPool(t *testing.T) { clock mclock.Simulated db = rawdb.NewMemoryDatabase() ) - pool := newClientPool(db, 1, 1, activeBias, &clock, func(id enode.ID) {}) + pool := newClientPool(db, 1, 1, defaultConnectedBias, &clock, func(id enode.ID) {}) defer pool.stop() pool.setLimits(10, uint64(10)) // Total capacity limit is 10 pool.setDefaultFactors(lps.PriceFactors{1, 0, 1}, lps.PriceFactors{1, 0, 1}) @@ -256,7 +256,7 @@ func TestConnectPaidClientToFullPool(t *testing.T) { db = rawdb.NewMemoryDatabase() ) removeFn := func(enode.ID) {} // Noop - pool := newClientPool(db, 1, 1, activeBias, &clock, removeFn) + pool := newClientPool(db, 1, 1, defaultConnectedBias, &clock, removeFn) defer pool.stop() pool.setLimits(10, uint64(10)) // Total capacity limit is 10 pool.setDefaultFactors(lps.PriceFactors{1, 0, 1}, lps.PriceFactors{1, 0, 1}) @@ -285,7 +285,7 @@ func TestPaidClientKickedOut(t *testing.T) { removeFn := func(id enode.ID) { kickedCh <- int(id[0]) } - pool := newClientPool(db, 1, 1, activeBias, &clock, removeFn) + pool := newClientPool(db, 1, 1, defaultConnectedBias, &clock, removeFn) pool.bt.SetExpirationTCs(0, 0) defer pool.stop() pool.setLimits(10, uint64(10)) // Total capacity limit is 10 @@ -296,7 +296,7 @@ func TestPaidClientKickedOut(t *testing.T) { pool.connect(newPoolTestPeer(i, kickedCh), 1) clock.Run(time.Millisecond) } - clock.Run(activeBias + time.Second*11) + clock.Run(defaultConnectedBias + time.Second*11) if cap, _ := pool.connect(newPoolTestPeer(11, kickedCh), 0); cap == 0 { t.Fatalf("Free client should be accepted") } @@ -315,7 +315,7 @@ func TestConnectFreeClient(t *testing.T) { clock mclock.Simulated db = rawdb.NewMemoryDatabase() ) - pool := newClientPool(db, 1, 1, activeBias, &clock, func(id enode.ID) {}) + pool := newClientPool(db, 1, 1, defaultConnectedBias, &clock, func(id enode.ID) {}) defer pool.stop() pool.setLimits(10, uint64(10)) pool.setDefaultFactors(lps.PriceFactors{1, 0, 1}, lps.PriceFactors{1, 0, 1}) @@ -333,7 +333,7 @@ func TestConnectFreeClientToFullPool(t *testing.T) { db = rawdb.NewMemoryDatabase() ) removeFn := func(enode.ID) {} // Noop - pool := newClientPool(db, 1, 1, activeBias, &clock, removeFn) + pool := newClientPool(db, 1, 1, defaultConnectedBias, &clock, removeFn) defer pool.stop() pool.setLimits(10, uint64(10)) // Total capacity limit is 10 pool.setDefaultFactors(lps.PriceFactors{1, 0, 1}, lps.PriceFactors{1, 0, 1}) @@ -362,7 +362,7 @@ func TestFreeClientKickedOut(t *testing.T) { kicked = make(chan int, 100) ) removeFn := func(id enode.ID) { kicked <- int(id[0]) } - pool := newClientPool(db, 1, 1, activeBias, &clock, removeFn) + pool := newClientPool(db, 1, 1, defaultConnectedBias, &clock, removeFn) defer pool.stop() pool.setLimits(10, uint64(10)) // Total capacity limit is 10 pool.setDefaultFactors(lps.PriceFactors{1, 0, 1}, lps.PriceFactors{1, 0, 1}) @@ -403,7 +403,7 @@ func TestPositiveBalanceCalculation(t *testing.T) { kicked = make(chan int, 10) ) removeFn := func(id enode.ID) { kicked <- int(id[0]) } // Noop - pool := newClientPool(db, 1, 1, activeBias, &clock, removeFn) + pool := newClientPool(db, 1, 1, defaultConnectedBias, &clock, removeFn) defer pool.stop() pool.setLimits(10, uint64(10)) // Total capacity limit is 10 pool.setDefaultFactors(lps.PriceFactors{1, 0, 1}, lps.PriceFactors{1, 0, 1}) @@ -426,7 +426,7 @@ func TestDowngradePriorityClient(t *testing.T) { kicked = make(chan int, 10) ) removeFn := func(id enode.ID) { kicked <- int(id[0]) } // Noop - pool := newClientPool(db, 1, 1, activeBias, &clock, removeFn) + pool := newClientPool(db, 1, 1, defaultConnectedBias, &clock, removeFn) defer pool.stop() pool.setLimits(10, uint64(10)) // Total capacity limit is 10 pool.setDefaultFactors(lps.PriceFactors{1, 0, 1}, lps.PriceFactors{1, 0, 1}) @@ -460,7 +460,7 @@ func TestNegativeBalanceCalculation(t *testing.T) { clock mclock.Simulated db = rawdb.NewMemoryDatabase() ) - pool := newClientPool(db, 1, 1, activeBias, &clock, func(id enode.ID) {}) + pool := newClientPool(db, 1, 1, defaultConnectedBias, &clock, func(id enode.ID) {}) defer pool.stop() pool.setLimits(10, uint64(10)) // Total capacity limit is 10 pool.setDefaultFactors(lps.PriceFactors{1e-3, 0, 1}, lps.PriceFactors{1e-3, 0, 1}) @@ -495,7 +495,7 @@ func TestInactiveClient(t *testing.T) { clock mclock.Simulated db = rawdb.NewMemoryDatabase() ) - pool := newClientPool(db, 1, 1, activeBias, &clock, func(id enode.ID) {}) + pool := newClientPool(db, 1, 1, defaultConnectedBias, &clock, func(id enode.ID) {}) defer pool.stop() pool.setLimits(2, uint64(2)) diff --git a/les/lespay/server/prioritypool.go b/les/lespay/server/prioritypool.go index 4c66914df6ca..ab3fdb3e46b6 100644 --- a/les/lespay/server/prioritypool.go +++ b/les/lespay/server/prioritypool.go @@ -99,7 +99,7 @@ type PriorityPool struct { activeCount, activeCap uint64 maxCount, maxCap uint64 minCap uint64 - minBias time.Duration + activeBias time.Duration capacityStepDiv uint64 } @@ -126,7 +126,7 @@ type ppNodeInfo struct { } // NewPriorityPool creates a new PriorityPool -func NewPriorityPool(ns *nodestate.NodeStateMachine, setup PriorityPoolSetup, clock mclock.Clock, minCap uint64, minBias time.Duration, capacityStepDiv uint64) *PriorityPool { +func NewPriorityPool(ns *nodestate.NodeStateMachine, setup PriorityPoolSetup, clock mclock.Clock, minCap uint64, activeBias time.Duration, capacityStepDiv uint64) *PriorityPool { pp := &PriorityPool{ ns: ns, PriorityPoolSetup: setup, @@ -134,7 +134,7 @@ func NewPriorityPool(ns *nodestate.NodeStateMachine, setup PriorityPoolSetup, cl activeQueue: prque.NewLazyQueue(activeSetIndex, activePriority, activeMaxPriority, clock, lazyQueueRefresh), inactiveQueue: prque.New(inactiveSetIndex), minCap: minCap, - minBias: minBias, + activeBias: activeBias, capacityStepDiv: capacityStepDiv, } @@ -246,6 +246,15 @@ func (pp *PriorityPool) SetLimits(maxCount, maxCap uint64) { } } +// SetActiveBias sets the bias applied when trying to activate inactive nodes +func (pp *PriorityPool) SetActiveBias(bias time.Duration) { + pp.lock.Lock() + defer pp.lock.Unlock() + + pp.activeBias = bias + pp.tryActivate() +} + // ActiveCapacity returns the total capacity of currently active nodes func (pp *PriorityPool) ActiveCapacity() uint64 { pp.lock.Lock() @@ -452,7 +461,7 @@ func (pp *PriorityPool) tryActivate() []capUpdate { c := pp.inactiveQueue.PopItem().(*ppNodeInfo) pp.markForChange(c) pp.setCapacity(c, pp.minCap) - c.bias = pp.minBias + c.bias = pp.activeBias pp.activeQueue.Push(c) pp.enforceLimits() if c.capacity > 0 { diff --git a/les/server.go b/les/server.go index 68295b59d44c..0dfe008fccd4 100644 --- a/les/server.go +++ b/les/server.go @@ -117,7 +117,7 @@ func NewLesServer(node *node.Node, e *eth.Ethereum, config *eth.Config) (*LesSer srv.maxCapacity = totalRecharge } srv.fcManager.SetCapacityLimits(srv.freeCapacity, srv.maxCapacity, srv.freeCapacity*2) - srv.clientPool = newClientPool(srv.chainDb, srv.minCapacity, srv.freeCapacity, activeBias, mclock.System{}, func(id enode.ID) { go srv.peers.disconnect(id.String()) }) + srv.clientPool = newClientPool(srv.chainDb, srv.minCapacity, srv.freeCapacity, defaultConnectedBias, mclock.System{}, func(id enode.ID) { go srv.peers.disconnect(id.String()) }) srv.clientPool.setDefaultFactors(lps.PriceFactors{0, 1, 1}, lps.PriceFactors{0, 1, 1}) checkpoint := srv.latestLocalCheckpoint() diff --git a/les/test_helper.go b/les/test_helper.go index f5c21c781ff1..c9cb6a54af32 100644 --- a/les/test_helper.go +++ b/les/test_helper.go @@ -284,7 +284,7 @@ func newTestServerHandler(blocks int, indexers []*core.ChainIndexer, db ethdb.Da } server.costTracker, server.freeCapacity = newCostTracker(db, server.config) server.costTracker.testCostList = testCostList(0) // Disable flow control mechanism. - server.clientPool = newClientPool(db, testBufRecharge, testBufRecharge, activeBias, clock, func(id enode.ID) {}) + server.clientPool = newClientPool(db, testBufRecharge, testBufRecharge, defaultConnectedBias, clock, func(id enode.ID) {}) server.clientPool.setLimits(10000, 10000) // Assign enough capacity for clientpool server.handler = newServerHandler(server, simulation.Blockchain(), db, txpool, func() bool { return true }) if server.oracle != nil {