Skip to content

Commit

Permalink
Merge branch 'main' into abhi/acme-enable
Browse files Browse the repository at this point in the history
  • Loading branch information
paulgb authored Jan 17, 2024
2 parents f5f64a1 + 124a42e commit 173d1bc
Show file tree
Hide file tree
Showing 14 changed files with 365 additions and 254 deletions.
389 changes: 181 additions & 208 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion docker/quickstart/supervisord.conf
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ environment=POSTGRES_HOST_AUTH_METHOD=trust
user=postgres

[program:plane-controller]
command=/bin/plane controller --db postgres://postgres@127.0.0.1 --host 0.0.0.0
command=/bin/plane controller --db postgres://postgres@127.0.0.1 --host 0.0.0.0 --controller-url http://localhost:8080
autostart=true
autorestart=true
stderr_logfile=/var/log/plane-controller-stderr.log
Expand Down
10 changes: 7 additions & 3 deletions docs/pages/deploy-to-prod.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,12 @@ Plane implements the [ACME DNS-01](https://letsencrypt.org/docs/challenge-types/
certificate for each proxy. Each proxy generates its own private key, which never leaves that proxy.

For this to work, the `CNAME` record for the `_acme-challenge.<cluster name>` subdomain needs to point to a domain like
`<cluster name>.my-dns-acme-server.com`. The `A` record of `<cluster name>.my-dns-acme-server.com` needs to contain the public
IP of a DNS server that is running the Plane ACME DNS-01 receiver, with port 53 (TCP and UDP) open to the public internet.
`<cluster name>.my-dns-acme-server.com`. The `NS` record of `<cluster name>.my-dns-acme-server.com` needs to point do a domain
whose `A` record is set to the public IP of the Plane ACME DNS-01 receiver. Port 53 (both TCP and UDP) on that IP must be
open to the public internet.

These are basic DNS servers that exist only to serve the ACME DNS-01 challenge, which is required for proxies to update their
Plane’s built-in DNS server exists only to serve the ACME DNS-01 challenge, which is required for proxies to update their
certificates.

As an alternative to setting up Plane’s DNS server, you can obtain certificates for your application on your own and pass them
in to the proxies on startup. Note that under this approach, Plane is not able to refresh certificates on its own.
3 changes: 1 addition & 2 deletions plane/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,10 @@ tokio = { version = "1.33.0", features = ["macros", "rt-multi-thread", "signal"]
tokio-rustls = "0.24.1"
tokio-stream = "0.1.14"
tokio-tungstenite = { version = "0.20.1", features = ["rustls-tls-webpki-roots"] }
tower-http = { version = "0.4.4", features = ["trace"] }
tower-http = { version = "0.4.4", features = ["trace", "cors"] }
tracing = "0.1.40"
tracing-subscriber = { version = "0.3.17", features = ["env-filter"] }
trust-dns-server = "0.23.2"
tungstenite = "0.20.1"
url = "2.4.1"
x509-parser = "0.15.1"

22 changes: 11 additions & 11 deletions plane/plane-tests/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,19 @@ version = "0.2.0"
edition = "2021"

[dependencies]
plane = { path = "../plane-dynamic", package = "plane-dynamic" }
chrono = { version = "0.4.31", features = ["serde"] }
tracing = "0.1.40"
tracing-appender = "0.2.2"
tokio = { version = "1.33.0", features = ["macros", "rt-multi-thread", "signal"] }
anyhow = "1.0.75"
async-trait = "0.1.74"
tracing-subscriber = { version = "0.3.17", features = ["env-filter"] }
serde_json = "1.0.107"
bollard = "0.15.0"
chrono = { version = "0.4.31", features = ["serde"] }
futures-util = "0.3.29"
hyper = { version = "0.14.27", features = ["server"] }
plane = { path = "../plane-dynamic", package = "plane-dynamic" }
plane-test-macro = { path = "plane-test-macro" }
reqwest = { version = "0.11.22", features = ["json", "rustls-tls"], default-features = false }
serde_json = "1.0.107"
thiserror = "1.0.50"
hyper = { version = "0.14.27", features = ["server"] }
anyhow = "1.0.75"
tokio = { version = "1.33.0", features = ["macros", "rt-multi-thread", "signal"] }
tracing = "0.1.40"
tracing-appender = "0.2.2"
tracing-subscriber = { version = "0.3.17", features = ["env-filter"] }
url = "2.4.1"
futures-util = "0.3.29"
plane-test-macro = { path = "plane-test-macro" }
82 changes: 82 additions & 0 deletions plane/plane-tests/tests/reuse_key.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
use crate::common::timeout::WithTimeout;
use common::test_env::TestEnvironment;
use plane::{
types::{BackendStatus, ConnectRequest, ExecutorConfig, PullPolicy, SpawnConfig},
types::{KeyConfig, ResourceLimits},
};
use plane_test_macro::plane_test;
use serde_json::Map;
use std::collections::HashMap;

mod common;

#[plane_test]
async fn reuse_key(env: TestEnvironment) {
let controller = env.controller().await;
let client = controller.client();
let _drone = env.drone(&controller).await;

// Wait for the drone to register. TODO: this seems long.
tokio::time::sleep(std::time::Duration::from_secs(5)).await;

tracing::info!("Requesting backend.");
let connect_request = ConnectRequest {
spawn_config: Some(SpawnConfig {
executable: ExecutorConfig {
image: "ghcr.io/drifting-in-space/demo-image-drop-four".to_string(),
pull_policy: PullPolicy::IfNotPresent,
env: HashMap::default(),
resource_limits: ResourceLimits::default(),
credentials: None,
},
lifetime_limit_seconds: Some(5),
max_idle_seconds: None,
}),
key: Some(KeyConfig {
name: "reuse-key".to_string(),
namespace: "".to_string(),
tag: "".to_string(),
}),
user: None,
auth: Map::default(),
};

let response = client
.connect(&env.cluster, &connect_request)
.await
.unwrap();
tracing::info!("Got response.");

assert!(response.spawned);

let backend_id = response.backend_id.clone();

let mut backend_status_stream = client
.backend_status_stream(&env.cluster, &backend_id)
.with_timeout(10)
.await
.unwrap()
.unwrap();

let response2 = client
.connect(&env.cluster, &connect_request)
.await
.unwrap();

assert!(!response2.spawned);
assert_eq!(response2.backend_id, backend_id);

loop {
let message = backend_status_stream
.next()
.with_timeout(10)
.await
.unwrap()
.unwrap();

tracing::info!("Got status: {:?}", message);
if message.status == BackendStatus::Terminated {
break;
}
}
}
38 changes: 30 additions & 8 deletions plane/src/admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,30 @@ fn show_error(error: &PlaneClientError) {
error.id.bright_yellow(),
);
}
PlaneClientError::ConnectFailed(message) => {
eprintln!(
"{}: {}",
"Failed to connect to API server".bright_red(),
message.magenta()
);
}
PlaneClientError::BadConfiguration(message) => {
eprintln!(
"{}: {}",
"Bad configuration".bright_red(),
message.magenta()
);
}
PlaneClientError::Tungstenite(error) => {
eprintln!(
"{}: {}",
"WebSocket error".bright_red(),
error.to_string().magenta()
);
}
PlaneClientError::SendFailed => {
eprintln!("{}", "Failed to send message to channel".bright_red());
}
}
}

