diff --git a/Cargo.lock b/Cargo.lock index dbeeb539d44f..4d59f400de17 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -740,6 +740,9 @@ name = "cc" version = "1.0.79" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "50d30906286121d95be3d479533b458f87493b30a4b5f79a607db8f5d11aa91f" +dependencies = [ + "jobserver", +] [[package]] name = "cexpr" @@ -907,12 +910,14 @@ dependencies = [ "opentelemetry", "postgres", "regex", + "remote_storage", "reqwest", "serde", "serde_json", "tar", "tokio", "tokio-postgres", + "toml_edit", "tracing", "tracing-opentelemetry", "tracing-subscriber", @@ -920,6 +925,7 @@ dependencies = [ "url", "utils", "workspace_hack", + "zstd", ] [[package]] @@ -980,6 +986,7 @@ dependencies = [ "tar", "thiserror", "toml", + "tracing", "url", "utils", "workspace_hack", @@ -1972,6 +1979,15 @@ version = "1.0.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "453ad9f582a441959e5f0d088b02ce04cfe8d51a8eaf077f12ac6d3e94164ca6" +[[package]] +name = "jobserver" +version = "0.1.26" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "936cfd212a0155903bcbc060e316fb6cc7cbf2e1907329391ebadc1fe0ce77c2" +dependencies = [ + "libc", +] + [[package]] name = "js-sys" version = "0.3.63" @@ -5296,6 +5312,7 @@ version = "0.1.0" dependencies = [ "anyhow", "bytes", + "cc", "chrono", "clap", "clap_builder", @@ -5396,3 +5413,33 @@ name = "zeroize" version = "1.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2a0956f1ba7c7909bfb66c2e9e4124ab6f6482560f6628b5aaeba39207c9aad9" + +[[package]] +name = "zstd" +version = "0.12.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1a27595e173641171fc74a1232b7b1c7a7cb6e18222c11e9dfb9888fa424c53c" +dependencies = [ + "zstd-safe", +] + +[[package]] +name = "zstd-safe" +version = "6.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ee98ffd0b48ee95e6c5168188e44a54550b1564d9d530ee21d5f0eaed1069581" +dependencies = [ + "libc", + "zstd-sys", +] + +[[package]] +name = "zstd-sys" +version = "2.0.8+zstd.1.5.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5556e6ee25d32df2586c098bbfa278803692a20d0ab9565e049480d52707ec8c" +dependencies = [ + "cc", + "libc", + "pkg-config", +] diff --git a/compute_tools/Cargo.toml b/compute_tools/Cargo.toml index f8f8f729ce6c..08dcc21c7afd 100644 --- a/compute_tools/Cargo.toml +++ b/compute_tools/Cargo.toml @@ -32,3 +32,6 @@ url.workspace = true compute_api.workspace = true utils.workspace = true workspace_hack.workspace = true +toml_edit.workspace = true +remote_storage = { version = "0.1", path = "../libs/remote_storage/" } +zstd = "0.12.4" diff --git a/compute_tools/src/bin/compute_ctl.rs b/compute_tools/src/bin/compute_ctl.rs index 9d15a203e5b3..977a62eed2b3 100644 --- a/compute_tools/src/bin/compute_ctl.rs +++ b/compute_tools/src/bin/compute_ctl.rs @@ -5,6 +5,8 @@ //! - `compute_ctl` accepts cluster (compute node) specification as a JSON file. //! - Every start is a fresh start, so the data directory is removed and //! initialized again on each run. +//! - If remote_extension_config is provided, it will be used to fetch extensions list +//! and download `shared_preload_libraries` from the remote storage. //! - Next it will put configuration files into the `PGDATA` directory. //! - Sync safekeepers and get commit LSN. //! - Get `basebackup` from pageserver using the returned on the previous step LSN. @@ -27,7 +29,8 @@ //! compute_ctl -D /var/db/postgres/compute \ //! -C 'postgresql://cloud_admin@localhost/postgres' \ //! -S /var/db/postgres/specs/current.json \ -//! -b /usr/local/bin/postgres +//! -b /usr/local/bin/postgres \ +//! -r {"bucket": "neon-dev-extensions-eu-central-1", "region": "eu-central-1"} //! ``` //! use std::collections::HashMap; @@ -35,7 +38,7 @@ use std::fs::File; use std::panic; use std::path::Path; use std::process::exit; -use std::sync::{mpsc, Arc, Condvar, Mutex}; +use std::sync::{mpsc, Arc, Condvar, Mutex, OnceLock, RwLock}; use std::{thread, time::Duration}; use anyhow::{Context, Result}; @@ -48,22 +51,33 @@ use compute_api::responses::ComputeStatus; use compute_tools::compute::{ComputeNode, ComputeState, ParsedSpec}; use compute_tools::configurator::launch_configurator; +use compute_tools::extension_server::{get_pg_version, init_remote_storage}; use compute_tools::http::api::launch_http_server; use compute_tools::logger::*; use compute_tools::monitor::launch_monitor; use compute_tools::params::*; use compute_tools::spec::*; -const BUILD_TAG_DEFAULT: &str = "local"; +// this is an arbitrary build tag. Fine as a default / for testing purposes +// in-case of not-set environment var +const BUILD_TAG_DEFAULT: &str = "5670669815"; fn main() -> Result<()> { init_tracing_and_logging(DEFAULT_LOG_LEVEL)?; - let build_tag = option_env!("BUILD_TAG").unwrap_or(BUILD_TAG_DEFAULT); - + let build_tag = option_env!("BUILD_TAG") + .unwrap_or(BUILD_TAG_DEFAULT) + .to_string(); info!("build_tag: {build_tag}"); let matches = cli().get_matches(); + let pgbin_default = String::from("postgres"); + let pgbin = matches.get_one::("pgbin").unwrap_or(&pgbin_default); + + let remote_ext_config = matches.get_one::("remote-ext-config"); + let ext_remote_storage = remote_ext_config.map(|x| { + init_remote_storage(x).expect("cannot initialize remote extension storage from config") + }); let http_port = *matches .get_one::("http-port") @@ -128,9 +142,6 @@ fn main() -> Result<()> { let compute_id = matches.get_one::("compute-id"); let control_plane_uri = matches.get_one::("control-plane-uri"); - // Try to use just 'postgres' if no path is provided - let pgbin = matches.get_one::("pgbin").unwrap(); - let spec; let mut live_config_allowed = false; match spec_json { @@ -168,6 +179,7 @@ fn main() -> Result<()> { let mut new_state = ComputeState::new(); let spec_set; + if let Some(spec) = spec { let pspec = ParsedSpec::try_from(spec).map_err(|msg| anyhow::anyhow!(msg))?; new_state.pspec = Some(pspec); @@ -179,9 +191,15 @@ fn main() -> Result<()> { connstr: Url::parse(connstr).context("cannot parse connstr as a URL")?, pgdata: pgdata.to_string(), pgbin: pgbin.to_string(), + pgversion: get_pg_version(pgbin), live_config_allowed, state: Mutex::new(new_state), state_changed: Condvar::new(), + ext_remote_storage, + ext_remote_paths: OnceLock::new(), + ext_download_progress: RwLock::new(HashMap::new()), + library_index: OnceLock::new(), + build_tag, }; let compute = Arc::new(compute_node); @@ -190,6 +208,8 @@ fn main() -> Result<()> { let _http_handle = launch_http_server(http_port, &compute).expect("cannot launch http endpoint thread"); + let extension_server_port: u16 = http_port; + if !spec_set { // No spec provided, hang waiting for it. info!("no compute spec provided, waiting"); @@ -236,7 +256,7 @@ fn main() -> Result<()> { // Start Postgres let mut delay_exit = false; let mut exit_code = None; - let pg = match compute.start_compute() { + let pg = match compute.start_compute(extension_server_port) { Ok(pg) => Some(pg), Err(err) => { error!("could not start the compute node: {:?}", err); @@ -365,6 +385,12 @@ fn cli() -> clap::Command { .long("control-plane-uri") .value_name("CONTROL_PLANE_API_BASE_URI"), ) + .arg( + Arg::new("remote-ext-config") + .short('r') + .long("remote-ext-config") + .value_name("REMOTE_EXT_CONFIG"), + ) } #[test] diff --git a/compute_tools/src/compute.rs b/compute_tools/src/compute.rs index 254d367a7167..ec2eb02237c9 100644 --- a/compute_tools/src/compute.rs +++ b/compute_tools/src/compute.rs @@ -1,16 +1,20 @@ +use std::collections::HashMap; use std::fs; use std::io::BufRead; use std::os::unix::fs::PermissionsExt; use std::path::Path; use std::process::{Command, Stdio}; use std::str::FromStr; -use std::sync::{Condvar, Mutex}; +use std::sync::{Condvar, Mutex, OnceLock, RwLock}; use anyhow::{Context, Result}; use chrono::{DateTime, Utc}; +use futures::future::join_all; use futures::stream::FuturesUnordered; use futures::StreamExt; use postgres::{Client, NoTls}; +use regex::Regex; +use tokio; use tokio_postgres; use tracing::{error, info, instrument, warn}; use utils::id::{TenantId, TimelineId}; @@ -20,10 +24,12 @@ use compute_api::responses::{ComputeMetrics, ComputeStatus}; use compute_api::spec::{ComputeMode, ComputeSpec}; use utils::measured_stream::MeasuredReader; -use crate::config; +use remote_storage::{GenericRemoteStorage, RemotePath}; + use crate::pg_helpers::*; use crate::spec::*; use crate::sync_sk::{check_if_synced, ping_safekeeper}; +use crate::{config, extension_server}; /// Compute node info shared across several `compute_ctl` threads. pub struct ComputeNode { @@ -31,6 +37,7 @@ pub struct ComputeNode { pub connstr: url::Url, pub pgdata: String, pub pgbin: String, + pub pgversion: String, /// We should only allow live re- / configuration of the compute node if /// it uses 'pull model', i.e. it can go to control-plane and fetch /// the latest configuration. Otherwise, there could be a case: @@ -50,6 +57,24 @@ pub struct ComputeNode { pub state: Mutex, /// `Condvar` to allow notifying waiters about state changes. pub state_changed: Condvar, + /// the S3 bucket that we search for extensions in + pub ext_remote_storage: Option, + // (key: extension name, value: path to extension archive in remote storage) + pub ext_remote_paths: OnceLock>, + // (key: library name, value: name of extension containing this library) + pub library_index: OnceLock>, + // key: ext_archive_name, value: started download time, download_completed? + pub ext_download_progress: RwLock, bool)>>, + pub build_tag: String, +} + +// store some metrics about download size that might impact startup time +#[derive(Clone, Debug)] +pub struct RemoteExtensionMetrics { + num_ext_downloaded: u64, + largest_ext_size: u64, + total_ext_download_size: u64, + prep_extensions_ms: u64, } #[derive(Clone, Debug)] @@ -473,14 +498,22 @@ impl ComputeNode { /// Do all the preparations like PGDATA directory creation, configuration, /// safekeepers sync, basebackup, etc. #[instrument(skip_all)] - pub fn prepare_pgdata(&self, compute_state: &ComputeState) -> Result<()> { + pub fn prepare_pgdata( + &self, + compute_state: &ComputeState, + extension_server_port: u16, + ) -> Result<()> { let pspec = compute_state.pspec.as_ref().expect("spec must be set"); let spec = &pspec.spec; let pgdata_path = Path::new(&self.pgdata); // Remove/create an empty pgdata directory and put configuration there. self.create_pgdata()?; - config::write_postgres_conf(&pgdata_path.join("postgresql.conf"), &pspec.spec)?; + config::write_postgres_conf( + &pgdata_path.join("postgresql.conf"), + &pspec.spec, + Some(extension_server_port), + )?; // Syncing safekeepers is only safe with primary nodes: if a primary // is already connected it will be kicked out, so a secondary (standby) @@ -670,7 +703,7 @@ impl ComputeNode { // Write new config let pgdata_path = Path::new(&self.pgdata); - config::write_postgres_conf(&pgdata_path.join("postgresql.conf"), &spec)?; + config::write_postgres_conf(&pgdata_path.join("postgresql.conf"), &spec, None)?; let mut client = Client::connect(self.connstr.as_str(), NoTls)?; self.pg_reload_conf(&mut client)?; @@ -700,7 +733,7 @@ impl ComputeNode { } #[instrument(skip_all)] - pub fn start_compute(&self) -> Result { + pub fn start_compute(&self, extension_server_port: u16) -> Result { let compute_state = self.state.lock().unwrap().clone(); let pspec = compute_state.pspec.as_ref().expect("spec must be set"); info!( @@ -711,7 +744,31 @@ impl ComputeNode { pspec.timeline_id, ); - self.prepare_pgdata(&compute_state)?; + // This part is sync, because we need to download + // remote shared_preload_libraries before postgres start (if any) + { + let library_load_start_time = Utc::now(); + let remote_ext_metrics = self.prepare_preload_libraries(&compute_state)?; + + let library_load_time = Utc::now() + .signed_duration_since(library_load_start_time) + .to_std() + .unwrap() + .as_millis() as u64; + let mut state = self.state.lock().unwrap(); + state.metrics.load_ext_ms = library_load_time; + state.metrics.num_ext_downloaded = remote_ext_metrics.num_ext_downloaded; + state.metrics.largest_ext_size = remote_ext_metrics.largest_ext_size; + state.metrics.total_ext_download_size = remote_ext_metrics.total_ext_download_size; + state.metrics.prep_extensions_ms = remote_ext_metrics.prep_extensions_ms; + info!( + "Loading shared_preload_libraries took {:?}ms", + library_load_time + ); + info!("{:?}", remote_ext_metrics); + } + + self.prepare_pgdata(&compute_state, extension_server_port)?; let start_time = Utc::now(); let pg = self.start_postgres(pspec.storage_auth_token.clone())?; @@ -859,4 +916,200 @@ LIMIT 100", "{{\"pg_stat_statements\": []}}".to_string() } } + + // If remote extension storage is configured, + // download extension control files + pub async fn prepare_external_extensions(&self, compute_state: &ComputeState) -> Result<()> { + if let Some(ref ext_remote_storage) = self.ext_remote_storage { + let pspec = compute_state.pspec.as_ref().expect("spec must be set"); + let spec = &pspec.spec; + let custom_ext = spec.custom_extensions.clone().unwrap_or(Vec::new()); + info!("custom extensions: {:?}", &custom_ext); + let (ext_remote_paths, library_index) = extension_server::get_available_extensions( + ext_remote_storage, + &self.pgbin, + &self.pgversion, + &custom_ext, + &self.build_tag, + ) + .await?; + self.ext_remote_paths + .set(ext_remote_paths) + .expect("this is the only time we set ext_remote_paths"); + self.library_index + .set(library_index) + .expect("this is the only time we set library_index"); + } + Ok(()) + } + + // download an archive, unzip and place files in correct locations + pub async fn download_extension(&self, ext_name: &str, is_library: bool) -> Result { + match &self.ext_remote_storage { + None => anyhow::bail!("No remote extension storage"), + Some(remote_storage) => { + let mut real_ext_name = ext_name.to_string(); + if is_library { + // sometimes library names might have a suffix like + // library.so or library.so.3. We strip this off + // because library_index is based on the name without the file extension + let strip_lib_suffix = Regex::new(r"\.so.*").unwrap(); + let lib_raw_name = strip_lib_suffix.replace(&real_ext_name, "").to_string(); + real_ext_name = self + .library_index + .get() + .expect("must have already downloaded the library_index")[&lib_raw_name] + .clone(); + } + + let ext_path = &self + .ext_remote_paths + .get() + .expect("error accessing ext_remote_paths")[&real_ext_name]; + let ext_archive_name = ext_path.object_name().expect("bad path"); + + let mut first_try = false; + if !self + .ext_download_progress + .read() + .expect("lock err") + .contains_key(ext_archive_name) + { + self.ext_download_progress + .write() + .expect("lock err") + .insert(ext_archive_name.to_string(), (Utc::now(), false)); + first_try = true; + } + let (download_start, download_completed) = + self.ext_download_progress.read().expect("lock err")[ext_archive_name]; + let start_time_delta = Utc::now() + .signed_duration_since(download_start) + .to_std() + .unwrap() + .as_millis() as u64; + + // how long to wait for extension download if it was started by another process + const HANG_TIMEOUT: u64 = 3000; // milliseconds + + if download_completed { + info!("extension already downloaded, skipping re-download"); + return Ok(0); + } else if start_time_delta < HANG_TIMEOUT && !first_try { + info!("download {ext_archive_name} already started by another process, hanging untill completion or timeout"); + let mut interval = + tokio::time::interval(tokio::time::Duration::from_millis(500)); + loop { + info!("waiting for download"); + interval.tick().await; + let (_, download_completed_now) = + self.ext_download_progress.read().expect("lock")[ext_archive_name]; + if download_completed_now { + info!("download finished by whoever else downloaded it"); + return Ok(0); + } + } + // NOTE: the above loop will get terminated + // based on the timeout of the download function + } + + // if extension hasn't been downloaded before or the previous + // attempt to download was at least HANG_TIMEOUT ms ago + // then we try to download it here + info!("downloading new extension {ext_archive_name}"); + + let download_size = extension_server::download_extension( + &real_ext_name, + ext_path, + remote_storage, + &self.pgbin, + ) + .await; + self.ext_download_progress + .write() + .expect("bad lock") + .insert(ext_archive_name.to_string(), (download_start, true)); + download_size + } + } + } + + #[tokio::main] + pub async fn prepare_preload_libraries( + &self, + compute_state: &ComputeState, + ) -> Result { + if self.ext_remote_storage.is_none() { + return Ok(RemoteExtensionMetrics { + num_ext_downloaded: 0, + largest_ext_size: 0, + total_ext_download_size: 0, + prep_extensions_ms: 0, + }); + } + let pspec = compute_state.pspec.as_ref().expect("spec must be set"); + let spec = &pspec.spec; + + info!("parse shared_preload_libraries from spec.cluster.settings"); + let mut libs_vec = Vec::new(); + if let Some(libs) = spec.cluster.settings.find("shared_preload_libraries") { + libs_vec = libs + .split(&[',', '\'', ' ']) + .filter(|s| *s != "neon" && !s.is_empty()) + .map(str::to_string) + .collect(); + } + info!("parse shared_preload_libraries from provided postgresql.conf"); + // that is used in neon_local and python tests + if let Some(conf) = &spec.cluster.postgresql_conf { + let conf_lines = conf.split('\n').collect::>(); + let mut shared_preload_libraries_line = ""; + for line in conf_lines { + if line.starts_with("shared_preload_libraries") { + shared_preload_libraries_line = line; + } + } + let mut preload_libs_vec = Vec::new(); + if let Some(libs) = shared_preload_libraries_line.split("='").nth(1) { + preload_libs_vec = libs + .split(&[',', '\'', ' ']) + .filter(|s| *s != "neon" && !s.is_empty()) + .map(str::to_string) + .collect(); + } + libs_vec.extend(preload_libs_vec); + } + + info!("Download ext_index.json, find the extension paths"); + let prep_ext_start_time = Utc::now(); + self.prepare_external_extensions(compute_state).await?; + let prep_ext_time_delta = Utc::now() + .signed_duration_since(prep_ext_start_time) + .to_std() + .unwrap() + .as_millis() as u64; + info!("Prepare extensions took {prep_ext_time_delta}ms"); + + info!("Downloading to shared preload libraries: {:?}", &libs_vec); + let mut download_tasks = Vec::new(); + for library in &libs_vec { + download_tasks.push(self.download_extension(library, true)); + } + let results = join_all(download_tasks).await; + + let mut remote_ext_metrics = RemoteExtensionMetrics { + num_ext_downloaded: 0, + largest_ext_size: 0, + total_ext_download_size: 0, + prep_extensions_ms: prep_ext_time_delta, + }; + for result in results { + let download_size = result?; + remote_ext_metrics.num_ext_downloaded += 1; + remote_ext_metrics.largest_ext_size = + std::cmp::max(remote_ext_metrics.largest_ext_size, download_size); + remote_ext_metrics.total_ext_download_size += download_size; + } + Ok(remote_ext_metrics) + } } diff --git a/compute_tools/src/config.rs b/compute_tools/src/config.rs index 68b943eec8a5..2da671a149bf 100644 --- a/compute_tools/src/config.rs +++ b/compute_tools/src/config.rs @@ -33,7 +33,11 @@ pub fn line_in_file(path: &Path, line: &str) -> Result { } /// Create or completely rewrite configuration file specified by `path` -pub fn write_postgres_conf(path: &Path, spec: &ComputeSpec) -> Result<()> { +pub fn write_postgres_conf( + path: &Path, + spec: &ComputeSpec, + extension_server_port: Option, +) -> Result<()> { // File::create() destroys the file content if it exists. let mut file = File::create(path)?; @@ -87,5 +91,9 @@ pub fn write_postgres_conf(path: &Path, spec: &ComputeSpec) -> Result<()> { writeln!(file, "# Managed by compute_ctl: end")?; } + if let Some(port) = extension_server_port { + writeln!(file, "neon.extension_server_port={}", port)?; + } + Ok(()) } diff --git a/compute_tools/src/extension_server.rs b/compute_tools/src/extension_server.rs new file mode 100644 index 000000000000..c8710b057705 --- /dev/null +++ b/compute_tools/src/extension_server.rs @@ -0,0 +1,275 @@ +// Download extension files from the extension store +// and put them in the right place in the postgres directory (share / lib) +/* +The layout of the S3 bucket is as follows: +5615610098 // this is an extension build number +├── v14 +│   ├── extensions +│   │   ├── anon.tar.zst +│   │   └── embedding.tar.zst +│   └── ext_index.json +└── v15 + ├── extensions + │   ├── anon.tar.zst + │   └── embedding.tar.zst + └── ext_index.json +5615261079 +├── v14 +│   ├── extensions +│   │   └── anon.tar.zst +│   └── ext_index.json +└── v15 + ├── extensions + │   └── anon.tar.zst + └── ext_index.json +5623261088 +├── v14 +│   ├── extensions +│   │   └── embedding.tar.zst +│   └── ext_index.json +└── v15 + ├── extensions + │   └── embedding.tar.zst + └── ext_index.json + +Note that build number cannot be part of prefix because we might need extensions +from other build numbers. + +ext_index.json stores the control files and location of extension archives +It also stores a list of public extensions and a library_index + +We don't need to duplicate extension.tar.zst files. +We only need to upload a new one if it is updated. +(Although currently we just upload every time anyways, hopefully will change +this sometime) + +*access* is controlled by spec + +More specifically, here is an example ext_index.json +{ + "public_extensions": [ + "anon", + "pg_buffercache" + ], + "library_index": { + "anon": "anon", + "pg_buffercache": "pg_buffercache" + }, + "extension_data": { + "pg_buffercache": { + "control_data": { + "pg_buffercache.control": "# pg_buffercache extension \ncomment = 'examine the shared buffer cache' \ndefault_version = '1.3' \nmodule_pathname = '$libdir/pg_buffercache' \nrelocatable = true \ntrusted=true" + }, + "archive_path": "5670669815/v14/extensions/pg_buffercache.tar.zst" + }, + "anon": { + "control_data": { + "anon.control": "# PostgreSQL Anonymizer (anon) extension \ncomment = 'Data anonymization tools' \ndefault_version = '1.1.0' \ndirectory='extension/anon' \nrelocatable = false \nrequires = 'pgcrypto' \nsuperuser = false \nmodule_pathname = '$libdir/anon' \ntrusted = true \n" + }, + "archive_path": "5670669815/v14/extensions/anon.tar.zst" + } + } +} +*/ +use anyhow::Context; +use anyhow::{self, Result}; +use futures::future::join_all; +use remote_storage::*; +use serde_json; +use std::collections::HashMap; +use std::io::Read; +use std::num::{NonZeroU32, NonZeroUsize}; +use std::path::Path; +use std::str; +use tar::Archive; +use tokio::io::AsyncReadExt; +use tracing::info; +use tracing::log::warn; +use zstd::stream::read::Decoder; + +fn get_pg_config(argument: &str, pgbin: &str) -> String { + // gives the result of `pg_config [argument]` + // where argument is a flag like `--version` or `--sharedir` + let pgconfig = pgbin + .strip_suffix("postgres") + .expect("bad pgbin") + .to_owned() + + "/pg_config"; + let config_output = std::process::Command::new(pgconfig) + .arg(argument) + .output() + .expect("pg_config error"); + std::str::from_utf8(&config_output.stdout) + .expect("pg_config error") + .trim() + .to_string() +} + +pub fn get_pg_version(pgbin: &str) -> String { + // pg_config --version returns a (platform specific) human readable string + // such as "PostgreSQL 15.4". We parse this to v14/v15 + let human_version = get_pg_config("--version", pgbin); + if human_version.contains("15") { + return "v15".to_string(); + } else if human_version.contains("14") { + return "v14".to_string(); + } + panic!("Unsuported postgres version {human_version}"); +} + +// download control files for enabled_extensions +// return Hashmaps converting library names to extension names (library_index) +// and specifying the remote path to the archive for each extension name +pub async fn get_available_extensions( + remote_storage: &GenericRemoteStorage, + pgbin: &str, + pg_version: &str, + custom_extensions: &[String], + build_tag: &str, +) -> Result<(HashMap, HashMap)> { + let local_sharedir = Path::new(&get_pg_config("--sharedir", pgbin)).join("extension"); + let index_path = format!("{build_tag}/{pg_version}/ext_index.json"); + let index_path = RemotePath::new(Path::new(&index_path)).context("error forming path")?; + info!("download ext_index.json from: {:?}", &index_path); + + let mut download = remote_storage.download(&index_path).await?; + let mut ext_idx_buffer = Vec::new(); + download + .download_stream + .read_to_end(&mut ext_idx_buffer) + .await?; + info!("ext_index downloaded"); + + #[derive(Debug, serde::Deserialize)] + struct Index { + public_extensions: Vec, + library_index: HashMap, + extension_data: HashMap, + } + + #[derive(Debug, serde::Deserialize)] + struct ExtensionData { + control_data: HashMap, + archive_path: String, + } + + let ext_index_full = serde_json::from_slice::(&ext_idx_buffer)?; + let mut enabled_extensions = ext_index_full.public_extensions; + enabled_extensions.extend_from_slice(custom_extensions); + let library_index = ext_index_full.library_index; + let all_extension_data = ext_index_full.extension_data; + info!("library_index: {:?}", library_index); + + info!("enabled_extensions: {:?}", enabled_extensions); + let mut ext_remote_paths = HashMap::new(); + let mut file_create_tasks = Vec::new(); + for extension in enabled_extensions { + let ext_data = &all_extension_data[&extension]; + for (control_file, control_contents) in &ext_data.control_data { + let extension_name = control_file + .strip_suffix(".control") + .expect("control files must end in .control"); + ext_remote_paths.insert( + extension_name.to_string(), + RemotePath::from_string(&ext_data.archive_path)?, + ); + let control_path = local_sharedir.join(control_file); + info!("writing file {:?}{:?}", control_path, control_contents); + file_create_tasks.push(tokio::fs::write(control_path, control_contents)); + } + } + let results = join_all(file_create_tasks).await; + for result in results { + result?; + } + info!("ext_remote_paths {:?}", ext_remote_paths); + Ok((ext_remote_paths, library_index)) +} + +// download the archive for a given extension, +// unzip it, and place files in the appropriate locations (share/lib) +pub async fn download_extension( + ext_name: &str, + ext_path: &RemotePath, + remote_storage: &GenericRemoteStorage, + pgbin: &str, +) -> Result { + info!("Download extension {:?} from {:?}", ext_name, ext_path); + let mut download = remote_storage.download(ext_path).await?; + let mut download_buffer = Vec::new(); + download + .download_stream + .read_to_end(&mut download_buffer) + .await?; + let download_size = download_buffer.len() as u64; + // it's unclear whether it is more performant to decompress into memory or not + // TODO: decompressing into memory can be avoided + let mut decoder = Decoder::new(download_buffer.as_slice())?; + let mut decompress_buffer = Vec::new(); + decoder.read_to_end(&mut decompress_buffer)?; + let mut archive = Archive::new(decompress_buffer.as_slice()); + let unzip_dest = pgbin + .strip_suffix("/bin/postgres") + .expect("bad pgbin") + .to_string() + + "/download_extensions"; + archive.unpack(&unzip_dest)?; + info!("Download + unzip {:?} completed successfully", &ext_path); + + let sharedir_paths = ( + unzip_dest.to_string() + "/share/extension", + Path::new(&get_pg_config("--sharedir", pgbin)).join("extension"), + ); + let libdir_paths = ( + unzip_dest.to_string() + "/lib", + Path::new(&get_pg_config("--libdir", pgbin)).join("postgresql"), + ); + // move contents of the libdir / sharedir in unzipped archive to the correct local paths + for paths in [sharedir_paths, libdir_paths] { + let (zip_dir, real_dir) = paths; + info!("mv {zip_dir:?}/* {real_dir:?}"); + for file in std::fs::read_dir(zip_dir)? { + let old_file = file?.path(); + let new_file = + Path::new(&real_dir).join(old_file.file_name().context("error parsing file")?); + info!("moving {old_file:?} to {new_file:?}"); + + // extension download failed: Directory not empty (os error 39) + match std::fs::rename(old_file, new_file) { + Ok(()) => info!("move succeeded"), + Err(e) => { + warn!("move failed, probably because the extension already exists: {e}") + } + } + } + } + info!("done moving extension {ext_name}"); + Ok(download_size) +} + +// This function initializes the necessary structs to use remote storage +pub fn init_remote_storage(remote_ext_config: &str) -> anyhow::Result { + #[derive(Debug, serde::Deserialize)] + struct RemoteExtJson { + bucket: String, + region: String, + endpoint: Option, + prefix: Option, + } + let remote_ext_json = serde_json::from_str::(remote_ext_config)?; + + let config = S3Config { + bucket_name: remote_ext_json.bucket, + bucket_region: remote_ext_json.region, + prefix_in_bucket: remote_ext_json.prefix, + endpoint: remote_ext_json.endpoint, + concurrency_limit: NonZeroUsize::new(100).expect("100 != 0"), + max_keys_per_list_response: None, + }; + let config = RemoteStorageConfig { + max_concurrent_syncs: NonZeroUsize::new(100).expect("100 != 0"), + max_sync_errors: NonZeroU32::new(100).expect("100 != 0"), + storage: RemoteStorageKind::AwsS3(config), + }; + GenericRemoteStorage::from_config(&config) +} diff --git a/compute_tools/src/http/api.rs b/compute_tools/src/http/api.rs index afd9c2fb5479..af07412b5250 100644 --- a/compute_tools/src/http/api.rs +++ b/compute_tools/src/http/api.rs @@ -121,6 +121,37 @@ async fn routes(req: Request, compute: &Arc) -> Response { + info!("serving {:?} POST request", route); + info!("req.uri {:?}", req.uri()); + + let mut is_library = false; + if let Some(params) = req.uri().query() { + info!("serving {:?} POST request with params: {}", route, params); + if params == "is_library=true" { + is_library = true; + } else { + let mut resp = Response::new(Body::from("Wrong request parameters")); + *resp.status_mut() = StatusCode::BAD_REQUEST; + return resp; + } + } + + let filename = route.split('/').last().unwrap().to_string(); + info!("serving /extension_server POST request, filename: {filename:?} is_library: {is_library}"); + + match compute.download_extension(&filename, is_library).await { + Ok(_) => Response::new(Body::from("OK")), + Err(e) => { + error!("extension download failed: {}", e); + let mut resp = Response::new(Body::from(e.to_string())); + *resp.status_mut() = StatusCode::INTERNAL_SERVER_ERROR; + resp + } + } + } + // Return the `404 Not Found` for any other routes. _ => { let mut not_found = Response::new(Body::from("404 Not Found")); diff --git a/compute_tools/src/http/openapi_spec.yaml b/compute_tools/src/http/openapi_spec.yaml index 268026975630..dc26cc63eb8d 100644 --- a/compute_tools/src/http/openapi_spec.yaml +++ b/compute_tools/src/http/openapi_spec.yaml @@ -139,6 +139,34 @@ paths: application/json: schema: $ref: "#/components/schemas/GenericError" + /extension_server: + post: + tags: + - Extension + summary: Download extension from S3 to local folder. + description: "" + operationId: downloadExtension + responses: + 200: + description: Extension downloaded + content: + text/plain: + schema: + type: string + description: Error text or 'OK' if download succeeded. + example: "OK" + 400: + description: Request is invalid. + content: + application/json: + schema: + $ref: "#/components/schemas/GenericError" + 500: + description: Extension download request failed. + content: + application/json: + schema: + $ref: "#/components/schemas/GenericError" components: securitySchemes: diff --git a/compute_tools/src/lib.rs b/compute_tools/src/lib.rs index 1d7b09f095c1..1cd960208910 100644 --- a/compute_tools/src/lib.rs +++ b/compute_tools/src/lib.rs @@ -9,6 +9,7 @@ pub mod http; #[macro_use] pub mod logger; pub mod compute; +pub mod extension_server; pub mod monitor; pub mod params; pub mod pg_helpers; diff --git a/compute_tools/src/spec.rs b/compute_tools/src/spec.rs index 575a5332a82d..eff7c93b461a 100644 --- a/compute_tools/src/spec.rs +++ b/compute_tools/src/spec.rs @@ -124,7 +124,7 @@ pub fn get_spec_from_control_plane( pub fn handle_configuration(spec: &ComputeSpec, pgdata_path: &Path) -> Result<()> { // File `postgresql.conf` is no longer included into `basebackup`, so just // always write all config into it creating new file. - config::write_postgres_conf(&pgdata_path.join("postgresql.conf"), spec)?; + config::write_postgres_conf(&pgdata_path.join("postgresql.conf"), spec, None)?; update_pg_hba(pgdata_path)?; diff --git a/control_plane/Cargo.toml b/control_plane/Cargo.toml index a341ff0263bb..d2c99c5f364e 100644 --- a/control_plane/Cargo.toml +++ b/control_plane/Cargo.toml @@ -32,3 +32,4 @@ utils.workspace = true compute_api.workspace = true workspace_hack.workspace = true +tracing.workspace = true diff --git a/control_plane/src/bin/neon_local.rs b/control_plane/src/bin/neon_local.rs index 8995a1856468..8f71cb65e2b2 100644 --- a/control_plane/src/bin/neon_local.rs +++ b/control_plane/src/bin/neon_local.rs @@ -658,6 +658,8 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<( .get_one::("endpoint_id") .ok_or_else(|| anyhow!("No endpoint ID was provided to start"))?; + let remote_ext_config = sub_args.get_one::("remote-ext-config"); + // If --safekeepers argument is given, use only the listed safekeeper nodes. let safekeepers = if let Some(safekeepers_str) = sub_args.get_one::("safekeepers") { @@ -699,7 +701,7 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<( _ => {} } println!("Starting existing endpoint {endpoint_id}..."); - endpoint.start(&auth_token, safekeepers)?; + endpoint.start(&auth_token, safekeepers, remote_ext_config)?; } else { let branch_name = sub_args .get_one::("branch-name") @@ -743,7 +745,7 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<( pg_version, mode, )?; - ep.start(&auth_token, safekeepers)?; + ep.start(&auth_token, safekeepers, remote_ext_config)?; } } "stop" => { @@ -1003,6 +1005,12 @@ fn cli() -> Command { .help("Additional pageserver's configuration options or overrides, refer to pageserver's 'config-override' CLI parameter docs for more") .required(false); + let remote_ext_config_args = Arg::new("remote-ext-config") + .long("remote-ext-config") + .num_args(1) + .help("Configure the S3 bucket that we search for extensions in.") + .required(false); + let lsn_arg = Arg::new("lsn") .long("lsn") .help("Specify Lsn on the timeline to start from. By default, end of the timeline would be used.") @@ -1161,6 +1169,7 @@ fn cli() -> Command { .arg(pg_version_arg) .arg(hot_standby_arg) .arg(safekeepers_arg) + .arg(remote_ext_config_args) ) .subcommand( Command::new("stop") diff --git a/control_plane/src/endpoint.rs b/control_plane/src/endpoint.rs index 8b5c88bf01e5..60607994588b 100644 --- a/control_plane/src/endpoint.rs +++ b/control_plane/src/endpoint.rs @@ -313,7 +313,7 @@ impl Endpoint { // TODO: use future host field from safekeeper spec // Pass the list of safekeepers to the replica so that it can connect to any of them, - // whichever is availiable. + // whichever is available. let sk_ports = self .env .safekeepers @@ -420,7 +420,12 @@ impl Endpoint { Ok(()) } - pub fn start(&self, auth_token: &Option, safekeepers: Vec) -> Result<()> { + pub fn start( + &self, + auth_token: &Option, + safekeepers: Vec, + remote_ext_config: Option<&String>, + ) -> Result<()> { if self.status() == "running" { anyhow::bail!("The endpoint is already running"); } @@ -488,6 +493,7 @@ impl Endpoint { pageserver_connstring: Some(pageserver_connstring), safekeeper_connstrings, storage_auth_token: auth_token.clone(), + custom_extensions: Some(vec![]), }; let spec_path = self.endpoint_path().join("spec.json"); std::fs::write(spec_path, serde_json::to_string_pretty(&spec)?)?; @@ -519,6 +525,11 @@ impl Endpoint { .stdin(std::process::Stdio::null()) .stderr(logfile.try_clone()?) .stdout(logfile); + + if let Some(remote_ext_config) = remote_ext_config { + cmd.args(["--remote-ext-config", remote_ext_config]); + } + let child = cmd.spawn()?; // Write down the pid so we can wait for it when we want to stop diff --git a/docs/rfcs/024-extension-loading.md b/docs/rfcs/024-extension-loading.md new file mode 100644 index 000000000000..26ba4f792700 --- /dev/null +++ b/docs/rfcs/024-extension-loading.md @@ -0,0 +1,236 @@ +# Supporting custom user Extensions (Dynamic Extension Loading) +Created 2023-05-03 + +## Motivation + +There are many extensions in the PostgreSQL ecosystem, and not all extensions +are of a quality that we can confidently support them. Additionally, our +current extension inclusion mechanism has several problems because we build all +extensions into the primary Compute image: We build the extensions every time +we build the compute image regardless of whether we actually need to rebuild +the image, and the inclusion of these extensions in the image adds a hard +dependency on all supported extensions - thus increasing the image size, and +with it the time it takes to download that image - increasing first start +latency. + +This RFC proposes a dynamic loading mechanism that solves most of these +problems. + +## Summary + +`compute_ctl` is made responsible for loading extensions on-demand into +the container's file system for dynamically loaded extensions, and will also +make sure that the extensions in `shared_preload_libraries` are downloaded +before the compute node starts. + +## Components + +compute_ctl, PostgreSQL, neon (extension), Compute Host Node, Extension Store + +## Requirements + +Compute nodes with no extra extensions should not be negatively impacted by +the existence of support for many extensions. + +Installing an extension into PostgreSQL should be easy. + +Non-preloaded extensions shouldn't impact startup latency. + +Uninstalled extensions shouldn't impact query latency. + +A small latency penalty for dynamically loaded extensions is acceptable in +the first seconds of compute startup, but not in steady-state operations. + +## Proposed implementation + +### On-demand, JIT-loading of extensions + +Before postgres starts we download +- control files for all extensions available to that compute node; +- all `shared_preload_libraries`; + +After postgres is running, `compute_ctl` listens for requests to load files. +When PostgreSQL requests a file, `compute_ctl` downloads it. + +PostgreSQL requests files in the following cases: +- When loading a preload library set in `local_preload_libraries` +- When explicitly loading a library with `LOAD` +- Wnen creating extension with `CREATE EXTENSION` (download sql scripts, (optional) extension data files and (optional) library files))) + + +#### Summary + +Pros: + - Startup is only as slow as it takes to load all (shared_)preload_libraries + - Supports BYO Extension + +Cons: + - O(sizeof(extensions)) IO requirement for loading all extensions. + +### Alternative solutions + +1. Allow users to add their extensions to the base image + + Pros: + - Easy to deploy + + Cons: + - Doesn't scale - first start size is dependent on image size; + - All extensions are shared across all users: It doesn't allow users to + bring their own restrictive-licensed extensions + +2. Bring Your Own compute image + + Pros: + - Still easy to deploy + - User can bring own patched version of PostgreSQL + + Cons: + - First start latency is O(sizeof(extensions image)) + - Warm instance pool for skipping pod schedule latency is not feasible with + O(n) custom images + - Support channels are difficult to manage + +3. Download all user extensions in bulk on compute start + + Pros: + - Easy to deploy + - No startup latency issues for "clean" users. + - Warm instance pool for skipping pod schedule latency is possible + + Cons: + - Downloading all extensions in advance takes a lot of time, thus startup + latency issues + +4. Store user's extensions in persistent storage + + Pros: + - Easy to deploy + - No startup latency issues + - Warm instance pool for skipping pod schedule latency is possible + + Cons: + - EC2 instances have only limited number of attachments shared between EBS + volumes, direct-attached NVMe drives, and ENIs. + - Compute instance migration isn't trivially solved for EBS mounts (e.g. + the device is unavailable whilst moving the mount between instances). + - EBS can only mount on one instance at a time (except the expensive IO2 + device type). + +5. Store user's extensions in network drive + + Pros: + - Easy to deploy + - Few startup latency issues + - Warm instance pool for skipping pod schedule latency is possible + + Cons: + - We'd need networked drives, and a lot of them, which would store many + duplicate extensions. + - **UNCHECKED:** Compute instance migration may not work nicely with + networked IOs + + +### Idea extensions + +The extension store does not have to be S3 directly, but could be a Node-local +caching service on top of S3. This would reduce the load on the network for +popular extensions. + +## Extension Storage implementation + +The layout of the S3 bucket is as follows: +``` +5615610098 // this is an extension build number +├── v14 +│   ├── extensions +│   │   ├── anon.tar.zst +│   │   └── embedding.tar.zst +│   └── ext_index.json +└── v15 + ├── extensions + │   ├── anon.tar.zst + │   └── embedding.tar.zst + └── ext_index.json +5615261079 +├── v14 +│   ├── extensions +│   │   └── anon.tar.zst +│   └── ext_index.json +└── v15 + ├── extensions + │   └── anon.tar.zst + └── ext_index.json +5623261088 +├── v14 +│   ├── extensions +│   │   └── embedding.tar.zst +│   └── ext_index.json +└── v15 + ├── extensions + │   └── embedding.tar.zst + └── ext_index.json +``` + +Note that build number cannot be part of prefix because we might need extensions +from other build numbers. + +`ext_index.json` stores the control files and location of extension archives. +It also stores a list of public extensions and a library_index + +We don't need to duplicate `extension.tar.zst`` files. +We only need to upload a new one if it is updated. +(Although currently we just upload every time anyways, hopefully will change +this sometime) + +*access* is controlled by spec + +More specifically, here is an example ext_index.json +``` +{ + "public_extensions": [ + "anon", + "pg_buffercache" + ], + "library_index": { + "anon": "anon", + "pg_buffercache": "pg_buffercache" + // for more complex extensions like postgis + // we might have something like: + // address_standardizer: postgis + // postgis_tiger: postgis + }, + "extension_data": { + "pg_buffercache": { + "control_data": { + "pg_buffercache.control": "# pg_buffercache extension \ncomment = 'examine the shared buffer cache' \ndefault_version = '1.3' \nmodule_pathname = '$libdir/pg_buffercache' \nrelocatable = true \ntrusted=true" + }, + "archive_path": "5670669815/v14/extensions/pg_buffercache.tar.zst" + }, + "anon": { + "control_data": { + "anon.control": "# PostgreSQL Anonymizer (anon) extension \ncomment = 'Data anonymization tools' \ndefault_version = '1.1.0' \ndirectory='extension/anon' \nrelocatable = false \nrequires = 'pgcrypto' \nsuperuser = false \nmodule_pathname = '$libdir/anon' \ntrusted = true \n" + }, + "archive_path": "5670669815/v14/extensions/anon.tar.zst" + } + } +} +``` + +### How to add new extension to the Extension Storage? + +Simply upload build artifacts to the S3 bucket. +Implement a CI step for that. Splitting it from compute-node-image build. + +### How do we deal with extension versions and updates? + +Currently, we rebuild extensions on every compute-node-image build and store them in the prefix. +This is needed to ensure that `/share` and `/lib` files are in sync. + +For extension updates, we rely on the PostgreSQL extension versioning mechanism (sql update scripts) and extension authors to not break backwards compatibility within one major version of PostgreSQL. + +### Alternatives + +For extensions written on trusted languages we can also adopt +`dbdev` PostgreSQL Package Manager based on `pg_tle` by Supabase. +This will increase the amount supported extensions and decrease the amount of work required to support them. diff --git a/libs/compute_api/src/responses.rs b/libs/compute_api/src/responses.rs index f2865b71ec6d..9522d7138f79 100644 --- a/libs/compute_api/src/responses.rs +++ b/libs/compute_api/src/responses.rs @@ -76,6 +76,11 @@ pub struct ComputeMetrics { pub start_postgres_ms: u64, pub config_ms: u64, pub total_startup_ms: u64, + pub load_ext_ms: u64, + pub num_ext_downloaded: u64, + pub largest_ext_size: u64, // these are measured in bytes + pub total_ext_download_size: u64, + pub prep_extensions_ms: u64, } /// Response of the `/computes/{compute_id}/spec` control-plane API. diff --git a/libs/compute_api/src/spec.rs b/libs/compute_api/src/spec.rs index b3f0e9ba4350..293f6dc29479 100644 --- a/libs/compute_api/src/spec.rs +++ b/libs/compute_api/src/spec.rs @@ -60,6 +60,9 @@ pub struct ComputeSpec { /// If set, 'storage_auth_token' is used as the password to authenticate to /// the pageserver and safekeepers. pub storage_auth_token: Option, + + // list of prefixes to search for custom extensions in remote extension storage + pub custom_extensions: Option>, } #[serde_as] diff --git a/libs/remote_storage/src/lib.rs b/libs/remote_storage/src/lib.rs index 92ef793a345e..1ddd156a087e 100644 --- a/libs/remote_storage/src/lib.rs +++ b/libs/remote_storage/src/lib.rs @@ -65,6 +65,10 @@ impl RemotePath { Ok(Self(relative_path.to_path_buf())) } + pub fn from_string(relative_path: &str) -> anyhow::Result { + Self::new(Path::new(relative_path)) + } + pub fn with_base(&self, base_path: &Path) -> PathBuf { base_path.join(&self.0) } @@ -190,6 +194,20 @@ pub enum GenericRemoteStorage { } impl GenericRemoteStorage { + // A function for listing all the files in a "directory" + // Example: + // list_files("foo/bar") = ["foo/bar/a.txt", "foo/bar/b.txt"] + pub async fn list_files(&self, folder: Option<&RemotePath>) -> anyhow::Result> { + match self { + Self::LocalFs(s) => s.list_files(folder).await, + Self::AwsS3(s) => s.list_files(folder).await, + Self::Unreliable(s) => s.list_files(folder).await, + } + } + + // lists common *prefixes*, if any of files + // Example: + // list_prefixes("foo123","foo567","bar123","bar432") = ["foo", "bar"] pub async fn list_prefixes( &self, prefix: Option<&RemotePath>, @@ -201,14 +219,6 @@ impl GenericRemoteStorage { } } - pub async fn list_files(&self, folder: Option<&RemotePath>) -> anyhow::Result> { - match self { - Self::LocalFs(s) => s.list_files(folder).await, - Self::AwsS3(s) => s.list_files(folder).await, - Self::Unreliable(s) => s.list_files(folder).await, - } - } - pub async fn upload( &self, from: impl io::AsyncRead + Unpin + Send + Sync + 'static, diff --git a/pgxn/neon/Makefile b/pgxn/neon/Makefile index 194802347233..53917d8bc48b 100644 --- a/pgxn/neon/Makefile +++ b/pgxn/neon/Makefile @@ -4,6 +4,7 @@ MODULE_big = neon OBJS = \ $(WIN32RES) \ + extension_server.o \ file_cache.o \ libpagestore.o \ libpqwalproposer.o \ diff --git a/pgxn/neon/extension_server.c b/pgxn/neon/extension_server.c new file mode 100644 index 000000000000..6053425de0e5 --- /dev/null +++ b/pgxn/neon/extension_server.c @@ -0,0 +1,103 @@ + +/*------------------------------------------------------------------------- + * + * extension_server.c + * Request compute_ctl to download extension files. + * + * IDENTIFICATION + * contrib/neon/extension_server.c + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" +#include "tcop/pquery.h" +#include "tcop/utility.h" +#include "access/xact.h" +#include "utils/hsearch.h" +#include "utils/memutils.h" +#include "commands/defrem.h" +#include "miscadmin.h" +#include "utils/acl.h" +#include "fmgr.h" +#include "utils/guc.h" +#include "port.h" +#include "fmgr.h" + +#include + +static int extension_server_port = 0; + +static download_extension_file_hook_type prev_download_extension_file_hook = NULL; + +// to download all SQL (and data) files for an extension: +// curl -X POST http://localhost:8080/extension_server/postgis +// it covers two possible extension files layouts: +// 1. extension_name--version--platform.sql +// 2. extension_name/extension_name--version.sql +// extension_name/extra_files.csv +// +// to download specific library file: +// curl -X POST http://localhost:8080/extension_server/postgis-3.so?is_library=true +static bool +neon_download_extension_file_http(const char *filename, bool is_library) +{ + CURL *curl; + CURLcode res; + char *compute_ctl_url; + char *postdata; + bool ret = false; + + if ((curl = curl_easy_init()) == NULL) + { + elog(ERROR, "Failed to initialize curl handle"); + } + + compute_ctl_url = psprintf("http://localhost:%d/extension_server/%s%s", + extension_server_port, filename, is_library ? "?is_library=true" : ""); + + elog(LOG, "Sending request to compute_ctl: %s", compute_ctl_url); + + curl_easy_setopt(curl, CURLOPT_CUSTOMREQUEST, "POST"); + curl_easy_setopt(curl, CURLOPT_URL, compute_ctl_url); + curl_easy_setopt(curl, CURLOPT_TIMEOUT, 3L /* seconds */); + + if (curl) + { + /* Perform the request, res will get the return code */ + res = curl_easy_perform(curl); + /* Check for errors */ + if (res == CURLE_OK) + { + ret = true; + } + else + { + // Don't error here because postgres will try to find the file + // and will fail with some proper error message if it's not found. + elog(WARNING, "neon_download_extension_file_http failed: %s\n", curl_easy_strerror(res)); + } + + /* always cleanup */ + curl_easy_cleanup(curl); + } + + return ret; +} + +void pg_init_extension_server() +{ + // Port to connect to compute_ctl on localhost + // to request extension files. + DefineCustomIntVariable("neon.extension_server_port", + "connection string to the compute_ctl", + NULL, + &extension_server_port, + 0, 0, INT_MAX, + PGC_POSTMASTER, + 0, /* no flags required */ + NULL, NULL, NULL); + + // set download_extension_file_hook + prev_download_extension_file_hook = download_extension_file_hook; + download_extension_file_hook = neon_download_extension_file_http; +} diff --git a/pgxn/neon/neon.c b/pgxn/neon/neon.c index b45d7cfc32f4..c7211ea05ac9 100644 --- a/pgxn/neon/neon.c +++ b/pgxn/neon/neon.c @@ -35,8 +35,11 @@ _PG_init(void) { pg_init_libpagestore(); pg_init_walproposer(); + InitControlPlaneConnector(); + pg_init_extension_server(); + // Important: This must happen after other parts of the extension // are loaded, otherwise any settings to GUCs that were set before // the extension was loaded will be removed. diff --git a/pgxn/neon/neon.h b/pgxn/neon/neon.h index 60d321a9451f..2610da431128 100644 --- a/pgxn/neon/neon.h +++ b/pgxn/neon/neon.h @@ -21,6 +21,8 @@ extern char *neon_tenant; extern void pg_init_libpagestore(void); extern void pg_init_walproposer(void); +extern void pg_init_extension_server(void); + /* * Returns true if we shouldn't do REDO on that block in record indicated by * block_id; false otherwise. diff --git a/scripts/combine_control_files.py b/scripts/combine_control_files.py index 0350e4721d6f..b1fe73881e66 100644 --- a/scripts/combine_control_files.py +++ b/scripts/combine_control_files.py @@ -15,16 +15,12 @@ ], "library_index": { "anon": "anon", - "kq_imcx": "kq_imcx" - // would be more complicated for something like postgis where multiple library names all map to postgis + // for more complex extensions like postgis + // we might have something like: + // address_standardizer: postgis + // postgis_tiger: postgis }, "extension_data": { - "kq_imcx": { - "control_data": { - "kq_imcx.control": "# This file is generated content from add_postgresql_extension.\n# No point in modifying it, it will be overwritten anyway.\n\n# Default version, always set\ndefault_version = '0.1'\n\n# Module pathname generated from target shared library name. Use\n# MODULE_PATHNAME in script file.\nmodule_pathname = '$libdir/kq_imcx.so'\n\n# Comment for extension. Set using COMMENT option. Can be set in\n# script file as well.\ncomment = 'ketteQ In-Memory Calendar Extension (IMCX)'\n\n# Encoding for script file. Set using ENCODING option.\n#encoding = ''\n\n# Required extensions. Set using REQUIRES option (multi-valued).\n#requires = ''\ntrusted = true\n" - }, - "archive_path": "5648391853/v15/extensions/kq_imcx.tar.zst" - }, "anon": { "control_data": { "anon.control": "# PostgreSQL Anonymizer (anon) extension \ncomment = 'Data anonymization tools' \ndefault_version = '1.1.0' \ndirectory='extension/anon' \nrelocatable = false \nrequires = 'pgcrypto' \nsuperuser = false \nmodule_pathname = '$libdir/anon' \ntrusted = true \n" diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index fb78d69d6762..0a9735ce9e56 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -530,6 +530,16 @@ def available_remote_storages() -> List[RemoteStorageKind]: return remote_storages +def available_s3_storages() -> List[RemoteStorageKind]: + remote_storages = [RemoteStorageKind.MOCK_S3] + if os.getenv("ENABLE_REAL_S3_REMOTE_STORAGE") is not None: + remote_storages.append(RemoteStorageKind.REAL_S3) + log.info("Enabling real s3 storage for tests") + else: + log.info("Using mock implementations to test remote storage") + return remote_storages + + @dataclass class LocalFsStorage: root: Path @@ -550,6 +560,16 @@ def access_env_vars(self) -> Dict[str, str]: "AWS_SECRET_ACCESS_KEY": self.secret_key, } + def to_string(self) -> str: + return json.dumps( + { + "bucket": self.bucket_name, + "region": self.bucket_region, + "endpoint": self.endpoint, + "prefix": self.prefix_in_bucket, + } + ) + RemoteStorage = Union[LocalFsStorage, S3Storage] @@ -616,10 +636,12 @@ def __init__( self.rust_log_override = rust_log_override self.port_distributor = port_distributor self.remote_storage = remote_storage + self.ext_remote_storage: Optional[S3Storage] = None + self.remote_storage_client: Optional[Any] = None self.remote_storage_users = remote_storage_users self.broker = broker self.run_id = run_id - self.mock_s3_server = mock_s3_server + self.mock_s3_server: MockS3Server = mock_s3_server self.pageserver_config_override = pageserver_config_override self.num_safekeepers = num_safekeepers self.safekeepers_id_start = safekeepers_id_start @@ -667,15 +689,24 @@ def enable_remote_storage( remote_storage_kind: RemoteStorageKind, test_name: str, force_enable: bool = True, + enable_remote_extensions: bool = False, ): if remote_storage_kind == RemoteStorageKind.NOOP: return elif remote_storage_kind == RemoteStorageKind.LOCAL_FS: self.enable_local_fs_remote_storage(force_enable=force_enable) elif remote_storage_kind == RemoteStorageKind.MOCK_S3: - self.enable_mock_s3_remote_storage(bucket_name=test_name, force_enable=force_enable) + self.enable_mock_s3_remote_storage( + bucket_name=test_name, + force_enable=force_enable, + enable_remote_extensions=enable_remote_extensions, + ) elif remote_storage_kind == RemoteStorageKind.REAL_S3: - self.enable_real_s3_remote_storage(test_name=test_name, force_enable=force_enable) + self.enable_real_s3_remote_storage( + test_name=test_name, + force_enable=force_enable, + enable_remote_extensions=enable_remote_extensions, + ) else: raise RuntimeError(f"Unknown storage type: {remote_storage_kind}") @@ -689,11 +720,18 @@ def enable_local_fs_remote_storage(self, force_enable: bool = True): assert force_enable or self.remote_storage is None, "remote storage is enabled already" self.remote_storage = LocalFsStorage(Path(self.repo_dir / "local_fs_remote_storage")) - def enable_mock_s3_remote_storage(self, bucket_name: str, force_enable: bool = True): + def enable_mock_s3_remote_storage( + self, + bucket_name: str, + force_enable: bool = True, + enable_remote_extensions: bool = False, + ): """ Sets up the pageserver to use the S3 mock server, creates the bucket, if it's not present already. Starts up the mock server, if that does not run yet. Errors, if the pageserver has some remote storage configuration already, unless `force_enable` is not set to `True`. + + Also creates the bucket for extensions, self.ext_remote_storage bucket """ assert force_enable or self.remote_storage is None, "remote storage is enabled already" mock_endpoint = self.mock_s3_server.endpoint() @@ -714,9 +752,25 @@ def enable_mock_s3_remote_storage(self, bucket_name: str, force_enable: bool = T bucket_region=mock_region, access_key=self.mock_s3_server.access_key(), secret_key=self.mock_s3_server.secret_key(), + prefix_in_bucket="pageserver", ) - def enable_real_s3_remote_storage(self, test_name: str, force_enable: bool = True): + if enable_remote_extensions: + self.ext_remote_storage = S3Storage( + bucket_name=bucket_name, + endpoint=mock_endpoint, + bucket_region=mock_region, + access_key=self.mock_s3_server.access_key(), + secret_key=self.mock_s3_server.secret_key(), + prefix_in_bucket="ext", + ) + + def enable_real_s3_remote_storage( + self, + test_name: str, + force_enable: bool = True, + enable_remote_extensions: bool = False, + ): """ Sets up configuration to use real s3 endpoint without mock server """ @@ -756,6 +810,15 @@ def enable_real_s3_remote_storage(self, test_name: str, force_enable: bool = Tru prefix_in_bucket=self.remote_storage_prefix, ) + if enable_remote_extensions: + self.ext_remote_storage = S3Storage( + bucket_name="neon-dev-extensions-eu-central-1", + bucket_region="eu-central-1", + access_key=access_key, + secret_key=secret_key, + prefix_in_bucket=None, + ) + def cleanup_local_storage(self): if self.preserve_database_files: return @@ -789,6 +852,7 @@ def cleanup_remote_storage(self): # `self.remote_storage_prefix` is coupled with `S3Storage` storage type, # so this line effectively a no-op assert isinstance(self.remote_storage, S3Storage) + assert self.remote_storage_client is not None if self.keep_remote_storage_contents: log.info("keep_remote_storage_contents skipping remote storage cleanup") @@ -918,6 +982,8 @@ def __init__(self, config: NeonEnvBuilder): self.neon_binpath = config.neon_binpath self.pg_distrib_dir = config.pg_distrib_dir self.endpoint_counter = 0 + self.remote_storage_client = config.remote_storage_client + self.ext_remote_storage = config.ext_remote_storage # generate initial tenant ID here instead of letting 'neon init' generate it, # so that we don't need to dig it out of the config file afterwards. @@ -1505,6 +1571,7 @@ def endpoint_start( tenant_id: Optional[TenantId] = None, lsn: Optional[Lsn] = None, branch_name: Optional[str] = None, + remote_ext_config: Optional[str] = None, ) -> "subprocess.CompletedProcess[str]": args = [ "endpoint", @@ -1514,6 +1581,8 @@ def endpoint_start( "--pg-version", self.env.pg_version, ] + if remote_ext_config is not None: + args.extend(["--remote-ext-config", remote_ext_config]) if lsn is not None: args.append(f"--lsn={lsn}") args.extend(["--pg-port", str(pg_port)]) @@ -2375,7 +2444,7 @@ def create( return self - def start(self) -> "Endpoint": + def start(self, remote_ext_config: Optional[str] = None) -> "Endpoint": """ Start the Postgres instance. Returns self. @@ -2391,6 +2460,7 @@ def start(self) -> "Endpoint": http_port=self.http_port, tenant_id=self.tenant_id, safekeepers=self.active_safekeepers, + remote_ext_config=remote_ext_config, ) self.running = True @@ -2480,6 +2550,7 @@ def create_start( hot_standby: bool = False, lsn: Optional[Lsn] = None, config_lines: Optional[List[str]] = None, + remote_ext_config: Optional[str] = None, ) -> "Endpoint": """ Create an endpoint, apply config, and start Postgres. @@ -2494,7 +2565,7 @@ def create_start( config_lines=config_lines, hot_standby=hot_standby, lsn=lsn, - ).start() + ).start(remote_ext_config=remote_ext_config) log.info(f"Postgres startup took {time.time() - started_at} seconds") @@ -2528,6 +2599,7 @@ def create_start( lsn: Optional[Lsn] = None, hot_standby: bool = False, config_lines: Optional[List[str]] = None, + remote_ext_config: Optional[str] = None, ) -> Endpoint: ep = Endpoint( self.env, @@ -2544,6 +2616,7 @@ def create_start( hot_standby=hot_standby, config_lines=config_lines, lsn=lsn, + remote_ext_config=remote_ext_config, ) def create( diff --git a/test_runner/fixtures/types.py b/test_runner/fixtures/types.py index 7d179cc7fb38..ef88e09de4ba 100644 --- a/test_runner/fixtures/types.py +++ b/test_runner/fixtures/types.py @@ -89,6 +89,9 @@ class TenantId(Id): def __repr__(self) -> str: return f'`TenantId("{self.id.hex()}")' + def __str__(self) -> str: + return self.id.hex() + class TimelineId(Id): def __repr__(self) -> str: diff --git a/test_runner/regress/data/extension_test/5670669815/v14/ext_index.json b/test_runner/regress/data/extension_test/5670669815/v14/ext_index.json new file mode 100644 index 000000000000..af49dfa0c006 --- /dev/null +++ b/test_runner/regress/data/extension_test/5670669815/v14/ext_index.json @@ -0,0 +1,24 @@ +{ + "public_extensions": [ + "anon", + "pg_buffercache" + ], + "library_index": { + "anon": "anon", + "pg_buffercache": "pg_buffercache" + }, + "extension_data": { + "pg_buffercache": { + "control_data": { + "pg_buffercache.control": "# pg_buffercache extension \ncomment = 'examine the shared buffer cache' \ndefault_version = '1.3' \nmodule_pathname = '$libdir/pg_buffercache' \nrelocatable = true \ntrusted=true" + }, + "archive_path": "5670669815/v14/extensions/pg_buffercache.tar.zst" + }, + "anon": { + "control_data": { + "anon.control": "# PostgreSQL Anonymizer (anon) extension \ncomment = 'Data anonymization tools' \ndefault_version = '1.1.0' \ndirectory='extension/anon' \nrelocatable = false \nrequires = 'pgcrypto' \nsuperuser = false \nmodule_pathname = '$libdir/anon' \ntrusted = true \n" + }, + "archive_path": "5670669815/v14/extensions/anon.tar.zst" + } + } +} \ No newline at end of file diff --git a/test_runner/regress/data/extension_test/5670669815/v14/extensions/anon.tar.zst b/test_runner/regress/data/extension_test/5670669815/v14/extensions/anon.tar.zst new file mode 100644 index 000000000000..5c17630109e6 Binary files /dev/null and b/test_runner/regress/data/extension_test/5670669815/v14/extensions/anon.tar.zst differ diff --git a/test_runner/regress/data/extension_test/5670669815/v14/extensions/pg_buffercache.tar.zst b/test_runner/regress/data/extension_test/5670669815/v14/extensions/pg_buffercache.tar.zst new file mode 100644 index 000000000000..69648a2f1a03 Binary files /dev/null and b/test_runner/regress/data/extension_test/5670669815/v14/extensions/pg_buffercache.tar.zst differ diff --git a/test_runner/regress/data/extension_test/5670669815/v15/ext_index.json b/test_runner/regress/data/extension_test/5670669815/v15/ext_index.json new file mode 100644 index 000000000000..fd0d1edc3cb8 --- /dev/null +++ b/test_runner/regress/data/extension_test/5670669815/v15/ext_index.json @@ -0,0 +1,17 @@ +{ + "public_extensions": [ + "anon" + ], + "library_index": { + "anon": "anon" + }, + "extension_data": { + "anon": { + "control_data": { + "anon.control": "# PostgreSQL Anonymizer (anon) extension \ncomment = 'Data anonymization tools' \ndefault_version = '1.1.0' \ndirectory='extension/anon' \nrelocatable = false \nrequires = 'pgcrypto' \nsuperuser = false \nmodule_pathname = '$libdir/anon' \ntrusted = true \n" + }, + "archive_path": "5670669815/v15/extensions/anon.tar.zst" + } + } +} + diff --git a/test_runner/regress/data/extension_test/5670669815/v15/extensions/anon.tar.zst b/test_runner/regress/data/extension_test/5670669815/v15/extensions/anon.tar.zst new file mode 100644 index 000000000000..ea7034578f06 Binary files /dev/null and b/test_runner/regress/data/extension_test/5670669815/v15/extensions/anon.tar.zst differ diff --git a/test_runner/regress/test_download_extensions.py b/test_runner/regress/test_download_extensions.py new file mode 100644 index 000000000000..b7692b71c6c0 --- /dev/null +++ b/test_runner/regress/test_download_extensions.py @@ -0,0 +1,324 @@ +import os +import shutil +import threading +from contextlib import closing +from pathlib import Path + +import pytest +from fixtures.log_helper import log +from fixtures.neon_fixtures import ( + NeonEnvBuilder, + RemoteStorageKind, + available_s3_storages, +) +from fixtures.pg_version import PgVersion + + +# Cleaning up downloaded files is important for local tests +# or else one test could reuse the files from another test or another test run +def cleanup(pg_version): + PGDIR = Path(f"pg_install/v{pg_version}") + + LIB_DIR = PGDIR / Path("lib/postgresql") + cleanup_lib_globs = ["anon*", "postgis*", "pg_buffercache*"] + cleanup_lib_glob_paths = [LIB_DIR.glob(x) for x in cleanup_lib_globs] + + SHARE_DIR = PGDIR / Path("share/postgresql/extension") + cleanup_ext_globs = [ + "anon*", + "address_standardizer*", + "postgis*", + "pageinspect*", + "pg_buffercache*", + "pgrouting*", + ] + cleanup_ext_glob_paths = [SHARE_DIR.glob(x) for x in cleanup_ext_globs] + + all_glob_paths = cleanup_lib_glob_paths + cleanup_ext_glob_paths + all_cleanup_files = [] + for file_glob in all_glob_paths: + for file in file_glob: + all_cleanup_files.append(file) + + for file in all_cleanup_files: + try: + os.remove(file) + log.info(f"removed file {file}") + except Exception as err: + log.info( + f"skipping remove of file {file} because it doesn't exist.\ + this may be expected or unexpected depending on the test {err}" + ) + + cleanup_folders = [SHARE_DIR / Path("anon"), PGDIR / Path("download_extensions")] + for folder in cleanup_folders: + try: + shutil.rmtree(folder) + log.info(f"removed folder {folder}") + except Exception as err: + log.info( + f"skipping remove of folder {folder} because it doesn't exist.\ + this may be expected or unexpected depending on the test {err}" + ) + + +def upload_files(env): + log.info("Uploading test files to mock bucket") + os.chdir("test_runner/regress/data/extension_test") + for path in os.walk("."): + prefix, _, files = path + for file in files: + # the [2:] is to remove the leading "./" + full_path = os.path.join(prefix, file)[2:] + + with open(full_path, "rb") as f: + log.info(f"UPLOAD {full_path} to ext/{full_path}") + env.remote_storage_client.upload_fileobj( + f, + env.ext_remote_storage.bucket_name, + f"ext/{full_path}", + ) + os.chdir("../../../..") + + +# Test downloading remote extension. +@pytest.mark.parametrize("remote_storage_kind", available_s3_storages()) +def test_remote_extensions( + neon_env_builder: NeonEnvBuilder, + remote_storage_kind: RemoteStorageKind, + pg_version: PgVersion, +): + neon_env_builder.enable_remote_storage( + remote_storage_kind=remote_storage_kind, + test_name="test_remote_extensions", + enable_remote_extensions=True, + ) + env = neon_env_builder.init_start() + tenant_id, _ = env.neon_cli.create_tenant() + env.neon_cli.create_timeline("test_remote_extensions", tenant_id=tenant_id) + + assert env.ext_remote_storage is not None # satisfy mypy + assert env.remote_storage_client is not None # satisfy mypy + + # For MOCK_S3 we upload test files. + # For REAL_S3 we use the files already in the bucket + if remote_storage_kind == RemoteStorageKind.MOCK_S3: + upload_files(env) + + # Start a compute node and check that it can download the extensions + # and use them to CREATE EXTENSION and LOAD + endpoint = env.endpoints.create_start( + "test_remote_extensions", + tenant_id=tenant_id, + remote_ext_config=env.ext_remote_storage.to_string(), + # config_lines=["log_min_messages=debug3"], + ) + try: + with closing(endpoint.connect()) as conn: + with conn.cursor() as cur: + # Check that appropriate control files were downloaded + cur.execute("SELECT * FROM pg_available_extensions") + all_extensions = [x[0] for x in cur.fetchall()] + log.info(all_extensions) + assert "anon" in all_extensions + + # postgis is on real s3 but not mock s3. + # it's kind of a big file, would rather not upload to github + if remote_storage_kind == RemoteStorageKind.REAL_S3: + assert "postgis" in all_extensions + # this may fail locally if dependency is missing + # we don't really care about the error, + # we just want to make sure it downloaded + try: + cur.execute("CREATE EXTENSION postgis") + except Exception as err: + log.info(f"(expected) error creating postgis extension: {err}") + # we do not check the error, so this is basically a NO-OP + # however checking the log you can make sure that it worked + # and also get valuable information about how long loading the extension took + + # this is expected to fail on my computer because I don't have the pgcrypto extension + try: + cur.execute("CREATE EXTENSION anon") + except Exception as err: + log.info("error creating anon extension") + assert "pgcrypto" in str(err), "unexpected error creating anon extension" + finally: + cleanup(pg_version) + + +# Test downloading remote library. +@pytest.mark.parametrize("remote_storage_kind", available_s3_storages()) +def test_remote_library( + neon_env_builder: NeonEnvBuilder, + remote_storage_kind: RemoteStorageKind, + pg_version: PgVersion, +): + neon_env_builder.enable_remote_storage( + remote_storage_kind=remote_storage_kind, + test_name="test_remote_library", + enable_remote_extensions=True, + ) + env = neon_env_builder.init_start() + tenant_id, _ = env.neon_cli.create_tenant() + env.neon_cli.create_timeline("test_remote_library", tenant_id=tenant_id) + + assert env.ext_remote_storage is not None # satisfy mypy + assert env.remote_storage_client is not None # satisfy mypy + + # For MOCK_S3 we upload test files. + # For REAL_S3 we use the files already in the bucket + if remote_storage_kind == RemoteStorageKind.MOCK_S3: + upload_files(env) + + # and use them to run LOAD library + endpoint = env.endpoints.create_start( + "test_remote_library", + tenant_id=tenant_id, + remote_ext_config=env.ext_remote_storage.to_string(), + # config_lines=["log_min_messages=debug3"], + ) + try: + with closing(endpoint.connect()) as conn: + with conn.cursor() as cur: + # try to load library + try: + cur.execute("LOAD 'anon'") + except Exception as err: + log.info(f"error loading anon library: {err}") + raise AssertionError("unexpected error loading anon library") from err + + # test library which name is different from extension name + # this may fail locally if dependency is missing + # however, it does successfully download the postgis archive + if remote_storage_kind == RemoteStorageKind.REAL_S3: + try: + cur.execute("LOAD 'postgis_topology-3'") + except Exception as err: + log.info("error loading postgis_topology-3") + assert "No such file or directory" in str( + err + ), "unexpected error loading postgis_topology-3" + finally: + cleanup(pg_version) + + +# Here we test a complex extension +# which has multiple extensions in one archive +# using postgis as an example +@pytest.mark.skipif( + RemoteStorageKind.REAL_S3 not in available_s3_storages(), + reason="skipping test because real s3 not enabled", +) +def test_multiple_extensions_one_archive( + neon_env_builder: NeonEnvBuilder, + pg_version: PgVersion, +): + neon_env_builder.enable_remote_storage( + remote_storage_kind=RemoteStorageKind.REAL_S3, + test_name="test_multiple_extensions_one_archive", + enable_remote_extensions=True, + ) + env = neon_env_builder.init_start() + tenant_id, _ = env.neon_cli.create_tenant() + env.neon_cli.create_timeline("test_multiple_extensions_one_archive", tenant_id=tenant_id) + + assert env.ext_remote_storage is not None # satisfy mypy + assert env.remote_storage_client is not None # satisfy mypy + + endpoint = env.endpoints.create_start( + "test_multiple_extensions_one_archive", + tenant_id=tenant_id, + remote_ext_config=env.ext_remote_storage.to_string(), + ) + with closing(endpoint.connect()) as conn: + with conn.cursor() as cur: + cur.execute("CREATE EXTENSION address_standardizer;") + cur.execute("CREATE EXTENSION address_standardizer_data_us;") + # execute query to ensure that it works + cur.execute( + "SELECT house_num, name, suftype, city, country, state, unit \ + FROM standardize_address('us_lex', 'us_gaz', 'us_rules', \ + 'One Rust Place, Boston, MA 02109');" + ) + res = cur.fetchall() + log.info(res) + assert len(res) > 0 + + cleanup(pg_version) + + +# Test that extension is downloaded after endpoint restart, +# when the library is used in the query. +# +# Run the test with mutliple simultaneous connections to an endpoint. +# to ensure that the extension is downloaded only once. +# +def test_extension_download_after_restart( + neon_env_builder: NeonEnvBuilder, + pg_version: PgVersion, +): + if "15" in pg_version: # SKIP v15 for now because test set only has extension built for v14 + return None + + neon_env_builder.enable_remote_storage( + remote_storage_kind=RemoteStorageKind.MOCK_S3, + test_name="test_extension_download_after_restart", + enable_remote_extensions=True, + ) + env = neon_env_builder.init_start() + tenant_id, _ = env.neon_cli.create_tenant() + env.neon_cli.create_timeline("test_extension_download_after_restart", tenant_id=tenant_id) + + assert env.ext_remote_storage is not None # satisfy mypy + assert env.remote_storage_client is not None # satisfy mypy + + # For MOCK_S3 we upload test files. + upload_files(env) + + endpoint = env.endpoints.create_start( + "test_extension_download_after_restart", + tenant_id=tenant_id, + remote_ext_config=env.ext_remote_storage.to_string(), + config_lines=["log_min_messages=debug3"], + ) + with closing(endpoint.connect()) as conn: + with conn.cursor() as cur: + cur.execute("CREATE extension pg_buffercache;") + cur.execute("SELECT * from pg_buffercache;") + res = cur.fetchall() + assert len(res) > 0 + log.info(res) + + # shutdown compute node + endpoint.stop() + # remove extension files locally + cleanup(pg_version) + + # spin up compute node again (there are no extension files available, because compute is stateless) + endpoint = env.endpoints.create_start( + "test_extension_download_after_restart", + tenant_id=tenant_id, + remote_ext_config=env.ext_remote_storage.to_string(), + config_lines=["log_min_messages=debug3"], + ) + + # connect to compute node and run the query + # that will trigger the download of the extension + def run_query(endpoint, thread_id: int): + log.info("thread_id {%d} starting", thread_id) + with closing(endpoint.connect()) as conn: + with conn.cursor() as cur: + cur.execute("SELECT * from pg_buffercache;") + res = cur.fetchall() + assert len(res) > 0 + log.info("thread_id {%d}, res = %s", thread_id, res) + + threads = [threading.Thread(target=run_query, args=(endpoint, i)) for i in range(2)] + + for thread in threads: + thread.start() + for thread in threads: + thread.join() + + cleanup(pg_version) diff --git a/test_runner/regress/test_timeline_delete.py b/test_runner/regress/test_timeline_delete.py index 764bfe62f96a..5ba34122bc29 100644 --- a/test_runner/regress/test_timeline_delete.py +++ b/test_runner/regress/test_timeline_delete.py @@ -404,6 +404,7 @@ def assert_prefix_empty(neon_env_builder: NeonEnvBuilder, prefix: Optional[str] assert isinstance(neon_env_builder.remote_storage, S3Storage) # Note that this doesnt use pagination, so list is not guaranteed to be exhaustive. + assert neon_env_builder.remote_storage_client is not None response = neon_env_builder.remote_storage_client.list_objects_v2( Bucket=neon_env_builder.remote_storage.bucket_name, Prefix=prefix or neon_env_builder.remote_storage.prefix_in_bucket or "", @@ -758,7 +759,7 @@ def test_timeline_delete_works_for_remote_smoke( ) # for some reason the check above doesnt immediately take effect for the below. - # Assume it is mock server incosistency and check twice. + # Assume it is mock server inconsistency and check twice. wait_until( 2, 0.5, diff --git a/vendor/postgres-v14 b/vendor/postgres-v14 index da3885c34db3..28bf5ccfa2fd 160000 --- a/vendor/postgres-v14 +++ b/vendor/postgres-v14 @@ -1 +1 @@ -Subproject commit da3885c34db312afd555802be2ce985fafd1d8ad +Subproject commit 28bf5ccfa2fda9677566a25abd450e714d9ed055 diff --git a/vendor/postgres-v15 b/vendor/postgres-v15 index 770c6dffc5ef..553f2d3618a6 160000 --- a/vendor/postgres-v15 +++ b/vendor/postgres-v15 @@ -1 +1 @@ -Subproject commit 770c6dffc5ef6aac05bf049693877fb377eea6fc +Subproject commit 553f2d3618a6d4893bde67f1c065926ee8a3a118 diff --git a/vendor/revisions.json b/vendor/revisions.json index 8579b5abaaec..80d161938eff 100644 --- a/vendor/revisions.json +++ b/vendor/revisions.json @@ -1,4 +1,4 @@ { - "postgres-v15": "770c6dffc5ef6aac05bf049693877fb377eea6fc", - "postgres-v14": "da3885c34db312afd555802be2ce985fafd1d8ad" + "postgres-v15": "553f2d3618a6d4893bde67f1c065926ee8a3a118", + "postgres-v14": "28bf5ccfa2fda9677566a25abd450e714d9ed055" } diff --git a/workspace_hack/Cargo.toml b/workspace_hack/Cargo.toml index 3f47ef062f93..d79c7a4104fc 100644 --- a/workspace_hack/Cargo.toml +++ b/workspace_hack/Cargo.toml @@ -60,6 +60,7 @@ url = { version = "2", features = ["serde"] } [build-dependencies] anyhow = { version = "1", features = ["backtrace"] } bytes = { version = "1", features = ["serde"] } +cc = { version = "1", default-features = false, features = ["parallel"] } either = { version = "1" } itertools = { version = "0.10" } libc = { version = "0.2", features = ["extra_traits"] }