From 25f43e459f4f6eafffd82d2e20d8e9b2b7f1ee8b Mon Sep 17 00:00:00 2001 From: Martin Raszyk Date: Sun, 24 Mar 2024 09:40:42 +0100 Subject: [PATCH] chore: introduce icx-proxy library --- rs/boundary_node/icx_proxy/BUILD.bazel | 30 +- rs/boundary_node/icx_proxy/Cargo.toml | 4 + rs/boundary_node/icx_proxy/src/core.rs | 307 +++++++++++++++++++ rs/boundary_node/icx_proxy/src/lib.rs | 20 ++ rs/boundary_node/icx_proxy/src/main.rs | 318 +------------------- rs/boundary_node/icx_proxy/src/proxy/mod.rs | 20 +- rs/boundary_node/icx_proxy/src/validate.rs | 2 +- 7 files changed, 382 insertions(+), 319 deletions(-) create mode 100644 rs/boundary_node/icx_proxy/src/core.rs create mode 100644 rs/boundary_node/icx_proxy/src/lib.rs diff --git a/rs/boundary_node/icx_proxy/BUILD.bazel b/rs/boundary_node/icx_proxy/BUILD.bazel index 9964a59df86..6b68f2d56b7 100644 --- a/rs/boundary_node/icx_proxy/BUILD.bazel +++ b/rs/boundary_node/icx_proxy/BUILD.bazel @@ -1,4 +1,4 @@ -load("@rules_rust//rust:defs.bzl", "rust_binary", "rust_test") +load("@rules_rust//rust:defs.bzl", "rust_binary", "rust_library", "rust_test") package(default_visibility = ["//visibility:public"]) @@ -64,7 +64,7 @@ rust_binary( srcs = glob(["src/**/*.rs"]), aliases = ALIASES, proc_macro_deps = MACRO_DEPENDENCIES, - deps = DEPENDENCIES, + deps = DEPENDENCIES + [":icx_proxy"], ) rust_binary( @@ -73,7 +73,7 @@ rust_binary( aliases = ALIASES, crate_features = ["dev_proxy"], proc_macro_deps = MACRO_DEPENDENCIES, - deps = DEPENDENCIES, + deps = DEPENDENCIES + [":icx_proxy_dev"], ) rust_test( @@ -83,3 +83,27 @@ rust_test( proc_macro_deps = MACRO_DEPENDENCIES + MACRO_DEV_DEPENDENCIES, deps = DEPENDENCIES + DEV_DEPENDENCIES, ) + +rust_library( + name = "icx_proxy", + srcs = glob( + ["src/**"], + exclude = ["src/main.rs"], + ), + aliases = ALIASES, + proc_macro_deps = MACRO_DEPENDENCIES + MACRO_DEV_DEPENDENCIES, + deps = DEPENDENCIES + DEV_DEPENDENCIES, +) + +rust_library( + name = "icx_proxy_dev", + srcs = glob( + ["src/**"], + exclude = ["src/main.rs"], + ), + aliases = ALIASES, + crate_features = ["dev_proxy"], + crate_name = "icx_proxy", + proc_macro_deps = MACRO_DEPENDENCIES + MACRO_DEV_DEPENDENCIES, + deps = DEPENDENCIES + DEV_DEPENDENCIES, +) diff --git a/rs/boundary_node/icx_proxy/Cargo.toml b/rs/boundary_node/icx_proxy/Cargo.toml index 8c289039ce4..140be9725a5 100644 --- a/rs/boundary_node/icx_proxy/Cargo.toml +++ b/rs/boundary_node/icx_proxy/Cargo.toml @@ -16,6 +16,10 @@ version.workspace = true name = "icx-proxy" path = "src/main.rs" +[lib] +name = "icx_proxy" +path = "src/lib.rs" + [dependencies] anyhow = { workspace = true } arc-swap = "1" diff --git a/rs/boundary_node/icx_proxy/src/core.rs b/rs/boundary_node/icx_proxy/src/core.rs new file mode 100644 index 00000000000..a7e9b736481 --- /dev/null +++ b/rs/boundary_node/icx_proxy/src/core.rs @@ -0,0 +1,307 @@ +use std::{collections::HashSet, fs, net::SocketAddr, path::PathBuf, sync::Arc, time::Duration}; + +use anyhow::{Context, Error}; +use candid::Principal; +use clap::{builder::ValueParser, Parser}; +use futures::try_join; +use hyperlocal_next::Uri as UnixUri; +use tracing::{error, warn, Instrument}; + +use crate::{ + canister_alias::{parse_canister_alias, CanisterAlias}, + canister_id, + domain_addr::{parse_domain_addr, DomainAddr}, + http_client, logging, + metrics::{self, MetricParams, WithMetrics}, + proxy::{self, ListenProto}, + validate::Validator, +}; + +// TODO: Remove after inspect_err stabilizes (rust-lang/rust#91345) +trait InspectErr { + type E; + fn inspect_err(self, f: F) -> Self; +} + +impl InspectErr for Result { + type E = E; + fn inspect_err(self, f: F) -> Self { + if let Err(ref e) = self { + f(e); + } + + self + } +} + +// Generic function to load a list of canister ids from a text file into a HashSet +pub fn load_canister_list(path: PathBuf) -> Result, Error> { + let data = fs::read_to_string(path).context("failed to read canisters file")?; + let set = data + .lines() + .filter(|x| !x.trim().is_empty()) + .map(Principal::from_text) + .collect::, _>>()?; + Ok(set) +} + +#[derive(Parser)] +pub struct Opts { + /// The address to bind to. + #[clap(long)] + address: Option, + + /// Unix socket to listen on. If address is specified too - it takes precedence + #[clap(long)] + unix_socket: Option, + + /// A list of replica mappings from domains to socket addresses for replica upstreams. + /// Format: [|:] + #[clap(long, value_parser = ValueParser::new(parse_domain_addr), default_value = "http://localhost:8000/")] + replica: Vec, + + /// Replica's Unix Socket. Overrides `--replica` + #[clap(long)] + replica_unix_socket: Option, + + /// A list of domains that can be served. These are used for canister resolution. + #[clap(long, default_value = "localhost")] + domain: Vec, + + /// Regex to match domain allowed to serve system subnets canisters + #[clap(long)] + domain_system_regex: Vec, + + /// Domain allowed to serve normal canisters + #[clap(long)] + domain_app_regex: Vec, + + /// A list of mappings from canister names to canister principals. + /// Format: name:principal + #[clap(long, value_parser = ValueParser::new(parse_canister_alias))] + canister_alias: Vec, + + /// The list of custom root HTTPS certificates to use to talk to the replica. This can be used + /// to connect to an IC that has a self-signed certificate, or to limit the certificates. Do not use this + /// when talking to the Internet Computer blockchain mainnet unless you know what you're doing. + #[clap(long)] + ssl_root_certificate: Vec, + + /// The root key to use to communicate with the replica back end. By default, the Internet Computer + /// mainnet key is used (embedded in the binary). Ensure you know what you are doing before + /// using this option. + #[clap(long, conflicts_with("fetch_root_key"))] + root_key: Option, + + /// Path to a list of pre-isolation canister ids, one per line + #[clap(long)] + pre_isolation_canisters: Option, + + /// Path to a MaxMind GeoIP2 database + #[clap(long)] + geoip_db: Option, + + /// Denylist URL + #[clap(long)] + denylist_url: Option, + + /// Path to an initial denylist snapshot + #[clap(long)] + denylist_initial: Option, + + /// Interval to update denylist in seconds + #[clap(long, default_value = "60")] + denylist_interval: u64, + + /// Allowlist that takes precedence over denylist + #[clap(long)] + allowlist: Option, + + /// Whether or not to fetch the root key from the replica back end. Do not use this when + /// talking to the Internet Computer blockchain mainnet as it is unsecure. + #[clap(long)] + fetch_root_key: bool, + + /// Allows HTTPS connection to replicas with invalid HTTPS certificates. This can be used to + /// connect to an IC that has a self-signed certificate, for example. Do not use this when + /// talking to the Internet Computer blockchain mainnet as it is *VERY* unsecure. + #[clap(long)] + danger_accept_invalid_ssl: bool, + + /// Whether or not this is run in a debug context (e.g. errors returned in responses + /// should show full stack and error details). + #[clap(long)] + debug: bool, + + /// The options for logging + #[clap(flatten)] + log: logging::LoggingOpts, + + /// The options for metrics + #[clap(flatten)] + metrics: metrics::MetricsOpts, +} + +pub fn main(opts: Opts) -> Result<(), Error> { + let Opts { + address, + unix_socket, + replica, + replica_unix_socket, + domain, + domain_system_regex, + domain_app_regex, + canister_alias, + pre_isolation_canisters, + geoip_db, + denylist_url, + denylist_initial, + denylist_interval, + allowlist, + ssl_root_certificate, + fetch_root_key, + danger_accept_invalid_ssl, + debug, + log, + metrics, + root_key, + } = opts; + + let _span = logging::setup(log); + + // Setup Metrics + let (meter, metrics) = metrics::setup(metrics); + + // Setup domain-canister matching + let domain_match = pre_isolation_canisters.map(|x| { + if domain_app_regex.is_empty() || domain_system_regex.is_empty() { + panic!("if --pre-isolation-canisters list is specified then --domain-app-regex and --domain-system-regex should also be"); + } + + let pre_isolation_canisters = load_canister_list(x).expect("uname to load pre-isolation canisters"); + warn!("{} pre-isolation canisters loaded", pre_isolation_canisters.len()); + + Ok::<_, Error>(Arc::new(proxy::domain_canister::DomainCanisterMatcher::new( + pre_isolation_canisters, + domain_app_regex, + domain_system_regex, + )?)) + }).transpose()?; + + // Setup GeoIP + let geoip = geoip_db + .map(proxy::geoip::GeoIp::new) + .transpose()? + .map(Arc::new); + + // Setup denylisting + let denylist = if denylist_url.is_some() || denylist_initial.is_some() { + let allowlist = allowlist + .map(load_canister_list) + .transpose()? + .unwrap_or_default(); + + let dl = proxy::denylist::Denylist::new(denylist_url, allowlist); + + // Load initial list if provided + if let Some(v) = denylist_initial { + let data = fs::read(v).context("unable to read initial denylist")?; + let count = dl.load_json(&data)?; + warn!("Initial denylist loaded with {count} canisters"); + } + + Some(Arc::new(dl)) + } else { + None + }; + + // Setup Canister ID Resolver + let resolver = canister_id::setup(canister_id::CanisterIdOpts { + canister_alias, + domain, + })?; + + // Setup Validator + let validator = Validator::new(); + let validator = WithMetrics(validator, MetricParams::new(&meter, "validator")); + + let listen = address + .map(ListenProto::Tcp) + .or(unix_socket.map(ListenProto::Unix)) + .expect("must specify either address or unix_socket"); + + let proxy = if let Some(v) = replica_unix_socket { + let uri = UnixUri::new(v, "/"); + let client = http_client::setup_unix_socket(uri.into())?; + + proxy::setup_unix_socket( + proxy::SetupArgs { + resolver, + validator, + domain_match, + geoip, + denylist: denylist.clone(), + client, + meter: meter.clone(), + }, + proxy::ProxyOpts { + listen, + replicas: vec![], + debug, + fetch_root_key, + root_key, + }, + )? + } else { + let client = http_client::setup(http_client::HttpClientOpts { + ssl_root_certificate, + danger_accept_invalid_ssl, + replicas: &replica, + })?; + + proxy::setup( + proxy::SetupArgs { + resolver, + validator, + domain_match, + geoip, + denylist: denylist.clone(), + client, + meter: meter.clone(), + }, + proxy::ProxyOpts { + listen, + replicas: replica, + debug, + fetch_root_key, + root_key, + }, + )? + }; + + let rt = tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build()?; + + rt.block_on( + async move { + try_join!( + metrics.run().in_current_span(), + proxy.run().in_current_span(), + async { + if let Some(v) = denylist { + v.run(Duration::from_secs(denylist_interval), &meter).await + } else { + Ok(()) + } + } + ) + .context("Runtime crashed") + .inspect_err(|e| error!("{e}"))?; + Ok::<_, Error>(()) + } + .in_current_span(), + )?; + + Ok(()) +} diff --git a/rs/boundary_node/icx_proxy/src/lib.rs b/rs/boundary_node/icx_proxy/src/lib.rs new file mode 100644 index 00000000000..5d6c75bf510 --- /dev/null +++ b/rs/boundary_node/icx_proxy/src/lib.rs @@ -0,0 +1,20 @@ +mod canister_alias; +mod canister_id; +mod config; +mod core; +mod domain_addr; +mod error; +mod http; +mod http_client; +mod logging; +mod metrics; +mod proxy; +mod validate; + +use crate::domain_addr::DomainAddr; + +pub use crate::canister_id::ResolverState; +pub use crate::config::dns_canister_config::DnsCanisterConfig; +pub use crate::core::{main, Opts}; +pub use crate::proxy::{agent_handler, AppState}; +pub use crate::validate::Validator; diff --git a/rs/boundary_node/icx_proxy/src/main.rs b/rs/boundary_node/icx_proxy/src/main.rs index fd8e6c6f9a4..7d3588f4cad 100644 --- a/rs/boundary_node/icx_proxy/src/main.rs +++ b/rs/boundary_node/icx_proxy/src/main.rs @@ -1,325 +1,15 @@ // TODO: Remove after inspect_err stabilizes (rust-lang/rust#91345) - #![allow(unstable_name_collisions)] -use std::{collections::HashSet, fs, net::SocketAddr, path::PathBuf, sync::Arc, time::Duration}; - -use anyhow::{Context, Error}; -use candid::Principal; -use clap::{builder::ValueParser, Parser}; -use futures::try_join; -use hyperlocal_next::Uri as UnixUri; +use anyhow::Error; +use clap::Parser; use jemallocator::Jemalloc; -use tracing::{error, warn, Instrument}; - -mod canister_alias; -mod canister_id; -mod config; -mod domain_addr; -mod error; -mod http; -mod http_client; -mod logging; -mod metrics; -mod proxy; -mod validate; - -use crate::{ - canister_alias::{parse_canister_alias, CanisterAlias}, - domain_addr::{parse_domain_addr, DomainAddr}, - metrics::{MetricParams, WithMetrics}, - proxy::ListenProto, - validate::Validator, -}; #[global_allocator] static GLOBAL: Jemalloc = Jemalloc; -// TODO: Remove after inspect_err stabilizes (rust-lang/rust#91345) -trait InspectErr { - type E; - fn inspect_err(self, f: F) -> Self; -} - -impl InspectErr for Result { - type E = E; - fn inspect_err(self, f: F) -> Self { - if let Err(ref e) = self { - f(e); - } - - self - } -} - -// Generic function to load a list of canister ids from a text file into a HashSet -pub fn load_canister_list(path: PathBuf) -> Result, Error> { - let data = fs::read_to_string(path).context("failed to read canisters file")?; - let set = data - .lines() - .filter(|x| !x.trim().is_empty()) - .map(Principal::from_text) - .collect::, _>>()?; - Ok(set) -} - -#[derive(Parser)] -struct Opts { - /// The address to bind to. - #[clap(long)] - address: Option, - - /// Unix socket to listen on. If address is specified too - it takes precedence - #[clap(long)] - unix_socket: Option, - - /// A list of replica mappings from domains to socket addresses for replica upstreams. - /// Format: [|:] - #[clap(long, value_parser = ValueParser::new(parse_domain_addr), default_value = "http://localhost:8000/")] - replica: Vec, - - /// Replica's Unix Socket. Overrides `--replica` - #[clap(long)] - replica_unix_socket: Option, - - /// A list of domains that can be served. These are used for canister resolution. - #[clap(long, default_value = "localhost")] - domain: Vec, - - /// Regex to match domain allowed to serve system subnets canisters - #[clap(long)] - domain_system_regex: Vec, - - /// Domain allowed to serve normal canisters - #[clap(long)] - domain_app_regex: Vec, - - /// A list of mappings from canister names to canister principals. - /// Format: name:principal - #[clap(long, value_parser = ValueParser::new(parse_canister_alias))] - canister_alias: Vec, - - /// The list of custom root HTTPS certificates to use to talk to the replica. This can be used - /// to connect to an IC that has a self-signed certificate, or to limit the certificates. Do not use this - /// when talking to the Internet Computer blockchain mainnet unless you know what you're doing. - #[clap(long)] - ssl_root_certificate: Vec, - - /// The root key to use to communicate with the replica back end. By default, the Internet Computer - /// mainnet key is used (embedded in the binary). Ensure you know what you are doing before - /// using this option. - #[clap(long, conflicts_with("fetch_root_key"))] - root_key: Option, - - /// Path to a list of pre-isolation canister ids, one per line - #[clap(long)] - pre_isolation_canisters: Option, - - /// Path to a MaxMind GeoIP2 database - #[clap(long)] - geoip_db: Option, - - /// Denylist URL - #[clap(long)] - denylist_url: Option, - - /// Path to an initial denylist snapshot - #[clap(long)] - denylist_initial: Option, - - /// Interval to update denylist in seconds - #[clap(long, default_value = "60")] - denylist_interval: u64, - - /// Allowlist that takes precedence over denylist - #[clap(long)] - allowlist: Option, - - /// Whether or not to fetch the root key from the replica back end. Do not use this when - /// talking to the Internet Computer blockchain mainnet as it is unsecure. - #[clap(long)] - fetch_root_key: bool, - - /// Allows HTTPS connection to replicas with invalid HTTPS certificates. This can be used to - /// connect to an IC that has a self-signed certificate, for example. Do not use this when - /// talking to the Internet Computer blockchain mainnet as it is *VERY* unsecure. - #[clap(long)] - danger_accept_invalid_ssl: bool, - - /// Whether or not this is run in a debug context (e.g. errors returned in responses - /// should show full stack and error details). - #[clap(long)] - debug: bool, - - /// The options for logging - #[clap(flatten)] - log: logging::LoggingOpts, - - /// The options for metrics - #[clap(flatten)] - metrics: metrics::MetricsOpts, -} - fn main() -> Result<(), Error> { - let Opts { - address, - unix_socket, - replica, - replica_unix_socket, - domain, - domain_system_regex, - domain_app_regex, - canister_alias, - pre_isolation_canisters, - geoip_db, - denylist_url, - denylist_initial, - denylist_interval, - allowlist, - ssl_root_certificate, - fetch_root_key, - danger_accept_invalid_ssl, - debug, - log, - metrics, - root_key, - } = Opts::parse(); - - let _span = logging::setup(log); - - // Setup Metrics - let (meter, metrics) = metrics::setup(metrics); - - // Setup domain-canister matching - let domain_match = pre_isolation_canisters.map(|x| { - if domain_app_regex.is_empty() || domain_system_regex.is_empty() { - panic!("if --pre-isolation-canisters list is specified then --domain-app-regex and --domain-system-regex should also be"); - } - - let pre_isolation_canisters = load_canister_list(x).expect("uname to load pre-isolation canisters"); - warn!("{} pre-isolation canisters loaded", pre_isolation_canisters.len()); - - Ok::<_, Error>(Arc::new(proxy::domain_canister::DomainCanisterMatcher::new( - pre_isolation_canisters, - domain_app_regex, - domain_system_regex, - )?)) - }).transpose()?; - - // Setup GeoIP - let geoip = geoip_db - .map(proxy::geoip::GeoIp::new) - .transpose()? - .map(Arc::new); - - // Setup denylisting - let denylist = if denylist_url.is_some() || denylist_initial.is_some() { - let allowlist = allowlist - .map(load_canister_list) - .transpose()? - .unwrap_or_default(); - - let dl = proxy::denylist::Denylist::new(denylist_url, allowlist); - - // Load initial list if provided - if let Some(v) = denylist_initial { - let data = fs::read(v).context("unable to read initial denylist")?; - let count = dl.load_json(&data)?; - warn!("Initial denylist loaded with {count} canisters"); - } - - Some(Arc::new(dl)) - } else { - None - }; - - // Setup Canister ID Resolver - let resolver = canister_id::setup(canister_id::CanisterIdOpts { - canister_alias, - domain, - })?; - - // Setup Validator - let validator = Validator::new(); - let validator = WithMetrics(validator, MetricParams::new(&meter, "validator")); - - let listen = address - .map(ListenProto::Tcp) - .or(unix_socket.map(ListenProto::Unix)) - .expect("must specify either address or unix_socket"); - - let proxy = if let Some(v) = replica_unix_socket { - let uri = UnixUri::new(v, "/"); - let client = http_client::setup_unix_socket(uri.into())?; - - proxy::setup_unix_socket( - proxy::SetupArgs { - resolver, - validator, - domain_match, - geoip, - denylist: denylist.clone(), - client, - meter: meter.clone(), - }, - proxy::ProxyOpts { - listen, - replicas: vec![], - debug, - fetch_root_key, - root_key, - }, - )? - } else { - let client = http_client::setup(http_client::HttpClientOpts { - ssl_root_certificate, - danger_accept_invalid_ssl, - replicas: &replica, - })?; - - proxy::setup( - proxy::SetupArgs { - resolver, - validator, - domain_match, - geoip, - denylist: denylist.clone(), - client, - meter: meter.clone(), - }, - proxy::ProxyOpts { - listen, - replicas: replica, - debug, - fetch_root_key, - root_key, - }, - )? - }; - - let rt = tokio::runtime::Builder::new_multi_thread() - .enable_all() - .build()?; - - rt.block_on( - async move { - try_join!( - metrics.run().in_current_span(), - proxy.run().in_current_span(), - async { - if let Some(v) = denylist { - v.run(Duration::from_secs(denylist_interval), &meter).await - } else { - Ok(()) - } - } - ) - .context("Runtime crashed") - .inspect_err(|e| error!("{e}"))?; - Ok::<_, Error>(()) - } - .in_current_span(), - )?; + let opts = icx_proxy::Opts::parse(); - Ok(()) + icx_proxy::main(opts) } diff --git a/rs/boundary_node/icx_proxy/src/proxy/mod.rs b/rs/boundary_node/icx_proxy/src/proxy/mod.rs index 4b1a2fe161f..6797b524871 100644 --- a/rs/boundary_node/icx_proxy/src/proxy/mod.rs +++ b/rs/boundary_node/icx_proxy/src/proxy/mod.rs @@ -67,7 +67,8 @@ pub struct ProxyOpts { pub root_key: Option, } -use agent::{handler as agent_handler, Pool}; +pub use agent::{handler as agent_handler, Pool}; + trait HandleError { type B; fn handle_error(self, debug: bool) -> Response; @@ -264,6 +265,23 @@ struct AppStateInner { } impl AppState { + pub fn new_for_testing( + replicas: Vec<(Agent, Uri)>, + resolver: ResolverState, + validator: V, + ) -> Self { + Self(Arc::new(AppStateInner { + agent: None, + domain_match: None, + geoip: None, + denylist: None, + replica_pool: Pool::new(replicas), + resolver, + validator, + debug: true, + })) + } + pub fn pool(&self) -> &Pool { &self.0.replica_pool } diff --git a/rs/boundary_node/icx_proxy/src/validate.rs b/rs/boundary_node/icx_proxy/src/validate.rs index 2099960bfca..51d137868e0 100644 --- a/rs/boundary_node/icx_proxy/src/validate.rs +++ b/rs/boundary_node/icx_proxy/src/validate.rs @@ -20,7 +20,7 @@ pub trait Validate: Sync + Send { ) -> Result, Cow<'static, str>>; } -#[derive(Clone)] +#[derive(Clone, Default)] pub struct Validator {} impl Validator {