Skip to content

Commit

Permalink
Merge pull request #5 from flashbots/proxy
Browse files Browse the repository at this point in the history
Add proxy layer to forward calls to op-geth
  • Loading branch information
avalonche authored Sep 21, 2024
2 parents 609bd39 + 68b037a commit f1922fa
Show file tree
Hide file tree
Showing 7 changed files with 174 additions and 25 deletions.
3 changes: 3 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
JWT_TOKEN=688f5d737bad920bdfb2fc2f488d6b6209eebda1dae949a8de91398d932c517a
L2_URL=http://localhost:8551
BUILDER_URL=http://localhost:8552
31 changes: 31 additions & 0 deletions .github/workflows/lint.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
name: Linting

on:
push:
branches:
- main
pull_request:

jobs:
build:
runs-on: ubuntu-latest

steps:
- name: Checkout code
uses: actions/checkout@v2

- name: Set up Rust
uses: actions-rs/toolchain@v1
with:
toolchain: stable
override: true
components: rustfmt

- name: Build
run: cargo build --verbose

- name: Lint
run: cargo clippy -- -D warnings

- name: Format code
run: cargo fmt -- --check
71 changes: 52 additions & 19 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@ clap = { version = "4", features = ["derive", "env"] }
jsonrpsee = {version = "0.24.4", features = ["server", "http-client", "macros"]}
reqwest = "0.12.5"
http = "1.1.0"
dotenv = "0.15.0"
tower = "0.4.13"
http-body = "0.4.5"
hyper = { version = "1.4.1", features = ["full"] }
hyper-util = { version = "0.1", features = ["full"] }

[dev-dependencies]
anyhow = "1.0"
13 changes: 10 additions & 3 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,20 @@ use std::net::SocketAddr;
use std::sync::Arc;

use clap::{arg, Parser};
use dotenv::dotenv;
use error::Error;
use http::HeaderMap;
use http::{header::AUTHORIZATION, HeaderValue};
use jsonrpsee::http_client::HttpClientBuilder;
use jsonrpsee::server::Server;
use jsonrpsee::RpcModule;
use proxy::ProxyLayer;
use server::{EthApiServer, EthEngineApi};
use tracing::{info, Level};
use tracing_subscriber::EnvFilter;

mod error;
mod proxy;
mod server;

#[derive(Parser, Debug)]
Expand Down Expand Up @@ -51,6 +54,8 @@ type Result<T> = core::result::Result<T, Error>;

#[tokio::main]
async fn main() -> Result<()> {
// Load .env file
dotenv().ok();
let args: Args = Args::parse();

// Initialize logging
Expand All @@ -69,7 +74,7 @@ async fn main() -> Result<()> {
);
let l2_client = HttpClientBuilder::new()
.set_headers(headers.clone())
.build(args.l2_url)
.build(args.l2_url.clone())
.unwrap();

let builder_client = HttpClientBuilder::new()
Expand All @@ -78,14 +83,16 @@ async fn main() -> Result<()> {
.unwrap();

let eth_engine_api = EthEngineApi::new(Arc::new(l2_client), Arc::new(builder_client));
let mut module = RpcModule::new(());
let mut module: RpcModule<()> = RpcModule::new(());
module
.merge(eth_engine_api.into_rpc())
.map_err(|e| Error::InitRPCServerError(e.to_string()))?;

// server setup
info!("Starting server on :{}", args.rpc_port);
let service_builder = tower::ServiceBuilder::new().layer(ProxyLayer::new(args.l2_url));
let server = Server::builder()
.set_http_middleware(service_builder)
.build(
format!("{}:{}", args.rpc_host, args.rpc_port)
.parse::<SocketAddr>()
Expand All @@ -94,7 +101,7 @@ async fn main() -> Result<()> {
.await
.map_err(|e| Error::InitRPCServerError(e.to_string()))?;
let handle = server.start(module);
tokio::spawn(handle.stopped());
handle.stopped().await;

Ok(())
}
70 changes: 70 additions & 0 deletions src/proxy.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
use hyper::Response;
use hyper_util::client::legacy::connect::HttpConnector;
use hyper_util::client::legacy::Client;
use hyper_util::rt::TokioExecutor;
use jsonrpsee::core::BoxError;
use jsonrpsee::http_client::{HttpBody, HttpRequest, HttpResponse};
use std::task::{Context, Poll};
use std::{future::Future, pin::Pin};
use tower::{Layer, Service};
use tracing::error;

#[derive(Debug, Clone)]
pub struct ProxyLayer {
target_url: String,
}

impl ProxyLayer {
pub fn new(target_url: String) -> Self {
ProxyLayer { target_url }
}
}

impl<S> Layer<S> for ProxyLayer {
type Service = ProxyService<S>;

fn layer(&self, inner: S) -> Self::Service {
ProxyService {
inner,
client: Client::builder(TokioExecutor::new()).build_http(),
target_url: self.target_url.clone(),
}
}
}

#[derive(Debug, Clone)]
pub struct ProxyService<S> {
inner: S,
client: Client<HttpConnector, HttpBody>,
target_url: String,
}

impl<S> Service<HttpRequest<HttpBody>> for ProxyService<S>
where
S: Service<HttpRequest<HttpBody>, Response = Response<HttpBody>>,
S::Response: 'static,
S::Error: Into<BoxError> + 'static,
S::Future: Send + 'static,
{
type Response = HttpResponse<hyper::body::Incoming>;
type Error = BoxError;
type Future =
Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;

fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.inner.poll_ready(cx).map_err(Into::into)
}

fn call(&mut self, mut req: HttpRequest<HttpBody>) -> Self::Future {
let target_url = self.target_url.clone();
let client = self.client.clone();
let fut = async move {
*req.uri_mut() = target_url.parse().unwrap();
client.request(req).await.map_err(|e| {
error!("Error proxying request: {}", e);
Box::new(e) as Box<dyn std::error::Error + Send + Sync>
})
};
Box::pin(fut)
}
}
Loading

0 comments on commit f1922fa

Please sign in to comment.