diff --git a/Cargo.lock b/Cargo.lock index 695a4ba5c..276905000 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1624,7 +1624,7 @@ checksum = "69d3587f8a9e599cc7ec2c00e331f71c4e69a5f9a4b8a6efd5b07466b9736f9a" [[package]] name = "plane" -version = "0.4.0" +version = "0.4.1" dependencies = [ "acme2-eab", "anyhow", diff --git a/dev/controller.sh b/dev/controller.sh index e2fc65f70..fc67d2814 100755 --- a/dev/controller.sh +++ b/dev/controller.sh @@ -1,3 +1,3 @@ #!/bin/sh -cargo run -- controller --db postgres://postgres@localhost "$@" +cargo run -- controller --db postgres://postgres@localhost "$@" --default-cluster localhost:9090 diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index f9bf498a7..a81bdcb42 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -7,7 +7,7 @@ services: dockerfile: docker/Dockerfile context: ../ command: - "controller --db postgres://postgres@postgres --host 0.0.0.0" + "controller --db postgres://postgres@postgres --host 0.0.0.0 --default-cluster=localhost:9090" networks: - plane-dev ports: diff --git a/docker/quickstart/supervisord.conf b/docker/quickstart/supervisord.conf index 495da8b8d..d46d2fec3 100644 --- a/docker/quickstart/supervisord.conf +++ b/docker/quickstart/supervisord.conf @@ -13,7 +13,7 @@ environment=POSTGRES_HOST_AUTH_METHOD=trust user=postgres [program:plane-controller] -command=/bin/plane controller --db postgres://postgres@127.0.0.1 --host 0.0.0.0 --controller-url http://localhost:8080 +command=/bin/plane controller --db postgres://postgres@127.0.0.1 --host 0.0.0.0 --controller-url http://localhost:8080 --default-cluster localhost:9090 autostart=true autorestart=true stderr_logfile=/var/log/plane-controller-stderr.log diff --git a/docs/pages/quickstart-guide.mdx b/docs/pages/quickstart-guide.mdx index 3deadff1b..e69de29bb 100644 --- a/docs/pages/quickstart-guide.mdx +++ b/docs/pages/quickstart-guide.mdx @@ -1,108 +0,0 @@ -import { Callout } from 'nextra/components' - -# Quickstart Guide - -The easiest way to try out Plane is to use Docker Compose. These instructions are for a Linux or Mac environment -with Docker installed. - -## Quickstart Steps - -### 1. Clone Plane’s [git repo](https://github.com/drifting-in-space/plane) - -```bash -git clone https://github.com/drifting-in-space/plane.git -cd plane -``` - -### 2. Start Plane - -```bash -docker compose -f docker/docker-compose.yml up -``` - -This will run: -- An instance of Postgres -- A Plane controller -- A Plane “drone” -- A Plane proxy - -See the [architecture overview](concepts/architecture.mdx) for background on each Plane component. - -### 3. Connect to a backend - -```bash -docker/cli.sh \ - connect \ - --cluster 'localhost:9090' \ - --key 'my-first-backend' \ - --image 'ghcr.io/drifting-in-space/demo-image-drop-four' \ - --wait -``` - -You can think of Plane as a big key-value store that associates “keys” (arbitrary strings) with running processes -([session backends](concepts/session-backends.mdx)). - -When you issue a “connect” request, Plane will return a URL that routes to the process associated with the key you provide. -If no process is running, Plane will start one (provided that you supply an image name in the connect request). - -Here’s a breakdown of the command above: - -- `docker/cli.sh` is a shell script that runs the Plane CLI in a Docker container, pre-configuring it to point to - a Plane instance started in the Docker Compose file. -- `connect` is the Plane CLI subcommand for issuing a “connect request”, which will return a URL that routes to - a backend process. -- `--cluster 'localhost:9090'` tells the CLI to start the backend on the `localhost:9090` cluster. Since the cluster name - includes a port (`:9090`), Plane will treat it as a “development” cluster and not enable HTTPS on it. See [clusters](concepts/clusters.mdx) - for more details. -- `--key 'my-first-backend'` tells the CLI to associate the backend with the key `my-first-backend`. The first time we run - this command, no backend with the key `my-first-backend` will exist, so Plane will start a new backend process. If we - subsequently run this command again while that backend process is still running, Plane will return a URL that routes - to that backend process. -- `--image '...'` tells the CLI that if it needs to start a new backend process, - it should use the given Docker image. In this case, the image serves a simple turn-based multiplayer game. -- `--wait` tells the CLI to display the backend’s status as it starts up, and to wait until the backend is `Ready` before - returning. - - - At least one of `--key` or `--image` must be provided, but it is not neccesary to include both. - - If you **only** want to connect to an existing backend, you can provide `--key` only. If a - backend with the given key is not running, Plane will return an error. - - If you **don’t** want to connect to an existing backend process, you can provide `--image` only. - Plane will use a unique key to start a new backend, and return a URL that routes to it. - - -When you run the command above, you should see output like this: - -``` -Created backend: ba-xt8nmtlgti18qx -URL: http://localhost:9090/tYVHfS4PKgufdhwGCnn6LLfAaCo_iAHitbw4Bg8ETjA/ -Status URL: http://0.0.0.0:8080/pub/c/localhost:9090/b/ba-xt8nmtlgti18qx/status -``` - -The first line tells you that Plane has created a backend with the key `ba-xt8nmtlgti18qx`. The second line tells you -a URL that routes to that backend. The third line tells you a URL that you can use to check the status of the backend -as JSON. - -Below that output, it will show a running list of states and the time they were entered. The first -time you run a particular image, it may spend some time in the `Loading` state, because it will download the container -image if it does not have it cached. - -Once the image is in the `Ready` state, you can open the URL provided in your browser to open the game. - -## Docker Compose Configuration - -The Docker Compose file defines four services: - -| Container | Image | Port | -|------------------|----------------------|------| -| plane-postgres | `postgres:16` | 5432 | -| plane-controller | `plane/plane:latest` | 8080 | -| plane-drone | `plane/plane:latest` | | -| plane-proxy | `plane/plane:latest` | 9090 | - -Note that the name of the cluster (`localhost:9090`) refers to the port of the proxy, not the controller. -This is because the name of the cluster refers to the address that end-user clients will use to access the -backends, and clients access backends through the cluster. See [the documentation on clusters](concepts/clusters.mdx) -for more information. diff --git a/plane/.sqlx/query-bb4d67150e3808a0b443ad0e1fd66b3b5f713518a772007b6caa85a86d12acf6.json b/plane/.sqlx/query-1c405b09216fa50560aebba1f44dd83b2d541f09303c3e1175596983abdf0d45.json similarity index 73% rename from plane/.sqlx/query-bb4d67150e3808a0b443ad0e1fd66b3b5f713518a772007b6caa85a86d12acf6.json rename to plane/.sqlx/query-1c405b09216fa50560aebba1f44dd83b2d541f09303c3e1175596983abdf0d45.json index 4427dcf29..b431b0210 100644 --- a/plane/.sqlx/query-bb4d67150e3808a0b443ad0e1fd66b3b5f713518a772007b6caa85a86d12acf6.json +++ b/plane/.sqlx/query-1c405b09216fa50560aebba1f44dd83b2d541f09303c3e1175596983abdf0d45.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "\n select\n backend_key.id as id,\n backend_key.tag as tag,\n backend_key.expires_at as expires_at,\n backend_key.fencing_token as token,\n backend_key.key_name as name,\n backend.last_status as status,\n now() as \"as_of!\"\n from backend_key\n left join backend on backend_key.id = backend.id\n where backend_key.key_name = $1\n and backend_key.namespace = $2\n ", + "query": "\n select\n backend_key.id as id,\n backend_key.tag as tag,\n backend_key.expires_at as expires_at,\n backend_key.fencing_token as token,\n backend_key.key_name as name,\n backend.last_status as status,\n backend.cluster as cluster,\n now() as \"as_of!\"\n from backend_key\n left join backend on backend_key.id = backend.id\n where backend_key.key_name = $1\n and backend_key.namespace = $2\n ", "describe": { "columns": [ { @@ -35,6 +35,11 @@ }, { "ordinal": 6, + "name": "cluster", + "type_info": "Varchar" + }, + { + "ordinal": 7, "name": "as_of!", "type_info": "Timestamptz" } @@ -52,8 +57,9 @@ false, false, false, + false, null ] }, - "hash": "bb4d67150e3808a0b443ad0e1fd66b3b5f713518a772007b6caa85a86d12acf6" + "hash": "1c405b09216fa50560aebba1f44dd83b2d541f09303c3e1175596983abdf0d45" } diff --git a/plane/.sqlx/query-ef75301e191cc12843a0b58ec445eef615345f0d3266767cb8e00b29a8413ecc.json b/plane/.sqlx/query-a42e8f2278997853dbef5b514de8c7404084d3fe383d5cfe4016f63e0705fa3e.json similarity index 89% rename from plane/.sqlx/query-ef75301e191cc12843a0b58ec445eef615345f0d3266767cb8e00b29a8413ecc.json rename to plane/.sqlx/query-a42e8f2278997853dbef5b514de8c7404084d3fe383d5cfe4016f63e0705fa3e.json index d6ebef1dd..937f959ab 100644 --- a/plane/.sqlx/query-ef75301e191cc12843a0b58ec445eef615345f0d3266767cb8e00b29a8413ecc.json +++ b/plane/.sqlx/query-a42e8f2278997853dbef5b514de8c7404084d3fe383d5cfe4016f63e0705fa3e.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "\n select\n id,\n cluster,\n last_status,\n last_status_time,\n drone_id,\n expiration_time,\n allowed_idle_seconds,\n last_keepalive,\n now() as \"as_of!\"\n from backend\n where id = $1\n and cluster = $2\n ", + "query": "\n select\n id,\n cluster,\n last_status,\n last_status_time,\n drone_id,\n expiration_time,\n allowed_idle_seconds,\n last_keepalive,\n now() as \"as_of!\"\n from backend\n where id = $1\n ", "describe": { "columns": [ { @@ -51,7 +51,6 @@ ], "parameters": { "Left": [ - "Text", "Text" ] }, @@ -67,5 +66,5 @@ null ] }, - "hash": "ef75301e191cc12843a0b58ec445eef615345f0d3266767cb8e00b29a8413ecc" + "hash": "a42e8f2278997853dbef5b514de8c7404084d3fe383d5cfe4016f63e0705fa3e" } diff --git a/plane/Cargo.toml b/plane/Cargo.toml index c6fd1215c..ccf5097b0 100644 --- a/plane/Cargo.toml +++ b/plane/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "plane" -version = "0.4.0" +version = "0.4.1" edition = "2021" default-run = "plane" description = "Session backend orchestrator for ambitious browser-based apps." diff --git a/plane/plane-tests/tests/backend_actions.rs b/plane/plane-tests/tests/backend_actions.rs index 5bcdfd8df..e8e5d52f7 100644 --- a/plane/plane-tests/tests/backend_actions.rs +++ b/plane/plane-tests/tests/backend_actions.rs @@ -6,7 +6,7 @@ use plane::{ database::backend::BackendActionMessage, names::{DroneName, Name}, protocol::{BackendAction, Heartbeat, MessageFromDrone, MessageToDrone}, - types::{ConnectRequest, ConnectResponse, ExecutorConfig, SpawnConfig}, + types::{ClusterName, ConnectRequest, ConnectResponse, ExecutorConfig, SpawnConfig}, }; use plane_test_macro::plane_test; use std::time::Duration; @@ -14,9 +14,10 @@ use std::time::Duration; mod common; /// Return a dummy connect request, which does not use a key. -fn connect_request() -> ConnectRequest { +fn connect_request(cluster: &ClusterName) -> ConnectRequest { ConnectRequest { spawn_config: Some(SpawnConfig { + cluster: Some(cluster.clone()), executable: ExecutorConfig::from_image_with_defaults("alpine"), lifetime_limit_seconds: None, max_idle_seconds: None, @@ -31,12 +32,9 @@ async fn no_drone_available(env: TestEnvironment) { let controller = env.controller().await; let client = controller.client(); - let connect_request = connect_request(); + let connect_request = connect_request(&env.cluster); - let result = client - .connect(&env.cluster, &connect_request) - .await - .unwrap_err(); + let result = client.connect(&connect_request).await.unwrap_err(); assert!(matches!( result, @@ -71,11 +69,8 @@ async fn backend_action_resent_if_not_acked(env: TestEnvironment) { tracing::info!("Issuing the connect request."); let client = controller.client(); - let connect_request = connect_request(); - let result = client - .connect(&env.cluster, &connect_request) - .await - .unwrap(); + let connect_request = connect_request(&env.cluster); + let result = client.connect(&connect_request).await.unwrap(); let ConnectResponse { spawned: true, @@ -149,7 +144,7 @@ async fn backend_action_resent_if_not_acked(env: TestEnvironment) { controller .client() - .soft_terminate(&env.cluster, &backend_id) + .soft_terminate(&backend_id) .await .unwrap(); diff --git a/plane/plane-tests/tests/backend_lifecycle.rs b/plane/plane-tests/tests/backend_lifecycle.rs index 35e18c397..14d70eec0 100644 --- a/plane/plane-tests/tests/backend_lifecycle.rs +++ b/plane/plane-tests/tests/backend_lifecycle.rs @@ -25,6 +25,7 @@ async fn backend_lifecycle(env: TestEnvironment) { tracing::info!("Requesting backend."); let connect_request = ConnectRequest { spawn_config: Some(SpawnConfig { + cluster: Some(env.cluster.clone()), executable: ExecutorConfig { image: "ghcr.io/drifting-in-space/demo-image-drop-four".to_string(), pull_policy: Some(PullPolicy::IfNotPresent), @@ -39,10 +40,7 @@ async fn backend_lifecycle(env: TestEnvironment) { user: None, auth: Map::default(), }; - let response = client - .connect(&env.cluster, &connect_request) - .await - .unwrap(); + let response = client.connect(&connect_request).await.unwrap(); tracing::info!("Got response."); assert!(response.spawned); @@ -51,7 +49,7 @@ async fn backend_lifecycle(env: TestEnvironment) { tracing::info!("Streaming status."); let mut backend_status_stream = client - .backend_status_stream(&env.cluster, &backend_id) + .backend_status_stream(&backend_id) .with_timeout(10) .await .unwrap() @@ -119,7 +117,7 @@ async fn backend_lifecycle(env: TestEnvironment) { // Test non-streaming status endpoint. let status = client - .backend_status(&env.cluster, &response.backend_id) + .backend_status(&response.backend_id) .with_timeout(10) .await .unwrap() @@ -156,7 +154,7 @@ async fn backend_lifecycle(env: TestEnvironment) { let initial_keepalive = { let backend = db .backend() - .backend(&env.cluster, &response.backend_id) + .backend(&response.backend_id) .with_timeout(10) .await .unwrap() @@ -178,7 +176,7 @@ async fn backend_lifecycle(env: TestEnvironment) { { let backend = db .backend() - .backend(&env.cluster, &response.backend_id) + .backend(&response.backend_id) .with_timeout(10) .await .unwrap() @@ -190,7 +188,7 @@ async fn backend_lifecycle(env: TestEnvironment) { tracing::info!("Terminating backend."); client - .soft_terminate(&env.cluster, &response.backend_id) + .soft_terminate(&response.backend_id) .with_timeout(10) .await .unwrap() diff --git a/plane/plane-tests/tests/backend_status_in_response.rs b/plane/plane-tests/tests/backend_status_in_response.rs index 6211efc03..8a290614a 100644 --- a/plane/plane-tests/tests/backend_status_in_response.rs +++ b/plane/plane-tests/tests/backend_status_in_response.rs @@ -22,6 +22,7 @@ async fn backend_status_in_response(env: TestEnvironment) { tracing::info!("Requesting backend."); let connect_request = ConnectRequest { spawn_config: Some(SpawnConfig { + cluster: Some(env.cluster.clone()), executable: ExecutorConfig { image: "ghcr.io/drifting-in-space/demo-image-drop-four".to_string(), pull_policy: Some(PullPolicy::IfNotPresent), @@ -41,10 +42,7 @@ async fn backend_status_in_response(env: TestEnvironment) { auth: Map::default(), }; - let response = client - .connect(&env.cluster, &connect_request) - .await - .unwrap(); + let response = client.connect(&connect_request).await.unwrap(); tracing::info!("Got response."); assert!(response.spawned); @@ -53,7 +51,7 @@ async fn backend_status_in_response(env: TestEnvironment) { let backend_id = response.backend_id.clone(); let mut backend_status_stream = client - .backend_status_stream(&env.cluster, &backend_id) + .backend_status_stream(&backend_id) .with_timeout(10) .await .unwrap() @@ -71,10 +69,7 @@ async fn backend_status_in_response(env: TestEnvironment) { tracing::info!(status=?message, "Got status"); } - let response2 = client - .connect(&env.cluster, &connect_request) - .await - .unwrap(); + let response2 = client.connect(&connect_request).await.unwrap(); assert!(!response2.spawned); assert_eq!(response2.backend_id, backend_id); diff --git a/plane/plane-tests/tests/common/resources/database.rs b/plane/plane-tests/tests/common/resources/database.rs index 5049ccf81..7564c3ede 100644 --- a/plane/plane-tests/tests/common/resources/database.rs +++ b/plane/plane-tests/tests/common/resources/database.rs @@ -91,7 +91,7 @@ impl DevDatabase { let port = container.get_port(5432).await?; let connection_string = Self::get_connection_string(port); - println!("Connection_string: {}", connection_string); + tracing::info!("Connection string: {}", connection_string); let db = attempt_to_connect(&connection_string).await?; diff --git a/plane/plane-tests/tests/common/test_env.rs b/plane/plane-tests/tests/common/test_env.rs index d6b42f63e..f983cff91 100644 --- a/plane/plane-tests/tests/common/test_env.rs +++ b/plane/plane-tests/tests/common/test_env.rs @@ -95,6 +95,7 @@ impl TestEnvironment { listener, ControllerName::new_random(), url, + None, ) .await .expect("Unable to construct controller."); diff --git a/plane/plane-tests/tests/reuse_key.rs b/plane/plane-tests/tests/reuse_key.rs index 3ca004c5d..6d8120c82 100644 --- a/plane/plane-tests/tests/reuse_key.rs +++ b/plane/plane-tests/tests/reuse_key.rs @@ -22,6 +22,7 @@ async fn reuse_key(env: TestEnvironment) { tracing::info!("Requesting backend."); let connect_request = ConnectRequest { spawn_config: Some(SpawnConfig { + cluster: Some(env.cluster.clone()), executable: ExecutorConfig { image: "ghcr.io/drifting-in-space/demo-image-drop-four".to_string(), pull_policy: Some(PullPolicy::IfNotPresent), @@ -41,10 +42,7 @@ async fn reuse_key(env: TestEnvironment) { auth: Map::default(), }; - let response = client - .connect(&env.cluster, &connect_request) - .await - .unwrap(); + let response = client.connect(&connect_request).await.unwrap(); tracing::info!("Got response."); assert!(response.spawned); @@ -52,16 +50,13 @@ async fn reuse_key(env: TestEnvironment) { let backend_id = response.backend_id.clone(); let mut backend_status_stream = client - .backend_status_stream(&env.cluster, &backend_id) + .backend_status_stream(&backend_id) .with_timeout(10) .await .unwrap() .unwrap(); - let response2 = client - .connect(&env.cluster, &connect_request) - .await - .unwrap(); + let response2 = client.connect(&connect_request).await.unwrap(); assert!(!response2.spawned); assert_eq!(response2.backend_id, backend_id); diff --git a/plane/src/admin.rs b/plane/src/admin.rs index b3cf34e8c..24d0b6514 100644 --- a/plane/src/admin.rs +++ b/plane/src/admin.rs @@ -88,7 +88,7 @@ pub struct AdminOpts { enum AdminCommand { Connect { #[clap(long)] - cluster: ClusterName, + cluster: Option, #[clap(long)] image: String, @@ -100,9 +100,6 @@ enum AdminCommand { wait: bool, }, Terminate { - #[clap(long)] - cluster: ClusterName, - #[clap(long)] backend: BackendName, @@ -145,6 +142,7 @@ pub async fn run_admin_command_inner(opts: AdminOpts) -> Result<(), PlaneClientE } => { let executor_config = ExecutorConfig::from_image_with_defaults(image); let spawn_config = SpawnConfig { + cluster: cluster.clone(), executable: executor_config.clone(), lifetime_limit_seconds: None, max_idle_seconds: Some(500), @@ -159,7 +157,7 @@ pub async fn run_admin_command_inner(opts: AdminOpts) -> Result<(), PlaneClientE ..Default::default() }; - let response = client.connect(&cluster, &spawn_request).await?; + let response = client.connect(&spawn_request).await?; println!( "Created backend: {}", @@ -170,9 +168,7 @@ pub async fn run_admin_command_inner(opts: AdminOpts) -> Result<(), PlaneClientE println!("Status URL: {}", response.status_url.bright_white()); if wait { - let mut stream = client - .backend_status_stream(&cluster, &response.backend_id) - .await?; + let mut stream = client.backend_status_stream(&response.backend_id).await?; while let Some(status) = stream.next().await { println!( @@ -189,14 +185,13 @@ pub async fn run_admin_command_inner(opts: AdminOpts) -> Result<(), PlaneClientE } AdminCommand::Terminate { backend, - cluster, hard, wait, } => { if hard { - client.hard_terminate(&cluster, &backend).await? + client.hard_terminate(&backend).await? } else { - client.soft_terminate(&cluster, &backend).await? + client.soft_terminate(&backend).await? }; println!( @@ -205,7 +200,7 @@ pub async fn run_admin_command_inner(opts: AdminOpts) -> Result<(), PlaneClientE ); if wait { - let mut stream = client.backend_status_stream(&cluster, &backend).await?; + let mut stream = client.backend_status_stream(&backend).await?; while let Some(status) = stream.next().await { println!( diff --git a/plane/src/client/mod.rs b/plane/src/client/mod.rs index 6f38c2410..16d424d99 100644 --- a/plane/src/client/mod.rs +++ b/plane/src/client/mod.rs @@ -97,12 +97,9 @@ impl PlaneClient { pub async fn connect( &self, - cluster: &ClusterName, connect_request: &ConnectRequest, ) -> Result { - let addr = self - .controller_address - .join(&format!("/ctrl/c/{}/connect", cluster)); + let addr = self.controller_address.join("/ctrl/connect"); let response = authed_post(&self.client, &addr, connect_request).await?; Ok(response) @@ -121,71 +118,52 @@ impl PlaneClient { Ok(()) } - pub async fn soft_terminate( - &self, - cluster: &ClusterName, - backend_id: &BackendName, - ) -> Result<(), PlaneClientError> { - let addr = self.controller_address.join(&format!( - "/ctrl/c/{}/b/{}/soft-terminate", - cluster, backend_id - )); + pub async fn soft_terminate(&self, backend_id: &BackendName) -> Result<(), PlaneClientError> { + let addr = self + .controller_address + .join(&format!("/ctrl/b/{}/soft-terminate", backend_id)); authed_post(&self.client, &addr, &()).await?; Ok(()) } - pub async fn hard_terminate( - &self, - cluster: &ClusterName, - backend_id: &BackendName, - ) -> Result<(), PlaneClientError> { - let addr = self.controller_address.join(&format!( - "/ctrl/c/{}/b/{}/hard-terminate", - cluster, backend_id - )); + pub async fn hard_terminate(&self, backend_id: &BackendName) -> Result<(), PlaneClientError> { + let addr = self + .controller_address + .join(&format!("/ctrl/b/{}/hard-terminate", backend_id)); authed_post(&self.client, &addr, &()).await?; Ok(()) } - pub fn backend_status_url(&self, cluster: &ClusterName, backend_id: &BackendName) -> Url { + pub fn backend_status_url(&self, backend_id: &BackendName) -> Url { self.controller_address - .join(&format!("/pub/c/{}/b/{}/status", cluster, backend_id)) + .join(&format!("/pub/b/{}/status", backend_id)) .url } pub async fn backend_status( &self, - cluster: &ClusterName, backend_id: &BackendName, ) -> Result { - let url = self.backend_status_url(cluster, backend_id); + let url = self.backend_status_url(backend_id); let response = self.client.get(url).send().await?; let status: TimestampedBackendStatus = get_response(response).await?; Ok(status) } - pub fn backend_status_stream_url( - &self, - cluster: &ClusterName, - backend_id: &BackendName, - ) -> Url { + pub fn backend_status_stream_url(&self, backend_id: &BackendName) -> Url { self.controller_address - .join(&format!( - "/pub/c/{}/b/{}/status-stream", - cluster, backend_id - )) + .join(&format!("/pub/b/{}/status-stream", backend_id)) .url } pub async fn backend_status_stream( &self, - cluster: &ClusterName, backend_id: &BackendName, ) -> Result, PlaneClientError> { - let url = self.backend_status_stream_url(cluster, backend_id); + let url = self.backend_status_stream_url(backend_id); let stream = sse::sse_request(url, self.client.clone()).await?; Ok(stream) diff --git a/plane/src/controller/backend_status.rs b/plane/src/controller/backend_status.rs index 1269f036b..eb4c81c56 100644 --- a/plane/src/controller/backend_status.rs +++ b/plane/src/controller/backend_status.rs @@ -1,7 +1,7 @@ use super::{core::Controller, error::IntoApiError}; use crate::{ names::BackendName, - types::{BackendStatus, ClusterName, TimestampedBackendStatus}, + types::{BackendStatus, TimestampedBackendStatus}, }; use axum::{ extract::{Path, State}, @@ -17,13 +17,12 @@ use std::convert::Infallible; async fn backend_status( controller: &Controller, - cluster: &ClusterName, backend_id: &BackendName, ) -> Result { let backend = controller .db .backend() - .backend(cluster, backend_id) + .backend(backend_id) .await .or_internal_error("Database error")? .or_not_found("Backend does not exist")?; @@ -37,15 +36,15 @@ async fn backend_status( } pub async fn handle_backend_status( - Path((cluster, backend_id)): Path<(ClusterName, BackendName)>, + Path(backend_id): Path, State(controller): State, ) -> Result, Response> { - let status = backend_status(&controller, &cluster, &backend_id).await?; + let status = backend_status(&controller, &backend_id).await?; Ok(Json(status)) } pub async fn handle_backend_status_stream( - Path((_cluster, backend_id)): Path<(ClusterName, BackendName)>, // TODO: check cluster id + Path(backend_id): Path, State(controller): State, headers: HeaderMap, ) -> Result>>, Response> { diff --git a/plane/src/controller/connect.rs b/plane/src/controller/connect.rs index aa02a1f18..cdf85d946 100644 --- a/plane/src/controller/connect.rs +++ b/plane/src/controller/connect.rs @@ -1,12 +1,8 @@ use super::error::err_to_response; use super::Controller; use crate::database::connect::ConnectError; -use crate::types::{ClusterName, ConnectRequest, ConnectResponse}; -use axum::{ - extract::{Path, State}, - response::Response, - Json, -}; +use crate::types::{ConnectRequest, ConnectResponse}; +use axum::{extract::State, response::Response, Json}; use reqwest::StatusCode; fn connect_error_to_response(connect_error: &ConnectError) -> Response { @@ -46,6 +42,11 @@ fn connect_error_to_response(connect_error: &ConnectError) -> Response { StatusCode::INTERNAL_SERVER_ERROR, "Internal error.", ), + ConnectError::NoClusterProvided => err_to_response( + connect_error, + StatusCode::BAD_REQUEST, + "No cluster provided, and no default cluster for this controller.", + ), ConnectError::Other(_) => err_to_response( connect_error, StatusCode::INTERNAL_SERVER_ERROR, @@ -55,13 +56,11 @@ fn connect_error_to_response(connect_error: &ConnectError) -> Response { } pub async fn handle_connect( - Path(cluster): Path, State(controller): State, Json(request): Json, ) -> Result, Response> { let response = controller - .db - .connect(&cluster, &request, &controller.client) + .connect(&request) .await .map_err(|e| connect_error_to_response(&e))?; Ok(Json(response)) diff --git a/plane/src/controller/core.rs b/plane/src/controller/core.rs index 90422b239..ede7139de 100644 --- a/plane/src/controller/core.rs +++ b/plane/src/controller/core.rs @@ -1,19 +1,19 @@ -use url::Url; - use crate::{ client::PlaneClient, - database::PlaneDatabase, + database::{connect::ConnectError, PlaneDatabase}, names::{AnyNodeName, ControllerName}, typed_socket::Handshake, - types::{ClusterName, NodeId}, + types::{ClusterName, ConnectRequest, ConnectResponse, NodeId}, }; use std::net::IpAddr; +use url::Url; #[derive(Clone)] pub struct Controller { pub db: PlaneDatabase, pub id: ControllerName, pub client: PlaneClient, + pub default_cluster: Option, } pub struct NodeHandle { @@ -61,9 +61,31 @@ impl Controller { }) } - pub async fn new(db: PlaneDatabase, id: ControllerName, controller_url: Url) -> Self { + pub async fn new( + db: PlaneDatabase, + id: ControllerName, + controller_url: Url, + default_cluster: Option, + ) -> Self { let client = PlaneClient::new(controller_url); - Self { db, id, client } + Self { + db, + id, + client, + default_cluster, + } + } + + pub async fn connect( + &self, + connect_request: &ConnectRequest, + ) -> Result { + let response = self + .db + .connect(self.default_cluster.as_ref(), connect_request, &self.client) + .await?; + + Ok(response) } } diff --git a/plane/src/controller/mod.rs b/plane/src/controller/mod.rs index 4b9497c4c..4dd24398a 100644 --- a/plane/src/controller/mod.rs +++ b/plane/src/controller/mod.rs @@ -11,6 +11,7 @@ use crate::{ heartbeat_consts::HEARTBEAT_INTERVAL, names::ControllerName, signals::wait_for_shutdown_signal, + types::ClusterName, PLANE_GIT_HASH, PLANE_VERSION, }; use anyhow::Result; @@ -114,10 +115,11 @@ impl ControllerServer { bind_addr: SocketAddr, id: ControllerName, controller_url: Url, + default_cluster: Option, ) -> Result { let listener = TcpListener::bind(bind_addr)?; - Self::run_with_listener(db, listener, id, controller_url).await + Self::run_with_listener(db, listener, id, controller_url, default_cluster).await } pub async fn run_with_listener( @@ -125,13 +127,15 @@ impl ControllerServer { listener: TcpListener, id: ControllerName, controller_url: Url, + default_cluster: Option, ) -> Result { let bind_addr = listener.local_addr()?; let (graceful_terminate_sender, graceful_terminate_receiver) = tokio::sync::oneshot::channel::<()>(); - let controller = Controller::new(db.clone(), id.clone(), controller_url).await; + let controller = + Controller::new(db.clone(), id.clone(), controller_url, default_cluster).await; let trace_layer = TraceLayer::new_for_http() .make_span_with(DefaultMakeSpan::new().level(Level::INFO)) @@ -150,14 +154,14 @@ impl ControllerServer { .route("/c/:cluster/drone-socket", get(handle_drone_socket)) .route("/c/:cluster/proxy-socket", get(handle_proxy_socket)) .route("/dns-socket", get(handle_dns_socket)) - .route("/c/:cluster/connect", post(handle_connect)) + .route("/connect", post(handle_connect)) .route("/c/:cluster/d/:drone/drain", post(handle_drain)) .route( - "/c/:cluster/b/:backend/soft-terminate", + "/b/:backend/soft-terminate", post(terminate::handle_soft_terminate), ) .route( - "/c/:cluster/b/:backend/hard-terminate", + "/b/:backend/hard-terminate", post(terminate::handle_hard_terminate), ); @@ -170,9 +174,9 @@ impl ControllerServer { // under the /pub/ top-level route to make it easier to expose only these routes, // using a reverse proxy configuration. let public_routes = Router::new() - .route("/c/:cluster/b/:backend/status", get(handle_backend_status)) + .route("/b/:backend/status", get(handle_backend_status)) .route( - "/c/:cluster/b/:backend/status-stream", + "/b/:backend/status-stream", get(handle_backend_status_stream), ) .layer(cors_public.clone()); @@ -248,8 +252,10 @@ pub async fn run_controller( bind_addr: SocketAddr, id: ControllerName, controller_url: Url, + default_cluster: Option, ) -> Result<()> { - let mut server = ControllerServer::run(db, bind_addr, id, controller_url).await?; + let mut server = + ControllerServer::run(db, bind_addr, id, controller_url, default_cluster).await?; wait_for_shutdown_signal().await; diff --git a/plane/src/controller/terminate.rs b/plane/src/controller/terminate.rs index b067f3825..0841488a1 100644 --- a/plane/src/controller/terminate.rs +++ b/plane/src/controller/terminate.rs @@ -1,9 +1,5 @@ use super::{core::Controller, error::IntoApiError}; -use crate::{ - names::BackendName, - protocol::BackendAction, - types::{ClusterName, TerminationKind}, -}; +use crate::{names::BackendName, protocol::BackendAction, types::TerminationKind}; use axum::{ extract::{Path, State}, response::Response, @@ -12,14 +8,13 @@ use axum::{ async fn terminate( controller: &Controller, - cluster: &ClusterName, backend_id: &BackendName, hard: bool, ) -> Result<(), Response> { let backend = controller .db .backend() - .backend(cluster, backend_id) + .backend(backend_id) .await .or_internal_error("Database error")? .or_not_found("Backend does not exist")?; @@ -44,17 +39,17 @@ async fn terminate( } pub async fn handle_soft_terminate( - Path((cluster, backend_id)): Path<(ClusterName, BackendName)>, + Path(backend_id): Path, State(controller): State, ) -> Result, Response> { - terminate(&controller, &cluster, &backend_id, false).await?; + terminate(&controller, &backend_id, false).await?; Ok(Json(())) } pub async fn handle_hard_terminate( - Path((cluster, backend_id)): Path<(ClusterName, BackendName)>, + Path(backend_id): Path, State(controller): State, ) -> Result, Response> { - terminate(&controller, &cluster, &backend_id, true).await?; + terminate(&controller, &backend_id, true).await?; Ok(Json(())) } diff --git a/plane/src/database/backend.rs b/plane/src/database/backend.rs index c8bcfbd40..9169d8bcd 100644 --- a/plane/src/database/backend.rs +++ b/plane/src/database/backend.rs @@ -2,9 +2,7 @@ use super::{subscribe::emit_with_key, util::MapSqlxError, PlaneDatabase}; use crate::{ names::{BackendActionName, BackendName}, protocol::{BackendAction, RouteInfo}, - types::{ - BackendStatus, BearerToken, ClusterName, NodeId, SecretToken, TimestampedBackendStatus, - }, + types::{BackendStatus, BearerToken, NodeId, SecretToken, TimestampedBackendStatus}, }; use chrono::{DateTime, Utc}; use futures_util::Stream; @@ -107,11 +105,7 @@ impl<'a> BackendDatabase<'a> { Ok(stream) } - pub async fn backend( - &self, - cluster: &ClusterName, - backend_id: &BackendName, - ) -> sqlx::Result> { + pub async fn backend(&self, backend_id: &BackendName) -> sqlx::Result> { let result = sqlx::query!( r#" select @@ -126,10 +120,8 @@ impl<'a> BackendDatabase<'a> { now() as "as_of!" from backend where id = $1 - and cluster = $2 "#, backend_id.to_string(), - cluster.to_string(), ) .fetch_optional(&self.db.pool) .await?; diff --git a/plane/src/database/backend_key.rs b/plane/src/database/backend_key.rs index fa97b0686..e045e842f 100644 --- a/plane/src/database/backend_key.rs +++ b/plane/src/database/backend_key.rs @@ -9,11 +9,11 @@ use crate::{ names::BackendName, - types::{BackendStatus, KeyConfig}, + types::{BackendStatus, ClusterName, KeyConfig}, }; use chrono::{DateTime, Utc}; use sqlx::{postgres::types::PgInterval, PgPool}; -use std::time::Duration; +use std::{str::FromStr, time::Duration}; pub const KEY_LEASE_RENEW_AFTER: Duration = Duration::from_secs(30); pub const KEY_LEASE_SOFT_TERMINATE_AFTER: Duration = Duration::from_secs(40); @@ -99,6 +99,7 @@ impl<'a> KeysDatabase<'a> { backend_key.fencing_token as token, backend_key.key_name as name, backend.last_status as status, + backend.cluster as cluster, now() as "as_of!" from backend_key left join backend on backend_key.id = backend.id @@ -118,6 +119,8 @@ impl<'a> KeysDatabase<'a> { token: lock_result.token, status: BackendStatus::try_from(lock_result.status) .map_err(|_| sqlx::Error::Decode("Invalid backend status.".into()))?, + cluster: ClusterName::from_str(&lock_result.cluster) + .map_err(|_| sqlx::Error::Decode("Invalid cluster name.".into()))?, key: lock_result.name, tag: lock_result.tag, expires_at: lock_result.expires_at, @@ -135,6 +138,7 @@ pub struct BackendKeyResult { pub token: i64, pub key: String, pub status: BackendStatus, + pub cluster: ClusterName, expires_at: DateTime, as_of: DateTime, } diff --git a/plane/src/database/connect.rs b/plane/src/database/connect.rs index 1ae8d1a51..ff8bfd5ef 100644 --- a/plane/src/database/connect.rs +++ b/plane/src/database/connect.rs @@ -54,6 +54,9 @@ pub enum ConnectError { #[error("Serialization error: {0}")] Serialization(#[from] serde_json::Error), + #[error("No cluster provided, and no default cluster for this controller.")] + NoClusterProvided, + #[error("Other internal error. {0}")] Other(String), } @@ -179,7 +182,7 @@ async fn create_token( async fn attempt_connect( pool: &PgPool, - cluster: &ClusterName, + default_cluster: Option<&ClusterName>, request: &ConnectRequest, client: &PlaneClient, ) -> Result { @@ -208,7 +211,7 @@ async fn attempt_connect( let connect_response = ConnectResponse::new( key_result.id, - cluster, + &key_result.cluster, false, key_result.status, token, @@ -240,6 +243,12 @@ async fn attempt_connect( return Err(ConnectError::KeyUnheldNoSpawnConfig); }; + let cluster = spawn_config + .cluster + .as_ref() + .or(default_cluster) + .ok_or(ConnectError::NoClusterProvided)?; + let drone = DroneDatabase::new(pool) .pick_drone_for_spawn(cluster) .await? @@ -275,13 +284,13 @@ async fn attempt_connect( pub async fn connect( pool: &PgPool, - cluster: &ClusterName, + default_cluster: Option<&ClusterName>, request: &ConnectRequest, client: &PlaneClient, ) -> Result { let mut attempt = 1; loop { - match attempt_connect(pool, cluster, request, client).await { + match attempt_connect(pool, default_cluster, request, client).await { Ok(response) => return Ok(response), Err(error) => { if !error.retryable() || attempt >= 3 { diff --git a/plane/src/database/mod.rs b/plane/src/database/mod.rs index a38c6d79c..e8b1d44e8 100644 --- a/plane/src/database/mod.rs +++ b/plane/src/database/mod.rs @@ -84,11 +84,11 @@ impl PlaneDatabase { pub async fn connect( &self, - cluster: &ClusterName, + default_cluster: Option<&ClusterName>, request: &ConnectRequest, client: &PlaneClient, ) -> Result { - connect::connect(&self.pool, cluster, request, client).await + connect::connect(&self.pool, default_cluster, request, client).await } fn subscription_manager(&self) -> &EventSubscriptionManager { diff --git a/plane/src/main.rs b/plane/src/main.rs index b76e9ff65..ed7436ba2 100644 --- a/plane/src/main.rs +++ b/plane/src/main.rs @@ -41,6 +41,9 @@ enum Command { #[clap(long)] controller_url: Option, + + #[clap(long)] + default_cluster: Option, }, Drone { #[clap(long)] @@ -125,6 +128,7 @@ async fn run(opts: Opts) -> Result<()> { port, db, controller_url, + default_cluster, } => { let name = ControllerName::new_random(); @@ -141,7 +145,7 @@ async fn run(opts: Opts) -> Result<()> { let addr = (host, port).into(); - run_controller(db, addr, name, controller_url).await? + run_controller(db, addr, name, controller_url, default_cluster).await? } Command::Migrate { db } => { let _ = connect_and_migrate(&db).await?; diff --git a/plane/src/types.rs b/plane/src/types.rs index fe8e36f89..ead43bb11 100644 --- a/plane/src/types.rs +++ b/plane/src/types.rs @@ -285,6 +285,9 @@ impl ExecutorConfig { #[derive(Clone, Serialize, Deserialize, Debug)] pub struct SpawnConfig { + /// Cluster to spawn to. Uses the controller default if not provided. + pub cluster: Option, + /// Config to use to spawn the backend process. pub executable: ExecutorConfig, @@ -409,7 +412,7 @@ impl ConnectResponse { format!("http://{}/{}/", cluster, token) }; - let status_url = client.backend_status_url(cluster, &backend_id).to_string(); + let status_url = client.backend_status_url(&backend_id).to_string(); Self { backend_id,