diff --git a/cpp/arcticdb/entity/protobufs.hpp b/cpp/arcticdb/entity/protobufs.hpp index cbdf9dea4e..23794048da 100644 --- a/cpp/arcticdb/entity/protobufs.hpp +++ b/cpp/arcticdb/entity/protobufs.hpp @@ -10,6 +10,7 @@ #include #include #include +#include #include #include #include @@ -24,6 +25,7 @@ namespace arcticdb::proto { namespace encoding = arcticc::pb2::encoding_pb2; namespace storage = arcticc::pb2::storage_pb2; namespace s3_storage = arcticc::pb2::s3_storage_pb2; + namespace gcp_storage = arcticc::pb2::gcp_storage_pb2; namespace lmdb_storage = arcticc::pb2::lmdb_storage_pb2; namespace mapped_file_storage = arcticc::pb2::mapped_file_storage_pb2; namespace mongo_storage = arcticc::pb2::mongo_storage_pb2; diff --git a/cpp/arcticdb/storage/common.hpp b/cpp/arcticdb/storage/common.hpp index 63d04d1662..ca54b73977 100644 --- a/cpp/arcticdb/storage/common.hpp +++ b/cpp/arcticdb/storage/common.hpp @@ -75,14 +75,26 @@ inline std::vector stream_to_vector(std::iostream& src) { class NativeVariantStorage { public: - using VariantStorageConfig = std::variant; + using VariantStorageConfig = std::variant; explicit NativeVariantStorage(VariantStorageConfig config = std::monostate()) : config_(std::move(config)) {}; const VariantStorageConfig& variant() const { return config_; } + void update(const s3::S3Settings& config) { config_ = config; } + + std::string to_string() { + return util::variant_match(config_, [](std::monostate) { + return "empty"; + }, [](s3::S3Settings) { + return "s3"; + }, [](s3::GCPXMLSettings) { + return "gcpxml"; + }); + } + private: VariantStorageConfig config_; }; diff --git a/cpp/arcticdb/storage/config_cache.hpp b/cpp/arcticdb/storage/config_cache.hpp index 42c08f71ae..59c5eebf5f 100644 --- a/cpp/arcticdb/storage/config_cache.hpp +++ b/cpp/arcticdb/storage/config_cache.hpp @@ -89,6 +89,12 @@ class ConfigCache { storage_conf.config().UnpackTo(&s3_storage); storages.emplace_back(create_storage(path, mode, s3::S3Settings(settings).update(s3_storage))); }, + [&storage_conf, &storages, &path, mode] (const s3::GCPXMLSettings& settings) { + util::check(storage_conf.config().Is(), "Only support GCP native settings"); + arcticdb::proto::gcp_storage::Config gcp_storage; + storage_conf.config().UnpackTo(&gcp_storage); + storages.emplace_back(create_storage(path, mode, s3::GCPXMLSettings(settings).update(gcp_storage))); + }, [&storage_conf, &storages, &path, mode](const auto &) { storages.emplace_back(create_storage(path, mode, storage_conf)); } diff --git a/cpp/arcticdb/storage/library_manager.cpp b/cpp/arcticdb/storage/library_manager.cpp index 403283dc14..19738fce10 100644 --- a/cpp/arcticdb/storage/library_manager.cpp +++ b/cpp/arcticdb/storage/library_manager.cpp @@ -46,6 +46,7 @@ void apply_storage_override( util::variant_match( storage_override.variant(), StorageVisitor{lib_cfg_proto, override_https}, + StorageVisitor{lib_cfg_proto, override_https}, StorageVisitor{lib_cfg_proto, override_https}, StorageVisitor{lib_cfg_proto, override_https}, [] (const std::monostate&) {}); diff --git a/cpp/arcticdb/storage/mock/s3_mock_client.cpp b/cpp/arcticdb/storage/mock/s3_mock_client.cpp index a16a1ec801..d14a05d05b 100644 --- a/cpp/arcticdb/storage/mock/s3_mock_client.cpp +++ b/cpp/arcticdb/storage/mock/s3_mock_client.cpp @@ -126,7 +126,7 @@ S3Result MockS3Client::put_object( return {std::monostate()}; } -S3Result MockS3Client::delete_objects( +S3Result MockS3Client::delete_objects( const std::vector& s3_object_names, const std::string& bucket_name) { std::scoped_lock lock(mutex_); @@ -137,7 +137,7 @@ S3Result MockS3Client::delete_objects( } } - DeleteOutput output; + DeleteObjectsOutput output; for (auto& s3_object_name : s3_object_names) { auto maybe_error = has_failure_trigger(s3_object_name, StorageOperation::DELETE_LOCAL); if (maybe_error.has_value()) { @@ -154,6 +154,29 @@ S3Result MockS3Client::delete_objects( return {output}; } +folly::Future> MockS3Client::delete_object( + const std::string& s3_object_name, + const std::string& bucket_name) { + std::scoped_lock lock(mutex_); + if (auto maybe_error = has_failure_trigger(s3_object_name, StorageOperation::DELETE); maybe_error) { + S3Result res{*maybe_error}; + return folly::makeFuture(res); + } else if (auto maybe_local_error = has_failure_trigger(s3_object_name, StorageOperation::DELETE_LOCAL); maybe_local_error) { + S3Result res{*maybe_local_error}; + return folly::makeFuture(res); + } + + auto pos = s3_contents_.find({bucket_name, s3_object_name}); + if (pos == s3_contents_.end() || !pos->second.has_value()) { + S3Result res{not_found_error}; + return folly::makeFuture(res); + } else { + pos->second = std::nullopt; + S3Result res{}; + return folly::makeFuture(res); + } +} + // Using a fixed page size since it's only being used for simple tests. // If we ever need to configure it we should move it to the s3 proto config instead. constexpr auto page_size = 10; diff --git a/cpp/arcticdb/storage/mock/s3_mock_client.hpp b/cpp/arcticdb/storage/mock/s3_mock_client.hpp index 6e2ed18647..8f5311ae8c 100644 --- a/cpp/arcticdb/storage/mock/s3_mock_client.hpp +++ b/cpp/arcticdb/storage/mock/s3_mock_client.hpp @@ -72,10 +72,14 @@ class MockS3Client : public S3ClientInterface { const std::string& bucket_name, PutHeader header = PutHeader::NONE) override; - S3Result delete_objects( + S3Result delete_objects( const std::vector& s3_object_names, const std::string& bucket_name) override; + folly::Future> delete_object( + const std::string& s3_object_name, + const std::string& bucket_name) override; + S3Result list_objects( const std::string& prefix, const std::string& bucket_name, diff --git a/cpp/arcticdb/storage/python_bindings.cpp b/cpp/arcticdb/storage/python_bindings.cpp index e641646499..3e034ee362 100644 --- a/cpp/arcticdb/storage/python_bindings.cpp +++ b/cpp/arcticdb/storage/python_bindings.cpp @@ -35,6 +35,77 @@ std::shared_ptr create_library_index(const std::string &environmen return std::make_shared(EnvironmentName{environment_name}, mem_resolver); } +enum class S3SettingsPickleOrder : uint32_t { + TYPE = 0, + AWS_AUTH = 1, + AWS_PROFILE = 2, + USE_INTERNAL_CLIENT_WRAPPER_FOR_TESTING = 3 +}; + +enum class GCPXMLSettingsPickleOrder : uint32_t { + TYPE = 0, + AWS_AUTH = 1, + CA_CERT_PATH = 2, + CA_CERT_DIR = 3, + SSL = 4, + HTTPS = 5, + PREFIX = 6, + ENDPOINT = 7, + SECRET = 8, + ACCESS = 9, + BUCKET = 10, +}; + +s3::GCPXMLSettings gcp_settings(const py::tuple& t) { + util::check(t.size() == 11, "Invalid GCPXMLSettings pickle objects, expected 11 attributes but was {}", t.size()); + return s3::GCPXMLSettings{ + t[static_cast(GCPXMLSettingsPickleOrder::AWS_AUTH)].cast(), + t[static_cast(GCPXMLSettingsPickleOrder::CA_CERT_PATH)].cast(), + t[static_cast(GCPXMLSettingsPickleOrder::CA_CERT_DIR)].cast(), + t[static_cast(GCPXMLSettingsPickleOrder::SSL)].cast(), + t[static_cast(GCPXMLSettingsPickleOrder::HTTPS)].cast(), + t[static_cast(GCPXMLSettingsPickleOrder::PREFIX)].cast(), + t[static_cast(GCPXMLSettingsPickleOrder::ENDPOINT)].cast(), + t[static_cast(GCPXMLSettingsPickleOrder::SECRET)].cast(), + t[static_cast(GCPXMLSettingsPickleOrder::ACCESS)].cast(), + t[static_cast(GCPXMLSettingsPickleOrder::BUCKET)].cast() + }; +} + +s3::S3Settings s3_settings(const py::tuple& t) { + util::check(t.size() == 4, "Invalid S3Settings pickle objects"); + return s3::S3Settings{ + t[static_cast(S3SettingsPickleOrder::AWS_AUTH)].cast(), + t[static_cast(S3SettingsPickleOrder::AWS_PROFILE)].cast(), + t[static_cast(S3SettingsPickleOrder::USE_INTERNAL_CLIENT_WRAPPER_FOR_TESTING)].cast() + }; +} + +py::tuple to_tuple(const s3::GCPXMLSettings& settings) { + return py::make_tuple( + s3::NativeSettingsType::GCPXML, + settings.aws_auth(), + settings.ca_cert_path(), + settings.ca_cert_dir(), + settings.ssl(), + settings.https(), + settings.prefix(), + settings.endpoint(), + settings.secret(), + settings.access(), + settings.bucket() + ); +} + +py::tuple to_tuple(const s3::S3Settings& settings) { + return py::make_tuple( + s3::NativeSettingsType::S3, + settings.aws_auth(), + settings.aws_profile(), + settings.use_internal_client_wrapper_for_testing() + ); +} + void register_bindings(py::module& storage, py::exception& base_exception) { storage.attr("CONFIG_LIBRARY_NAME") = py::str(arcticdb::storage::CONFIG_LIBRARY_NAME); @@ -97,31 +168,23 @@ void register_bindings(py::module& storage, py::exception(storage, "AWSAuthMethod") .value("DISABLED", s3::AWSAuthMethod::DISABLED) .value("DEFAULT_CREDENTIALS_PROVIDER_CHAIN", s3::AWSAuthMethod::DEFAULT_CREDENTIALS_PROVIDER_CHAIN) .value("STS_PROFILE_CREDENTIALS_PROVIDER", s3::AWSAuthMethod::STS_PROFILE_CREDENTIALS_PROVIDER); - enum class S3SettingsPickleOrder : uint32_t { - AWS_AUTH = 0, - AWS_PROFILE = 1, - USE_INTERNAL_CLIENT_WRAPPER_FOR_TESTING = 2 - }; + py::enum_(storage, "NativeSettingsType") + .value("S3", s3::NativeSettingsType::S3) + .value("GCPXML", s3::NativeSettingsType::GCPXML); py::class_(storage, "S3Settings") .def(py::init()) .def(py::pickle( [](const s3::S3Settings &settings) { - return py::make_tuple(settings.aws_auth(), settings.aws_profile(), settings.use_internal_client_wrapper_for_testing()); + return to_tuple(settings); }, [](py::tuple t) { - util::check(t.size() == 3, "Invalid S3Settings pickle objects"); - s3::S3Settings settings(t[static_cast(S3SettingsPickleOrder::AWS_AUTH)].cast(), - t[static_cast(S3SettingsPickleOrder::AWS_PROFILE)].cast(), - t[static_cast(S3SettingsPickleOrder::USE_INTERNAL_CLIENT_WRAPPER_FOR_TESTING)].cast() - ); - return settings; + return s3_settings(t); } )) .def_property_readonly("aws_profile", [](const s3::S3Settings &settings) { return settings.aws_profile(); }) @@ -130,6 +193,28 @@ void register_bindings(py::module& storage, py::exception(storage, "GCPXMLSettings") + .def(py::init<>()) + .def(py::pickle( + [](const s3::GCPXMLSettings &settings) { + return to_tuple(settings); + }, + [](py::tuple t) { + return gcp_settings(t); + } + )) + .def_property("bucket", &s3::GCPXMLSettings::bucket, &s3::GCPXMLSettings::set_bucket) + .def_property("endpoint", &s3::GCPXMLSettings::endpoint, &s3::GCPXMLSettings::set_endpoint) + .def_property("access", &s3::GCPXMLSettings::access, &s3::GCPXMLSettings::set_access) + .def_property("secret", &s3::GCPXMLSettings::secret, &s3::GCPXMLSettings::set_secret) + .def_property("prefix", &s3::GCPXMLSettings::prefix, &s3::GCPXMLSettings::set_prefix) + .def_property("aws_auth", &s3::GCPXMLSettings::aws_auth, &s3::GCPXMLSettings::set_aws_auth) + .def_property("https", &s3::GCPXMLSettings::https, &s3::GCPXMLSettings::set_https) + .def_property("ssl", &s3::GCPXMLSettings::ssl, &s3::GCPXMLSettings::set_ssl) + .def_property("ca_cert_path", &s3::GCPXMLSettings::ca_cert_path, &s3::GCPXMLSettings::set_cert_path) + .def_property("ca_cert_dir", &s3::GCPXMLSettings::ca_cert_dir, &s3::GCPXMLSettings::set_cert_dir) + ; + py::class_(storage, "NativeVariantStorage") .def(py::init<>()) .def(py::init()) @@ -137,24 +222,31 @@ void register_bindings(py::module& storage, py::exception py::tuple { util::raise_rte("Invalid native storage setting type"); - return py::make_tuple(); } ); }, [](py::tuple t) { - util::check(t.size() == 3, "Invalid S3Settings pickle objects"); - s3::S3Settings settings(t[static_cast(S3SettingsPickleOrder::AWS_AUTH)].cast(), - t[static_cast(S3SettingsPickleOrder::AWS_PROFILE)].cast(), - t[static_cast(S3SettingsPickleOrder::USE_INTERNAL_CLIENT_WRAPPER_FOR_TESTING)].cast() - ); - return NativeVariantStorage(std::move(settings)); + util::check(t.size() >= 1, "Expected at least one attribute in Native Settings pickle"); + auto type = t[static_cast(S3SettingsPickleOrder::TYPE)].cast(); + switch(type) { + case s3::NativeSettingsType::S3: + return NativeVariantStorage(s3_settings(t)); + case s3::NativeSettingsType::GCPXML: + return NativeVariantStorage(gcp_settings(t)); + } + util::raise_rte("Inaccessible"); } )) - .def("update", &NativeVariantStorage::update); + .def("update", &NativeVariantStorage::update) + .def("__repr__", &NativeVariantStorage::to_string); + py::implicitly_convertible(); storage.def("create_mem_config_resolver", [](const py::object & env_config_map_py) -> std::shared_ptr { @@ -205,6 +297,9 @@ void register_bindings(py::module& storage, py::exception(storage, "GCPXMLOverride") + .def(py::init<>()); + py::class_(storage, "AzureOverride") .def(py::init<>()) .def_property("container_name", &AzureOverride::container_name, &AzureOverride::set_container_name) @@ -221,7 +316,8 @@ void register_bindings(py::module& storage, py::exception()) .def("set_s3_override", &StorageOverride::set_s3_override) .def("set_azure_override", &StorageOverride::set_azure_override) - .def("set_lmdb_override", &StorageOverride::set_lmdb_override); + .def("set_lmdb_override", &StorageOverride::set_lmdb_override) + .def("set_gcpxml_override", &StorageOverride::set_gcpxml_override); py::class_>(storage, "LibraryManager") .def(py::init>()) @@ -262,7 +358,7 @@ void register_bindings(py::module& storage, py::exception #include #include +#include #include @@ -46,6 +47,10 @@ static const size_t DELETE_OBJECTS_LIMIT = 1000; template using Range = folly::Range; +inline bool is_not_found_error(const Aws::S3::S3Errors& error) { + return error == Aws::S3::S3Errors::NO_SUCH_KEY || error == Aws::S3::S3Errors::RESOURCE_NOT_FOUND; +} + [[noreturn]] inline void raise_s3_exception(const Aws::S3::S3Error& err, const std::string& object_name) { std::string error_message; auto type = err.GetErrorType(); @@ -57,7 +62,7 @@ using Range = folly::Range; object_name); // s3_client.HeadObject returns RESOURCE_NOT_FOUND if a key is not found. - if (type == Aws::S3::S3Errors::NO_SUCH_KEY || type == Aws::S3::S3Errors::RESOURCE_NOT_FOUND) { + if (is_not_found_error(type)) { throw KeyNotFoundException(fmt::format("Key Not Found Error: {}", error_message_suffix)); } @@ -110,8 +115,7 @@ using Range = folly::Range; } inline bool is_expected_error_type(Aws::S3::S3Errors err) { - return err == Aws::S3::S3Errors::NO_SUCH_KEY || err == Aws::S3::S3Errors::RESOURCE_NOT_FOUND - || err == Aws::S3::S3Errors::NO_SUCH_BUCKET; + return is_not_found_error(err) || err == Aws::S3::S3Errors::NO_SUCH_BUCKET; } inline void raise_if_unexpected_error(const Aws::S3::S3Error& err, const std::string& object_name) { @@ -237,6 +241,21 @@ struct FailedDelete { error_message(error_message) {} }; +inline void raise_if_failed_deletes(const boost::container::small_vector& failed_deletes) { + if (!failed_deletes.empty()) { + auto failed_deletes_message = std::ostringstream(); + for (auto i = 0u; i < failed_deletes.size(); ++i) { + auto& failed = failed_deletes[i]; + failed_deletes_message << fmt::format("'{}' failed with '{}'", to_serialized_key(failed.failed_key), failed.error_message); + if (i != failed_deletes.size()) { + failed_deletes_message << ", "; + } + } + auto error_message = fmt::format("Failed to delete some of the objects: {}.", failed_deletes_message.str()); + raise(error_message); + } +} + template void do_remove_impl( std::span ks, @@ -288,18 +307,7 @@ void do_remove_impl( }); util::check(to_delete.empty(), "Have {} segment that have not been removed", to_delete.size()); - if (!failed_deletes.empty()) { - auto failed_deletes_message = std::ostringstream(); - for (auto i = 0u; i < failed_deletes.size(); ++i) { - auto& failed = failed_deletes[i]; - failed_deletes_message << fmt::format("'{}' failed with '{}'", to_serialized_key(failed.failed_key), failed.error_message); - if (i != failed_deletes.size()) { - failed_deletes_message << ", "; - } - } - auto error_message = fmt::format("Failed to delete some of the objects: {}.", failed_deletes_message.str()); - raise(error_message); - } + raise_if_failed_deletes(failed_deletes); } template @@ -313,6 +321,58 @@ void do_remove_impl( do_remove_impl(std::span(arr), root_folder, bucket_name, s3_client, std::forward(bucketizer)); } +template +void do_remove_no_batching_impl( + std::span ks, + const std::string& root_folder, + const std::string& bucket_name, + S3ClientInterface& s3_client, + KeyBucketizer&& bucketizer) { + ARCTICDB_SUBSAMPLE(S3StorageDeleteNoBatching, 0) + + std::vector>> delete_object_results; + for (const auto& k : ks) { + auto key_type_dir = key_type_folder(root_folder, variant_key_type(k)); + auto s3_object_name = object_path(bucketizer.bucketize(key_type_dir, k), k); + auto delete_fut = s3_client.delete_object(s3_object_name, bucket_name); + delete_object_results.push_back(std::move(delete_fut)); + } + + folly::QueuedImmediateExecutor inline_executor; + auto delete_results = folly::collect(std::move(delete_object_results)).via(&inline_executor).get(); + + boost::container::small_vector failed_deletes; + auto keys_and_delete_results = folly::gen::from(ks) | folly::gen::move | folly::gen::zip(std::move(delete_results)) | folly::gen::as(); + for (auto&& [k, delete_object_result] : std::move(keys_and_delete_results)) { + if (delete_object_result.is_success()) { + ARCTICDB_RUNTIME_DEBUG(log::storage(), "Deleted object with key '{}'", variant_key_view(k)); + } else if (const auto& error = delete_object_result.get_error(); !is_not_found_error(error.GetErrorType())) { + auto key_type_dir = key_type_folder(root_folder, variant_key_type(k)); + auto s3_object_name = object_path(bucketizer.bucketize(key_type_dir, k), k); + auto bad_key_name = s3_object_name.substr(key_type_dir.size(), std::string::npos); + auto error_message = error.GetMessage(); + failed_deletes.push_back(FailedDelete{ + variant_key_from_bytes(reinterpret_cast(bad_key_name.data()), bad_key_name.size(), variant_key_type(k)), + std::move(error_message)}); + } else { + ARCTICDB_RUNTIME_DEBUG(log::storage(), "Acceptable error when deleting object with key '{}'", variant_key_view(k)); + } + } + + raise_if_failed_deletes(failed_deletes); +} + +template +void do_remove_no_batching_impl( + VariantKey&& variant_key, + const std::string& root_folder, + const std::string& bucket_name, + S3ClientInterface& s3_client, + KeyBucketizer&& bucketizer) { + std::array arr{std::move(variant_key)}; + do_remove_no_batching_impl(std::span(arr), root_folder, bucket_name, s3_client, std::forward(bucketizer)); +} + template void do_write_if_none_impl( KeySegmentPair &kv, diff --git a/cpp/arcticdb/storage/s3/nfs_backed_storage.cpp b/cpp/arcticdb/storage/s3/nfs_backed_storage.cpp index 7c53d64459..9de3709286 100644 --- a/cpp/arcticdb/storage/s3/nfs_backed_storage.cpp +++ b/cpp/arcticdb/storage/s3/nfs_backed_storage.cpp @@ -184,7 +184,8 @@ KeySegmentPair NfsBackedStorage::do_read(VariantKey&& variant_key, ReadKeyOpts o void NfsBackedStorage::do_remove(VariantKey&& variant_key, RemoveOpts) { auto enc = encode_object_id(variant_key); - s3::detail::do_remove_impl(std::move(enc), root_folder_, bucket_name_, *s3_client_, NfsBucketizer{}); + std::array arr{std::move(variant_key)}; + s3::detail::do_remove_impl(std::span(arr), root_folder_, bucket_name_, *s3_client_, NfsBucketizer{}); } void NfsBackedStorage::do_remove(std::span variant_keys, RemoveOpts) { diff --git a/cpp/arcticdb/storage/s3/s3_client_impl.cpp b/cpp/arcticdb/storage/s3/s3_client_impl.cpp index a476a92edb..76fa6c0bce 100644 --- a/cpp/arcticdb/storage/s3/s3_client_impl.cpp +++ b/cpp/arcticdb/storage/s3/s3_client_impl.cpp @@ -204,7 +204,7 @@ S3Result S3ClientImpl::put_object( return {std::monostate()}; } -S3Result S3ClientImpl::delete_objects( +S3Result S3ClientImpl::delete_objects( const std::vector& s3_object_names, const std::string& bucket_name) { Aws::S3::Model::DeleteObjectsRequest request; @@ -230,10 +230,48 @@ S3Result S3ClientImpl::delete_objects( failed_deletes.emplace_back(failed_key.GetKey(), failed_key.GetMessage()); } - DeleteOutput result = {failed_deletes}; + DeleteObjectsOutput result = {failed_deletes}; return {result}; } +struct DeleteObjectAsyncHandler { + std::shared_ptr>> promise_; + timestamp start_; + + DeleteObjectAsyncHandler(std::shared_ptr>>&& promise) : + promise_(std::move(promise)), + start_(util::SysClock::coarse_nanos_since_epoch()){ + } + + ARCTICDB_MOVE_COPY_DEFAULT(DeleteObjectAsyncHandler) + + void operator()( + const Aws::S3::S3Client*, + const Aws::S3::Model::DeleteObjectRequest&, + const Aws::S3::Model::DeleteObjectOutcome& outcome, + const std::shared_ptr&) { + if (outcome.IsSuccess()) { + promise_->setValue>({}); + } else { + promise_->setValue>({outcome.GetError()}); + } + } +}; + +folly::Future> S3ClientImpl::delete_object( + const std::string& s3_object_name, + const std::string& bucket_name) { + ARCTICDB_RUNTIME_DEBUG(log::storage(), "Removing s3 object with key {} (async)", s3_object_name); + auto promise = std::make_shared>>(); + auto future = promise->getFuture(); + Aws::S3::Model::DeleteObjectRequest request; + request.WithBucket(bucket_name.c_str()); + request.WithKey(s3_object_name); + + s3_client.DeleteObjectAsync(request, DeleteObjectAsyncHandler{std::move(promise)}); + return future; +} + S3Result S3ClientImpl::list_objects( const std::string& name_prefix, const std::string& bucket_name, diff --git a/cpp/arcticdb/storage/s3/s3_client_impl.hpp b/cpp/arcticdb/storage/s3/s3_client_impl.hpp index 6141785dc0..44a3f17530 100644 --- a/cpp/arcticdb/storage/s3/s3_client_impl.hpp +++ b/cpp/arcticdb/storage/s3/s3_client_impl.hpp @@ -49,10 +49,14 @@ class S3ClientImpl : public S3ClientInterface { const std::string& bucket_name, PutHeader header = PutHeader::NONE) override; - S3Result delete_objects( + S3Result delete_objects( const std::vector& s3_object_names, const std::string& bucket_name) override; + folly::Future> delete_object( + const std::string& s3_object_name, + const std::string& bucket_name) override; + S3Result list_objects( const std::string& prefix, const std::string& bucket_name, diff --git a/cpp/arcticdb/storage/s3/s3_client_interface.hpp b/cpp/arcticdb/storage/s3/s3_client_interface.hpp index 9cbe38764f..6c90b785ca 100644 --- a/cpp/arcticdb/storage/s3/s3_client_interface.hpp +++ b/cpp/arcticdb/storage/s3/s3_client_interface.hpp @@ -58,7 +58,8 @@ struct FailedDelete{ std::string s3_object_name; std::string error_message; }; -struct DeleteOutput{ + +struct DeleteObjectsOutput{ std::vector failed_deletes; }; @@ -89,10 +90,14 @@ class S3ClientInterface { const std::string& bucket_name, PutHeader header = PutHeader::NONE) = 0; - virtual S3Result delete_objects( + virtual S3Result delete_objects( const std::vector& s3_object_names, const std::string& bucket_name) = 0; + [[nodiscard]] virtual folly::Future> delete_object( + const std::string& s3_object_name, + const std::string& bucket_name) = 0; + [[nodiscard]] virtual S3Result list_objects( const std::string& prefix, const std::string& bucket_name, diff --git a/cpp/arcticdb/storage/s3/s3_client_wrapper.cpp b/cpp/arcticdb/storage/s3/s3_client_wrapper.cpp index 5b270fd035..a7e98427d2 100644 --- a/cpp/arcticdb/storage/s3/s3_client_wrapper.cpp +++ b/cpp/arcticdb/storage/s3/s3_client_wrapper.cpp @@ -113,7 +113,7 @@ S3Result S3ClientTestWrapper::put_object( return actual_client_->put_object(s3_object_name, segment, bucket_name, header); } -S3Result S3ClientTestWrapper::delete_objects( +S3Result S3ClientTestWrapper::delete_objects( const std::vector& s3_object_names, const std::string& bucket_name) { auto maybe_error = has_failure_trigger(bucket_name); @@ -125,6 +125,17 @@ S3Result S3ClientTestWrapper::delete_objects( return actual_client_->delete_objects(s3_object_names, bucket_name); } +folly::Future> S3ClientTestWrapper::delete_object( + const std::string& s3_object_name, + const std::string& bucket_name) { + auto maybe_error = has_failure_trigger(bucket_name); + if (maybe_error.has_value()) { + return folly::makeFuture>({*maybe_error}); + } + + return actual_client_->delete_object(s3_object_name, bucket_name); +} + // Using a fixed page size since it's only being used for simple tests. // If we ever need to configure it we should move it to the s3 proto config instead. constexpr auto page_size = 10; diff --git a/cpp/arcticdb/storage/s3/s3_client_wrapper.hpp b/cpp/arcticdb/storage/s3/s3_client_wrapper.hpp index 4639036d85..c9119d5769 100644 --- a/cpp/arcticdb/storage/s3/s3_client_wrapper.hpp +++ b/cpp/arcticdb/storage/s3/s3_client_wrapper.hpp @@ -54,10 +54,14 @@ class S3ClientTestWrapper : public S3ClientInterface { const std::string& bucket_name, PutHeader header = PutHeader::NONE) override; - S3Result delete_objects( + S3Result delete_objects( const std::vector& s3_object_names, const std::string& bucket_name) override; + folly::Future> delete_object( + const std::string& s3_object_names, + const std::string& bucket_name) override; + S3Result list_objects( const std::string& prefix, const std::string& bucket_name, diff --git a/cpp/arcticdb/storage/s3/s3_settings.hpp b/cpp/arcticdb/storage/s3/s3_settings.hpp index e4bd6762ae..b71ab44dcd 100644 --- a/cpp/arcticdb/storage/s3/s3_settings.hpp +++ b/cpp/arcticdb/storage/s3/s3_settings.hpp @@ -12,12 +12,136 @@ namespace arcticdb::storage::s3 { +enum class NativeSettingsType : uint32_t { + S3 = 0, + GCPXML = 1 +}; + enum class AWSAuthMethod : uint32_t { DISABLED = 0, DEFAULT_CREDENTIALS_PROVIDER_CHAIN = 1, STS_PROFILE_CREDENTIALS_PROVIDER = 2 }; +class GCPXMLSettings { + std::string bucket_; + std::string endpoint_; + std::string access_; + std::string secret_; + std::string prefix_; + AWSAuthMethod aws_auth_; + bool https_; + std::string ca_cert_path_; + std::string ca_cert_dir_; + bool ssl_; + +public: + GCPXMLSettings() { } + + explicit GCPXMLSettings( + AWSAuthMethod aws_auth, + std::string ca_cert_path, + std::string ca_cert_dir, + bool ssl, + bool https, + std::string prefix, + std::string endpoint, + std::string secret, + std::string access, + std::string bucket + ) : bucket_(std::move(bucket)), endpoint_(std::move(endpoint)), access_(std::move(access)), secret_(std::move(secret)), + prefix_(std::move(prefix)), aws_auth_(aws_auth), https_(https), ca_cert_path_(std::move(ca_cert_path)), ca_cert_dir_(std::move(ca_cert_dir)), + ssl_(ssl) { + + } + + GCPXMLSettings update(const arcticc::pb2::gcp_storage_pb2::Config& config){ + prefix_ = config.prefix(); + return *this; + } + + [[nodiscard]] std::string endpoint() const { + return endpoint_; + } + + void set_endpoint(std::string_view endpoint) { + endpoint_ = endpoint; + } + + [[nodiscard]] std::string access() const { + return access_; + } + + void set_access(const std::string_view access) { + access_ = access; + } + + [[nodiscard]] std::string secret() const { + return secret_; + } + + void set_secret(const std::string_view secret) { + secret_ = secret; + } + + [[nodiscard]] AWSAuthMethod aws_auth() const { + return aws_auth_; + } + + void set_aws_auth(const AWSAuthMethod aws_auth) { + aws_auth_ = aws_auth; + } + + [[nodiscard]] std::string bucket() const { + return bucket_; + } + + void set_bucket(const std::string_view bucket) { + bucket_ = bucket; + }; + + void set_prefix(const std::string_view prefix) { + prefix_ = prefix; + } + + [[nodiscard]] std::string prefix() const { + return prefix_; + } + + [[nodiscard]] bool https() const { + return https_; + } + + void set_https(bool https) { + https_ = https; + } + + [[nodiscard]] bool ssl() const { + return ssl_; + } + + void set_ssl(bool ssl) { + ssl_ = ssl; + } + + [[nodiscard]] std::string ca_cert_path() const { + return ca_cert_path_; + } + + void set_cert_path(const std::string_view ca_cert_path) { + ca_cert_path_ = ca_cert_path; + }; + + [[nodiscard]] std::string ca_cert_dir() const { + return ca_cert_dir_; + } + + void set_cert_dir(const std::string_view ca_cert_dir) { + ca_cert_dir_ = ca_cert_dir; + }; + +}; + class S3Settings { private: std::string bucket_name_; @@ -75,6 +199,31 @@ class S3Settings { return *this; } + S3Settings update(const GCPXMLSettings& gcp_xml_settings) { + bucket_name_ = gcp_xml_settings.bucket(); + credential_name_ = gcp_xml_settings.access(); + credential_key_ = gcp_xml_settings.secret(); + endpoint_ = gcp_xml_settings.endpoint(); + prefix_ = gcp_xml_settings.prefix(); + aws_auth_ = gcp_xml_settings.aws_auth(); + https_ = gcp_xml_settings.https(); + ssl_ = gcp_xml_settings.ssl(); + ca_cert_path_ = gcp_xml_settings.ca_cert_path(); + ca_cert_dir_ = gcp_xml_settings.ca_cert_dir(); + // The below are all controlled by config options + max_connections_ = 0; + connect_timeout_ = 0; + request_timeout_ = 0; + // Only used for STS, not supported for GCPXML + aws_profile_ = ""; + // Testing options, not used for GCPXML + use_raw_prefix_ = false; + use_virtual_addressing_ = false; + use_mock_storage_for_testing_ = false; + use_internal_client_wrapper_for_testing_ = false; + return *this; + } + std::string bucket_name() const { return bucket_name_; } @@ -151,4 +300,5 @@ class S3Settings { return use_raw_prefix_; } }; + } \ No newline at end of file diff --git a/cpp/arcticdb/storage/s3/s3_storage.cpp b/cpp/arcticdb/storage/s3/s3_storage.cpp index b471e1b1e3..40b2e83d65 100644 --- a/cpp/arcticdb/storage/s3/s3_storage.cpp +++ b/cpp/arcticdb/storage/s3/s3_storage.cpp @@ -89,6 +89,17 @@ void S3Storage::do_remove(VariantKey&& variant_key, RemoveOpts) { detail::do_remove_impl(std::move(variant_key), root_folder_, bucket_name_, client(), FlatBucketizer{}); } +void GCPXMLStorage::do_remove(std::span variant_keys, RemoveOpts) { + // GCP does not support batch deletes + detail::do_remove_no_batching_impl(variant_keys, root_folder_, bucket_name_, client(), FlatBucketizer{}); +} + +void GCPXMLStorage::do_remove(VariantKey&& variant_key, RemoveOpts) { + // GCP does not support batch deletes + std::span keys{&variant_key, 1}; + detail::do_remove_no_batching_impl(keys, root_folder_, bucket_name_, client(), FlatBucketizer{}); +} + bool S3Storage::do_iterate_type_until_match(KeyType key_type, const IterateTypePredicate& visitor, const std::string& prefix) { auto prefix_handler = [] (const std::string& prefix, const std::string& key_type_dir, const KeyDescriptor& key_descriptor, KeyType) { return !prefix.empty() ? fmt::format("{}/{}*{}", key_type_dir, key_descriptor, prefix) : key_type_dir; @@ -172,4 +183,11 @@ S3Storage::S3Storage(const LibraryPath &library_path, OpenMode mode, const S3Set ARCTICDB_DEBUG(log::storage(), "Opened S3 backed storage at {}", root_folder_); } +GCPXMLStorage::GCPXMLStorage(const arcticdb::storage::LibraryPath& lib, + arcticdb::storage::OpenMode mode, + const arcticdb::storage::s3::GCPXMLSettings& conf) : + S3Storage(lib, mode, S3Settings{AWSAuthMethod::DISABLED, "", false}.update(conf)) { + +} + } // namespace arcticdb::storage::s3 diff --git a/cpp/arcticdb/storage/s3/s3_storage.hpp b/cpp/arcticdb/storage/s3/s3_storage.hpp index a87b831e71..0c830552dd 100644 --- a/cpp/arcticdb/storage/s3/s3_storage.hpp +++ b/cpp/arcticdb/storage/s3/s3_storage.hpp @@ -31,7 +31,7 @@ namespace arcticdb::storage::s3 { const std::string USE_AWS_CRED_PROVIDERS_TOKEN = "_RBAC_"; -class S3Storage final : public Storage, AsyncStorage { +class S3Storage : public Storage, AsyncStorage { public: S3Storage(const LibraryPath &lib, OpenMode mode, const S3Settings &conf); @@ -48,7 +48,7 @@ class S3Storage final : public Storage, AsyncStorage { return dynamic_cast(this); } - private: + protected: void do_write(KeySegmentPair& key_seg) final; void do_write_if_none(KeySegmentPair& kv) final; @@ -63,9 +63,9 @@ class S3Storage final : public Storage, AsyncStorage { folly::Future do_async_read(entity::VariantKey&& variant_key, ReadKeyOpts opts) final; - void do_remove(VariantKey&& variant_key, RemoveOpts opts) final; + void do_remove(VariantKey&& variant_key, RemoveOpts opts); - void do_remove(std::span variant_keys, RemoveOpts opts) final; + void do_remove(std::span variant_keys, RemoveOpts opts); bool do_iterate_type_until_match(KeyType key_type, const IterateTypePredicate& visitor, const std::string &prefix) final; @@ -101,6 +101,14 @@ class S3Storage final : public Storage, AsyncStorage { std::string region_; }; +class GCPXMLStorage : public S3Storage { +public: + GCPXMLStorage(const LibraryPath &lib, OpenMode mode, const GCPXMLSettings &conf); +protected: + void do_remove(std::span variant_keys, RemoveOpts opts) override; + void do_remove(VariantKey&& variant_key, RemoveOpts opts) override; +}; + inline arcticdb::proto::storage::VariantStorage pack_config(const std::string &bucket_name) { arcticdb::proto::storage::VariantStorage output; arcticdb::proto::s3_storage::Config cfg; diff --git a/cpp/arcticdb/storage/storage_factory.cpp b/cpp/arcticdb/storage/storage_factory.cpp index 15bcc6fc76..f06ad3fc59 100644 --- a/cpp/arcticdb/storage/storage_factory.cpp +++ b/cpp/arcticdb/storage/storage_factory.cpp @@ -24,6 +24,13 @@ std::shared_ptr create_storage( return std::make_shared(library_path, mode, storage_config); } +std::shared_ptr create_storage( + const LibraryPath &library_path, + OpenMode mode, + const s3::GCPXMLSettings& storage_config) { + return std::make_shared(library_path, mode, storage_config); +} + std::shared_ptr create_storage( const LibraryPath &library_path, OpenMode mode, diff --git a/cpp/arcticdb/storage/storage_factory.hpp b/cpp/arcticdb/storage/storage_factory.hpp index b80dd8003c..1e6f83e42b 100644 --- a/cpp/arcticdb/storage/storage_factory.hpp +++ b/cpp/arcticdb/storage/storage_factory.hpp @@ -13,16 +13,21 @@ #include -namespace arcticdb { - namespace storage { - std::shared_ptr create_storage( - const LibraryPath &library_path, - OpenMode mode, - const s3::S3Settings& storage_descriptor); - std::shared_ptr create_storage( - const LibraryPath& library_path, - OpenMode mode, - const arcticdb::proto::storage::VariantStorage &storage_config); - - } // namespace storage -} // namespace arcticdb +namespace arcticdb::storage { + +std::shared_ptr create_storage( + const LibraryPath &library_path, + OpenMode mode, + const s3::S3Settings& storage_descriptor); + +std::shared_ptr create_storage( + const LibraryPath &library_path, + OpenMode mode, + const s3::GCPXMLSettings& storage_descriptor); + +std::shared_ptr create_storage( + const LibraryPath& library_path, + OpenMode mode, + const arcticdb::proto::storage::VariantStorage &storage_config); + +} // namespace arcticdb::storage diff --git a/cpp/arcticdb/storage/storage_override.hpp b/cpp/arcticdb/storage/storage_override.hpp index 5fe0f7c643..f4dff125bb 100644 --- a/cpp/arcticdb/storage/storage_override.hpp +++ b/cpp/arcticdb/storage/storage_override.hpp @@ -127,6 +127,13 @@ class S3Override { } }; +class GCPXMLOverride { +public: + void modify_storage_config(arcticdb::proto::storage::VariantStorage&, bool) const { + // Nothing is serialized in the GCPXML proto that shouldn't be, so nothing to override. + } +}; + class AzureOverride { std::string container_name_; std::string endpoint_; @@ -218,13 +225,13 @@ class LmdbOverride { } }; -using VariantStorageOverride = std::variant; +using VariantStorageOverride = std::variant; class StorageOverride { VariantStorageOverride override_; public: - const VariantStorageOverride& variant() const { + [[nodiscard]] const VariantStorageOverride& variant() const { return override_; } @@ -239,6 +246,11 @@ class StorageOverride { void set_lmdb_override(const LmdbOverride& storage_override) { override_ = storage_override; } + + void set_gcpxml_override(const GCPXMLOverride& storage_override) { + override_ = storage_override; + } + }; } //namespace arcticdb::storage diff --git a/cpp/arcticdb/storage/storages.hpp b/cpp/arcticdb/storage/storages.hpp index 1c2d018e91..8ef9c3138c 100644 --- a/cpp/arcticdb/storage/storages.hpp +++ b/cpp/arcticdb/storage/storages.hpp @@ -275,6 +275,15 @@ inline std::shared_ptr create_storages(const LibraryPath& library_path mode, s3::S3Settings(settings).update(s3_storage))); }, + [&storage_config, &storages, &library_path, mode](const s3::GCPXMLSettings& settings) { + util::check(storage_config.config().Is(), + "Only support GCP native settings"); + arcticdb::proto::gcp_storage::Config gcp_storage; + storage_config.config().UnpackTo(&gcp_storage); + storages.push_back(create_storage(library_path, + mode, + s3::GCPXMLSettings(settings).update(gcp_storage))); + }, [&storage_config, &storages, &library_path, mode](const auto&) { storages.push_back(create_storage(library_path, mode, storage_config)); } diff --git a/cpp/proto/arcticc/pb2/gcp_storage.proto b/cpp/proto/arcticc/pb2/gcp_storage.proto new file mode 100644 index 0000000000..44523199d3 --- /dev/null +++ b/cpp/proto/arcticc/pb2/gcp_storage.proto @@ -0,0 +1,14 @@ +/* +Copyright 2025 Man Group Operations Limited + +Use of this software is governed by the Business Source License 1.1 included in the file licenses/BSL.txt. + +As of the Change Date specified in that file, in accordance with the Business Source License, use of this software will be governed by the Apache License, version 2.0. +*/ +syntax = "proto3"; + +package arcticc.pb2.gcp_storage_pb2; + +message Config { + string prefix = 1; +} diff --git a/cpp/proto/arcticc/pb2/proto/CMakeLists.txt b/cpp/proto/arcticc/pb2/proto/CMakeLists.txt index 1f43accc7e..3bf5c0f2fa 100755 --- a/cpp/proto/arcticc/pb2/proto/CMakeLists.txt +++ b/cpp/proto/arcticc/pb2/proto/CMakeLists.txt @@ -11,6 +11,7 @@ SET(PROTO_IN_FILES mongo_storage.proto in_memory_storage.proto s3_storage.proto + gcp_storage.proto azure_storage.proto nfs_backed_storage.proto mapped_file_storage.proto diff --git a/python/arcticdb/adapters/arctic_library_adapter.py b/python/arcticdb/adapters/arctic_library_adapter.py index 642496b9b3..a4152d5d7c 100644 --- a/python/arcticdb/adapters/arctic_library_adapter.py +++ b/python/arcticdb/adapters/arctic_library_adapter.py @@ -49,7 +49,7 @@ def set_library_options(lib_desc: "LibraryConfig", options: LibraryOptions, class ArcticLibraryAdapter(ABC): @abstractmethod def __init__(self, uri: str, encoding_version: EncodingVersion): - self._native_cfg = None + ... @abstractmethod def __repr__(self): @@ -65,6 +65,9 @@ def supports_uri(uri: str) -> bool: def config_library(self) -> Library: raise NotImplementedError + def native_config(self): + return None + def get_library_config(self, name: str, library_options: LibraryOptions, enterprise_library_options: EnterpriseLibraryOptions): env_cfg = EnvironmentConfigsMap() diff --git a/python/arcticdb/adapters/gcpxml_library_adapter.py b/python/arcticdb/adapters/gcpxml_library_adapter.py new file mode 100644 index 0000000000..7d6024b6ef --- /dev/null +++ b/python/arcticdb/adapters/gcpxml_library_adapter.py @@ -0,0 +1,219 @@ +""" +Copyright 2023 Man Group Operations Limited + +Use of this software is governed by the Business Source License 1.1 included in the file licenses/BSL.txt. + +As of the Change Date specified in that file, in accordance with the Business Source License, use of this software will be governed by the Apache License, version 2.0. +""" +import re +import ssl +from dataclasses import dataclass, fields +import time +import platform +from typing import Optional + +from arcticc.pb2.storage_pb2 import EnvironmentConfigsMap, LibraryDescriptor +from arcticdb.adapters.s3_library_adapter import strtobool, USE_AWS_CRED_PROVIDERS_TOKEN +from arcticdb.encoding_version import EncodingVersion +from arcticdb.version_store import NativeVersionStore +from arcticdb_ext.exceptions import UserInputException +from arcticdb_ext.storage import ( + StorageOverride, + GCPXMLOverride, + AWSAuthMethod, + NativeVariantStorage, + GCPXMLSettings as NativeGCPXMLSettings, + CONFIG_LIBRARY_NAME +) + +from arcticdb.adapters.arctic_library_adapter import ArcticLibraryAdapter + +from arcticdb.version_store.helper import add_gcp_library_to_env + + +@dataclass +class ParsedQuery: + port: Optional[int] = None + access: Optional[str] = None + secret: Optional[str] = None + aws_auth: Optional[AWSAuthMethod] = AWSAuthMethod.DISABLED + path_prefix: Optional[str] = None + CA_cert_path: Optional[str] = "" # CURLOPT_CAINFO in curl + CA_cert_dir: Optional[str] = "" # CURLOPT_CAPATH in curl + ssl: Optional[bool] = False + + +def _parse_query(query: str) -> ParsedQuery: + if query and query.startswith("?"): + query = query.strip("?") + elif not query: + return ParsedQuery() + + parsed_query = re.split("[;&]", query) + parsed_query = {t.split("=", 1)[0]: t.split("=", 1)[1] for t in parsed_query} + + field_dict = {field.name: field for field in fields(ParsedQuery)} + for key in parsed_query.keys(): + if key not in field_dict.keys(): + raise ValueError( + "Invalid GCPXML URI. " + f"Invalid query parameter '{key}' passed in. " + "Value query parameters: " + f"{list(field_dict.keys())}" + ) + + if field_dict[key].type == bool: + parsed_query[key] = bool(strtobool(parsed_query[key][0])) + + if field_dict[key].type == Optional[bool] and field_dict[key] is not None: + parsed_query[key] = bool(strtobool(parsed_query[key][0])) + + if field_dict[key].type == Optional[AWSAuthMethod]: + value = parsed_query[key] + if strtobool(value) or value.lower() == "default": + parsed_query[key] = AWSAuthMethod.DEFAULT_CREDENTIALS_PROVIDER_CHAIN + else: + raise ValueError(f"Invalid setting for awsauth {value} should be absent or have value 'true'") + + if parsed_query.get("path_prefix"): + parsed_query["path_prefix"] = parsed_query["path_prefix"].strip("/") + + _kwargs = {k: v for k, v in parsed_query.items()} + return ParsedQuery(**_kwargs) + + +class GCPXMLLibraryAdapter(ArcticLibraryAdapter): + REGEX = r"gcpxml(s)?://(?P.*):(?P[-_a-zA-Z0-9.]+)(?P\?.*)?" + + @staticmethod + def supports_uri(uri: str) -> bool: + return uri.startswith("gcpxml://") or uri.startswith("gcpxmls://") + + def __init__(self, uri: str, encoding_version: EncodingVersion, *args, **kwargs): + match = re.match(self.REGEX, uri) + match_groups = match.groupdict() + + self._endpoint = match_groups["endpoint"] + self._bucket = match_groups["bucket"] + + query_params: ParsedQuery = _parse_query(match["query"]) + + if query_params.port: + self._endpoint += f":{query_params.port}" + + if query_params.aws_auth is None: + self._aws_auth = AWSAuthMethod.DISABLED + else: + self._aws_auth = query_params.aws_auth + + if query_params.access: + if self._aws_auth == AWSAuthMethod.DEFAULT_CREDENTIALS_PROVIDER_CHAIN: + raise UserInputException(f"Specified both access and awsauth=true in the GCPXML Arctic URI - only one can be set endpoint={self._endpoint} bucket={self._bucket}") + self._access = query_params.access + elif self._aws_auth == AWSAuthMethod.DISABLED: + raise UserInputException(f"Access token or awsauth=true must be specified in GCPXML Arctic URI endpoint={self._endpoint} bucket={self._bucket}") + else: + self._access = USE_AWS_CRED_PROVIDERS_TOKEN + + if query_params.secret: + if self._aws_auth == AWSAuthMethod.DEFAULT_CREDENTIALS_PROVIDER_CHAIN: + raise UserInputException(f"Specified both secret and awsauth=true in the GCPXML Arctic URI - only one can be set endpoint={self._endpoint} bucket={self._bucket}") + self._secret = query_params.secret + elif self._aws_auth == AWSAuthMethod.DISABLED: + raise UserInputException(f"Secret or awsauth=true must be specified in GCPXML Arctic URI endpoint={self._endpoint} bucket={self._bucket}") + else: + self._secret = USE_AWS_CRED_PROVIDERS_TOKEN + + self._path_prefix = query_params.path_prefix + self._https = uri.startswith("gcpxmls") + + if platform.system() != "Linux" and (query_params.CA_cert_path or query_params.CA_cert_dir): + raise ValueError( + "You have provided `ca_cert_path` or `ca_cert_dir` in the URI which is only supported on Linux. " + "Remove the setting in the connection URI and use your operating system defaults." + ) + self._ca_cert_path = query_params.CA_cert_path + self._ca_cert_dir = query_params.CA_cert_dir + if not self._ca_cert_path and not self._ca_cert_dir and platform.system() == "Linux": + if ssl.get_default_verify_paths().cafile is not None: + self._ca_cert_path = ssl.get_default_verify_paths().cafile + if ssl.get_default_verify_paths().capath is not None: + self._ca_cert_dir = ssl.get_default_verify_paths().capath + + self._ssl = query_params.ssl + self._encoding_version = encoding_version + + super().__init__(uri, self._encoding_version) + + def native_config(self): + native_settings = NativeGCPXMLSettings() + native_settings.bucket = self._bucket + native_settings.endpoint = self._endpoint + native_settings.access = self._access + native_settings.secret = self._secret + native_settings.aws_auth = self._aws_auth + native_settings.prefix = "" # Set on the returned value later when needed + native_settings.https = self._https + native_settings.ssl = self._ssl + native_settings.ca_cert_path = self._ca_cert_path + native_settings.ca_cert_dir = self._ca_cert_dir + return NativeVariantStorage(native_settings) + + def __repr__(self): + return "GCPXML(endpoint=%s, bucket=%s)" % (self._endpoint, self._bucket) + + @property + def config_library(self): + env_cfg = EnvironmentConfigsMap() + _name = ( + self._access + if self._aws_auth == AWSAuthMethod.DISABLED + else USE_AWS_CRED_PROVIDERS_TOKEN + ) + _key = ( + self._secret + if self._aws_auth == AWSAuthMethod.DISABLED + else USE_AWS_CRED_PROVIDERS_TOKEN + ) + with_prefix = ( + f"{self._path_prefix}/{CONFIG_LIBRARY_NAME}" if self._path_prefix else False + ) + + add_gcp_library_to_env( + cfg=env_cfg, + lib_name=CONFIG_LIBRARY_NAME, + env_name="local", + with_prefix=with_prefix, + ) + + lib = NativeVersionStore.create_store_from_config( + (env_cfg, self.native_config()), "local", CONFIG_LIBRARY_NAME, encoding_version=self._encoding_version + ) + + return lib._library + + def get_storage_override(self) -> StorageOverride: + storage_override = StorageOverride() + gcpxml_override = GCPXMLOverride() + storage_override.set_gcpxml_override(gcpxml_override) + return storage_override + + def get_masking_override(self) -> StorageOverride: + storage_override = StorageOverride() + gcpxml_override = GCPXMLOverride() + storage_override.set_gcpxml_override(gcpxml_override) + return storage_override + + def add_library_to_env(self, env_cfg: EnvironmentConfigsMap, name: str): + if self._path_prefix: + # add time to prefix - so that the s3 root folder is unique and we can delete and recreate fast + with_prefix = f"{self._path_prefix}/{name}{time.time() * 1e9:.0f}" + else: + with_prefix = True + + add_gcp_library_to_env( + cfg=env_cfg, + lib_name=name, + env_name="local", + with_prefix=with_prefix, + ) diff --git a/python/arcticdb/adapters/s3_library_adapter.py b/python/arcticdb/adapters/s3_library_adapter.py index 8ba57abcb5..d792effff0 100644 --- a/python/arcticdb/adapters/s3_library_adapter.py +++ b/python/arcticdb/adapters/s3_library_adapter.py @@ -12,7 +12,6 @@ import ssl import platform -from arcticdb.options import LibraryOptions from arcticc.pb2.storage_pb2 import EnvironmentConfigsMap, LibraryDescriptor from arcticdb.version_store.helper import add_s3_library_to_env from arcticdb.config import _DEFAULT_ENV @@ -111,7 +110,8 @@ def __init__(self, uri: str, encoding_version: EncodingVersion, *args, **kwargs) super().__init__(uri, self._encoding_version) - self._native_cfg = NativeVariantStorage(NativeS3Settings(AWSAuthMethod.DISABLED, "", False)) + def native_config(self): + return NativeVariantStorage(NativeS3Settings(AWSAuthMethod.DISABLED, "", False)) def __repr__(self): return "S3(endpoint=%s, bucket=%s)" % (self._endpoint, self._bucket) @@ -150,11 +150,11 @@ def config_library(self): ssl=self._ssl, aws_auth=self._query_params.aws_auth, aws_profile=self._query_params.aws_profile, - native_cfg=self._native_cfg, + native_cfg=self.native_config(), ) lib = NativeVersionStore.create_store_from_config( - (env_cfg, self._native_cfg), _DEFAULT_ENV, CONFIG_LIBRARY_NAME, encoding_version=self._encoding_version + (env_cfg, self.native_config()), _DEFAULT_ENV, CONFIG_LIBRARY_NAME, encoding_version=self._encoding_version ) return lib._library @@ -198,7 +198,7 @@ def _parse_query(self, query: str) -> ParsedQuery: _kwargs = {k: v for k, v in parsed_query.items()} return ParsedQuery(**_kwargs) - def get_storage_override(self) -> StorageOverride: + def _get_s3_override(self) -> S3Override: s3_override = S3Override() # storage_override will overwrite access and key while reading config from storage # access and secret whether equals to _RBAC_ are used for determining aws_auth is true on C++ layer @@ -226,10 +226,11 @@ def get_storage_override(self) -> StorageOverride: s3_override.ssl = self._ssl s3_override.use_virtual_addressing = self._query_params.use_virtual_addressing + return s3_override + def get_storage_override(self) -> StorageOverride: storage_override = StorageOverride() - storage_override.set_s3_override(s3_override) - + storage_override.set_s3_override(self._get_s3_override()) return storage_override def get_masking_override(self) -> StorageOverride: @@ -273,7 +274,7 @@ def add_library_to_env(self, env_cfg: EnvironmentConfigsMap, name: str): ssl=self._ssl, aws_auth=self._query_params.aws_auth, aws_profile=self._query_params.aws_profile, - native_cfg=self._native_cfg, + native_cfg=self.native_config(), ) def _configure_aws(self): diff --git a/python/arcticdb/arctic.py b/python/arcticdb/arctic.py index c22e3d5411..4a6e7fc039 100644 --- a/python/arcticdb/arctic.py +++ b/python/arcticdb/arctic.py @@ -19,8 +19,8 @@ from arcticdb.adapters.azure_library_adapter import AzureLibraryAdapter from arcticdb.adapters.mongo_library_adapter import MongoLibraryAdapter from arcticdb.adapters.in_memory_library_adapter import InMemoryLibraryAdapter +from arcticdb.adapters.gcpxml_library_adapter import GCPXMLLibraryAdapter from arcticdb.encoding_version import EncodingVersion -from arcticdb.exceptions import UnsupportedLibraryOptionValue, UnknownLibraryOption from arcticdb.options import ModifiableEnterpriseLibraryOption, ModifiableLibraryOption @@ -35,6 +35,7 @@ class Arctic: _LIBRARY_ADAPTERS = [ S3LibraryAdapter, + GCPXMLLibraryAdapter, LMDBLibraryAdapter, AzureLibraryAdapter, MongoLibraryAdapter, @@ -96,9 +97,10 @@ def __getitem__(self, name: str) -> Library: storage_override = self._library_adapter.get_storage_override() lib = NativeVersionStore( - self._library_manager.get_library(lib_mgr_name, storage_override, native_storage_map=self._library_adapter._native_cfg), + self._library_manager.get_library(lib_mgr_name, storage_override, native_storage_config=self._library_adapter.native_config()), repr(self._library_adapter), lib_cfg=self._library_manager.get_library_config(lib_mgr_name, storage_override), + native_cfg=self._library_adapter.native_config() ) if self._accessed_libs is not None: self._accessed_libs.append(lib) @@ -317,7 +319,7 @@ def modify_library_option(self, storage_override = self._library_adapter.get_storage_override() new_cfg = self._library_manager.get_library_config(lib_mgr_name, storage_override) library._nvs._initialize( - self._library_manager.get_library(lib_mgr_name, storage_override, ignore_cache=True, native_storage_map=self._library_adapter._native_cfg), + self._library_manager.get_library(lib_mgr_name, storage_override, ignore_cache=True, native_storage_config=self._library_adapter.native_config()), library._nvs.env, new_cfg, library._nvs._custom_normalizer, diff --git a/python/arcticdb/scripts/update_storage.py b/python/arcticdb/scripts/update_storage.py index 5524e06bfb..82e2c9718f 100644 --- a/python/arcticdb/scripts/update_storage.py +++ b/python/arcticdb/scripts/update_storage.py @@ -19,7 +19,7 @@ def repair_library_if_necessary(ac, lib_name: str, run: bool) -> bool: """Returns True if library required repair.""" storage_override = ac._library_adapter.get_storage_override() lib = NativeVersionStore( - ac._library_manager.get_library(lib_name, storage_override, native_storage_map=ac._library_adapter._native_cfg), + ac._library_manager.get_library(lib_name, storage_override, native_storage_config=ac._library_adapter.native_config()), repr(ac._library_adapter), lib_cfg=ac._library_manager.get_library_config(lib_name, storage_override), ) diff --git a/python/arcticdb/storage_fixtures/s3.py b/python/arcticdb/storage_fixtures/s3.py index 2f5e8faa3c..bb1ef64a3e 100644 --- a/python/arcticdb/storage_fixtures/s3.py +++ b/python/arcticdb/storage_fixtures/s3.py @@ -23,6 +23,9 @@ import requests from typing import Optional, Any, Type +import werkzeug +from moto.moto_server.werkzeug_app import DomainDispatcherApplication, create_backend_app + from .api import * from .utils import ( get_ephemeral_port, @@ -198,6 +201,18 @@ def create_test_cfg(self, lib_name: str) -> EnvironmentConfigsMap: return cfg +class GcpS3Bucket(S3Bucket): + + def __init__( + self, + factory: "BaseS3StorageFixtureFactory", + bucket: str, + native_config: Optional[NativeVariantStorage] = None, + ): + super().__init__(factory, bucket, native_config=native_config) + self.arctic_uri = self.arctic_uri.replace("s3", "gcpxml", 1) + + class BaseS3StorageFixtureFactory(StorageFixtureFactory): """Logic and fields common to real and mock S3""" @@ -235,7 +250,7 @@ def _boto(self, service: str, key: Key, api="client"): aws_access_key_id=key.id, aws_secret_access_key=key.secret, verify=self.client_cert_file if self.client_cert_file else False, - ) # verify=False cannot skip verification on buggy boto3 in py3.6 + ) def create_fixture(self) -> S3Bucket: return S3Bucket(self, self.default_bucket, self.native_config) @@ -490,6 +505,97 @@ def mock_s3_with_error_simulation(): return out +class HostDispatcherApplication(DomainDispatcherApplication): + _reqs_till_rate_limit = -1 + + def get_backend_for_host(self, host): + """The stand-alone server needs a way to distinguish between S3 and IAM. We use the host for that""" + if host is None: + return None + if "s3" in host or host == "localhost": + return "s3" + elif host == "127.0.0.1": + return "iam" + elif host == "moto_api": + return "moto_api" + else: + raise RuntimeError(f"Unknown host {host}") + + def __call__(self, environ, start_response): + path_info: bytes = environ.get("PATH_INFO", "") + + with self.lock: + # Mock ec2 imds responses for testing + if path_info in ( + "/latest/dynamic/instance-identity/document", + b"/latest/dynamic/instance-identity/document", + ): + start_response("200 OK", [("Content-Type", "text/plain")]) + return [b"Something to prove imds is reachable"] + + # Allow setting up a rate limit + if path_info in ("/rate_limit", b"/rate_limit"): + length = int(environ["CONTENT_LENGTH"]) + body = environ["wsgi.input"].read(length).decode("ascii") + self._reqs_till_rate_limit = int(body) + start_response("200 OK", [("Content-Type", "text/plain")]) + return [b"Limit accepted"] + + if self._reqs_till_rate_limit == 0: + response_body = ( + b'SlowDownPlease reduce your request rate.' + b"176C22715A856A299Gjjt1m+cjU4OPvX9O9/8RuvnG41MRb/18Oux2o5H5MY7ISNTlXN+Dz9IG62/ILVxhAGI0qyPfg=" + ) + start_response( + "503 Slow Down", [("Content-Type", "text/xml"), ("Content-Length", str(len(response_body)))] + ) + return [response_body] + else: + self._reqs_till_rate_limit -= 1 + + return super().__call__(environ, start_response) + + +class GcpHostDispatcherApplication(HostDispatcherApplication): + """GCP's S3 implementation does not have batch delete.""" + + def __call__(self, environ, start_response): + if environ["REQUEST_METHOD"] == "POST" and environ["QUERY_STRING"] == "delete": + response_body = ( + b'' + b'' + b'NotImplemented' + b'A header or query you provided requested a function that is not implemented.' + b'
POST ?delete is not implemented for objects.
' + b'
' + ) + start_response( + "501 Not Implemented", [("Content-Type", "text/xml"), ("Content-Length", str(len(response_body)))] + ) + return [response_body] + return super().__call__(environ, start_response) + + +def run_s3_server(port, key_file, cert_file): + werkzeug.run_simple( + "0.0.0.0", + port, + HostDispatcherApplication(create_backend_app), + threaded=True, + ssl_context=(cert_file, key_file) if cert_file and key_file else None, + ) + + +def run_gcp_server(port, key_file, cert_file): + werkzeug.run_simple( + "0.0.0.0", + port, + GcpHostDispatcherApplication(create_backend_app), + threaded=True, + ssl_context=(cert_file, key_file) if cert_file and key_file else None, + ) + + class MotoS3StorageFixtureFactory(BaseS3StorageFixtureFactory): default_key = Key(id="awd", secret="awd", user_name="dummy") _RO_POLICY: str @@ -529,69 +635,6 @@ def __init__( # and we need to make sure the bucket names are unique self.unique_id = "".join(random.choices(string.ascii_letters + string.digits, k=5)) - @staticmethod - def run_server(port, key_file, cert_file): - import werkzeug - from moto.server import DomainDispatcherApplication, create_backend_app - - class _HostDispatcherApplication(DomainDispatcherApplication): - _reqs_till_rate_limit = -1 - - def get_backend_for_host(self, host): - """The stand-alone server needs a way to distinguish between S3 and IAM. We use the host for that""" - if host is None: - return None - if "s3" in host or host == "localhost": - return "s3" - elif host == "127.0.0.1": - return "iam" - elif host == "moto_api": - return "moto_api" - else: - raise RuntimeError(f"Unknown host {host}") - - def __call__(self, environ, start_response): - path_info: bytes = environ.get("PATH_INFO", "") - - with self.lock: - # Mock ec2 imds responses for testing - if path_info in ( - "/latest/dynamic/instance-identity/document", - b"/latest/dynamic/instance-identity/document", - ): - start_response("200 OK", [("Content-Type", "text/plain")]) - return [b"Something to prove imds is reachable"] - - # Allow setting up a rate limit - if path_info in ("/rate_limit", b"/rate_limit"): - length = int(environ["CONTENT_LENGTH"]) - body = environ["wsgi.input"].read(length).decode("ascii") - self._reqs_till_rate_limit = int(body) - start_response("200 OK", [("Content-Type", "text/plain")]) - return [b"Limit accepted"] - - if self._reqs_till_rate_limit == 0: - response_body = ( - b'SlowDownPlease reduce your request rate.' - b"176C22715A856A299Gjjt1m+cjU4OPvX9O9/8RuvnG41MRb/18Oux2o5H5MY7ISNTlXN+Dz9IG62/ILVxhAGI0qyPfg=" - ) - start_response( - "503 Slow Down", [("Content-Type", "text/xml"), ("Content-Length", str(len(response_body)))] - ) - return [response_body] - else: - self._reqs_till_rate_limit -= 1 - - return super().__call__(environ, start_response) - - werkzeug.run_simple( - "0.0.0.0", - port, - _HostDispatcherApplication(create_backend_app), - threaded=True, - ssl_context=(cert_file, key_file) if cert_file and key_file else None, - ) - def _start_server(self): port = self.port = get_ephemeral_port(2) self.endpoint = f"{self.http_protocol}://{self.host}:{port}" @@ -614,7 +657,7 @@ def _start_server(self): "spawn" ) # In py3.7, multiprocess with forking will lead to seg fault in moto, possibly due to the handling of file descriptors self._p = spawn_context.Process( - target=self.run_server, + target=run_s3_server, args=( port, self.key_file if self.http_protocol == "https" else None, @@ -713,3 +756,45 @@ def create_fixture(self) -> NfsS3Bucket: out = NfsS3Bucket(self, bucket) self._live_buckets.append(out) return out + + +class MotoGcpS3StorageFixtureFactory(MotoS3StorageFixtureFactory): + def _start_server(self): + port = self.port = get_ephemeral_port(3) + self.endpoint = f"{self.http_protocol}://{self.host}:{port}" + self.working_dir = mkdtemp(suffix="MotoGcpS3StorageFixtureFactory") + self._iam_endpoint = f"{self.http_protocol}://localhost:{port}" + + self.ssl = ( + self.http_protocol == "https" + ) # In real world, using https protocol doesn't necessarily mean ssl will be verified + if self.ssl_test_support: + self.ca, self.key_file, self.cert_file, self.client_cert_file = get_ca_cert_for_testing(self.working_dir) + else: + self.ca = "" + self.key_file = "" + self.cert_file = "" + self.client_cert_file = "" + self.client_cert_dir = self.working_dir + + spawn_context = multiprocessing.get_context( + "spawn" + ) # In py3.7, multiprocess with forking will lead to seg fault in moto, possibly due to the handling of file descriptors + self._p = spawn_context.Process( + target=run_gcp_server, + args=( + port, + self.key_file if self.http_protocol == "https" else None, + self.cert_file if self.http_protocol == "https" else None, + ), + ) + self._p.start() + wait_for_server_to_come_up(self.endpoint, "moto", self._p) + + def create_fixture(self) -> GcpS3Bucket: + bucket = f"test_bucket_{self._bucket_id}" + self._s3_admin.create_bucket(Bucket=bucket) + self._bucket_id += 1 + out = GcpS3Bucket(self, bucket) + self._live_buckets.append(out) + return out diff --git a/python/arcticdb/version_store/helper.py b/python/arcticdb/version_store/helper.py index a941ed1caf..49cbea5ded 100644 --- a/python/arcticdb/version_store/helper.py +++ b/python/arcticdb/version_store/helper.py @@ -6,13 +6,13 @@ As of the Change Date specified in that file, in accordance with the Business Source License, use of this software will be governed by the Apache License, version 2.0. """ -import os.path as osp import re import time -from typing import Iterable, Dict, Any, Union +from typing import Iterable, Any, Union from arcticc.pb2.lmdb_storage_pb2 import Config as LmdbConfig from arcticc.pb2.s3_storage_pb2 import Config as S3Config +from arcticc.pb2.gcp_storage_pb2 import Config as GcpConfig from arcticc.pb2.azure_storage_pb2 import Config as AzureConfig from arcticc.pb2.in_memory_storage_pb2 import Config as MemoryConfig from arcticc.pb2.mongo_storage_pb2 import Config as MongoConfig @@ -364,6 +364,54 @@ def add_s3_library_to_env( _add_lib_desc_to_env(env, lib_name, sid, description) +def get_gcp_proto( + *, + cfg, + lib_name, + env_name, + with_prefix, +): + env = cfg.env_by_id[env_name] + proto = GcpConfig() + + # adding time to prefix - so that the s3 root folder is unique and we can delete and recreate fast + if with_prefix: + if isinstance(with_prefix, str): + proto.prefix = with_prefix + else: + proto.prefix = f"{lib_name}{time.time() * 1e9:.0f}" + else: + proto.prefix = lib_name + + sid, storage = get_storage_for_lib_name(proto.prefix, env) + storage.config.Pack(proto, type_url_prefix="cxx.arctic.org") + return sid + + +def add_gcp_library_to_env( + *, + cfg, + lib_name, + env_name, + with_prefix, +): + env = cfg.env_by_id[env_name] + if with_prefix and isinstance(with_prefix, str) and (with_prefix.endswith("/") or "//" in with_prefix): + raise UserInputException( + "path_prefix cannot contain // or end with a / because this breaks some S3 API calls, path_prefix was" + f" [{with_prefix}]" + ) + + sid = get_gcp_proto( + cfg=cfg, + lib_name=lib_name, + env_name=env_name, + with_prefix=with_prefix, + ) + + _add_lib_desc_to_env(env, lib_name, sid) + + def get_azure_proto( cfg, lib_name, diff --git a/python/tests/conftest.py b/python/tests/conftest.py index f533f16855..45cb0cae72 100644 --- a/python/tests/conftest.py +++ b/python/tests/conftest.py @@ -7,7 +7,7 @@ """ import enum -from typing import Callable, Generator, Iterator +from typing import Callable, Generator from arcticdb.version_store._store import NativeVersionStore from arcticdb.version_store.library import Library import hypothesis @@ -30,6 +30,7 @@ from arcticdb.storage_fixtures.s3 import ( BaseS3StorageFixtureFactory, MotoS3StorageFixtureFactory, + MotoGcpS3StorageFixtureFactory, MotoNfsBackedS3StorageFixtureFactory, NfsS3Bucket, S3Bucket, @@ -55,6 +56,7 @@ ) from arcticdb.storage_fixtures.utils import safer_rmtree + # region =================================== Misc. Constants & Setup ==================================== hypothesis.settings.register_profile("ci_linux", max_examples=100) hypothesis.settings.register_profile("ci_windows", max_examples=100) @@ -149,6 +151,14 @@ def s3_storage_factory() -> Generator[MotoS3StorageFixtureFactory, None, None]: yield f +@pytest.fixture(scope="session") +def gcp_storage_factory() -> Generator[MotoGcpS3StorageFixtureFactory, None, None]: + with MotoGcpS3StorageFixtureFactory( + use_ssl=SSL_TEST_SUPPORTED, ssl_test_support=SSL_TEST_SUPPORTED, bucket_versioning=False + ) as f: + yield f + + @pytest.fixture(scope="session") def wrapped_s3_storage_factory() -> Generator[MotoS3StorageFixtureFactory, None, None]: with MotoS3StorageFixtureFactory( @@ -191,6 +201,12 @@ def s3_storage(s3_storage_factory) -> Generator[S3Bucket, None, None]: yield f +@pytest.fixture +def gcp_storage(gcp_storage_factory) -> Generator[S3Bucket, None, None]: + with gcp_storage_factory.create_fixture() as f: + yield f + + @pytest.fixture def nfs_backed_s3_storage(nfs_backed_s3_storage_factory) -> Generator[NfsS3Bucket, None, None]: with nfs_backed_s3_storage_factory.create_fixture() as f: @@ -369,6 +385,8 @@ def mem_storage() -> Generator[InMemoryStorageFixture, None, None]: scope="function", params=[ "s3", + "nfs_backed_s3", + "gcp", "lmdb", "mem", pytest.param("azurite", marks=AZURE_TESTS_MARK), diff --git a/python/tests/integration/arcticdb/test_arctic.py b/python/tests/integration/arcticdb/test_arctic.py index fdaf51508d..f904792129 100644 --- a/python/tests/integration/arcticdb/test_arctic.py +++ b/python/tests/integration/arcticdb/test_arctic.py @@ -83,8 +83,9 @@ def edit_connection_string(uri, delimiter, storage, ssl_setting, client_cert_fil @pytest.mark.parametrize('client_cert_file', parameter_display_status if SSL_TEST_SUPPORTED else no_ssl_parameter_display_status) @pytest.mark.parametrize('client_cert_dir', parameter_display_status if SSL_TEST_SUPPORTED else no_ssl_parameter_display_status) @pytest.mark.parametrize('ssl_setting', parameter_display_status if SSL_TEST_SUPPORTED else no_ssl_parameter_display_status) -def test_s3_verification(monkeypatch, s3_storage, client_cert_file, client_cert_dir, ssl_setting): - storage = s3_storage +@pytest.mark.parametrize('storage_fixture', ["s3_storage", "gcp_storage"]) +def test_s3_verification(monkeypatch, storage_fixture, client_cert_file, client_cert_dir, ssl_setting, request): + storage = request.getfixturevalue(storage_fixture) # Leaving ca file and ca dir unset will fallback to using os default setting, # which is different from the test environment default_setting = DefaultSetting(storage.factory) diff --git a/python/tests/integration/storage_fixtures/test_s3.py b/python/tests/integration/storage_fixtures/test_s3.py index 705761c57e..8667c36b8a 100644 --- a/python/tests/integration/storage_fixtures/test_s3.py +++ b/python/tests/integration/storage_fixtures/test_s3.py @@ -1,5 +1,7 @@ +import botocore.exceptions +import pytest import requests -from arcticdb.storage_fixtures.s3 import MotoS3StorageFixtureFactory +from arcticdb.storage_fixtures.s3 import MotoS3StorageFixtureFactory, MotoGcpS3StorageFixtureFactory def test_rate_limit(s3_storage_factory: MotoS3StorageFixtureFactory): # Don't need to create buckets @@ -27,3 +29,44 @@ def test_rate_limit(s3_storage_factory: MotoS3StorageFixtureFactory): # Don't n # Then working again requests.head(s3.endpoint, verify=s3.client_cert_file).raise_for_status() + + +def test_gcp_no_batch_delete(gcp_storage_factory: MotoGcpS3StorageFixtureFactory): + # Given + gcp = gcp_storage_factory + bucket = gcp.create_fixture() + boto_bucket = bucket.get_boto_bucket() + boto_bucket.put_object(Key="key1", Body=b"contents1") + boto_bucket.put_object(Key="key2", Body=b"contents2") + assert [k.key for k in boto_bucket.objects.all()] == ["key1", "key2"] + + # When + with pytest.raises(botocore.exceptions.ClientError): + boto_bucket.delete_objects(Delete={"Objects": [ + {"Key": "key1"}, + {"Key": "key2"}, + ]}) + + # Then + # We're checking that our simulator doesn't handle batch deletes (like GCP does not) + assert [k.key for k in boto_bucket.objects.all()] == ["key1", "key2"] + + +def test_s3_has_batch_delete(s3_storage_factory: MotoS3StorageFixtureFactory): + # Given + s3 = s3_storage_factory + bucket = s3.create_fixture() + boto_bucket = bucket.get_boto_bucket() + boto_bucket.put_object(Key="key1", Body=b"contents1") + boto_bucket.put_object(Key="key2", Body=b"contents2") + assert [k.key for k in boto_bucket.objects.all()] == ["key1", "key2"] + + # When + boto_bucket.delete_objects(Delete={"Objects": [ + {"Key": "key1"}, + {"Key": "key2"}, + ]}) + + # Then + # We're checking that our simulator does handle batch deletes (like AWS does) + assert [k.key for k in boto_bucket.objects.all()] == [] diff --git a/python/tests/unit/arcticdb/version_store/test_fork.py b/python/tests/unit/arcticdb/version_store/test_fork.py index e758f9885d..4141de39dc 100644 --- a/python/tests/unit/arcticdb/version_store/test_fork.py +++ b/python/tests/unit/arcticdb/version_store/test_fork.py @@ -7,9 +7,10 @@ """ import time from multiprocessing import Pool -from pickle import loads, dumps import numpy as np import pandas as pd +import pytest +from arcticdb import Arctic from arcticdb.util.test import assert_frame_equal @@ -26,25 +27,6 @@ def write_symbol(args): return symbol -def check_lib_config(lib): - assert lib.env == "test" - found_test_normalizer = False - for normalizer in lib._custom_normalizer._normalizers: - if normalizer.__class__.__name__ == "TestCustomNormalizer": - found_test_normalizer = True - - assert found_test_normalizer - - -def get_pickle_store(lmdb_version_store): - d = {"a": "b"} - lmdb_version_store.write("xxx", d) - ser = dumps(lmdb_version_store) - nvs = loads(ser) - out = nvs.read("xxx") - assert d == out.data - - def test_map(lmdb_version_store): symbols = ["XXX", "YYY"] p = Pool(1) @@ -78,3 +60,20 @@ def test_parallel_reads(local_object_version_store): p.map(_read_and_assert_symbol, [(local_object_version_store, s, idx) for idx, s in enumerate(symbols)]) p.close() p.join() + + +@pytest.mark.parametrize("storage_name", [ + "s3_storage", + "gcp_storage" +]) +def test_parallel_reads_arctic(storage_name, request): + storage = request.getfixturevalue(storage_name) + ac = Arctic(storage.arctic_uri) + lib = ac.create_library("test") + symbols = [f"{i}" for i in range(1)] + for s in symbols: + lib.write(s, df("test1")) + p = Pool(10) + p.map(_read_and_assert_symbol, [(lib, s, idx) for idx, s in enumerate(symbols)]) + p.close() + p.join()