From 392015bfaf15c923344e2f5f134717669569349d Mon Sep 17 00:00:00 2001 From: kdeme Date: Thu, 1 Feb 2024 17:35:59 +0100 Subject: [PATCH] Add Era1File object and getter for specific block tuple --- fluffy/eth_data/era1.nim | 189 ++++++++++++++++++++++++++++++++++++++- 1 file changed, 185 insertions(+), 4 deletions(-) diff --git a/fluffy/eth_data/era1.nim b/fluffy/eth_data/era1.nim index 32fbc3f039..3d794d654b 100644 --- a/fluffy/eth_data/era1.nim +++ b/fluffy/eth_data/era1.nim @@ -9,7 +9,7 @@ import std/[strformat, typetraits], - results, stew/[endians2, io2, byteutils], + results, stew/[endians2, io2, byteutils, arrayops], stint, snappy, eth/common/eth_types_rlp, beacon_chain/spec/beacon_time, @@ -92,6 +92,41 @@ proc appendHeader(f: IoHandle, typ: Type, dataLen: int): Result[int64, string] = ok(start) +proc checkBytesLeft(f: IoHandle, expected: int64): Result[void, string] = + let size = ? getFileSize(f).mapErr(toString) + if expected > size: + return err("Record extends past end of file") + + let pos = ? getFilePos(f).mapErr(toString) + if expected > size - pos: + return err("Record extends past end of file") + + ok() + +proc readFileExact(f: IoHandle, buf: var openArray[byte]): Result[void, string] = + if (? f.readFile(buf).mapErr(toString)) != buf.len().uint: + return err("missing data") + ok() + +proc readHeader(f: IoHandle): Result[Header, string] = + var buf: array[10, byte] + ? readFileExact(f, buf.toOpenArray(0, 7)) + + var + typ: Type + discard typ.copyFrom(buf) + + # Conversion safe because we had only 4 bytes of length data + let len = (uint32.fromBytesLE(buf.toOpenArray(2, 5))).int64 + + # No point reading these.. + if len > int.high(): return err("header length exceeds int.high") + + # Must have at least that much data, or header is invalid + ? f.checkBytesLeft(len) + + ok(Header(typ: typ, len: int(len))) + ## Following types & procs are era1 specific type @@ -103,9 +138,9 @@ type # As stated, not really a time unit but nevertheless, need the borrows ethTimeUnit Era1 -# Note: appendIndex and appendRecord for BlockIndex are only different from -# its consensus layer counter parts because of usage of slot vs blockNumber. -# In practise, they do the same thing. +# Note: appendIndex, appendRecord and readIndex for BlockIndex are only +# different from its consensus layer counter parts because of usage of slot vs +# blockNumber. In practise, they do the same thing. proc appendIndex*( f: IoHandle, startNumber: uint64, offsets: openArray[int64]): Result[int64, string] = @@ -125,6 +160,45 @@ proc appendIndex*( proc appendRecord(f: IoHandle, index: BlockIndex): Result[int64, string] = f.appendIndex(index.startNumber, index.offsets) +proc readBlockIndex*(f: IoHandle): Result[BlockIndex, string] = + let + startPos = ? f.getFilePos().mapErr(toString) + fileSize = ? f.getFileSize().mapErr(toString) + header = ? f.readHeader() + + if header.typ != E2BlockIndex: return err("not an index") + if header.len < 16: return err("index entry too small") + if header.len mod 8 != 0: return err("index length invalid") + + var buf: array[8, byte] + ? f.readFileExact(buf) + let + blockNumber = uint64.fromBytesLE(buf) + count = header.len div 8 - 2 + + var offsets = newSeqUninitialized[int64](count) + for i in 0.. fileSize: return err("Invalid offset") + offsets[i] = absolute + + ? f.readFileExact(buf) + if uint64(count) != uint64.fromBytesLE(buf): return err("invalid count") + + # technically not an error, but we'll throw this sanity check in here.. + if blockNumber > int32.high().uint64: return err("fishy slot") + + ok(BlockIndex(startNumber: blockNumber, offsets: offsets)) + proc toCompressedRlpBytes(item: auto): seq[byte] = snappy.encodeFramed(rlp.encode(item)) @@ -191,3 +265,110 @@ func era1FileName*(network: string, era: Era1, eraRoot: Digest): string = &"{network}-{era.uint64:05}-{shortLog(eraRoot)}.era1" except ValueError as exc: raiseAssert exc.msg + +# Helpers to directly read objects from era1 files +# TODO: Might want to var parameters to avoid copying as is done for era files. + +type + Era1File* = ref object + handle: Opt[IoHandle] + blockIdx: BlockIndex + +proc open*(_: type Era1File, name: string): Result[Era1File, string] = + var + f = Opt[IoHandle].ok(? openFile(name, {OpenFlags.Read}).mapErr(ioErrorMsg)) + + defer: + if f.isSome(): discard closeFile(f[]) + + # Indices can be found at the end of each era file - we only support + # single-era files for now + ? f[].setFilePos(0, SeekPosition.SeekEnd).mapErr(ioErrorMsg) + + # Last in the file is the block index + let + blockIdxPos = ? f[].findIndexStartOffset() + ? f[].setFilePos(blockIdxPos, SeekPosition.SeekCurrent).mapErr(ioErrorMsg) + + let + blockIdx = ? f[].readBlockIndex() + if blockIdx.offsets.len() != 8192: + return err("Block index length invalid") + + let res = Era1File(handle: f, blockIdx: blockIdx) + reset(f) + ok res + +proc close*(f: Era1File) = + if f.handle.isSome(): + discard closeFile(f.handle.get()) + reset(f.handle) + +proc getBlockHeader(f: Era1File): Result[BlockHeader, string] = + var bytes: seq[byte] + + let header = ? f[].handle.get().readRecord(bytes) + if header.typ != CompressedHeader: + return err("Invalid era file: didn't find block header at index position") + + try: + ok(rlp.decode(decodeFramed(bytes, checkIntegrity = false), BlockHeader)) + except RlpError as e: + err("Invalid block header" & e.msg) + +proc getBlockBody(f: Era1File): Result[BlockBody, string] = + var bytes: seq[byte] + + let header = ? f[].handle.get().readRecord(bytes) + if header.typ != CompressedBody: + return err("Invalid era file: didn't find block body at index position") + + try: + ok(rlp.decode(decodeFramed(bytes, checkIntegrity = false), BlockBody)) + except RlpError as e: + err("Invalid block body" & e.msg) + +proc getReceipts(f: Era1File): Result[seq[Receipt], string] = + var bytes: seq[byte] + + let header = ? f[].handle.get().readRecord(bytes) + if header.typ != CompressedReceipts: + return err("Invalid era file: didn't find receipts at index position") + + try: + ok(rlp.decode(decodeFramed(bytes, checkIntegrity = false), seq[Receipt])) + except RlpError as e: + err("Invalid receipts" & e.msg) + +proc getTotalDifficulty(f: Era1File): Result[UInt256, string] = + var bytes: seq[byte] + + let header = ? f[].handle.get().readRecord(bytes) + if header.typ != TotalDifficulty: + return err("Invalid era file: didn't find total difficulty at index position") + + if bytes.len != 32: + return err("Invalid total difficulty length") + + ok(UInt256.fromBytesLE(bytes)) + +proc getBlockTuple*( + f: Era1File, blockNumber: uint64 + ): Result[(BlockHeader, BlockBody, seq[Receipt], UInt256), string] = + doAssert not isNil(f) and f[].handle.isSome + doAssert( + blockNumber >= f[].blockIdx.startNumber and + blockNumber < f[].blockIdx.startNumber + 8192, + "Wrong era1 file for selected block number") + + let pos = f[].blockIdx.offsets[blockNumber - f[].blockIdx.startNumber] + + ? f[].handle.get().setFilePos(pos, SeekPosition.SeekBegin).mapErr(ioErrorMsg) + + let + blockHeader = ? getBlockHeader(f) + blockBody = ? getBlockBody(f) + receipts = ? getReceipts(f) + totalDifficulty = ? getTotalDifficulty(f) + + ok((blockHeader, blockBody, receipts, totalDifficulty))