Skip to content

Commit

Permalink
Add Versioning + move Access to control plane for remote ext (#4760)
Browse files Browse the repository at this point in the history
  • Loading branch information
awestover authored Jul 20, 2023
1 parent 1b624e7 commit 3084658
Show file tree
Hide file tree
Showing 30 changed files with 377 additions and 280 deletions.
44 changes: 44 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions compute_tools/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,4 @@ utils.workspace = true
workspace_hack.workspace = true
toml_edit.workspace = true
remote_storage = { version = "0.1", path = "../libs/remote_storage/" }
zstd = "0.12.4"
22 changes: 10 additions & 12 deletions compute_tools/src/bin/compute_ctl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,10 @@
//! -C 'postgresql://cloud_admin@localhost/postgres' \
//! -S /var/db/postgres/specs/current.json \
//! -b /usr/local/bin/postgres \
//! -r {"bucket": "my-bucket", "region": "eu-central-1", "endpoint": "http:://localhost:9000",
//! (optionally) "key": "AWS_SECRET_ACCESS_KEY", "id": "AWS_ACCESS_KEY_ID"}
//! -r {"bucket": "my-bucket", "region": "eu-central-1", "endpoint": "http:://localhost:9000"}
//! ```
//!
use std::collections::HashMap;
use std::collections::{HashMap, HashSet};
use std::fs::File;
use std::panic;
use std::path::Path;
Expand All @@ -52,20 +51,21 @@ use compute_api::responses::ComputeStatus;

use compute_tools::compute::{ComputeNode, ComputeState, ParsedSpec};
use compute_tools::configurator::launch_configurator;
use compute_tools::extension_server::launch_download_extensions;
use compute_tools::extension_server::{get_pg_version, init_remote_storage};
use compute_tools::http::api::launch_http_server;
use compute_tools::logger::*;
use compute_tools::monitor::launch_monitor;
use compute_tools::params::*;
use compute_tools::spec::*;

const BUILD_TAG_DEFAULT: &str = "local";
const BUILD_TAG_DEFAULT: &str = "111"; // TODO: change back to local; I need 111 for my test

fn main() -> Result<()> {
init_tracing_and_logging(DEFAULT_LOG_LEVEL)?;

let build_tag = option_env!("BUILD_TAG").unwrap_or(BUILD_TAG_DEFAULT);
let build_tag = option_env!("BUILD_TAG")
.unwrap_or(BUILD_TAG_DEFAULT)
.to_string();
info!("build_tag: {build_tag}");

let matches = cli().get_matches();
Expand All @@ -74,8 +74,7 @@ fn main() -> Result<()> {

let remote_ext_config = matches.get_one::<String>("remote-ext-config");
let ext_remote_storage = remote_ext_config.map(|x| {
init_remote_storage(x, build_tag)
.expect("cannot initialize remote extension storage from config")
init_remote_storage(x).expect("cannot initialize remote extension storage from config")
});

let http_port = *matches
Expand Down Expand Up @@ -195,7 +194,9 @@ fn main() -> Result<()> {
state: Mutex::new(new_state),
state_changed: Condvar::new(),
ext_remote_storage,
available_extensions: OnceLock::new(),
ext_remote_paths: OnceLock::new(),
already_downloaded_extensions: Mutex::new(HashSet::new()),
build_tag,
};
let compute = Arc::new(compute_node);

Expand Down Expand Up @@ -243,9 +244,6 @@ fn main() -> Result<()> {
let _configurator_handle =
launch_configurator(&compute).expect("cannot launch configurator thread");

let _download_extensions_handle =
launch_download_extensions(&compute).expect("cannot launch download extensions thread");

// Start Postgres
let mut delay_exit = false;
let mut exit_code = None;
Expand Down
52 changes: 39 additions & 13 deletions compute_tools/src/compute.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::collections::HashMap;
use std::collections::HashSet;
use std::fs;
use std::io::BufRead;
Expand All @@ -21,7 +22,7 @@ use compute_api::responses::{ComputeMetrics, ComputeStatus};
use compute_api::spec::{ComputeMode, ComputeSpec};
use utils::measured_stream::MeasuredReader;

use remote_storage::GenericRemoteStorage;
use remote_storage::{GenericRemoteStorage, RemotePath};

use crate::pg_helpers::*;
use crate::spec::*;
Expand Down Expand Up @@ -55,8 +56,10 @@ pub struct ComputeNode {
pub state_changed: Condvar,
/// the S3 bucket that we search for extensions in
pub ext_remote_storage: Option<GenericRemoteStorage>,
// cached lists of available extensions and libraries
pub available_extensions: OnceLock<HashSet<String>>,
// (key: extension name, value: path to extension archive in remote storage)
pub ext_remote_paths: OnceLock<HashMap<String, RemotePath>>,
pub already_downloaded_extensions: Mutex<HashSet<String>>,
pub build_tag: String,
}

#[derive(Clone, Debug)]
Expand Down Expand Up @@ -735,23 +738,23 @@ LIMIT 100",

// If remote extension storage is configured,
// download extension control files
#[tokio::main]
pub async fn prepare_external_extensions(&self, compute_state: &ComputeState) -> Result<()> {
if let Some(ref ext_remote_storage) = self.ext_remote_storage {
let pspec = compute_state.pspec.as_ref().expect("spec must be set");
let spec = &pspec.spec;
let custom_ext_prefixes = spec.custom_extensions.clone().unwrap_or(Vec::new());
info!("custom_ext_prefixes: {:?}", &custom_ext_prefixes);
let available_extensions = extension_server::get_available_extensions(
let custom_ext = spec.custom_extensions.clone().unwrap_or(Vec::new());
info!("custom extensions: {:?}", &custom_ext);
let ext_remote_paths = extension_server::get_available_extensions(
ext_remote_storage,
&self.pgbin,
&self.pgversion,
&custom_ext_prefixes,
&custom_ext,
&self.build_tag,
)
.await?;
self.available_extensions
.set(available_extensions)
.expect("available_extensions.set error");
self.ext_remote_paths
.set(ext_remote_paths)
.expect("ext_remote_paths.set error");
}
Ok(())
}
Expand All @@ -760,11 +763,31 @@ LIMIT 100",
match &self.ext_remote_storage {
None => anyhow::bail!("No remote extension storage"),
Some(remote_storage) => {
// TODO: eliminate useless LOAD Library calls to this function (if possible)
// not clear that we can distinguish between useful and useless
// library calls better than the below code
let ext_name = ext_name.replace(".so", "");
{
let mut already_downloaded_extensions =
self.already_downloaded_extensions.lock().expect("bad lock");
if already_downloaded_extensions.contains(&ext_name) {
info!(
"extension {:?} already exists, skipping download",
&ext_name
);
return Ok(());
} else {
already_downloaded_extensions.insert(ext_name.clone());
}
}
extension_server::download_extension(
ext_name,
&ext_name,
&self
.ext_remote_paths
.get()
.expect("error accessing ext_remote_paths")[&ext_name],
remote_storage,
&self.pgbin,
&self.pgversion,
)
.await
}
Expand Down Expand Up @@ -809,6 +832,9 @@ LIMIT 100",
libs_vec.extend(preload_libs_vec);
}

info!("Download ext_index.json, find the extension paths");
self.prepare_external_extensions(compute_state).await?;

info!("Downloading to shared preload libraries: {:?}", &libs_vec);
let mut download_tasks = Vec::new();
for library in &libs_vec {
Expand Down
Loading

0 comments on commit 3084658

Please sign in to comment.