Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(s3fs): Add Metrics #12213

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
*/

#ifdef VELOX_ENABLE_S3
#include "velox/common/base/StatsReporter.h"
#include "velox/connectors/hive/storage_adapters/s3fs/S3Config.h" // @manual
#include "velox/connectors/hive/storage_adapters/s3fs/S3Counters.h" // @manual
#include "velox/connectors/hive/storage_adapters/s3fs/S3FileSystem.h" // @manual
#include "velox/connectors/hive/storage_adapters/s3fs/S3Util.h" // @manual
#include "velox/dwio/common/FileSink.h"
Expand Down Expand Up @@ -122,4 +124,19 @@ void finalizeS3FileSystem() {
#endif
}

void registerS3Metrics() {
#ifdef VELOX_ENABLE_S3
DEFINE_METRIC(kMetricS3ActiveConnections, velox::StatType::SUM);
DEFINE_METRIC(kMetricS3StartedUploads, velox::StatType::COUNT);
DEFINE_METRIC(kMetricS3FailedUploads, velox::StatType::COUNT);
DEFINE_METRIC(kMetricS3SuccessfulUploads, velox::StatType::COUNT);
DEFINE_METRIC(kMetricS3MetadataCalls, velox::StatType::COUNT);
DEFINE_METRIC(kMetricS3GetObjectCalls, velox::StatType::COUNT);
DEFINE_METRIC(kMetricS3GetObjectErrors, velox::StatType::COUNT);
DEFINE_METRIC(kMetricS3GetMetadataErrors, velox::StatType::COUNT);
DEFINE_METRIC(kMetricS3GetObjectRetries, velox::StatType::COUNT);
DEFINE_METRIC(kMetricS3GetMetadataRetries, velox::StatType::COUNT);
#endif
}

} // namespace facebook::velox::filesystems
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ namespace facebook::velox::filesystems {
// Register the S3 filesystem.
void registerS3FileSystem();

void registerS3Metrics();

/// Teardown the AWS SDK C++.
/// Velox users need to manually invoke this before exiting an application.
/// This is because Velox uses a static object to hold the S3 FileSystem
Expand Down
56 changes: 56 additions & 0 deletions velox/connectors/hive/storage_adapters/s3fs/S3Counters.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once

namespace facebook::velox::filesystems {

// The number of connections open for S3 read operations.
constexpr std::string_view kMetricS3ActiveConnections{
majetideepak marked this conversation as resolved.
Show resolved Hide resolved
"velox.s3.active_connections"};

// The number of S3 upload calls that started.
constexpr std::string_view kMetricS3StartedUploads{"velox.s3.started_uploads"};

// The number of S3 upload calls that were completed.
constexpr std::string_view kMetricS3SuccessfulUploads{
xin-zhang2 marked this conversation as resolved.
Show resolved Hide resolved
"velox.s3.successful_uploads"};

// The number of S3 upload calls that failed.
constexpr std::string_view kMetricS3FailedUploads{"velox.s3.failed_uploads"};

// The number of S3 head (metadata) calls.
constexpr std::string_view kMetricS3MetadataCalls{"velox.s3.metadata_calls"};

// The number of S3 head (metadata) calls that failed.
constexpr std::string_view kMetricS3GetMetadataErrors{
xin-zhang2 marked this conversation as resolved.
Show resolved Hide resolved
"velox.s3.get_metadata_errors"};

// The number of retries made during S3 head (metadata) calls.
constexpr std::string_view kMetricS3GetMetadataRetries{
xin-zhang2 marked this conversation as resolved.
Show resolved Hide resolved
"velox.s3.get_metadata_retries"};

// The number of S3 getObject calls.
constexpr std::string_view kMetricS3GetObjectCalls{"velox.s3.get_object_calls"};

// The number of S3 getObject calls that failed.
constexpr std::string_view kMetricS3GetObjectErrors{
"velox.s3.get_object_errors"};

// The number of retries made during S3 getObject calls.
constexpr std::string_view kMetricS3GetObjectRetries{
"velox.s3.get_object_retries"};

} // namespace facebook::velox::filesystems
26 changes: 26 additions & 0 deletions velox/connectors/hive/storage_adapters/s3fs/S3FileSystem.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@
*/

#include "velox/connectors/hive/storage_adapters/s3fs/S3FileSystem.h"
#include "velox/common/base/StatsReporter.h"
#include "velox/common/config/Config.h"
#include "velox/common/file/File.h"
#include "velox/connectors/hive/storage_adapters/s3fs/S3Config.h"
#include "velox/connectors/hive/storage_adapters/s3fs/S3Counters.h"
#include "velox/connectors/hive/storage_adapters/s3fs/S3Util.h"
#include "velox/connectors/hive/storage_adapters/s3fs/S3WriteFile.h"
#include "velox/dwio/common/DataBuffer.h"
Expand Down Expand Up @@ -96,7 +98,12 @@ class S3ReadFile final : public ReadFile {
request.SetBucket(awsString(bucket_));
request.SetKey(awsString(key_));

RECORD_METRIC_VALUE(kMetricS3MetadataCalls);
auto outcome = client_->HeadObject(request);
if (!outcome.IsSuccess()) {
RECORD_METRIC_VALUE(kMetricS3GetMetadataErrors);
}
RECORD_METRIC_VALUE(kMetricS3GetMetadataRetries, outcome.GetRetryCount());
VELOX_CHECK_AWS_OUTCOME(
outcome, "Failed to get metadata for S3 object", bucket_, key_);
length_ = outcome.GetResult().GetContentLength();
Expand Down Expand Up @@ -184,7 +191,14 @@ class S3ReadFile final : public ReadFile {
request.SetRange(awsString(ss.str()));
request.SetResponseStreamFactory(
AwsWriteableStreamFactory(position, length));
RECORD_METRIC_VALUE(kMetricS3ActiveConnections);
RECORD_METRIC_VALUE(kMetricS3GetObjectCalls);
auto outcome = client_->GetObject(request);
if (!outcome.IsSuccess()) {
RECORD_METRIC_VALUE(kMetricS3GetObjectErrors);
}
RECORD_METRIC_VALUE(kMetricS3GetObjectRetries, outcome.GetRetryCount());
RECORD_METRIC_VALUE(kMetricS3ActiveConnections, -1);
xin-zhang2 marked this conversation as resolved.
Show resolved Hide resolved
VELOX_CHECK_AWS_OUTCOME(outcome, "Failed to get S3 object", bucket_, key_);
}

Expand Down Expand Up @@ -237,7 +251,13 @@ class S3WriteFile::Impl {
Aws::S3::Model::HeadObjectRequest request;
request.SetBucket(awsString(bucket_));
request.SetKey(awsString(key_));
RECORD_METRIC_VALUE(kMetricS3MetadataCalls);
auto objectMetadata = client_->HeadObject(request);
if (!objectMetadata.IsSuccess()) {
RECORD_METRIC_VALUE(kMetricS3GetMetadataErrors);
}
RECORD_METRIC_VALUE(
kMetricS3GetObjectRetries, objectMetadata.GetRetryCount());
VELOX_CHECK(!objectMetadata.IsSuccess(), "S3 object already exists");
}

Expand Down Expand Up @@ -305,6 +325,7 @@ class S3WriteFile::Impl {
if (closed()) {
return;
}
RECORD_METRIC_VALUE(kMetricS3StartedUploads);
uploadPart({currentPart_->data(), currentPart_->size()}, true);
VELOX_CHECK_EQ(uploadState_.partNumber, uploadState_.completedParts.size());
// Complete the multipart upload.
Expand All @@ -318,6 +339,11 @@ class S3WriteFile::Impl {
request.SetMultipartUpload(std::move(completedUpload));

auto outcome = client_->CompleteMultipartUpload(request);
if (outcome.IsSuccess()) {
RECORD_METRIC_VALUE(kMetricS3SuccessfulUploads);
} else {
RECORD_METRIC_VALUE(kMetricS3FailedUploads);
}
VELOX_CHECK_AWS_OUTCOME(
outcome, "Failed to complete multiple part upload", bucket_, key_);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,3 +96,12 @@ target_link_libraries(
velox_exec
GTest::gtest
GTest::gtest_main)

add_executable(velox_s3metrics_test S3FileSystemMetricsTest.cpp)
add_test(velox_s3metrics_test velox_s3metrics_test)
target_link_libraries(
velox_s3metrics_test
velox_s3fs
velox_exec_test_lib
GTest::gtest
GTest::gtest_main)
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <folly/init/Init.h>

#include "velox/common/memory/Memory.h"
#include "velox/connectors/hive/storage_adapters/s3fs/RegisterS3FileSystem.h"
#include "velox/connectors/hive/storage_adapters/s3fs/S3Counters.h"
#include "velox/connectors/hive/storage_adapters/s3fs/S3WriteFile.h"
#include "velox/connectors/hive/storage_adapters/s3fs/tests/S3Test.h"

#include <gtest/gtest.h>

namespace facebook::velox::filesystems {
namespace {
class S3TestReporter : public BaseStatsReporter {
public:
mutable std::mutex m;
mutable std::map<std::string, size_t> counterMap;
mutable std::unordered_map<std::string, StatType> statTypeMap;
mutable std::unordered_map<std::string, std::vector<int32_t>>
histogramPercentilesMap;

void clear() {
std::lock_guard<std::mutex> l(m);
counterMap.clear();
statTypeMap.clear();
histogramPercentilesMap.clear();
}
void registerMetricExportType(const char* key, StatType statType)
const override {
statTypeMap[key] = statType;
}

void registerMetricExportType(folly::StringPiece key, StatType statType)
const override {
statTypeMap[key.str()] = statType;
}

void registerHistogramMetricExportType(
const char* key,
int64_t /* bucketWidth */,
int64_t /* min */,
int64_t /* max */,
const std::vector<int32_t>& pcts) const override {
histogramPercentilesMap[key] = pcts;
}

void registerHistogramMetricExportType(
folly::StringPiece key,
int64_t /* bucketWidth */,
int64_t /* min */,
int64_t /* max */,
const std::vector<int32_t>& pcts) const override {
histogramPercentilesMap[key.str()] = pcts;
}

void addMetricValue(const std::string& key, const size_t value)
const override {
std::lock_guard<std::mutex> l(m);
counterMap[key] += value;
}

void addMetricValue(const char* key, const size_t value) const override {
std::lock_guard<std::mutex> l(m);
counterMap[key] += value;
}

void addMetricValue(folly::StringPiece key, size_t value) const override {
std::lock_guard<std::mutex> l(m);
counterMap[key.str()] += value;
}

void addHistogramMetricValue(const std::string& key, size_t value)
const override {
counterMap[key] = std::max(counterMap[key], value);
}

void addHistogramMetricValue(const char* key, size_t value) const override {
counterMap[key] = std::max(counterMap[key], value);
}

void addHistogramMetricValue(folly::StringPiece key, size_t value)
const override {
counterMap[key.str()] = std::max(counterMap[key.str()], value);
}

std::string fetchMetrics() override {
std::stringstream ss;
ss << "[";
auto sep = "";
for (const auto& [key, value] : counterMap) {
ss << sep << key << ":" << value;
sep = ",";
}
ss << "]";
return ss.str();
}
};

folly::Singleton<BaseStatsReporter> reporter([]() {
return new S3TestReporter();
});

class S3FileSystemMetricsTest : public S3Test {
protected:
static void SetUpTestSuite() {
memory::MemoryManager::testingSetInstance({});
}

void SetUp() override {
S3Test::SetUp();
filesystems::initializeS3("Info");
s3Reporter = std::dynamic_pointer_cast<S3TestReporter>(
folly::Singleton<BaseStatsReporter>::try_get());
s3Reporter->clear();
}

static void TearDownTestSuite() {
filesystems::finalizeS3();
}
std::shared_ptr<S3TestReporter> s3Reporter;
};

} // namespace

TEST_F(S3FileSystemMetricsTest, metrics) {
registerS3Metrics();

const auto bucketName = "metrics";
const auto file = "test.txt";
const auto filename = localPath(bucketName) + "/" + file;
const auto s3File = s3URI(bucketName, file);
auto hiveConfig = minioServer_->hiveConfig();
S3FileSystem s3fs(bucketName, hiveConfig);
auto pool = memory::memoryManager()->addLeafPool("S3FileSystemMetricsTest");

auto writeFile =
s3fs.openFileForWrite(s3File, {{}, pool.get(), std::nullopt});
EXPECT_EQ(1, s3Reporter->counterMap[std::string{kMetricS3MetadataCalls}]);
EXPECT_EQ(1, s3Reporter->counterMap[std::string{kMetricS3GetMetadataErrors}]);

constexpr std::string_view kDataContent =
"Dance me to your beauty with a burning violin"
"Dance me through the panic till I'm gathered safely in"
"Lift me like an olive branch and be my homeward dove"
"Dance me to the end of love";
writeFile->append(kDataContent);
writeFile->close();
EXPECT_EQ(1, s3Reporter->counterMap[std::string{kMetricS3StartedUploads}]);
EXPECT_EQ(1, s3Reporter->counterMap[std::string{kMetricS3SuccessfulUploads}]);

auto readFile = s3fs.openFileForRead(s3File);
EXPECT_EQ(2, s3Reporter->counterMap[std::string{kMetricS3MetadataCalls}]);
readFile->pread(0, kDataContent.length());
EXPECT_EQ(1, s3Reporter->counterMap[std::string{kMetricS3GetObjectCalls}]);
}

} // namespace facebook::velox::filesystems

int main(int argc, char** argv) {
testing::InitGoogleTest(&argc, argv);
folly::Init init{&argc, &argv, false};
BaseStatsReporter::registered = true;
return RUN_ALL_TESTS();
}
Loading