diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 2ebaa639..a5f5fea4 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -14,37 +14,39 @@ jobs: go-version: 1.22 - name: Run tests run: make test - #build: - # # needs: test - # runs-on: ubuntu-latest - # steps: - # - name: Checkout - # uses: actions/checkout@v4 - # - name: Configure AWS credentials - # uses: aws-actions/configure-aws-credentials@v4 - # with: - # aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID }} - # aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY }} - # aws-region: ${{ secrets.AWS_REGION }} - # - name: Login to Amazon ECR - # id: login-ecr - # uses: aws-actions/amazon-ecr-login@v2 - # - name: Set up QEMU - # uses: docker/setup-qemu-action@v2 - # - name: Set up Docker Buildx - # uses: docker/setup-buildx-action@v3 - # - name: Build, tag, and push docker image to Amazon ECR - # env: - # REGISTRY: "767397703211.dkr.ecr.us-east-1.amazonaws.com" - # REPOSITORY: ${{ github.event.repository.name }} - # IMAGE_TAG: ${{ github.sha }} - # PLATFORMS: "linux/amd64" - # run: | - # if [[ $GITHUB_REF == refs/heads/master ]]; then - # docker buildx build --platform $PLATFORMS -t $REGISTRY/$REPOSITORY:$IMAGE_TAG -t $REGISTRY/$REPOSITORY:latest --push . - # else - # docker buildx build --platform $PLATFORMS -t $REGISTRY/$REPOSITORY:$IMAGE_TAG --push . - # fi + build: + # needs: test + runs-on: ubuntu-latest + steps: + - name: Checkout + uses: actions/checkout@v4 + - name: Configure AWS credentials + uses: aws-actions/configure-aws-credentials@v4 + with: + aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID }} + aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY }} + aws-region: ${{ secrets.AWS_REGION }} + - name: Login to Amazon ECR + id: login-ecr-public + uses: aws-actions/amazon-ecr-login@v2 + with: + registry-type: public + - name: Set up QEMU + uses: docker/setup-qemu-action@v2 + - name: Set up Docker Buildx + uses: docker/setup-buildx-action@v3 + - name: Build, tag, and push docker image to Amazon ECR + env: + REGISTRY: "public.ecr.aws/z6g0f8n7" + REPOSITORY: ${{ github.event.repository.name }} + IMAGE_TAG: ${{ github.sha }} + PLATFORMS: "linux/amd64,linux/arm64" + run: | + if [[ $GITHUB_REF == refs/heads/master ]]; then + docker buildx build --platform $PLATFORMS -t $REGISTRY/$REPOSITORY:$IMAGE_TAG -t $REGISTRY/$REPOSITORY:latest --push . + else + docker buildx build --platform $PLATFORMS -t $REGISTRY/$REPOSITORY:$IMAGE_TAG --push . + fi #build-mainnet: # # needs: test # if: github.ref == 'refs/heads/master' diff --git a/Makefile b/Makefile index 55274af4..cff90b9f 100644 --- a/Makefile +++ b/Makefile @@ -40,8 +40,11 @@ build/cmd/sidecar: .PHONY: build build: build/cmd/sidecar +docker-buildx-self: + docker buildx build -t go-sidecar:latest -t go-sidecar:latest . + docker-buildx: - docker-buildx build --platform linux/amd64 --push -t 767397703211.dkr.ecr.us-east-1.amazonaws.com/blocklake:$(shell date +%s) -t 767397703211.dkr.ecr.us-east-1.amazonaws.com/blocklake:latest . + docker-buildx build --platform linux/amd64 --push -t 767397703211.dkr.ecr.us-east-1.amazonaws.com/go-sidecar:$(shell date +%s) -t 767397703211.dkr.ecr.us-east-1.amazonaws.com/go-sidecar:latest . .PHONY: test test: diff --git a/README.md b/README.md index eecf2c8f..1868f8fd 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,55 @@ +## Running + +### Directly using Go +```bash +SIDECAR_DEBUG=false \ +SIDECAR_ETHEREUM_RPC_BASE_URL="http://54.198.82.217:8545" \ +SIDECAR_ENVIRONMENT="testnet" \ +SIDECAR_NETWORK="holesky" \ +SIDECAR_ETHERSCAN_API_KEYS="" \ +SIDECAR_STATSD_URL="localhost:8125" \ +SIDECAR_SQLITE_DB_FILE_PATH="./sqlite/sidecar.db" \ +go run cmd/sidecar/main.go +``` + +### Using the public Docker container +```bash +# Create the directory to hold the sqlite database +mkdir ./sqlite || true + +docker run -it --rm \ + -v $(pwd)/sqlite:/sqlite \ + -e SIDECAR_DEBUG=false \ + -e SIDECAR_ETHEREUM_RPC_BASE_URL="http://54.198.82.217:8545" \ + -e SIDECAR_ENVIRONMENT="testnet" \ + -e SIDECAR_NETWORK="holesky" \ + -e SIDECAR_ETHERSCAN_API_KEYS="" \ + -e SIDECAR_STATSD_URL="localhost:8125" \ + -e SIDECAR_SQLITE_DB_FILE_PATH="/sqlite/sidecar.db" \ + public.ecr.aws/z6g0f8n7/go-sidecar:latest \ + /build/bin/sidecar +``` + +### Build and run a container locally +```bash +# Create the directory to hold the sqlite database +mkdir ./sqlite || true + +make docker-buildx-self + +docker run \ + -e "SIDECAR_DEBUG=false" \ + -e "SIDECAR_ETHEREUM_RPC_BASE_URL=http://54.198.82.217:8545" \ + -e "SIDECAR_ENVIRONMENT=testnet" \ + -e "SIDECAR_NETWORK=holesky" \ + -e "SIDECAR_ETHERSCAN_API_KEYS=''" \ + -e "SIDECAR_STATSD_URL=localhost:8125" \ + -e "SIDECAR_SQLITE_DB_FILE_PATH=/sqlite/sidecar.db" \ + --interactive --tty \ + -v "$(pwd)/sqlite:/sqlite" \ + go-sidecar:latest /build/bin/cmd/sidecar +``` + ## RPC Routes ### Get current block height diff --git a/cmd/sidecar/main.go b/cmd/sidecar/main.go index 88469f28..d097d9b9 100644 --- a/cmd/sidecar/main.go +++ b/cmd/sidecar/main.go @@ -96,5 +96,6 @@ func main() { shutdown.ListenForShutdown(gracefulShutdown, done, func() { l.Sugar().Info("Shutting down...") rpcChannel <- true + sidecar.ShutdownChan <- true }, time.Second*5, l) } diff --git a/internal/eigenState/stateManager/stateManager.go b/internal/eigenState/stateManager/stateManager.go index c832614a..9cd07874 100644 --- a/internal/eigenState/stateManager/stateManager.go +++ b/internal/eigenState/stateManager/stateManager.go @@ -81,6 +81,17 @@ func (e *EigenStateManager) CommitFinalState(blockNumber uint64) error { return nil } +func (e *EigenStateManager) CleanupBlock(blockNumber uint64) error { + for _, index := range e.GetSortedModelIndexes() { + state := e.StateModels[index] + err := state.ClearAccumulatedState(blockNumber) + if err != nil { + return err + } + } + return nil +} + func (e *EigenStateManager) GenerateStateRoot(blockNumber uint64, blockHash string) (types.StateRoot, error) { sortedIndexes := e.GetSortedModelIndexes() roots := [][]byte{ diff --git a/internal/pipeline/pipeline.go b/internal/pipeline/pipeline.go index 5da8813f..c7f94c15 100644 --- a/internal/pipeline/pipeline.go +++ b/internal/pipeline/pipeline.go @@ -34,7 +34,7 @@ func NewPipeline( } func (p *Pipeline) RunForBlock(ctx context.Context, blockNumber uint64) error { - p.Logger.Sugar().Infow("Running pipeline for block", zap.Uint64("blockNumber", blockNumber)) + p.Logger.Sugar().Debugw("Running pipeline for block", zap.Uint64("blockNumber", blockNumber)) /* - Fetch block @@ -63,7 +63,7 @@ func (p *Pipeline) RunForBlock(ctx context.Context, blockNumber uint64) error { p.Logger.Sugar().Errorw("Failed to init processing for block", zap.Uint64("blockNumber", blockNumber), zap.Error(err)) return err } - p.Logger.Sugar().Infow("Initialized processing for block", zap.Uint64("blockNumber", blockNumber)) + p.Logger.Sugar().Debugw("Initialized processing for block", zap.Uint64("blockNumber", blockNumber)) // Parse all transactions and logs for the block. // - If a transaction is not calling to a contract, it is ignored @@ -77,7 +77,7 @@ func (p *Pipeline) RunForBlock(ctx context.Context, blockNumber uint64) error { ) return err } - p.Logger.Sugar().Infow("Parsed transactions", zap.Uint64("blockNumber", blockNumber), zap.Int("count", len(parsedTransactions))) + p.Logger.Sugar().Debugw("Parsed transactions", zap.Uint64("blockNumber", blockNumber), zap.Int("count", len(parsedTransactions))) // With only interesting transactions/logs parsed, insert them into the database for _, pt := range parsedTransactions { @@ -90,7 +90,7 @@ func (p *Pipeline) RunForBlock(ctx context.Context, blockNumber uint64) error { ) return err } - p.Logger.Debug("Indexed transaction", zap.Uint64("blockNumber", blockNumber), zap.String("transactionHash", indexedTransaction.TransactionHash)) + p.Logger.Sugar().Debugw("Indexed transaction", zap.Uint64("blockNumber", blockNumber), zap.String("transactionHash", indexedTransaction.TransactionHash)) for _, log := range pt.Logs { indexedLog, err := p.Indexer.IndexLog( @@ -162,12 +162,14 @@ func (p *Pipeline) RunForBlock(ctx context.Context, blockNumber uint64) error { return err } - if sr, err := p.stateManager.WriteStateRoot(blockNumber, block.Block.Hash.Value(), stateRoot); err != nil { + sr, err := p.stateManager.WriteStateRoot(blockNumber, block.Block.Hash.Value(), stateRoot) + if err != nil { p.Logger.Sugar().Errorw("Failed to write state root", zap.Uint64("blockNumber", blockNumber), zap.Error(err)) - return err } else { - p.Logger.Sugar().Infow("Wrote state root", zap.Uint64("blockNumber", blockNumber), zap.Any("stateRoot", sr)) + p.Logger.Sugar().Debugw("Wrote state root", zap.Uint64("blockNumber", blockNumber), zap.Any("stateRoot", sr)) } - return nil + _ = p.stateManager.CleanupBlock(blockNumber) + + return err } diff --git a/internal/sidecar/blockIndexer.go b/internal/sidecar/blockIndexer.go index ef85e20f..be7947e4 100644 --- a/internal/sidecar/blockIndexer.go +++ b/internal/sidecar/blockIndexer.go @@ -2,6 +2,7 @@ package sidecar import ( "context" + "fmt" "go.uber.org/zap" "sync" "time" @@ -48,6 +49,8 @@ func (s *Sidecar) IndexFromCurrentToTip(ctx context.Context) error { if latestBlock == 0 { s.Logger.Sugar().Infow("No blocks indexed, starting from genesis block", zap.Uint64("genesisBlock", s.Config.GenesisBlockNumber)) latestBlock = int64(s.Config.GenesisBlockNumber) + } else { + latestBlock += 1 } blockNumber, err := s.EthereumClient.GetBlockNumberUint64(ctx) @@ -63,9 +66,25 @@ func (s *Sidecar) IndexFromCurrentToTip(ctx context.Context) error { ct := currentTip{CurrentTip: blockNumber} + shouldShutdown := false + + go func() { + for { + select { + case <-s.ShutdownChan: + s.Logger.Sugar().Infow("Received shutdown signal") + shouldShutdown = true + } + } + }() + go func() { for { time.Sleep(time.Second * 30) + if shouldShutdown { + s.Logger.Sugar().Infow("Shutting down block listener...") + return + } latestTip, err := s.EthereumClient.GetBlockNumberUint64(ctx) if err != nil { s.Logger.Sugar().Errorw("Failed to get latest tip", zap.Error(err)) @@ -80,7 +99,30 @@ func (s *Sidecar) IndexFromCurrentToTip(ctx context.Context) error { } } }() + blocksProcessed := 0 + runningAvg := 0 + totalDurationMs := 0 for i := latestBlock; i <= int64(ct.Get()); i++ { + if shouldShutdown { + s.Logger.Sugar().Infow("Shutting down block processor") + return nil + } + tip := ct.Get() + pctComplete := float64(i) / float64(tip) * 100 + blocksRemaining := tip - uint64(i) + estTimeRemainingMs := runningAvg * int(blocksRemaining) + estTimeRemainingHours := float64(estTimeRemainingMs) / 1000 / 60 / 60 + + if i%10 == 0 { + s.Logger.Sugar().Infow("Progress", + zap.String("percentComplete", fmt.Sprintf("%.2f", pctComplete)), + zap.Uint64("blocksRemaining", blocksRemaining), + zap.Float64("estimatedTimeRemaining (hrs)", estTimeRemainingHours), + zap.Float64("avgBlockProcessTime (ms)", float64(runningAvg)), + ) + } + + startTime := time.Now() if err := s.Pipeline.RunForBlock(ctx, uint64(i)); err != nil { s.Logger.Sugar().Errorw("Failed to run pipeline for block", zap.Int64("currentBlockNumber", i), @@ -88,6 +130,19 @@ func (s *Sidecar) IndexFromCurrentToTip(ctx context.Context) error { ) return err } + delta := time.Since(startTime).Milliseconds() + blocksProcessed++ + + totalDurationMs += int(delta) + runningAvg = totalDurationMs / blocksProcessed + + s.Logger.Sugar().Debugw("Processed block", + zap.Int64("blockNumber", i), + zap.Int64("duration", delta), + zap.Int("avgDuration", runningAvg), + ) } + + // TODO(seanmcgary): transition to listening for new blocks return nil } diff --git a/internal/sidecar/sidecar.go b/internal/sidecar/sidecar.go index 66818343..4fe81853 100644 --- a/internal/sidecar/sidecar.go +++ b/internal/sidecar/sidecar.go @@ -35,6 +35,7 @@ type Sidecar struct { Storage storage.BlockStore Pipeline *pipeline.Pipeline EthereumClient *ethereum.Client + ShutdownChan chan bool } func NewSidecar( @@ -52,6 +53,7 @@ func NewSidecar( Storage: s, Pipeline: p, EthereumClient: ethClient, + ShutdownChan: make(chan bool), } } diff --git a/internal/sqlite/sqlite.go b/internal/sqlite/sqlite.go index f6f98f2b..562c7728 100644 --- a/internal/sqlite/sqlite.go +++ b/internal/sqlite/sqlite.go @@ -20,9 +20,12 @@ func NewGormSqliteFromSqlite(sqlite gorm.Dialector) (*gorm.DB, error) { return nil, err } + // https://phiresky.github.io/blog/2020/sqlite-performance-tuning/ pragmas := []string{ `PRAGMA foreign_keys = ON;`, `PRAGMA journal_mode = WAL;`, + `PRAGMA synchronous = normal;`, + `pragma mmap_size = 30000000000;`, } for _, pragma := range pragmas {