Skip to content

Commit

Permalink
Fix tests by retaining process shutdown signal
Browse files Browse the repository at this point in the history
  • Loading branch information
olix0r committed Nov 18, 2020
1 parent 2699866 commit 056c177
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 38 deletions.
5 changes: 2 additions & 3 deletions linkerd/app/core/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,9 +191,8 @@ impl<RspB: Default + hyper::body::HttpBody> respond::Respond<http::Response<RspB
// Gracefully teardown the serverside connection.
if should_teardown_connection(&*error) {
if let Some(c) = self.close.as_ref() {
//debug!("Closing serverside connection");
//c.close();
let _ = c;
debug!("Closing serverside connection");
c.close();
}
}

Expand Down
90 changes: 55 additions & 35 deletions linkerd/app/outbound/src/http/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,33 +9,36 @@ use linkerd2_app_core::{
proxy::{identity::Name, tap},
svc::{self, NewService},
transport::{io, listen},
Addr,
Addr, Error,
};
use std::{net::SocketAddr, str::FromStr, time::Duration};
use tokio::time;
use tower::ServiceExt;
use tower::{Service, ServiceExt};

fn build_server<I>(
cfg: Config,
profiles: resolver::Profiles<SocketAddr>,
resolver: resolver::Dst<Addr, resolver::Metadata>,
connect: Connect<Endpoint>,
) -> impl svc::NewService<
listen::Addrs,
Service = impl tower::Service<
I,
Response = (),
Error = impl Into<linkerd2_app_core::Error>,
Future = impl Send + 'static,
> + Send
+ 'static,
> + Send
+ 'static
) -> (
impl svc::NewService<
listen::Addrs,
Service = impl tower::Service<
I,
Response = (),
Error = impl Into<linkerd2_app_core::Error>,
Future = impl Send + 'static,
> + Send
+ 'static,
> + Send
+ 'static,
drain::Signal,
)
where
I: io::AsyncRead + io::AsyncWrite + io::PeerAddr + std::fmt::Debug + Unpin + Send + 'static,
{
let (metrics, _) = metrics::Metrics::new(Duration::from_secs(10));
let (_, drain) = drain::channel();
let (drain_tx, drain) = drain::channel();

let (_, tap, _) = tap::new();
let router = super::logical::stack(
Expand All @@ -50,7 +53,7 @@ where
resolver.clone(),
metrics.outbound.clone(),
);
crate::server::stack(
let svc = crate::server::stack(
&cfg,
profiles,
resolver,
Expand All @@ -59,7 +62,8 @@ where
metrics.outbound,
None,
drain,
)
);
(svc, drain_tx)
}

#[tokio::test(core_threads = 1)]
Expand All @@ -69,6 +73,11 @@ async fn profile_endpoint_propagates_conn_errors() {
let _trace = support::trace_init();

let ep1 = SocketAddr::new([10, 0, 0, 41].into(), 5550);
let addrs = listen::Addrs::new(
([127, 0, 0, 1], 4140).into(),
([127, 0, 0, 1], 666).into(),
Some(ep1),
);

let cfg = default_config(ep1);
let id_name = Name::from_str("foo.ns1.serviceaccount.identity.linkerd.cluster.local")
Expand Down Expand Up @@ -102,40 +111,51 @@ async fn profile_endpoint_propagates_conn_errors() {
let resolver = support::resolver::<Addr, support::resolver::Metadata>();

// Build the outbound server
let mut server = build_server(cfg, profiles, resolver, connect);

let svc = server.new_service(listen::Addrs::new(
([127, 0, 0, 1], 4140).into(),
([127, 0, 0, 1], 666).into(),
Some(ep1),
));
let (mut s, shutdown) = build_server(cfg, profiles, resolver, connect);
let server = s.new_service(addrs);

let (client_io, proxy_io) = support::io::duplex(4096);
tokio::spawn(svc.oneshot(proxy_io));
let (client_io, server_io) = support::io::duplex(4096);
tokio::spawn(async move {
let res = server.oneshot(server_io).err_into::<Error>().await;
tracing::info!(?res, "Server complete");
res
});
let (mut client, conn) = hyper::client::conn::Builder::new()
.handshake(client_io)
.await
.expect("Client must connect");
let client_task = tokio::spawn(conn);
let mut client_task = tokio::spawn(async move {
let res = conn.await;
tracing::info!(?res, "Client connection complete");
res
});

let rsp = client
.send_request(
.ready_and()
.await
.expect("Client must not fail")
.call(
hyper::Request::builder()
.header("Host", "foo.ns1.service.cluster.local")
.body(hyper::Body::default())
.unwrap(),
)
.await
.expect("Request must succeed");
tracing::info!(?rsp);
assert_eq!(rsp.status(), http::StatusCode::BAD_GATEWAY);

const TIMEOUT: time::Duration = time::Duration::from_secs(10);
match time::timeout(TIMEOUT, client_task)
.await
.expect("Must not not timeout")
.expect("Task completes")
{
Ok(()) => {}
Err(e) => panic!("Client failed: {}", e),
tokio::select! {
_ = time::delay_for(time::Duration::from_secs(10)) => {
panic!("timeout");
}
res = &mut client_task => {
tracing::info!(?res, "Client task completed");
res.expect("Client task must not fail").expect("Client must close gracefully");
}
}

drop(client_task);
drop(client);
drop(shutdown);
}

0 comments on commit 056c177

Please sign in to comment.