Skip to content

Commit

Permalink
Add support for GCP via the S3 compatible XML API
Browse files Browse the repository at this point in the history
  • Loading branch information
poodlewars committed Feb 19, 2025
1 parent ad0b1cf commit d189bdd
Show file tree
Hide file tree
Showing 34 changed files with 1,085 additions and 175 deletions.
2 changes: 2 additions & 0 deletions cpp/arcticdb/entity/protobufs.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include <storage.pb.h>
#include <encoding.pb.h>
#include <s3_storage.pb.h>
#include <gcp_storage.pb.h>
#include <lmdb_storage.pb.h>
#include <mongo_storage.pb.h>
#include <in_memory_storage.pb.h>
Expand All @@ -24,6 +25,7 @@ namespace arcticdb::proto {
namespace encoding = arcticc::pb2::encoding_pb2;
namespace storage = arcticc::pb2::storage_pb2;
namespace s3_storage = arcticc::pb2::s3_storage_pb2;
namespace gcp_storage = arcticc::pb2::gcp_storage_pb2;
namespace lmdb_storage = arcticc::pb2::lmdb_storage_pb2;
namespace mapped_file_storage = arcticc::pb2::mapped_file_storage_pb2;
namespace mongo_storage = arcticc::pb2::mongo_storage_pb2;
Expand Down
14 changes: 13 additions & 1 deletion cpp/arcticdb/storage/common.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,14 +75,26 @@ inline std::vector<char> stream_to_vector(std::iostream& src) {

class NativeVariantStorage {
public:
using VariantStorageConfig = std::variant<std::monostate, s3::S3Settings>;
using VariantStorageConfig = std::variant<std::monostate, s3::S3Settings, s3::GCPXMLSettings>;
explicit NativeVariantStorage(VariantStorageConfig config = std::monostate()) : config_(std::move(config)) {};
const VariantStorageConfig& variant() const {
return config_;
}

void update(const s3::S3Settings& config) {
config_ = config;
}

std::string to_string() {
return util::variant_match(config_, [](std::monostate) {
return "empty";
}, [](s3::S3Settings) {
return "s3";
}, [](s3::GCPXMLSettings) {
return "gcpxml";
});
}

private:
VariantStorageConfig config_;
};
Expand Down
6 changes: 6 additions & 0 deletions cpp/arcticdb/storage/config_cache.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,12 @@ class ConfigCache {
storage_conf.config().UnpackTo(&s3_storage);
storages.emplace_back(create_storage(path, mode, s3::S3Settings(settings).update(s3_storage)));
},
[&storage_conf, &storages, &path, mode] (const s3::GCPXMLSettings& settings) {
util::check(storage_conf.config().Is<arcticdb::proto::gcp_storage::Config>(), "Only support GCP native settings");
arcticdb::proto::gcp_storage::Config gcp_storage;
storage_conf.config().UnpackTo(&gcp_storage);
storages.emplace_back(create_storage(path, mode, s3::GCPXMLSettings(settings).update(gcp_storage)));
},
[&storage_conf, &storages, &path, mode](const auto &) {
storages.emplace_back(create_storage(path, mode, storage_conf));
}
Expand Down
1 change: 1 addition & 0 deletions cpp/arcticdb/storage/library_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ void apply_storage_override(
util::variant_match(
storage_override.variant(),
StorageVisitor<S3Override>{lib_cfg_proto, override_https},
StorageVisitor<GCPXMLOverride>{lib_cfg_proto, override_https},
StorageVisitor<AzureOverride>{lib_cfg_proto, override_https},
StorageVisitor<LmdbOverride>{lib_cfg_proto, override_https},
[] (const std::monostate&) {});
Expand Down
27 changes: 25 additions & 2 deletions cpp/arcticdb/storage/mock/s3_mock_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ S3Result<std::monostate> MockS3Client::put_object(
return {std::monostate()};
}

S3Result<DeleteOutput> MockS3Client::delete_objects(
S3Result<DeleteObjectsOutput> MockS3Client::delete_objects(
const std::vector<std::string>& s3_object_names,
const std::string& bucket_name) {
std::scoped_lock<std::mutex> lock(mutex_);
Expand All @@ -137,7 +137,7 @@ S3Result<DeleteOutput> MockS3Client::delete_objects(
}
}

DeleteOutput output;
DeleteObjectsOutput output;
for (auto& s3_object_name : s3_object_names) {
auto maybe_error = has_failure_trigger(s3_object_name, StorageOperation::DELETE_LOCAL);
if (maybe_error.has_value()) {
Expand All @@ -154,6 +154,29 @@ S3Result<DeleteOutput> MockS3Client::delete_objects(
return {output};
}

folly::Future<S3Result<std::monostate>> MockS3Client::delete_object(
const std::string& s3_object_name,
const std::string& bucket_name) {
std::scoped_lock<std::mutex> lock(mutex_);
if (auto maybe_error = has_failure_trigger(s3_object_name, StorageOperation::DELETE); maybe_error) {
S3Result<std::monostate> res{*maybe_error};
return folly::makeFuture(res);
} else if (auto maybe_local_error = has_failure_trigger(s3_object_name, StorageOperation::DELETE_LOCAL); maybe_local_error) {
S3Result<std::monostate> res{*maybe_local_error};
return folly::makeFuture(res);
}

auto pos = s3_contents_.find({bucket_name, s3_object_name});
if (pos == s3_contents_.end() || !pos->second.has_value()) {
S3Result<std::monostate> res{not_found_error};
return folly::makeFuture(res);
} else {
pos->second = std::nullopt;
S3Result<std::monostate> res{};
return folly::makeFuture(res);
}
}

// Using a fixed page size since it's only being used for simple tests.
// If we ever need to configure it we should move it to the s3 proto config instead.
constexpr auto page_size = 10;
Expand Down
6 changes: 5 additions & 1 deletion cpp/arcticdb/storage/mock/s3_mock_client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,14 @@ class MockS3Client : public S3ClientInterface {
const std::string& bucket_name,
PutHeader header = PutHeader::NONE) override;

S3Result<DeleteOutput> delete_objects(
S3Result<DeleteObjectsOutput> delete_objects(
const std::vector<std::string>& s3_object_names,
const std::string& bucket_name) override;

folly::Future<S3Result<std::monostate>> delete_object(
const std::string& s3_object_name,
const std::string& bucket_name) override;

S3Result<ListObjectsOutput> list_objects(
const std::string& prefix,
const std::string& bucket_name,
Expand Down
146 changes: 121 additions & 25 deletions cpp/arcticdb/storage/python_bindings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,77 @@ std::shared_ptr<LibraryIndex> create_library_index(const std::string &environmen
return std::make_shared<LibraryIndex>(EnvironmentName{environment_name}, mem_resolver);
}

enum class S3SettingsPickleOrder : uint32_t {
TYPE = 0,
AWS_AUTH = 1,
AWS_PROFILE = 2,
USE_INTERNAL_CLIENT_WRAPPER_FOR_TESTING = 3
};

enum class GCPXMLSettingsPickleOrder : uint32_t {
TYPE = 0,
AWS_AUTH = 1,
CA_CERT_PATH = 2,
CA_CERT_DIR = 3,
SSL = 4,
HTTPS = 5,
PREFIX = 6,
ENDPOINT = 7,
SECRET = 8,
ACCESS = 9,
BUCKET = 10,
};

s3::GCPXMLSettings gcp_settings(const py::tuple& t) {
util::check(t.size() == 11, "Invalid GCPXMLSettings pickle objects, expected 11 attributes but was {}", t.size());
return s3::GCPXMLSettings{
t[static_cast<uint32_t>(GCPXMLSettingsPickleOrder::AWS_AUTH)].cast<s3::AWSAuthMethod>(),
t[static_cast<uint32_t>(GCPXMLSettingsPickleOrder::CA_CERT_PATH)].cast<std::string>(),
t[static_cast<uint32_t>(GCPXMLSettingsPickleOrder::CA_CERT_DIR)].cast<std::string>(),
t[static_cast<uint32_t>(GCPXMLSettingsPickleOrder::SSL)].cast<bool>(),
t[static_cast<uint32_t>(GCPXMLSettingsPickleOrder::HTTPS)].cast<bool>(),
t[static_cast<uint32_t>(GCPXMLSettingsPickleOrder::PREFIX)].cast<std::string>(),
t[static_cast<uint32_t>(GCPXMLSettingsPickleOrder::ENDPOINT)].cast<std::string>(),
t[static_cast<uint32_t>(GCPXMLSettingsPickleOrder::SECRET)].cast<std::string>(),
t[static_cast<uint32_t>(GCPXMLSettingsPickleOrder::ACCESS)].cast<std::string>(),
t[static_cast<uint32_t>(GCPXMLSettingsPickleOrder::BUCKET)].cast<std::string>()
};
}

s3::S3Settings s3_settings(const py::tuple& t) {
util::check(t.size() == 4, "Invalid S3Settings pickle objects");
return s3::S3Settings{
t[static_cast<uint32_t>(S3SettingsPickleOrder::AWS_AUTH)].cast<s3::AWSAuthMethod>(),
t[static_cast<uint32_t>(S3SettingsPickleOrder::AWS_PROFILE)].cast<std::string>(),
t[static_cast<uint32_t>(S3SettingsPickleOrder::USE_INTERNAL_CLIENT_WRAPPER_FOR_TESTING)].cast<bool>()
};
}

py::tuple to_tuple(const s3::GCPXMLSettings& settings) {
return py::make_tuple(
s3::NativeSettingsType::GCPXML,
settings.aws_auth(),
settings.ca_cert_path(),
settings.ca_cert_dir(),
settings.ssl(),
settings.https(),
settings.prefix(),
settings.endpoint(),
settings.secret(),
settings.access(),
settings.bucket()
);
}

py::tuple to_tuple(const s3::S3Settings& settings) {
return py::make_tuple(
s3::NativeSettingsType::S3,
settings.aws_auth(),
settings.aws_profile(),
settings.use_internal_client_wrapper_for_testing()
);
}

void register_bindings(py::module& storage, py::exception<arcticdb::ArcticException>& base_exception) {
storage.attr("CONFIG_LIBRARY_NAME") = py::str(arcticdb::storage::CONFIG_LIBRARY_NAME);

Expand Down Expand Up @@ -97,31 +168,23 @@ void register_bindings(py::module& storage, py::exception<arcticdb::ArcticExcept

storage.def("create_library_index", &create_library_index);


py::enum_<s3::AWSAuthMethod>(storage, "AWSAuthMethod")
.value("DISABLED", s3::AWSAuthMethod::DISABLED)
.value("DEFAULT_CREDENTIALS_PROVIDER_CHAIN", s3::AWSAuthMethod::DEFAULT_CREDENTIALS_PROVIDER_CHAIN)
.value("STS_PROFILE_CREDENTIALS_PROVIDER", s3::AWSAuthMethod::STS_PROFILE_CREDENTIALS_PROVIDER);

enum class S3SettingsPickleOrder : uint32_t {
AWS_AUTH = 0,
AWS_PROFILE = 1,
USE_INTERNAL_CLIENT_WRAPPER_FOR_TESTING = 2
};
py::enum_<s3::NativeSettingsType>(storage, "NativeSettingsType")
.value("S3", s3::NativeSettingsType::S3)
.value("GCPXML", s3::NativeSettingsType::GCPXML);

py::class_<s3::S3Settings>(storage, "S3Settings")
.def(py::init<s3::AWSAuthMethod, const std::string&, bool>())
.def(py::pickle(
[](const s3::S3Settings &settings) {
return py::make_tuple(settings.aws_auth(), settings.aws_profile(), settings.use_internal_client_wrapper_for_testing());
return to_tuple(settings);
},
[](py::tuple t) {
util::check(t.size() == 3, "Invalid S3Settings pickle objects");
s3::S3Settings settings(t[static_cast<uint32_t>(S3SettingsPickleOrder::AWS_AUTH)].cast<s3::AWSAuthMethod>(),
t[static_cast<uint32_t>(S3SettingsPickleOrder::AWS_PROFILE)].cast<std::string>(),
t[static_cast<uint32_t>(S3SettingsPickleOrder::USE_INTERNAL_CLIENT_WRAPPER_FOR_TESTING)].cast<bool>()
);
return settings;
return s3_settings(t);
}
))
.def_property_readonly("aws_profile", [](const s3::S3Settings &settings) { return settings.aws_profile(); })
Expand All @@ -130,31 +193,60 @@ void register_bindings(py::module& storage, py::exception<arcticdb::ArcticExcept
return settings.use_internal_client_wrapper_for_testing();
});

py::class_<s3::GCPXMLSettings>(storage, "GCPXMLSettings")
.def(py::init<>())
.def(py::pickle(
[](const s3::GCPXMLSettings &settings) {
return to_tuple(settings);
},
[](py::tuple t) {
return gcp_settings(t);
}
))
.def_property("bucket", &s3::GCPXMLSettings::bucket, &s3::GCPXMLSettings::set_bucket)
.def_property("endpoint", &s3::GCPXMLSettings::endpoint, &s3::GCPXMLSettings::set_endpoint)
.def_property("access", &s3::GCPXMLSettings::access, &s3::GCPXMLSettings::set_access)
.def_property("secret", &s3::GCPXMLSettings::secret, &s3::GCPXMLSettings::set_secret)
.def_property("prefix", &s3::GCPXMLSettings::prefix, &s3::GCPXMLSettings::set_prefix)
.def_property("aws_auth", &s3::GCPXMLSettings::aws_auth, &s3::GCPXMLSettings::set_aws_auth)
.def_property("https", &s3::GCPXMLSettings::https, &s3::GCPXMLSettings::set_https)
.def_property("ssl", &s3::GCPXMLSettings::ssl, &s3::GCPXMLSettings::set_ssl)
.def_property("ca_cert_path", &s3::GCPXMLSettings::ca_cert_path, &s3::GCPXMLSettings::set_cert_path)
.def_property("ca_cert_dir", &s3::GCPXMLSettings::ca_cert_dir, &s3::GCPXMLSettings::set_cert_dir)
;

py::class_<NativeVariantStorage>(storage, "NativeVariantStorage")
.def(py::init<>())
.def(py::init<NativeVariantStorage::VariantStorageConfig>())
.def(py::pickle(
[](const NativeVariantStorage &settings) {
return util::variant_match(settings.variant(),
[] (const s3::S3Settings& settings) {
return py::make_tuple(settings.aws_auth(), settings.aws_profile(), settings.use_internal_client_wrapper_for_testing());
return to_tuple(settings);
},
[] (const s3::GCPXMLSettings& settings) {
return to_tuple(settings);
},
[](const auto &) {
[](const auto &) -> py::tuple {
util::raise_rte("Invalid native storage setting type");
return py::make_tuple();
}
);
},
[](py::tuple t) {
util::check(t.size() == 3, "Invalid S3Settings pickle objects");
s3::S3Settings settings(t[static_cast<uint32_t>(S3SettingsPickleOrder::AWS_AUTH)].cast<s3::AWSAuthMethod>(),
t[static_cast<uint32_t>(S3SettingsPickleOrder::AWS_PROFILE)].cast<std::string>(),
t[static_cast<uint32_t>(S3SettingsPickleOrder::USE_INTERNAL_CLIENT_WRAPPER_FOR_TESTING)].cast<bool>()
);
return NativeVariantStorage(std::move(settings));
util::check(t.size() >= 1, "Expected at least one attribute in Native Settings pickle");
auto type = t[static_cast<uint32_t>(S3SettingsPickleOrder::TYPE)].cast<s3::NativeSettingsType>();
switch(type) {
case s3::NativeSettingsType::S3:
return NativeVariantStorage(s3_settings(t));
case s3::NativeSettingsType::GCPXML:
return NativeVariantStorage(gcp_settings(t));
}
util::raise_rte("Inaccessible");
}
))
.def("update", &NativeVariantStorage::update);
.def("update", &NativeVariantStorage::update)
.def("__repr__", &NativeVariantStorage::to_string);

py::implicitly_convertible<NativeVariantStorage::VariantStorageConfig, NativeVariantStorage>();

storage.def("create_mem_config_resolver", [](const py::object & env_config_map_py) -> std::shared_ptr<ConfigResolver> {
Expand Down Expand Up @@ -205,6 +297,9 @@ void register_bindings(py::module& storage, py::exception<arcticdb::ArcticExcept
.def_property("https", &S3Override::https, &S3Override::set_https)
.def_property("ssl", &S3Override::ssl, &S3Override::set_ssl);

py::class_<GCPXMLOverride>(storage, "GCPXMLOverride")
.def(py::init<>());

py::class_<AzureOverride>(storage, "AzureOverride")
.def(py::init<>())
.def_property("container_name", &AzureOverride::container_name, &AzureOverride::set_container_name)
Expand All @@ -221,7 +316,8 @@ void register_bindings(py::module& storage, py::exception<arcticdb::ArcticExcept
.def(py::init<>())
.def("set_s3_override", &StorageOverride::set_s3_override)
.def("set_azure_override", &StorageOverride::set_azure_override)
.def("set_lmdb_override", &StorageOverride::set_lmdb_override);
.def("set_lmdb_override", &StorageOverride::set_lmdb_override)
.def("set_gcpxml_override", &StorageOverride::set_gcpxml_override);

py::class_<LibraryManager, std::shared_ptr<LibraryManager>>(storage, "LibraryManager")
.def(py::init<std::shared_ptr<storage::Library>>())
Expand Down Expand Up @@ -262,7 +358,7 @@ void register_bindings(py::module& storage, py::exception<arcticdb::ArcticExcept
py::arg("library_path"),
py::arg("storage_override") = StorageOverride{},
py::arg("ignore_cache") = false,
py::arg("native_storage_map") = std::nullopt
py::arg("native_storage_config") = std::nullopt
)
.def("cleanup_library_if_open", [](LibraryManager& library_manager, std::string_view library_path) {
return library_manager.cleanup_library_if_open(LibraryPath{library_path, '.'});
Expand Down
Loading

0 comments on commit d189bdd

Please sign in to comment.