Expand Down Expand Up @@ -230,15 +254,14 @@ pub async fn run_admin_command_inner(opts: AdminOpts) -> Result<(), PlaneClientE
AdminCommand::PutDummyDns { cluster } => {
let connection = client.proxy_connection(&cluster);
let proxy_name = ProxyName::new_random();
let mut conn = connection.connect(&proxy_name).await.unwrap();
let mut conn = connection.connect(&proxy_name).await?;

conn.send(MessageFromProxy::CertManagerRequest(
CertManagerRequest::CertLeaseRequest,
))
.await
.unwrap();
.await?;

let response = conn.recv().await.unwrap();
let response = conn.recv().await.expect("Failed to receive response");

match response {
MessageToProxy::CertManagerResponse(CertManagerResponse::CertLeaseResponse {
Expand All @@ -253,17 +276,16 @@ pub async fn run_admin_command_inner(opts: AdminOpts) -> Result<(), PlaneClientE
_ => panic!("Unexpected response"),
}

let message = format!("Dummy message from {}", proxy_name.to_string());
let message = format!("Dummy message from {}", proxy_name);

conn.send(MessageFromProxy::CertManagerRequest(
CertManagerRequest::SetTxtRecord {
txt_value: message.clone(),
},
))
.await
.unwrap();
.await?;

let response = conn.recv().await.unwrap();
let response = conn.recv().await.expect("Failed to receive response");

match response {
MessageToProxy::CertManagerResponse(
Expand Down
12 changes: 12 additions & 0 deletions plane/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,18 @@ pub enum PlaneClientError {

#[error("API error: {0} ({1})")]
PlaneError(ApiError, StatusCode),

#[error("Failed to connect.")]
ConnectFailed(&'static str),

#[error("Bad configuration.")]
BadConfiguration(&'static str),

#[error("WebSocket error: {0}")]
Tungstenite(#[from] tokio_tungstenite::tungstenite::Error),

#[error("Send error")]
SendFailed,
}

#[derive(Clone)]
Expand Down
10 changes: 9 additions & 1 deletion plane/src/controller/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use crate::{
};
use anyhow::Result;
use axum::{
http::{header, Method},
routing::{get, post},
Json, Router, Server,
};
Expand All @@ -24,6 +25,7 @@ use tokio::{
sync::oneshot::{self},
task::JoinHandle,
};
use tower_http::cors::{Any, CorsLayer};
use tower_http::trace::{DefaultMakeSpan, DefaultOnRequest, DefaultOnResponse, TraceLayer};
use tracing::Level;
use url::Url;
Expand Down Expand Up @@ -159,6 +161,11 @@ impl ControllerServer {
post(terminate::handle_hard_terminate),
);

let cors_public = CorsLayer::new()
.allow_methods(vec![Method::GET, Method::POST])
.allow_headers(vec![header::CONTENT_TYPE])
.allow_origin(Any);

// Routes that are may be accessed directly from end-user code. These are placed
// under the /pub/ top-level route to make it easier to expose only these routes,
// using a reverse proxy configuration.
Expand All @@ -167,7 +174,8 @@ impl ControllerServer {
.route(
"/c/:cluster/b/:backend/status-stream",
get(handle_backend_status_stream),
);
)
.layer(cors_public.clone());

let app = Router::new()
.nest("/pub", public_routes)
Expand Down
2 changes: 1 addition & 1 deletion plane/src/database/backend_key.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,6 @@ pub struct BackendKeyResult {

impl BackendKeyResult {
pub fn is_live(&self) -> bool {
self.as_of > self.expires_at
self.as_of < self.expires_at
}
}
3 changes: 2 additions & 1 deletion plane/src/proxy/cert_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,8 @@ async fn get_certificate(
let account = builder.build().await.context("Building account")?;

let mut builder = OrderBuilder::new(account);
builder.add_dns_identifier(format!("*.{}", cluster));
builder.add_dns_identifier(format!("{}", cluster));
builder.add_dns_identifier(format!("*.{}", cluster)); // wildcard
let order = builder.build().await.context("Building order")?;

let authorizations = order
Expand Down
11 changes: 7 additions & 4 deletions plane/src/proxy/rewriter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,11 +136,14 @@ fn extract_bearer_token(parts: &mut uri::Parts) -> Option<BearerToken> {
let (token, path) = path_and_query.path().strip_prefix('/')?.split_once('/')?;
let token = BearerToken::from(token.to_string());

let query = path_and_query
.query()
.map(|query| format!("?{}", query))
.unwrap_or_default();

parts.path_and_query = Some(
PathAndQuery::from_str(
format!("/{}{}", path, path_and_query.query().unwrap_or_default()).as_str(),
)
.expect("Path and query is valid."),
PathAndQuery::from_str(format!("/{}{}", path, query).as_str())
.expect("Path and query is valid."),
);

Some(token)
Expand Down
27 changes: 15 additions & 12 deletions plane/src/typed_socket/client.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use super::{ChannelMessage, Handshake, SocketAction, TypedSocket};
use crate::client::controller_address::AuthorizedAddress;
use crate::client::PlaneClientError;
use crate::names::NodeName;
use crate::{plane_version_info, util::ExponentialBackoff};
use anyhow::{anyhow, Result};
use futures_util::{SinkExt, StreamExt};
use std::marker::PhantomData;
use tokio::net::TcpStream;
Expand Down Expand Up @@ -47,7 +47,7 @@ impl<T: ChannelMessage> TypedSocketConnector<T> {
}
}

pub async fn connect<N: NodeName>(&self, name: &N) -> Result<TypedSocket<T>> {
pub async fn connect<N: NodeName>(&self, name: &N) -> Result<TypedSocket<T>, PlaneClientError> {
let handshake = Handshake {
name: name.to_string(),
version: plane_version_info(),
Expand All @@ -60,15 +60,16 @@ impl<T: ChannelMessage> TypedSocketConnector<T> {
.send(Message::Text(serde_json::to_string(&handshake)?))
.await?;

let msg = socket
.next()
.await
.ok_or_else(|| anyhow!("Socket closed before handshake received."))??;
let msg = socket.next().await.ok_or(PlaneClientError::ConnectFailed(
"Socket closed before handshake received.",
))??;
let msg = match msg {
Message::Text(msg) => msg,
msg => {
tracing::error!("Unexpected handshake message: {:?}", msg);
return Err(anyhow::anyhow!("Handshake message was not text."));
return Err(PlaneClientError::ConnectFailed(
"Handshake message was not text.",
));
}
};

Expand All @@ -87,15 +88,17 @@ impl<T: ChannelMessage> TypedSocketConnector<T> {
}

/// Creates a WebSocket request from an AuthorizedAddress.
fn auth_url_to_request(addr: &AuthorizedAddress) -> Result<hyper::Request<()>> {
fn auth_url_to_request(addr: &AuthorizedAddress) -> Result<hyper::Request<()>, PlaneClientError> {
let mut request = hyper::Request::builder()
.method(hyper::Method::GET)
.uri(addr.url.as_str())
.header(
"Host",
addr.url
.host_str()
.ok_or_else(|| anyhow!("No host in URL."))?
.ok_or(PlaneClientError::BadConfiguration(
"URL does not have a hostname.",
))?
.to_string(),
)
.header("Connection", "Upgrade")
Expand All @@ -106,17 +109,17 @@ fn auth_url_to_request(addr: &AuthorizedAddress) -> Result<hyper::Request<()>> {
if let Some(bearer_header) = addr.bearer_header() {
request = request.header(
hyper::header::AUTHORIZATION,
hyper::header::HeaderValue::from_str(&bearer_header)?,
hyper::header::HeaderValue::from_str(&bearer_header).expect("Bearer header is valid"),
);
}

Ok(request.body(())?)
Ok(request.body(()).expect("Request is valid"))
}

async fn new_client<T: ChannelMessage>(
mut socket: Socket,
remote_handshake: Handshake,
) -> Result<TypedSocket<T>> {
) -> Result<TypedSocket<T>, PlaneClientError> {
let (send_to_client, recv_to_client) = tokio::sync::mpsc::channel::<T::Reply>(100);
let (send_from_client, mut recv_from_client) =
tokio::sync::mpsc::channel::<SocketAction<T>>(100);
Expand Down
Loading

0 comments on commit 173d1bc

Please sign in to comment.