Skip to content

Commit

Permalink
Merge branch 'main' into paulgb/update-dep-versions
Browse files Browse the repository at this point in the history
  • Loading branch information
paulgb authored Jan 16, 2024
2 parents 722d9dc + 597add8 commit 545a70f
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 22 deletions.
38 changes: 30 additions & 8 deletions plane/src/admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,30 @@ fn show_error(error: &PlaneClientError) {
error.id.bright_yellow(),
);
}
PlaneClientError::ConnectFailed(message) => {
eprintln!(
"{}: {}",
"Failed to connect to API server".bright_red(),
message.magenta()
);
}
PlaneClientError::BadConfiguration(message) => {
eprintln!(
"{}: {}",
"Bad configuration".bright_red(),
message.magenta()
);
}
PlaneClientError::Tungstenite(error) => {
eprintln!(
"{}: {}",
"WebSocket error".bright_red(),
error.to_string().magenta()
);
}
PlaneClientError::SendFailed => {
eprintln!("{}", "Failed to send message to channel".bright_red());
}
}
}

Expand Down Expand Up @@ -230,15 +254,14 @@ pub async fn run_admin_command_inner(opts: AdminOpts) -> Result<(), PlaneClientE
AdminCommand::PutDummyDns { cluster } => {
let connection = client.proxy_connection(&cluster);
let proxy_name = ProxyName::new_random();
let mut conn = connection.connect(&proxy_name).await.unwrap();
let mut conn = connection.connect(&proxy_name).await?;

conn.send(MessageFromProxy::CertManagerRequest(
CertManagerRequest::CertLeaseRequest,
))
.await
.unwrap();
.await?;

let response = conn.recv().await.unwrap();
let response = conn.recv().await.expect("Failed to receive response");

match response {
MessageToProxy::CertManagerResponse(CertManagerResponse::CertLeaseResponse {
Expand All @@ -253,17 +276,16 @@ pub async fn run_admin_command_inner(opts: AdminOpts) -> Result<(), PlaneClientE
_ => panic!("Unexpected response"),
}

let message = format!("Dummy message from {}", proxy_name.to_string());
let message = format!("Dummy message from {}", proxy_name);

conn.send(MessageFromProxy::CertManagerRequest(
CertManagerRequest::SetTxtRecord {
txt_value: message.clone(),
},
))
.await
.unwrap();
.await?;

let response = conn.recv().await.unwrap();
let response = conn.recv().await.expect("Failed to receive response");

match response {
MessageToProxy::CertManagerResponse(
Expand Down
12 changes: 12 additions & 0 deletions plane/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,18 @@ pub enum PlaneClientError {

#[error("API error: {0} ({1})")]
PlaneError(ApiError, StatusCode),

#[error("Failed to connect.")]
ConnectFailed(&'static str),

#[error("Bad configuration.")]
BadConfiguration(&'static str),

#[error("WebSocket error: {0}")]
Tungstenite(#[from] tokio_tungstenite::tungstenite::Error),

#[error("Send error")]
SendFailed,
}

#[derive(Clone)]
Expand Down
27 changes: 15 additions & 12 deletions plane/src/typed_socket/client.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use super::{ChannelMessage, Handshake, SocketAction, TypedSocket};
use crate::client::controller_address::AuthorizedAddress;
use crate::client::PlaneClientError;
use crate::names::NodeName;
use crate::{plane_version_info, util::ExponentialBackoff};
use anyhow::{anyhow, Result};
use futures_util::{SinkExt, StreamExt};
use std::marker::PhantomData;
use tokio::net::TcpStream;
Expand Down Expand Up @@ -47,7 +47,7 @@ impl<T: ChannelMessage> TypedSocketConnector<T> {
}
}

pub async fn connect<N: NodeName>(&self, name: &N) -> Result<TypedSocket<T>> {
pub async fn connect<N: NodeName>(&self, name: &N) -> Result<TypedSocket<T>, PlaneClientError> {
let handshake = Handshake {
name: name.to_string(),
version: plane_version_info(),
Expand All @@ -60,15 +60,16 @@ impl<T: ChannelMessage> TypedSocketConnector<T> {
.send(Message::Text(serde_json::to_string(&handshake)?))
.await?;

let msg = socket
.next()
.await
.ok_or_else(|| anyhow!("Socket closed before handshake received."))??;
let msg = socket.next().await.ok_or(PlaneClientError::ConnectFailed(
"Socket closed before handshake received.",
))??;
let msg = match msg {
Message::Text(msg) => msg,
msg => {
tracing::error!("Unexpected handshake message: {:?}", msg);
return Err(anyhow::anyhow!("Handshake message was not text."));
return Err(PlaneClientError::ConnectFailed(
"Handshake message was not text.",
));
}
};

Expand All @@ -87,15 +88,17 @@ impl<T: ChannelMessage> TypedSocketConnector<T> {
}

/// Creates a WebSocket request from an AuthorizedAddress.
fn auth_url_to_request(addr: &AuthorizedAddress) -> Result<hyper::Request<()>> {
fn auth_url_to_request(addr: &AuthorizedAddress) -> Result<hyper::Request<()>, PlaneClientError> {
let mut request = hyper::Request::builder()
.method(hyper::Method::GET)
.uri(addr.url.as_str())
.header(
"Host",
addr.url
.host_str()
.ok_or_else(|| anyhow!("No host in URL."))?
.ok_or(PlaneClientError::BadConfiguration(
"URL does not have a hostname.",
))?
.to_string(),
)
.header("Connection", "Upgrade")
Expand All @@ -106,17 +109,17 @@ fn auth_url_to_request(addr: &AuthorizedAddress) -> Result<hyper::Request<()>> {
if let Some(bearer_header) = addr.bearer_header() {
request = request.header(
hyper::header::AUTHORIZATION,
hyper::header::HeaderValue::from_str(&bearer_header)?,
hyper::header::HeaderValue::from_str(&bearer_header).expect("Bearer header is valid"),
);
}

Ok(request.body(())?)
Ok(request.body(()).expect("Request is valid"))
}

async fn new_client<T: ChannelMessage>(
mut socket: Socket,
remote_handshake: Handshake,
) -> Result<TypedSocket<T>> {
) -> Result<TypedSocket<T>, PlaneClientError> {
let (send_to_client, recv_to_client) = tokio::sync::mpsc::channel::<T::Reply>(100);
let (send_from_client, mut recv_from_client) =
tokio::sync::mpsc::channel::<SocketAction<T>>(100);
Expand Down
8 changes: 6 additions & 2 deletions plane/src/typed_socket/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::client::PlaneClientError;
use crate::PlaneVersionInfo;
use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -66,8 +67,11 @@ impl<A: Debug> TypedSocketSender<A> {
}

impl<T: ChannelMessage> TypedSocket<T> {
pub async fn send(&mut self, message: T) -> anyhow::Result<()> {
self.send.send(SocketAction::Send(message)).await?;
pub async fn send(&mut self, message: T) -> Result<(), PlaneClientError> {
self.send
.send(SocketAction::Send(message))
.await
.map_err(|_| PlaneClientError::SendFailed)?;
Ok(())
}

Expand Down

0 comments on commit 545a70f

Please sign in to comment.