Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remote Extensions Multiple Endpoints #4834

Merged
merged 4 commits into from
Jul 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions compute_tools/src/compute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -898,6 +898,12 @@ LIMIT 100",
}

{
// TODO:
// started_to_download_extensions maybe should store archive paths rather than extension names...
// because right now if you try to download the same archive multiple times
// you will get the same archive downloaded multiple times
// but, this is not such an easy fix.
// because now requests should maybe pause and wait if someone else started making the download
let mut started_to_download_extensions = self
.started_to_download_extensions
.lock()
Expand Down
1 change: 1 addition & 0 deletions compute_tools/src/extension_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,7 @@ pub async fn download_extension(
}
}
}
info!("done moving extension {ext_name}");
Ok(())
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,20 @@
{
"public_extensions": [
"anon"
"anon",
"pg_buffercache"
],
"library_index": {
"anon": "anon",
"kq_imcx": "kq_imcx"
"kq_imcx": "kq_imcx",
"pg_buffercache": "pg_buffercache"
},
"extension_data": {
"pg_buffercache": {
"control_data": {
"pg_buffercache.control": "# pg_buffercache extension \ncomment = 'examine the shared buffer cache' \ndefault_version = '1.3' \nmodule_pathname = '$libdir/pg_buffercache' \nrelocatable = true \ntrusted=true"
},
"archive_path": "5670669815/v14/extensions/pg_buffercache.tar.zst"
},
"kq_imcx": {
"control_data": {
"kq_imcx.control": "# This file is generated content from add_postgresql_extension.\n# No point in modifying it, it will be overwritten anyway.\n\n# Default version, always set\ndefault_version = '0.1'\n\n# Module pathname generated from target shared library name. Use\n# MODULE_PATHNAME in script file.\nmodule_pathname = '$libdir/kq_imcx.so'\n\n# Comment for extension. Set using COMMENT option. Can be set in\n# script file as well.\ncomment = 'ketteQ In-Memory Calendar Extension (IMCX)'\n\n# Encoding for script file. Set using ENCODING option.\n#encoding = ''\n\n# Required extensions. Set using REQUIRES option (multi-valued).\n#requires = ''\ntrusted = true\n"
Expand All @@ -20,5 +28,4 @@
"archive_path": "5670669815/v14/extensions/anon.tar.zst"
}
}
}

}
Binary file not shown.
166 changes: 148 additions & 18 deletions test_runner/regress/test_download_extensions.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import os
import shutil
from contextlib import closing
from pathlib import Path

import pytest
from fixtures.log_helper import log
Expand All @@ -12,23 +13,40 @@
from fixtures.pg_version import PgVersion


def add_pgdir_prefix(pgversion, files):
return [f"pg_install/v{pgversion}/" + x for x in files]


# Cleaning up downloaded files is important for local tests
# or else one test could reuse the files from another test or another test run
def cleanup(cleanup_files, cleanup_folders, pg_version):
cleanup_files = add_pgdir_prefix(pg_version, cleanup_files)
cleanup_folders = add_pgdir_prefix(pg_version, cleanup_folders)
def cleanup(pg_version):
PGDIR = Path(f"pg_install/v{pg_version}")

LIB_DIR = PGDIR / Path("lib/postgresql")
cleanup_lib_globs = ["anon*", "postgis*", "pg_buffercache*"]
cleanup_lib_glob_paths = [LIB_DIR.glob(x) for x in cleanup_lib_globs]

SHARE_DIR = PGDIR / Path("share/postgresql/extension")
cleanup_ext_globs = [
"anon*",
"address_standardizer*",
"postgis*",
"pageinspect*",
"pg_buffercache*",
"pgrouting*",
]
cleanup_ext_glob_paths = [SHARE_DIR.glob(x) for x in cleanup_ext_globs]

for file in cleanup_files:
all_glob_paths = cleanup_lib_glob_paths + cleanup_ext_glob_paths
all_cleanup_files = []
for file_glob in all_glob_paths:
for file in file_glob:
all_cleanup_files.append(file)

for file in all_cleanup_files:
try:
os.remove(file)
log.info(f"removed file {file}")
except Exception as err:
log.info(f"error removing file {file}: {err}")

cleanup_folders = [SHARE_DIR / Path("anon"), PGDIR / Path("download_extensions")]
for folder in cleanup_folders:
try:
shutil.rmtree(folder)
Expand All @@ -37,13 +55,6 @@ def cleanup(cleanup_files, cleanup_folders, pg_version):
log.info(f"error removing folder {folder}: {err}")


cleanup_files = [
"lib/postgresql/anon.so",
"share/postgresql/extension/anon.control",
]
cleanup_folders = ["share/postgresql/extension/anon", "download_extensions"]


