Skip to content

Commit

Permalink
get headers service
Browse files Browse the repository at this point in the history
  • Loading branch information
ucwong committed Aug 21, 2024
1 parent 034e770 commit fb3ebb0
Show file tree
Hide file tree
Showing 10 changed files with 213 additions and 98 deletions.
253 changes: 169 additions & 84 deletions ctxc/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ import (
const (
softResponseLimit = 2 * 1024 * 1024 // Target maximum size of returned blocks, headers or node data.
estHeaderRlpSize = 500 // Approximate size of an RLP encoded block header
// maxHeadersServe is the maximum number of block headers to serve. This number
// is there to limit the number of disk lookups.
maxHeadersServe = 1024

// txChanSize is the size of channel listening to NewTxsEvent.
// The number is referenced from the size of tx pool.
Expand Down Expand Up @@ -483,6 +486,170 @@ func (pm *ProtocolManager) handle(p *peer) error {
}
}

func serviceNonContiguousBlockHeaderQuery(chain *core.BlockChain, query *getBlockHeadersData, peer *peer) []rlp.RawValue {
hashMode := query.Origin.Hash != (common.Hash{})
first := true
maxNonCanonical := uint64(100)
// Gather headers until the fetch or network limits is reached
var (
bytes common.StorageSize
headers []rlp.RawValue
unknown bool
lookups int
)
for !unknown && len(headers) < int(query.Amount) && bytes < softResponseLimit &&
len(headers) < maxHeadersServe && lookups < 2*maxHeadersServe {
lookups++
// Retrieve the next header satisfying the query
var origin *types.Header
if hashMode {
if first {
first = false
origin = chain.GetHeaderByHash(query.Origin.Hash)
if origin != nil {
query.Origin.Number = origin.Number.Uint64()
}
} else {
origin = chain.GetHeader(query.Origin.Hash, query.Origin.Number)
}
} else {
origin = chain.GetHeaderByNumber(query.Origin.Number)
}
if origin == nil {
break
}
if rlpData, err := rlp.EncodeToBytes(origin); err != nil {
log.Crit("Unable to encode our own headers", "err", err)
} else {
headers = append(headers, rlp.RawValue(rlpData))
bytes += common.StorageSize(len(rlpData))
}
// Advance to the next header of the query
switch {
case hashMode && query.Reverse:
// Hash based traversal towards the genesis block
ancestor := query.Skip + 1
if ancestor == 0 {
unknown = true
} else {
query.Origin.Hash, query.Origin.Number = chain.GetAncestor(query.Origin.Hash, query.Origin.Number, ancestor, &maxNonCanonical)
unknown = (query.Origin.Hash == common.Hash{})
}
case hashMode && !query.Reverse:
// Hash based traversal towards the leaf block
var (
current = origin.Number.Uint64()
next = current + query.Skip + 1
)
if next <= current {
infos, _ := json.MarshalIndent(peer.Peer.Info(), "", " ")
peer.Log().Warn("GetBlockHeaders skip overflow attack", "current", current, "skip", query.Skip, "next", next, "attacker", infos)
unknown = true
} else {
if header := chain.GetHeaderByNumber(next); header != nil {
nextHash := header.Hash()
expOldHash, _ := chain.GetAncestor(nextHash, next, query.Skip+1, &maxNonCanonical)
if expOldHash == query.Origin.Hash {
query.Origin.Hash, query.Origin.Number = nextHash, next
} else {
unknown = true
}
} else {
unknown = true
}
}
case query.Reverse:
// Number based traversal towards the genesis block
if query.Origin.Number >= query.Skip+1 {
query.Origin.Number -= query.Skip + 1
} else {
unknown = true
}
case !query.Reverse:
// Number based traversal towards the leaf block
query.Origin.Number += query.Skip + 1
}
}
return headers
}

func serviceContiguousBlockHeaderQuery(chain *core.BlockChain, query *getBlockHeadersData) []rlp.RawValue {
count := query.Amount
if count > maxHeadersServe {
count = maxHeadersServe
}
if query.Origin.Hash == (common.Hash{}) {
// Number mode, just return the canon chain segment. The backend
// delivers in [N, N-1, N-2..] descending order, so we need to
// accommodate for that.
from := query.Origin.Number
if !query.Reverse {
from = from + count - 1
}
headers := chain.GetHeadersFrom(from, count)
if !query.Reverse {
for i, j := 0, len(headers)-1; i < j; i, j = i+1, j-1 {
headers[i], headers[j] = headers[j], headers[i]
}
}
return headers
}
// Hash mode.
var (
headers []rlp.RawValue
hash = query.Origin.Hash
header = chain.GetHeaderByHash(hash)
)
if header != nil {
rlpData, _ := rlp.EncodeToBytes(header)
headers = append(headers, rlpData)
} else {
// We don't even have the origin header
return headers
}
num := header.Number.Uint64()
if !query.Reverse {
// Theoretically, we are tasked to deliver header by hash H, and onwards.
// However, if H is not canon, we will be unable to deliver any descendants of
// H.
if canonHash := chain.GetCanonicalHash(num); canonHash != hash {
// Not canon, we can't deliver descendants
return headers
}
descendants := chain.GetHeadersFrom(num+count-1, count-1)
for i, j := 0, len(descendants)-1; i < j; i, j = i+1, j-1 {
descendants[i], descendants[j] = descendants[j], descendants[i]
}
headers = append(headers, descendants...)
return headers
}
{ // Last mode: deliver ancestors of H
for i := uint64(1); i < count; i++ {
header = chain.GetHeaderByHash(header.ParentHash)
if header == nil {
break
}
rlpData, _ := rlp.EncodeToBytes(header)
headers = append(headers, rlpData)
}
return headers
}
}

// ServiceGetBlockHeadersQuery assembles the response to a header query. It is
// exposed to allow external packages to test protocol behavior.
func ServiceGetBlockHeadersQuery(chain *core.BlockChain, query *getBlockHeadersData, peer *peer) []rlp.RawValue {
if query.Amount == 0 {
return nil
}
if query.Skip == 0 {
// The fast path: when the request is for a contiguous segment of headers.
return serviceContiguousBlockHeaderQuery(chain, query)
} else {
return serviceNonContiguousBlockHeaderQuery(chain, query, peer)
}
}

// handleMsg is invoked whenever an inbound message is received from a remote
// peer. The remote connection is torn down upon returning any error.
func (pm *ProtocolManager) handleMsg(p *peer) error {
Expand All @@ -509,91 +676,9 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
if err := msg.Decode(&query); err != nil {
return errResp(ErrDecode, "%v: %v", msg, err)
}
hashMode := query.Origin.Hash != (common.Hash{})
first := true
maxNonCanonical := uint64(100)

// Gather headers until the fetch or network limits is reached
var (
bytes common.StorageSize
headers []rlp.RawValue
unknown bool
)
for !unknown && len(headers) < int(query.Amount) && bytes < softResponseLimit && len(headers) < downloader.MaxHeaderFetch {
// Retrieve the next header satisfying the query
var origin *types.Header
if hashMode {
if first {
first = false
origin = pm.blockchain.GetHeaderByHash(query.Origin.Hash)
if origin != nil {
query.Origin.Number = origin.Number.Uint64()
}
} else {
origin = pm.blockchain.GetHeader(query.Origin.Hash, query.Origin.Number)
}
} else {
origin = pm.blockchain.GetHeaderByNumber(query.Origin.Number)
}
if origin == nil {
break
}

if rlpData, err := rlp.EncodeToBytes(origin); err != nil {
log.Crit("Unable to decode our own headers", "err", err)
} else {
headers = append(headers, rlp.RawValue(rlpData))
bytes += common.StorageSize(len(rlpData))
}

// Advance to the next header of the query
switch {
case hashMode && query.Reverse:
// Hash based traversal towards the genesis block
ancestor := query.Skip + 1
if ancestor == 0 {
unknown = true
} else {
query.Origin.Hash, query.Origin.Number = pm.blockchain.GetAncestor(query.Origin.Hash, query.Origin.Number, ancestor, &maxNonCanonical)
unknown = (query.Origin.Hash == common.Hash{})
}
case hashMode && !query.Reverse:
// Hash based traversal towards the leaf block
var (
current = origin.Number.Uint64()
next = current + query.Skip + 1
)
if next <= current {
infos, _ := json.MarshalIndent(p.Peer.Info(), "", " ")
p.Log().Warn("GetBlockHeaders skip overflow attack", "current", current, "skip", query.Skip, "next", next, "attacker", infos)
unknown = true
} else {
if header := pm.blockchain.GetHeaderByNumber(next); header != nil {
nextHash := header.Hash()
expOldHash, _ := pm.blockchain.GetAncestor(nextHash, next, query.Skip+1, &maxNonCanonical)
if expOldHash == query.Origin.Hash {
query.Origin.Hash, query.Origin.Number = nextHash, next
} else {
unknown = true
}
} else {
unknown = true
}
}
case query.Reverse:
// Number based traversal towards the genesis block
if query.Origin.Number >= query.Skip+1 {
query.Origin.Number -= query.Skip + 1
} else {
unknown = true
}

case !query.Reverse:
// Number based traversal towards the leaf block
query.Origin.Number += query.Skip + 1
}
}
return p.SendBlockHeaders(headers)
response := ServiceGetBlockHeadersQuery(pm.blockchain, &query, p)
return p.SendBlockHeaders(response)

case msg.Code == ctxc.BlockHeadersMsg:
// A batch of headers arrived to one of our previous requests
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ require (
github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.4.0
github.com/CortexFoundation/inference v1.0.2-0.20230307032835-9197d586a4e8
github.com/CortexFoundation/statik v0.0.0-20210315012922-8bb8a7b5dc66
github.com/CortexFoundation/torrentfs v1.0.68-0.20240820152357-a6ca6adae8f2
github.com/CortexFoundation/torrentfs v1.0.68-0.20240821125451-fb71840dd86b
github.com/VictoriaMetrics/fastcache v1.12.2
github.com/arsham/figurine v1.3.0
github.com/aws/aws-sdk-go-v2 v1.30.4
Expand Down Expand Up @@ -232,7 +232,7 @@ require (
github.com/xujiajun/utils v0.0.0-20220904132955-5f7c5b914235 // indirect
github.com/yusufpapurcu/wmi v1.2.4 // indirect
github.com/zeebo/xxh3 v1.0.3-0.20230502181907-3808c706a06a // indirect
go.etcd.io/bbolt v1.3.10 // indirect
go.etcd.io/bbolt v1.3.11 // indirect
go.opencensus.io v0.24.0 // indirect
go.opentelemetry.io/otel v1.28.0 // indirect
go.opentelemetry.io/otel/metric v1.28.0 // indirect
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,8 @@ github.com/CortexFoundation/statik v0.0.0-20210315012922-8bb8a7b5dc66/go.mod h1:
github.com/CortexFoundation/torrentfs v1.0.13-0.20200623060705-ce027f43f2f8/go.mod h1:Ma+tGhPPvz4CEZHaqEJQMOEGOfHeQBiAoNd1zyc/w3Q=
github.com/CortexFoundation/torrentfs v1.0.14-0.20200703071639-3fcabcabf274/go.mod h1:qnb3YlIJmuetVBtC6Lsejr0Xru+1DNmDCdTqnwy7lhk=
github.com/CortexFoundation/torrentfs v1.0.20-0.20200810031954-d36d26f82fcc/go.mod h1:N5BsicP5ynjXIi/Npl/SRzlJ630n1PJV2sRj0Z0t2HA=
github.com/CortexFoundation/torrentfs v1.0.68-0.20240820152357-a6ca6adae8f2 h1://r8A2+SuR2FHwL0sUZ9TVkHhBqBMT6AAzjs6/f92Fg=
github.com/CortexFoundation/torrentfs v1.0.68-0.20240820152357-a6ca6adae8f2/go.mod h1:U8sGySmc4Pf5sboRXmPqyQxs0yUIjkiWp8vsT23vWPY=
github.com/CortexFoundation/torrentfs v1.0.68-0.20240821125451-fb71840dd86b h1:/arIJ6MHfBtiPxcIAzRhsQxH41SpzbLC1Zhj/XfsXjo=
github.com/CortexFoundation/torrentfs v1.0.68-0.20240821125451-fb71840dd86b/go.mod h1:U8sGySmc4Pf5sboRXmPqyQxs0yUIjkiWp8vsT23vWPY=
github.com/CortexFoundation/wormhole v0.0.2-0.20240624201423-33e289eb7662 h1:rmM5WDx5UX7V4LF1D8LtAOPDzcCKulpZ++NkP8/+Ook=
github.com/CortexFoundation/wormhole v0.0.2-0.20240624201423-33e289eb7662/go.mod h1:ipzmPabDgzYKUbXkGVe2gTkBEp+MsDx6pXGiuYzmP6s=
github.com/DATA-DOG/go-sqlmock v1.3.3/go.mod h1:f/Ixk793poVmq4qj/V1dPUg2JEAKC73Q5eFN3EC/SaM=
Expand Down Expand Up @@ -1297,8 +1297,8 @@ go.etcd.io/bbolt v1.3.3/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU=
go.etcd.io/bbolt v1.3.4/go.mod h1:G5EMThwa9y8QZGBClrRx5EY+Yw9kAhnjy3bSjsnlVTQ=
go.etcd.io/bbolt v1.3.5-0.20200424005604-a8af23b57f67/go.mod h1:G5EMThwa9y8QZGBClrRx5EY+Yw9kAhnjy3bSjsnlVTQ=
go.etcd.io/bbolt v1.3.5/go.mod h1:G5EMThwa9y8QZGBClrRx5EY+Yw9kAhnjy3bSjsnlVTQ=
go.etcd.io/bbolt v1.3.10 h1:+BqfJTcCzTItrop8mq/lbzL8wSGtj94UO/3U31shqG0=
go.etcd.io/bbolt v1.3.10/go.mod h1:bK3UQLPJZly7IlNmV7uVHJDxfe5aK9Ll93e/74Y9oEQ=
go.etcd.io/bbolt v1.3.11 h1:yGEzV1wPz2yVCLsD8ZAiGHhHVlczyC9d1rP43/VCRJ0=
go.etcd.io/bbolt v1.3.11/go.mod h1:dksAq7YMXoljX0xu6VF5DMZGbhYYoLUalEiSySYAS4I=
go.etcd.io/etcd v0.0.0-20191023171146-3cf2f69b5738/go.mod h1:dnLIgRNXwCJa5e+c6mIZCrds/GIG4ncV9HhK5PX7jPg=
go.opencensus.io v0.18.0/go.mod h1:vKdFvxhtzZ9onBp9VKHK8z/sRpBMnKAsufL7wlDrCOA=
go.opencensus.io v0.20.1/go.mod h1:6WKK9ahsWS3RSO+PY9ZHZUfv2irvY6gN279GOPZjmmk=
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion vendor/go.etcd.io/bbolt/.go-version

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 13 additions & 0 deletions vendor/go.etcd.io/bbolt/Makefile

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 5 additions & 3 deletions vendor/go.etcd.io/bbolt/db.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit fb3ebb0

Please sign in to comment.