diff --git a/.github/workflows/integration-test.yml b/.github/workflows/integration-test.yml new file mode 100644 index 00000000..d24796fb --- /dev/null +++ b/.github/workflows/integration-test.yml @@ -0,0 +1,65 @@ +name: Integration Test +on: + schedule: + - cron: '1 8 * * 0,2,4' + +jobs: + es-node-integration-test: + runs-on: self-hosted + timeout-minutes: 2880 + + steps: + - uses: actions/checkout@v3 + + - name: Set up Go + uses: actions/setup-go@v4 + with: + go-version: '1.21' + + - name: Use Node.js + uses: actions/setup-node@v3 + with: + node-version: '20.x' + + - name: Install Foundry + uses: foundry-rs/foundry-toolchain@v1 + with: + version: nightly + + - name: Deploy Contracts + run: | + cd ../storage-contracts-v1 + git checkout main + git pull + npm install + git submodule init && git submodule update + npx hardhat run scripts/deployL2-it.js --network qkc_testnet >> deploy.log + echo ES_NODE_CONTRACT_ADDRESS=`cat .caddr` >> "$GITHUB_ENV" + + - name: Build and Run Bootnode Node + run: | + cd ../es-node + make + ./run-l2-it-rpc.sh > es-node-it-bootnode.log& + + - name: Set ENV Parameters + run: | + echo ES_NODE_UPLOADER_PRIVATE_KEY=`cat ../uploader.key` >> "$GITHUB_ENV" + echo ES_NODE_SIGNER_PRIVATE_KEY=`cat ../private.key` >> "$GITHUB_ENV" + echo ES_NODE_STORAGE_MINER=0x5C935469C5592Aeeac3372e922d9bCEabDF8830d >> "$GITHUB_ENV" + + - name: Upload Blobs + run: | + cd ./integration_tests/scripts + npm install --force + node ituploader.js 10800 true > upload.log + cp .data ../../cmd/integration-test-server/.data + + - name: Test + run: | + ./run-l2-it.sh > es-node-it.log& + cd ./integration_tests/scripts + node ituploader.js 12288 false > upload2.log& + cd ../../cmd/integration-test-server + go build + ./integration-test-server --contract_addr $ES_NODE_CONTRACT_ADDRESS > itserver.log diff --git a/cmd/integration-test-server/main.go b/cmd/integration-test-server/main.go new file mode 100644 index 00000000..c3456925 --- /dev/null +++ b/cmd/integration-test-server/main.go @@ -0,0 +1,317 @@ +// Copyright 2022-2023, EthStorage. +// For license information, see https://github.com/ethstorage/es-node/blob/main/LICENSE + +package main + +import ( + "bufio" + "bytes" + "encoding/json" + "errors" + "flag" + "fmt" + "io" + "math" + "net/http" + "os" + "time" + + "github.com/crate-crypto/go-proto-danksharding-crypto/eth" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/common/hexutil" + "github.com/ethereum/go-ethereum/crypto/kzg4844" + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/rpc" + "github.com/ethstorage/go-ethstorage/cmd/es-utils/utils" + es "github.com/ethstorage/go-ethstorage/ethstorage" + "github.com/ethstorage/go-ethstorage/ethstorage/node" + prv "github.com/ethstorage/go-ethstorage/ethstorage/prover" +) + +const ( + expectedSaidHelloTime = 10 * time.Minute + expectedStateRefreshTime = 5 * time.Minute + executionTime = 2 * time.Hour + + kvEntries = 8192 + kvSize = 32 * 4096 + dataSize = 31 * 4096 + + rpcEndpoint = "http://127.0.0.1:9595" + uploadedDataFile = ".data" + shardFile0 = "../../es-data-it/shard-0.dat" + shardFile1 = "../../es-data-it/shard-1.dat" +) + +var ( + portFlag = flag.Int("port", 9096, "Listener port for the es-node to report node status") + contractAddr = flag.String("contract_addr", "", "EthStorage contract address") +) + +var ( + errorMessages = make([]string, 0) + lastQueryTime = time.Now() + lastRecord *node.NodeState + hasConnectedPeer = false + testLog = log.New("IntegrationTest") + prover = prv.NewKZGProver(testLog) + contractAddress = common.Address{} +) + +func HelloHandler(w http.ResponseWriter, r *http.Request) { + body, err := io.ReadAll(r.Body) + if err != nil { + addErrorMessage(fmt.Sprintf("Read Hello body failed with error %s", err.Error())) + return + } + log.Info("Get hello from node", "id", string(body)) + + if time.Since(lastQueryTime) > expectedSaidHelloTime { + addErrorMessage(fmt.Sprintf("Get Hello message later then expect time %v real value %v", expectedSaidHelloTime, time.Since(lastQueryTime))) + } + lastQueryTime = time.Now() + log.Info("Get Hello request from es-node", "id", string(body)) + + answer := `{"status":"ok"}` + w.Write([]byte(answer)) +} + +func ReportStateHandler(w http.ResponseWriter, r *http.Request) { + if time.Since(lastQueryTime) > expectedStateRefreshTime*2 { + addErrorMessage(fmt.Sprintf("Get Hello message later then expect time %v real value %v", + expectedStateRefreshTime*2, time.Since(lastQueryTime))) + } + lastQueryTime = time.Now() + + body, err := io.ReadAll(r.Body) + if err != nil { + addErrorMessage(fmt.Sprintf("Read ReportState body failed with error %s", err.Error())) + return + } + + state := &node.NodeState{} + err = json.Unmarshal(body, state) + if err != nil { + addErrorMessage(fmt.Sprintf("Parse node state failed with error %s", err.Error())) + w.Write([]byte(fmt.Sprintf(`{"status":"error", "err message":"%s"}`, err.Error()))) + return + } + + // If no state updated + log.Info("Get state from peer", "peer id", state.Id, "Version", state.Version) + if lastRecord != nil { + checkState(lastRecord, state) + } + + lastRecord = state + + w.Write([]byte(`{"status":"ok"}`)) +} + +func checkState(oldState, newState *node.NodeState) { + if len(oldState.Shards) != len(newState.Shards) { + addErrorMessage(fmt.Sprintf("shards count mismatch between two state, new %d, old %d", len(newState.Shards), len(oldState.Shards))) + return + } + + for _, shardState := range newState.Shards { + check := false + for _, oldShardState := range oldState.Shards { + if shardState.ShardId != oldShardState.ShardId { + continue + } + check = true + if shardState.SyncState.PeerCount > 0 { + hasConnectedPeer = true + } + + if oldShardState.SyncState.SyncProgress < 10000 && + (shardState.SyncState.BlobsSynced <= oldShardState.SyncState.BlobsSynced || + shardState.SyncState.SyncProgress <= oldShardState.SyncState.SyncProgress) { + addErrorMessage(fmt.Sprintf("es-node sync progress do not increase in %f minutes, "+ + "old synced: %d, new synced %d; old progress: %d, new progress: %d", expectedStateRefreshTime.Minutes(), oldShardState.SyncState.BlobsSynced, + shardState.SyncState.BlobsSynced, oldShardState.SyncState.SyncProgress, shardState.SyncState.SyncProgress)) + } + if oldShardState.SyncState.FillEmptyProgress < 10000 && + (shardState.SyncState.EmptyFilled < oldShardState.SyncState.EmptyFilled || + shardState.SyncState.FillEmptyProgress < oldShardState.SyncState.FillEmptyProgress) { + addErrorMessage(fmt.Sprintf("es-node fill empty progress do not increase in %f minutes, "+ + "old filled: %d, new filled %d; old progress: %d, new progress: %d", expectedStateRefreshTime.Minutes(), oldShardState.SyncState.EmptyFilled, + shardState.SyncState.EmptyFilled, oldShardState.SyncState.FillEmptyProgress, shardState.SyncState.FillEmptyProgress)) + } + + if oldShardState.SyncState.FillEmptyProgress == 10000 && oldShardState.SyncState.SyncProgress == 10000 && + (shardState.MiningState.MiningPower == 0 || shardState.MiningState.SamplingTime == 0) { + addErrorMessage("Mining should be start after sync done.") + } + } + if !check { + addErrorMessage(fmt.Sprintf("Shard %d in the new state do not exist in the old state", shardState.ShardId)) + } + } +} + +func checkFinalState(state *node.NodeState) { + if state == nil { + addErrorMessage("No state submitted during the test") + return + } + if !hasConnectedPeer { + addErrorMessage("es-node peer count should larger than 0") + } + + log.Info("Final state", "id", state.Id, "version", state.Version) + for _, shardState := range state.Shards { + if shardState.SyncState.SyncProgress != 10000 { + addErrorMessage("Sync should be finished during the test") + } + if shardState.SyncState.FillEmptyProgress != 10000 { + addErrorMessage("Fill should be finished during the test") + } + if shardState.MiningState.SamplingTime == 0 || shardState.MiningState.MiningPower == 0 { + addErrorMessage("Mining should be start after sync done.") + } + if shardState.SubmissionState.LastSucceededTime == 0 || shardState.SubmissionState.Succeeded == 0 { + addErrorMessage("At lease one block should be mined successfully during the test.") + } + if shardState.SubmissionState.Failed > 0 { + addErrorMessage(fmt.Sprintf("%d submission failed during the test.", shardState.SubmissionState.Failed)) + } + log.Info("Final state", "id", state.Id, "shard", shardState.ShardId, "miner", shardState.Miner, "sync progress", + shardState.SyncState.SyncProgress, "fill progress", shardState.SyncState.FillEmptyProgress, "mining power", + shardState.MiningState.MiningPower, "sampling time", shardState.MiningState.SamplingTime, "succeeded submission", + shardState.SubmissionState.Succeeded, "failed submission", shardState.SubmissionState.Failed, "dropped submission", + shardState.SubmissionState.Dropped, "last succeeded time", shardState.SubmissionState.LastSucceededTime) + } +} + +func createShardManager() (*es.ShardManager, error) { + sm := es.NewShardManager(contractAddress, kvSize, kvEntries, kvSize) + df0, err := es.OpenDataFile(shardFile0) + if err != nil { + return nil, err + } + err = sm.AddDataFileAndShard(df0) + if err != nil { + return nil, err + } + + df1, err := es.OpenDataFile(shardFile1) + if err != nil { + return nil, err + } + err = sm.AddDataFileAndShard(df1) + if err != nil { + return nil, err + } + + return sm, nil +} + +func verifyData() error { + file, err := os.OpenFile(uploadedDataFile, os.O_RDONLY, 0755) + if err != nil { + return err + } + defer file.Close() + + fileScanner := bufio.NewScanner(file) + fileScanner.Buffer(make([]byte, dataSize*2), kvSize*2) + fileScanner.Split(bufio.ScanLines) + + sm, err := createShardManager() + if err != nil { + return err + } + + client, err := rpc.DialHTTP(rpcEndpoint) + if err != nil { + return err + } + defer client.Close() + + i := uint64(0) + for fileScanner.Scan() { + expectedData := common.Hex2Bytes(fileScanner.Text()) + blob := utils.EncodeBlobs(expectedData)[0] + commit, _, _ := sm.TryReadMeta(i) + data, _, err := sm.TryRead(i, kvSize, common.BytesToHash(commit)) + if err != nil { + return errors.New(fmt.Sprintf("read %d from shard fail with err: %s", i, err.Error())) + } + if bytes.Compare(blob[:], data) != 0 { + return errors.New(fmt.Sprintf("compare shard data %d fail, expected data %s; data: %s", + i, common.Bytes2Hex(blob[:64]), common.Bytes2Hex(data[:64]))) + } + + rpcdata, err := downloadBlobFromRPC(client, i, common.BytesToHash(commit)) + if err != nil { + return errors.New(fmt.Sprintf("get data %d from rpc fail with err: %s", i, err.Error())) + } + if bytes.Compare(blob[:], rpcdata) != 0 { + return errors.New(fmt.Sprintf("compare rpc data %d fail, expected data %s; data: %s", + i, common.Bytes2Hex(blob[:64]), common.Bytes2Hex(rpcdata[:64]))) + } + i++ + } + return nil +} + +func downloadBlobFromRPC(client *rpc.Client, kvIndex uint64, hash common.Hash) ([]byte, error) { + var result hexutil.Bytes + err := client.Call(&result, "es_getBlob", kvIndex, hash, 0, 0, 4096*32) + if err != nil { + return nil, err + } + + var blob kzg4844.Blob + copy(blob[:], result) + commit, err := kzg4844.BlobToCommitment(blob) + if err != nil { + return nil, fmt.Errorf("blobToCommitment failed: %w", err) + } + cmt := common.Hash(eth.KZGToVersionedHash(commit)) + if bytes.Compare(cmt[:es.HashSizeInContract], hash[:es.HashSizeInContract]) != 0 { + return nil, fmt.Errorf("invalid blob for %d hash: %s, commit: %s", kvIndex, hash, cmt) + } + + return result, nil +} + +func addErrorMessage(errMessage string) { + log.Warn("Add error message", "msg", errMessage) + errorMessages = append(errorMessages, errMessage+"\n") +} + +func listenAndServe(port int) error { + http.HandleFunc("/hello", HelloHandler) + http.HandleFunc("/reportstate", ReportStateHandler) + return http.ListenAndServe(fmt.Sprintf(":%d", port), nil) +} + +func main() { + // Parse the flags and set up the logger to print everything requested + flag.Parse() + log.Root().SetHandler(log.LvlFilterHandler(log.Lvl(3), log.StreamHandler(os.Stderr, log.TerminalFormat(true)))) + + if *portFlag < 0 || *portFlag > math.MaxUint16 { + log.Crit("Invalid port") + } + if *contractAddr == "" { + log.Crit("Invalid contract address") + } else { + contractAddress = common.HexToAddress(*contractAddr) + } + + go listenAndServe(*portFlag) + + time.Sleep(executionTime) + checkFinalState(lastRecord) + if err := verifyData(); err != nil { + addErrorMessage(err.Error()) + } + + if len(errorMessages) > 0 { + log.Crit(fmt.Sprintf("integration test fail %v", errorMessages)) + } +} diff --git a/ethstorage/p2p/protocol/syncclient.go b/ethstorage/p2p/protocol/syncclient.go index e53dfa47..93972302 100644 --- a/ethstorage/p2p/protocol/syncclient.go +++ b/ethstorage/p2p/protocol/syncclient.go @@ -895,12 +895,9 @@ func (s *SyncClient) assignFillEmptyBlobTasks() { s.notifyUpdate() s.wg.Done() }() - t := time.Now() next, err := s.FillFileWithEmptyBlob(start, limit) if err != nil { log.Warn("Fill in empty fail", "err", err.Error()) - } else { - log.Debug("Fill in empty done", "time", time.Now().Sub(t).Seconds()) } filled := next - start diff --git a/init.sh b/init.sh index b99abdbe..9608865d 100755 --- a/init.sh +++ b/init.sh @@ -13,6 +13,7 @@ zkp_impl=1 # ZK prover mode, 1: one proof per sample, 2: one proof for multiple samples. # Note: currently only zk prover mode 2 is supported zkp_mode=2 +data_dir="./es-data" remaining_args="" shards="--shard_index 0" @@ -24,6 +25,9 @@ while [ $# -gt 0 ]; do elif [[ $1 == --miner.zk-prover-mode ]]; then zkp_mode=$2 shift 2 + elif [[ $1 == --datadir ]]; then + data_dir=$2 + shift 2 else if [[ $1 == --shard_index ]]; then shards="" @@ -103,8 +107,6 @@ if [ "$zkp_impl" = 1 ]; then fi -data_dir="./es-data" - es_node_init="$executable init $shards \ --datadir $data_dir \ --l1.rpc http://88.99.30.186:8545 \ diff --git a/integration_tests/scripts/ituploader.js b/integration_tests/scripts/ituploader.js new file mode 100644 index 00000000..fe275444 --- /dev/null +++ b/integration_tests/scripts/ituploader.js @@ -0,0 +1,96 @@ +const {ethers, Contract} = require("ethers"); +const crypto = require('crypto'); +const {EthStorage} = require("ethstorage-sdk"); +const core = require('@actions/core'); +const fs = require('fs'); + +const dotenv = require("dotenv") +dotenv.config() +const privateKey = process.env.ES_NODE_UPLOADER_PRIVATE_KEY; +const contractAddr = process.env.ES_NODE_CONTRACT_ADDRESS; +const RPC = 'http://65.109.20.29:8545'; +const contractABI = [ + "function lastKvIdx() public view returns (uint40)" +] + +const provider = new ethers.JsonRpcProvider(RPC); +const contract = new Contract(contractAddr, contractABI, provider); +const MAX_BLOB = BigInt(process.argv[2]); +const BATCH_SIZE = 6n; +const NEED_WAIT = (process.argv[3] === 'true'); + +async function UploadBlobsForIntegrationTest() { + // put blobs + console.log(contractAddr) + const es = await EthStorage.create({ + rpc: RPC, + privateKey, + address: contractAddr + }) + while (true) { + const currentIndex = await contract.lastKvIdx(); + const totalCount = MAX_BLOB - currentIndex; + console.log("Current Number:", currentIndex, " Total Number:", totalCount, "at", new Date().toLocaleTimeString([], { hour: "2-digit", minute: "2-digit", second: "2-digit" })); + if (totalCount <= 0) { + break; + } + + let keys = []; + let blobs = []; + for (let i = 0; i < BATCH_SIZE && i < totalCount; i++) { + const buf = crypto.randomBytes(126976); + keys[i] = buf.subarray(0,32).toString('hex') + blobs[i] = buf + } + + // write blobs + try { + let status = await es.writeBlobs(keys, blobs); + if (status == false) { + continue + } + console.log(status); + } catch (e) { + console.log("upload blob error:", e.message); + continue + } + for (let i = 0; i < blobs.length; i++) { + fs.writeFileSync(".data", blobs[i].toString('hex')+'\n', { flag: 'a+' }); + } + } + + if (!NEED_WAIT) { + return + } + + let latestBlock + try { + latestBlock = await provider.getBlock(); + console.log("latest block number is", latestBlock.number); + } catch (e) { + core.setFailed(`EthStorage: get latest block failed with message: ${e.message}`); + return + } + + // wait for blobs finalized + var intervalId = setInterval(async function (){ + try { + let finalizedBlock = await provider.getBlock("finalized"); + console.log( + "finalized block number is", + finalizedBlock.number, + "at", + new Date().toLocaleTimeString([], { hour: "2-digit", minute: "2-digit", second: "2-digit" }) + ); + if (latestBlock.number < finalizedBlock.number) { + setTimeout(() => console.log("Upload done!"), 300000) + clearInterval(intervalId); + } + } catch (e) { + console.error(`EthStorage: get finalized block failed!`, e.message); + } + }, 120000); +} + +UploadBlobsForIntegrationTest(); + diff --git a/integration_tests/scripts/package.json b/integration_tests/scripts/package.json new file mode 100644 index 00000000..7e603cef --- /dev/null +++ b/integration_tests/scripts/package.json @@ -0,0 +1,21 @@ +{ + "name": "integration-test", + "version": "1.0.0", + "main": "ituploader.js", + "scripts": { + "build": "rollup -c" + }, + "dependencies": { + "dotenv": "^16.4.5", + "ethers": "^6.13.1", + "ethstorage-sdk": "^2.1.1", + "@actions/core": "^1.10.1" + }, + "repository": { + "type": "git", + "url": "git+https://github.com/ethstorage/es-node.git" + }, + "author": "ethsorage", + "license": "ISC", + "description": "" +} diff --git a/run-l2-it-rpc.sh b/run-l2-it-rpc.sh new file mode 100755 index 00000000..5a68e2ea --- /dev/null +++ b/run-l2-it-rpc.sh @@ -0,0 +1,41 @@ +#!/bin/sh + +data_dir="./es-data-it-bootnode" +storage_file_0="$data_dir/shard-0.dat" +storage_file_1="$data_dir/shard-1.dat" +zkey_file="./build/bin/snark_lib/zkey/blob_poseidon2.zkey" + +if test -d ${data_dir} ; then + rm -r ${data_dir} +fi +mkdir ${data_dir} +echo "8714eb2672bb7ab01089a1060150b30bc374a3b00e18926460f169256d126339" > "${data_dir}/esnode_p2p_priv.txt" + +./init-l2.sh \ + --shard_index 0 \ + --shard_index 1 \ + --encoding_type=0 \ + --datadir $data_dir \ + --storage.l1contract $ES_NODE_CONTRACT_ADDRESS + + +exec ./build/bin/es-node \ + --network integration \ + --datadir $data_dir \ + --storage.files $storage_file_0 \ + --storage.files $storage_file_1 \ + --storage.l1contract $ES_NODE_CONTRACT_ADDRESS \ + --miner.enabled=false \ + --miner.zkey $zkey_file \ + --l1.block_time 2 \ + --l1.rpc http://65.109.20.29:8545 \ + --da.url http://65.109.20.29:8888 \ + --randao.url http://88.99.30.186:8545 \ + --rpc.port 9595 \ + --p2p.listen.udp 30395 \ + --p2p.listen.tcp 9295 \ + --p2p.priv.path $data_dir/esnode_p2p_priv.txt \ + --p2p.peerstore.path $data_dir/esnode_peerstore_db \ + --p2p.discovery.path $data_dir/esnode_discovery_db \ + --rpc.addr 0.0.0.0 + diff --git a/run-l2-it.sh b/run-l2-it.sh new file mode 100755 index 00000000..a8768dd9 --- /dev/null +++ b/run-l2-it.sh @@ -0,0 +1,39 @@ +#!/bin/bash + +data_dir="./es-data-it" +storage_file_0="$data_dir/shard-0.dat" +storage_file_1="$data_dir/shard-1.dat" +zkey_file="./build/bin/snark_lib/zkey/blob_poseidon2.zkey" + +if test -d ${data_dir} ; then + rm -r ${data_dir} +fi +mkdir ${data_dir} + +./init-l2.sh \ + --shard_index 0 \ + --shard_index 1 \ + --datadir $data_dir \ + --storage.l1contract $ES_NODE_CONTRACT_ADDRESS + + +exec ./build/bin/es-node \ + --network integration \ + --datadir $data_dir \ + --storage.files $storage_file_0 \ + --storage.files $storage_file_1 \ + --storage.l1contract $ES_NODE_CONTRACT_ADDRESS \ + --miner.enabled \ + --miner.zkey $zkey_file \ + --l1.block_time 2 \ + --l1.rpc http://65.109.20.29:8545 \ + --da.url http://65.109.20.29:8888 \ + --randao.url http://88.99.30.186:8545 \ + --state.upload.url http://127.0.0.1:9096 \ + --rpc.port 9596 \ + --p2p.listen.udp 30396 \ + --p2p.listen.tcp 9296 \ + --p2p.priv.path $data_dir/esnode_p2p_priv.txt \ + --p2p.peerstore.path $data_dir/esnode_peerstore_db \ + --p2p.discovery.path $data_dir/esnode_discovery_db \ + --p2p.bootnodes enr:-Li4QBp6QW2ji7JF-3yijZrQ54PqPZ-Io_xEtMUslxxcmGS5TAXiiU6hypBZbB_atxh2Pc72-MgonzU5_R-_qd_PBXyGAZDucmwzimV0aHN0b3JhZ2XbAYDY15SXhtonBXvE13WNGfkk7Nj9Y4_Qr8GAgmlkgnY0gmlwhFhjHrqJc2VjcDI1NmsxoQJ8KIsZjyfFPHZOR66JORtqr5ax0QU6QmvT6QE0QllVZIN0Y3CCJE-DdWRwgna7