def upload_files(env):
log.info("Uploading test files to mock bucket")
os.chdir("test_runner/regress/data/extension_test")
Expand Down Expand Up @@ -123,8 +134,7 @@ def test_remote_extensions(
log.info("error creating anon extension")
assert "pgcrypto" in str(err), "unexpected error creating anon extension"
finally:
pass
# cleanup(cleanup_files, cleanup_folders, pg_version)
cleanup(pg_version)


# Test downloading remote library.
Expand Down Expand Up @@ -181,4 +191,124 @@ def test_remote_library(
err
), "unexpected error loading postgis_topology-3"
finally:
cleanup(cleanup_files, cleanup_folders, pg_version)
cleanup(pg_version)


# Test extension downloading with mutliple connections to an endpoint.
# this test only supports real s3 becuase postgis is too large an extension to
# put in our github repo
def test_extension_download_after_restart(
neon_env_builder: NeonEnvBuilder,
pg_version: PgVersion,
):
if "15" in pg_version: # SKIP v15 for now because I only built the extension for v14
return None

neon_env_builder.enable_remote_storage(
remote_storage_kind=RemoteStorageKind.MOCK_S3,
test_name="test_extension_download_after_restart",
enable_remote_extensions=True,
)
neon_env_builder.num_safekeepers = 3
env = neon_env_builder.init_start()
tenant_id, _ = env.neon_cli.create_tenant()
env.neon_cli.create_timeline("test_extension_download_after_restart", tenant_id=tenant_id)

assert env.ext_remote_storage is not None # satisfy mypy
assert env.remote_storage_client is not None # satisfy mypy

# For MOCK_S3 we upload test files.
upload_files(env)

endpoint = env.endpoints.create_start(
"test_extension_download_after_restart",
tenant_id=tenant_id,
remote_ext_config=env.ext_remote_storage.to_string(),
config_lines=["log_min_messages=debug3"],
)
with closing(endpoint.connect()) as conn:
with conn.cursor() as cur:
cur.execute("CREATE extension pg_buffercache;")
cur.execute("SELECT * from pg_buffercache;")
log.info(cur.fetchall())

# the endpoint is closed now
endpoint.stop()
# remove postgis files locally
cleanup(pg_version)

# spin up compute node again (there are no postgis files available, because compute is stateless)
endpoint = env.endpoints.create_start(
"test_extension_download_after_restart",
tenant_id=tenant_id,
remote_ext_config=env.ext_remote_storage.to_string(),
config_lines=["log_min_messages=debug3"],
)
# connect to postrgres and execute the query again
with closing(endpoint.connect()) as conn:
with conn.cursor() as cur:
cur.execute("SELECT * from pg_buffercache;")
log.info(cur.fetchall())

cleanup(pg_version)


# here we test a complex extension
def test_multiple_extensions_one_archive(
neon_env_builder: NeonEnvBuilder,
pg_version: PgVersion,
):
if "15" in pg_version: # SKIP v15 for now because I only built the extension for v14
return None

neon_env_builder.enable_remote_storage(
remote_storage_kind=RemoteStorageKind.REAL_S3,
test_name="test_multiple_extensions_one_archive",
enable_remote_extensions=True,
)
neon_env_builder.num_safekeepers = 3
env = neon_env_builder.init_start()
tenant_id, _ = env.neon_cli.create_tenant()
env.neon_cli.create_timeline("test_multiple_extensions_one_archive", tenant_id=tenant_id)

assert env.ext_remote_storage is not None # satisfy mypy
assert env.remote_storage_client is not None # satisfy mypy

endpoint = env.endpoints.create_start(
"test_multiple_extensions_one_archive",
tenant_id=tenant_id,
remote_ext_config=env.ext_remote_storage.to_string(),
)
with closing(endpoint.connect()) as conn:
with conn.cursor() as cur:
# TODO later: figure out what was going wrong with this:
cur.execute("CREATE EXTENSION address_standardizer;")
cur.execute("CREATE EXTENSION address_standardizer_data_us;")
# execute query to ensure that it works
cur.execute(
"SELECT house_num, name, suftype, city, country, state, unit \
FROM standardize_address('us_lex', 'us_gaz', 'us_rules', \
'One Rust Place, Boston, MA 02109');"
)
log.info(cur.fetchall())

# remove postgis files locally
cleanup(pg_version)


# TODO: this complex example reveals a possible "inneficiency":
# both calls will download the extension

# proposed solution:
# A: don't worry about it
# B:
# 1st request sets started_download = true
# this request sets download_completed = true if it succeeds
# subsequent requests hang and repeatedly check download_completed until it gets set or until they timeout
# 3 seconds afterthe started_download = true was set some thread checks if download_completed was set. if not, then it sets started_download back to false


# TODO later: investigate download timeouts
# Test extension downloading with mutliple connections to an endpoint.
# this test only supports real s3 becuase postgis is too large an extension to
# put in our github repo