Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-36346: [C++][S3] Shutdown aws-sdk-cpp related resources on finalize #36437

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 73 additions & 6 deletions cpp/src/arrow/filesystem/s3fs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -398,12 +398,19 @@ namespace {
Status CheckS3Initialized() {
if (!IsS3Initialized()) {
return Status::Invalid(
"S3 subsystem not initialized; please call InitializeS3() "
"S3 subsystem is not initialized; please call InitializeS3() "
"before carrying out any S3-related operation");
}
return Status::OK();
}

Status CheckS3Finalized() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this actually useful? is_initialized_ is set to false when finalizing, so CheckS3Initialized will already fail.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In any case, please call this CheckS3NotFinalized as the current name is misleading.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, you're right.
We don't need this and I should have added Not...

if (IsS3Finalized()) {
return Status::Invalid("S3 subsystem is finalized");
}
return Status::OK();
}

// XXX Sanitize paths by removing leading slash?

struct S3Path {
Expand Down Expand Up @@ -1008,6 +1015,8 @@ class ObjectInputFile final : public io::RandomAccessFile {
content_length_(size) {}

Status Init() {
RETURN_NOT_OK(CheckS3Finalized());

// Issue a HEAD Object to get the content-length and ensure any
// errors (e.g. file not found) don't wait until the first Read() call.
if (content_length_ != kNoSize) {
Expand Down Expand Up @@ -1099,6 +1108,8 @@ class ObjectInputFile final : public io::RandomAccessFile {
return 0;
}

RETURN_NOT_OK(CheckS3Finalized());

// Read the desired range of bytes
ARROW_ASSIGN_OR_RAISE(S3Model::GetObjectResult result,
GetObjectRange(client_.get(), path_, position, nbytes, out));
Expand Down Expand Up @@ -1182,6 +1193,8 @@ class ObjectOutputStream final : public io::OutputStream {
}

Status Init() {
RETURN_NOT_OK(CheckS3Finalized());

// Initiate the multi-part upload
S3Model::CreateMultipartUploadRequest req;
req.SetBucket(ToAwsString(path_.bucket));
Expand Down Expand Up @@ -1217,6 +1230,8 @@ class ObjectOutputStream final : public io::OutputStream {
return Status::OK();
}

RETURN_NOT_OK(CheckS3Finalized());

S3Model::AbortMultipartUploadRequest req;
req.SetBucket(ToAwsString(path_.bucket));
req.SetKey(ToAwsString(path_.key));
Expand Down Expand Up @@ -1245,6 +1260,8 @@ class ObjectOutputStream final : public io::OutputStream {
Future<> CloseAsync() override {
if (closed_) return Status::OK();

RETURN_NOT_OK(CheckS3Finalized());

if (current_part_) {
// Upload last part
RETURN_NOT_OK(CommitCurrentPart());
Expand Down Expand Up @@ -1307,6 +1324,8 @@ class ObjectOutputStream final : public io::OutputStream {
return Status::Invalid("Operation on closed stream");
}

RETURN_NOT_OK(CheckS3Finalized());

const int8_t* data_ptr = reinterpret_cast<const int8_t*>(data);
auto advance_ptr = [&data_ptr, &nbytes](const int64_t offset) {
data_ptr += offset;
Expand Down Expand Up @@ -1359,6 +1378,7 @@ class ObjectOutputStream final : public io::OutputStream {
if (closed_) {
return Status::Invalid("Operation on closed stream");
}
RETURN_NOT_OK(CheckS3Finalized());
// Wait for background writes to finish
std::unique_lock<std::mutex> lock(upload_state_->mutex);
return upload_state_->pending_parts_completed;
Expand All @@ -1367,6 +1387,7 @@ class ObjectOutputStream final : public io::OutputStream {
// Upload-related helpers

Status CommitCurrentPart() {
RETURN_NOT_OK(CheckS3Finalized());
ARROW_ASSIGN_OR_RAISE(auto buf, current_part_->Finish());
current_part_.reset();
current_part_size_ = 0;
Expand All @@ -1379,6 +1400,8 @@ class ObjectOutputStream final : public io::OutputStream {

Status UploadPart(const void* data, int64_t nbytes,
std::shared_ptr<Buffer> owned_buffer = nullptr) {
RETURN_NOT_OK(CheckS3Finalized());

S3Model::UploadPartRequest req;
req.SetBucket(ToAwsString(path_.bucket));
req.SetKey(ToAwsString(path_.key));
Expand Down Expand Up @@ -1574,6 +1597,8 @@ struct TreeWalker : public std::enable_shared_from_this<TreeWalker> {
S3Model::ListObjectsV2Request req;

Status operator()(const Result<S3Model::ListObjectsV2Outcome>& result) {
RETURN_NOT_OK(CheckS3Finalized());

// Serialize calls to operation-specific handlers
if (!walker->ok()) {
// Early exit: avoid executing handlers if DoWalk() returned
Expand Down Expand Up @@ -1692,6 +1717,8 @@ class S3FileSystem::Impl : public std::enable_shared_from_this<S3FileSystem::Imp

// Tests to see if a bucket exists
Result<bool> BucketExists(const std::string& bucket) {
RETURN_NOT_OK(CheckS3Finalized());

S3Model::HeadBucketRequest req;
req.SetBucket(ToAwsString(bucket));

Expand All @@ -1709,6 +1736,8 @@ class S3FileSystem::Impl : public std::enable_shared_from_this<S3FileSystem::Imp

// Create a bucket. Successful if bucket already exists.
Status CreateBucket(const std::string& bucket) {
RETURN_NOT_OK(CheckS3Finalized());

// Check bucket exists first.
{
S3Model::HeadBucketRequest req;
Expand Down Expand Up @@ -1753,6 +1782,8 @@ class S3FileSystem::Impl : public std::enable_shared_from_this<S3FileSystem::Imp

// Create an object with empty contents. Successful if object already exists.
Status CreateEmptyObject(const std::string& bucket, const std::string& key) {
RETURN_NOT_OK(CheckS3Finalized());

S3Model::PutObjectRequest req;
req.SetBucket(ToAwsString(bucket));
req.SetKey(ToAwsString(key));
Expand All @@ -1768,6 +1799,7 @@ class S3FileSystem::Impl : public std::enable_shared_from_this<S3FileSystem::Imp
}

Status DeleteObject(const std::string& bucket, const std::string& key) {
RETURN_NOT_OK(CheckS3Finalized());
S3Model::DeleteObjectRequest req;
req.SetBucket(ToAwsString(bucket));
req.SetKey(ToAwsString(key));
Expand All @@ -1777,6 +1809,7 @@ class S3FileSystem::Impl : public std::enable_shared_from_this<S3FileSystem::Imp
}

Status CopyObject(const S3Path& src_path, const S3Path& dest_path) {
RETURN_NOT_OK(CheckS3Finalized());
S3Model::CopyObjectRequest req;
req.SetBucket(ToAwsString(dest_path.bucket));
req.SetKey(ToAwsString(dest_path.key));
Expand All @@ -1799,6 +1832,8 @@ class S3FileSystem::Impl : public std::enable_shared_from_this<S3FileSystem::Imp
Result<bool> IsEmptyDirectory(
const std::string& bucket, const std::string& key,
const S3Model::HeadObjectOutcome* previous_outcome = nullptr) {
RETURN_NOT_OK(CheckS3Finalized());

if (previous_outcome) {
// Fetch the backend from the previous error
DCHECK(!previous_outcome->IsSuccess());
Expand Down Expand Up @@ -1850,6 +1885,7 @@ class S3FileSystem::Impl : public std::enable_shared_from_this<S3FileSystem::Imp
}

Result<bool> IsNonEmptyDirectory(const S3Path& path) {
RETURN_NOT_OK(CheckS3Finalized());
S3Model::ListObjectsV2Request req;
req.SetBucket(ToAwsString(path.bucket));
req.SetPrefix(ToAwsString(path.key) + kSep);
Expand Down Expand Up @@ -1939,6 +1975,8 @@ class S3FileSystem::Impl : public std::enable_shared_from_this<S3FileSystem::Imp
// Workhorse for GetFileInfo(FileSelector...)
Status Walk(const FileSelector& select, const std::string& bucket,
const std::string& key, std::vector<FileInfo>* out) {
RETURN_NOT_OK(CheckS3Finalized());

FileInfoCollector collector(bucket, key, select);

auto handle_error = [&](const AWSError<S3Errors>& error) -> Status {
Expand Down Expand Up @@ -2027,6 +2065,8 @@ class S3FileSystem::Impl : public std::enable_shared_from_this<S3FileSystem::Imp
};
Future<std::shared_ptr<WalkResult>> WalkForDeleteDirAsync(const std::string& bucket,
const std::string& key) {
RETURN_NOT_OK(CheckS3Finalized());

auto state = std::make_shared<WalkResult>();

auto handle_results = [state](const std::string& prefix,
Expand Down Expand Up @@ -2064,6 +2104,8 @@ class S3FileSystem::Impl : public std::enable_shared_from_this<S3FileSystem::Imp
// Delete multiple objects at once
Future<> DeleteObjectsAsync(const std::string& bucket,
const std::vector<std::string>& keys) {
RETURN_NOT_OK(CheckS3Finalized());

struct DeleteCallback {
const std::string bucket;

Expand Down Expand Up @@ -2156,6 +2198,7 @@ class S3FileSystem::Impl : public std::enable_shared_from_this<S3FileSystem::Imp

static Result<std::vector<std::string>> ProcessListBuckets(
const Aws::S3::Model::ListBucketsOutcome& outcome) {
RETURN_NOT_OK(CheckS3Finalized());
if (!outcome.IsSuccess()) {
return ErrorToStatus(std::forward_as_tuple("When listing buckets: "), "ListBuckets",
outcome.GetError());
Expand All @@ -2169,11 +2212,13 @@ class S3FileSystem::Impl : public std::enable_shared_from_this<S3FileSystem::Imp
}

Result<std::vector<std::string>> ListBuckets() {
RETURN_NOT_OK(CheckS3Finalized());
auto outcome = client_->ListBuckets();
return ProcessListBuckets(outcome);
}

Future<std::vector<std::string>> ListBucketsAsync(io::IOContext ctx) {
RETURN_NOT_OK(CheckS3Finalized());
auto self = shared_from_this();
return DeferNotOk(SubmitIO(ctx, [self]() { return self->client_->ListBuckets(); }))
// TODO(ARROW-12655) Change to Then(Impl::ProcessListBuckets)
Expand All @@ -2187,6 +2232,7 @@ class S3FileSystem::Impl : public std::enable_shared_from_this<S3FileSystem::Imp
ARROW_RETURN_NOT_OK(internal::AssertNoTrailingSlash(s));
ARROW_ASSIGN_OR_RAISE(auto path, S3Path::FromString(s));
RETURN_NOT_OK(ValidateFilePath(path));
RETURN_NOT_OK(CheckS3Finalized());

auto ptr = std::make_shared<ObjectInputFile>(client_, fs->io_context(), path);
RETURN_NOT_OK(ptr->Init());
Expand All @@ -2205,6 +2251,7 @@ class S3FileSystem::Impl : public std::enable_shared_from_this<S3FileSystem::Imp

ARROW_ASSIGN_OR_RAISE(auto path, S3Path::FromString(info.path()));
RETURN_NOT_OK(ValidateFilePath(path));
RETURN_NOT_OK(CheckS3Finalized());

auto ptr =
std::make_shared<ObjectInputFile>(client_, fs->io_context(), path, info.size());
Expand All @@ -2223,6 +2270,7 @@ S3FileSystem::~S3FileSystem() {}
Result<std::shared_ptr<S3FileSystem>> S3FileSystem::Make(
const S3Options& options, const io::IOContext& io_context) {
RETURN_NOT_OK(CheckS3Initialized());
RETURN_NOT_OK(CheckS3Finalized());

std::shared_ptr<S3FileSystem> ptr(new S3FileSystem(options, io_context));
RETURN_NOT_OK(ptr->impl_->Init());
Expand Down Expand Up @@ -2250,6 +2298,8 @@ S3Options S3FileSystem::options() const { return impl_->options(); }
std::string S3FileSystem::region() const { return impl_->region(); }

Result<FileInfo> S3FileSystem::GetFileInfo(const std::string& s) {
RETURN_NOT_OK(CheckS3Finalized());

ARROW_ASSIGN_OR_RAISE(auto path, S3Path::FromString(s));
FileInfo info;
info.set_path(s);
Expand Down Expand Up @@ -2313,6 +2363,8 @@ Result<FileInfo> S3FileSystem::GetFileInfo(const std::string& s) {
}

Result<FileInfoVector> S3FileSystem::GetFileInfo(const FileSelector& select) {
RETURN_NOT_OK(CheckS3Finalized());

ARROW_ASSIGN_OR_RAISE(auto base_path, S3Path::FromString(select.base_dir));

FileInfoVector results;
Expand Down Expand Up @@ -2383,6 +2435,8 @@ FileInfoGenerator S3FileSystem::GetFileInfoGenerator(const FileSelector& select)
}

Status S3FileSystem::CreateDir(const std::string& s, bool recursive) {
RETURN_NOT_OK(CheckS3Finalized());

ARROW_ASSIGN_OR_RAISE(auto path, S3Path::FromString(s));

if (path.key.empty()) {
Expand Down Expand Up @@ -2426,6 +2480,8 @@ Status S3FileSystem::CreateDir(const std::string& s, bool recursive) {
}

Status S3FileSystem::DeleteDir(const std::string& s) {
RETURN_NOT_OK(CheckS3Finalized());

ARROW_ASSIGN_OR_RAISE(auto path, S3Path::FromString(s));

if (path.empty()) {
Expand Down Expand Up @@ -2455,6 +2511,8 @@ Status S3FileSystem::DeleteDirContents(const std::string& s, bool missing_dir_ok
}

Future<> S3FileSystem::DeleteDirContentsAsync(const std::string& s, bool missing_dir_ok) {
RETURN_NOT_OK(CheckS3Finalized());

ARROW_ASSIGN_OR_RAISE(auto path, S3Path::FromString(s));

if (path.empty()) {
Expand All @@ -2480,6 +2538,8 @@ Status S3FileSystem::DeleteRootDirContents() {
}

Status S3FileSystem::DeleteFile(const std::string& s) {
RETURN_NOT_OK(CheckS3Finalized());

ARROW_ASSIGN_OR_RAISE(auto path, S3Path::FromString(s));
RETURN_NOT_OK(ValidateFilePath(path));

Expand All @@ -2506,6 +2566,8 @@ Status S3FileSystem::DeleteFile(const std::string& s) {
}

Status S3FileSystem::Move(const std::string& src, const std::string& dest) {
RETURN_NOT_OK(CheckS3Finalized());

// XXX We don't implement moving directories as it would be too expensive:
// one must copy all directory contents one by one (including object data),
// then delete the original contents.
Expand All @@ -2525,6 +2587,8 @@ Status S3FileSystem::Move(const std::string& src, const std::string& dest) {
}

Status S3FileSystem::CopyFile(const std::string& src, const std::string& dest) {
RETURN_NOT_OK(CheckS3Finalized());

ARROW_ASSIGN_OR_RAISE(auto src_path, S3Path::FromString(src));
RETURN_NOT_OK(ValidateFilePath(src_path));
ARROW_ASSIGN_OR_RAISE(auto dest_path, S3Path::FromString(dest));
Expand Down Expand Up @@ -2562,6 +2626,8 @@ Result<std::shared_ptr<io::OutputStream>> S3FileSystem::OpenOutputStream(
ARROW_ASSIGN_OR_RAISE(auto path, S3Path::FromString(s));
RETURN_NOT_OK(ValidateFilePath(path));

RETURN_NOT_OK(CheckS3Finalized());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you put this consistently at the start of the method?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes.
(I chose this position to use non aws-sdk-cpp related codes as much as possible even after arrow::fs::FinalizeS3(). But it may not be important in most cases.)


auto ptr = std::make_shared<ObjectOutputStream>(impl_->client_, io_context(), path,
impl_->options(), metadata);
RETURN_NOT_OK(ptr->Init());
Expand Down Expand Up @@ -2600,6 +2666,8 @@ struct AwsInstance : public ::arrow::internal::Executor::Resource {

bool IsInitialized() { return !is_finalized_ && is_initialized_; }

bool IsFinalized() { return is_finalized_; }

void Finalize(bool from_destructor = false) {
bool expected = true;
is_finalized_.store(true);
Expand All @@ -2608,9 +2676,9 @@ struct AwsInstance : public ::arrow::internal::Executor::Resource {
ARROW_LOG(WARNING)
<< " arrow::fs::FinalizeS3 was not called even though S3 was initialized. "
"This could lead to a segmentation fault at exit";
RegionResolver::ResetDefaultInstance();
Aws::ShutdownAPI(aws_options_);
}
RegionResolver::ResetDefaultInstance();
Aws::ShutdownAPI(aws_options_);
}
}

Expand Down Expand Up @@ -2672,9 +2740,6 @@ struct AwsInstance : public ::arrow::internal::Executor::Resource {

std::shared_ptr<AwsInstance> CreateAwsInstance() {
auto instance = std::make_shared<AwsInstance>();
// Don't let S3 be shutdown until all Arrow threads are done using it
arrow::internal::GetCpuThreadPool()->KeepAlive(instance);
io::internal::GetIOThreadPool()->KeepAlive(instance);
return instance;
}

Expand Down Expand Up @@ -2713,6 +2778,8 @@ Status EnsureS3Finalized() { return FinalizeS3(); }

bool IsS3Initialized() { return GetAwsInstance().IsInitialized(); }

bool IsS3Finalized() { return GetAwsInstance().IsFinalized(); }

// -----------------------------------------------------------------------
// Top-level utility functions

Expand Down
4 changes: 4 additions & 0 deletions cpp/src/arrow/filesystem/s3fs.h
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,10 @@ Status EnsureS3Initialized();
ARROW_EXPORT
bool IsS3Initialized();

/// Whether S3 was finalized.
ARROW_EXPORT
bool IsS3Finalized();

/// Shutdown the S3 APIs.
ARROW_EXPORT
Status FinalizeS3();
Expand Down
Loading