Skip to content

Commit

Permalink
Default compression setting via CS config (#13203)
Browse files Browse the repository at this point in the history
  • Loading branch information
vlad-gogov authored Jan 13, 2025
1 parent cbefb3a commit fef7e41
Show file tree
Hide file tree
Showing 10 changed files with 191 additions and 16 deletions.
55 changes: 55 additions & 0 deletions ydb/core/config/validation/column_shard_config_validator.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
#include "validators.h"

#include <ydb/core/formats/arrow/serializer/parsing.h>
#include <ydb/core/formats/arrow/serializer/utils.h>
#include <ydb/core/protos/config.pb.h>

#include <util/generic/string.h>
#include <util/string/builder.h>

#include <vector>

namespace NKikimr::NConfig {
namespace {

EValidationResult ValidateDefaultCompression(const NKikimrConfig::TColumnShardConfig& columnShardConfig, std::vector<TString>& msg) {
if (!columnShardConfig.HasDefaultCompression() && !columnShardConfig.HasDefaultCompressionLevel()) {
return EValidationResult::Ok;
}
if (!columnShardConfig.HasDefaultCompression() && columnShardConfig.HasDefaultCompressionLevel()) {
msg.push_back("ColumnShardConfig: compression level is set without compression type");
return EValidationResult::Error;
}
std::optional<arrow::Compression::type> codec = NArrow::CompressionFromProto(columnShardConfig.GetDefaultCompression());
if (!codec.has_value()) {
msg.push_back("ColumnShardConfig: Unknown compression");
return EValidationResult::Error;
}
if (columnShardConfig.HasDefaultCompressionLevel()) {
if (!NArrow::SupportsCompressionLevel(codec.value())) {
TString messageErr = TStringBuilder() << "ColumnShardConfig: compression `" << NArrow::CompressionToString(codec.value())
<< "` does not support compression level";
msg.push_back(messageErr);
return EValidationResult::Error;
} else if (!NArrow::SupportsCompressionLevel(codec.value(), columnShardConfig.GetDefaultCompressionLevel())) {
TString messageErr = TStringBuilder()
<< "ColumnShardConfig: compression `" << NArrow::CompressionToString(codec.value())
<< "` does not support compression level = " << std::to_string(columnShardConfig.GetDefaultCompressionLevel());
msg.push_back(messageErr);
return EValidationResult::Error;
}
}
return EValidationResult::Ok;
}

} // namespace

EValidationResult ValidateColumnShardConfig(const NKikimrConfig::TColumnShardConfig& columnShardConfig, std::vector<TString>& msg) {
EValidationResult validateDefaultCompressionResult = ValidateDefaultCompression(columnShardConfig, msg);
if (validateDefaultCompressionResult == EValidationResult::Error) {
return EValidationResult::Error;
}
return EValidationResult::Ok;
}

} // namespace NKikimr::NConfig
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
#include <ydb/core/config/validation/validators.h>
#include <ydb/core/protos/config.pb.h>
#include <ydb/core/protos/flat_scheme_op.pb.h>

#include <library/cpp/testing/unittest/registar.h>

#include <vector>

using namespace NKikimr::NConfig;

Y_UNIT_TEST_SUITE(ColumnShardConfigValidation) {
Y_UNIT_TEST(AcceptDefaultCompression) {
NKikimrConfig::TColumnShardConfig CSConfig;
std::vector<TString> error;
EValidationResult result = ValidateColumnShardConfig(CSConfig, error);
UNIT_ASSERT_EQUAL(result, EValidationResult::Ok);
UNIT_ASSERT_C(error.empty(), "Should not be errors");
}

Y_UNIT_TEST(NotAcceptDefaultCompression) {
NKikimrConfig::TColumnShardConfig CSConfig;
std::vector<TString> error;
CSConfig.SetDefaultCompressionLevel(2);
EValidationResult result = ValidateColumnShardConfig(CSConfig, error);
UNIT_ASSERT_EQUAL(result, EValidationResult::Error);
UNIT_ASSERT_VALUES_EQUAL(error.size(), 1);
UNIT_ASSERT_STRINGS_EQUAL(error.front(), "ColumnShardConfig: compression level is set without compression type");
}

Y_UNIT_TEST(CorrectPlainCompression) {
NKikimrConfig::TColumnShardConfig CSConfig;
std::vector<TString> error;
CSConfig.SetDefaultCompression(NKikimrSchemeOp::EColumnCodec::ColumnCodecPlain);
EValidationResult result = ValidateColumnShardConfig(CSConfig, error);
UNIT_ASSERT_EQUAL(result, EValidationResult::Ok);
UNIT_ASSERT_C(error.empty(), "Should not be errors");
}

Y_UNIT_TEST(NotCorrectPlainCompression) {
NKikimrConfig::TColumnShardConfig CSConfig;
std::vector<TString> error;
CSConfig.SetDefaultCompression(NKikimrSchemeOp::EColumnCodec::ColumnCodecPlain);
CSConfig.SetDefaultCompressionLevel(1);
EValidationResult result = ValidateColumnShardConfig(CSConfig, error);
UNIT_ASSERT_EQUAL(result, EValidationResult::Error);
UNIT_ASSERT_VALUES_EQUAL(error.size(), 1);
UNIT_ASSERT_STRINGS_EQUAL(error.front(), "ColumnShardConfig: compression `uncompressed` does not support compression level");
}

Y_UNIT_TEST(CorrectLZ4Compression) {
NKikimrConfig::TColumnShardConfig CSConfig;
std::vector<TString> error;
CSConfig.SetDefaultCompression(NKikimrSchemeOp::EColumnCodec::ColumnCodecLZ4);
EValidationResult result = ValidateColumnShardConfig(CSConfig, error);
UNIT_ASSERT_EQUAL(result, EValidationResult::Ok);
UNIT_ASSERT_C(error.empty(), "Should not be errors");
}

Y_UNIT_TEST(NotCorrectLZ4Compression) {
NKikimrConfig::TColumnShardConfig CSConfig;
std::vector<TString> error;
CSConfig.SetDefaultCompression(NKikimrSchemeOp::EColumnCodec::ColumnCodecLZ4);
CSConfig.SetDefaultCompressionLevel(1);
EValidationResult result = ValidateColumnShardConfig(CSConfig, error);
UNIT_ASSERT_EQUAL(result, EValidationResult::Error);
UNIT_ASSERT_VALUES_EQUAL(error.size(), 1);
UNIT_ASSERT_STRINGS_EQUAL(error.front(), "ColumnShardConfig: compression `lz4` does not support compression level");
}

Y_UNIT_TEST(CorrectZSTDCompression) {
NKikimrConfig::TColumnShardConfig CSConfig;
std::vector<TString> error;
CSConfig.SetDefaultCompression(NKikimrSchemeOp::EColumnCodec::ColumnCodecZSTD);
EValidationResult result = ValidateColumnShardConfig(CSConfig, error);
UNIT_ASSERT_EQUAL(result, EValidationResult::Ok);
UNIT_ASSERT_C(error.empty(), "Should not be errors");
CSConfig.SetDefaultCompressionLevel(0);
result = ValidateColumnShardConfig(CSConfig, error);
UNIT_ASSERT_EQUAL(result, EValidationResult::Ok);
UNIT_ASSERT_C(error.empty(), "Should not be errors");
CSConfig.SetDefaultCompressionLevel(-100);
result = ValidateColumnShardConfig(CSConfig, error);
UNIT_ASSERT_EQUAL(result, EValidationResult::Ok);
UNIT_ASSERT_C(error.empty(), "Should not be errors");
}

Y_UNIT_TEST(NotCorrectZSTDCompression) {
NKikimrConfig::TColumnShardConfig CSConfig;
std::vector<TString> error;
CSConfig.SetDefaultCompression(NKikimrSchemeOp::EColumnCodec::ColumnCodecZSTD);
CSConfig.SetDefaultCompressionLevel(100);
EValidationResult result = ValidateColumnShardConfig(CSConfig, error);
UNIT_ASSERT_EQUAL(result, EValidationResult::Error);
UNIT_ASSERT_VALUES_EQUAL(error.size(), 1);
UNIT_ASSERT_STRINGS_EQUAL(error.front(), "ColumnShardConfig: compression `zstd` does not support compression level = 100");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
UNITTEST_FOR(ydb/core/config/validation)

SRC(
column_shard_config_validator_ut.cpp
)

END()
6 changes: 6 additions & 0 deletions ydb/core/config/validation/validators.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,12 @@ EValidationResult ValidateConfig(const NKikimrConfig::TAppConfig& config, std::v
return EValidationResult::Error;
}
}
if (config.HasColumnShardConfig()) {
NKikimr::NConfig::EValidationResult result = NKikimr::NConfig::ValidateColumnShardConfig(config.GetColumnShardConfig(), msg);
if (result == NKikimr::NConfig::EValidationResult::Error) {
return EValidationResult::Error;
}
}
if (msg.size() > 0) {
return EValidationResult::Warn;
}
Expand Down
4 changes: 4 additions & 0 deletions ydb/core/config/validation/validators.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ EValidationResult ValidateAuthConfig(
const NKikimrProto::TAuthConfig& authConfig,
std::vector<TString>& msg);

EValidationResult ValidateColumnShardConfig(
const NKikimrConfig::TColumnShardConfig& columnShardConfig,
std::vector<TString>& msg);

EValidationResult ValidateConfig(
const NKikimrConfig::TAppConfig& config,
std::vector<TString>& msg);
Expand Down
3 changes: 3 additions & 0 deletions ydb/core/config/validation/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,18 @@ SRCS(
validators.h
validators.cpp
auth_config_validator.cpp
column_shard_config_validator.cpp
)

PEERDIR(
ydb/core/protos
ydb/core/formats/arrow/serializer
)

END()

RECURSE_FOR_TESTS(
ut
auth_config_validator_ut
column_shard_config_validator_ut
)
18 changes: 12 additions & 6 deletions ydb/core/formats/arrow/serializer/utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,18 @@
#include <contrib/libs/apache/arrow/cpp/src/arrow/util/compression.h>

namespace NKikimr::NArrow {
bool SupportsCompressionLevel(const arrow::Compression::type compression) {
return arrow::util::Codec::SupportsCompressionLevel(compression);
}

bool SupportsCompressionLevel(const NKikimrSchemeOp::EColumnCodec compression) {
return SupportsCompressionLevel(CompressionFromProto(compression).value());
bool SupportsCompressionLevel(const arrow::Compression::type compression, const std::optional<i32>& compressionLevel) {
if (!arrow::util::Codec::SupportsCompressionLevel(compression)) {
return false;
}
if (compressionLevel.has_value()) {
int minLevel = MinimumCompressionLevel(compression).value();
int maxLevel = MaximumCompressionLevel(compression).value();
if (compressionLevel < minLevel || compressionLevel > maxLevel) {
return false;
}
}
return true;
}

std::optional<int> MinimumCompressionLevel(const arrow::Compression::type compression) {
Expand Down
7 changes: 2 additions & 5 deletions ydb/core/formats/arrow/serializer/utils.h
Original file line number Diff line number Diff line change
@@ -1,15 +1,12 @@
#pragma once

#include <ydb/core/protos/flat_scheme_op.pb.h>

#include <contrib/libs/apache/arrow/cpp/src/arrow/util/type_fwd.h>
#include <util/system/yassert.h>
#include <util/system/types.h>

#include <optional>

namespace NKikimr::NArrow {
bool SupportsCompressionLevel(const arrow::Compression::type compression);
bool SupportsCompressionLevel(const NKikimrSchemeOp::EColumnCodec compression);
bool SupportsCompressionLevel(const arrow::Compression::type compression, const std::optional<i32>& compressionLevel = {});

std::optional<int> MinimumCompressionLevel(const arrow::Compression::type compression);
std::optional<int> MaximumCompressionLevel(const arrow::Compression::type compression);
Expand Down
2 changes: 2 additions & 0 deletions ydb/core/protos/config.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1792,6 +1792,8 @@ message TColumnShardConfig {
optional uint64 WritingBufferVolumeMb = 33 [default = 32];
optional uint64 WritingInFlightRequestsCountLimit = 34 [default = 1000];
optional uint64 WritingInFlightRequestBytesLimit = 35 [default = 128000000];
optional NKikimrSchemeOp.EColumnCodec DefaultCompression = 36;
optional int32 DefaultCompressionLevel = 37;
}

message TSchemeShardConfig {
Expand Down
8 changes: 3 additions & 5 deletions ydb/core/tx/schemeshard/olap/column_families/update.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,12 @@ NKikimr::TConclusion<NKikimrSchemeOp::TOlapColumn::TSerializer> ConvertFamilyDes
<< "` is not support compression level");
}
if (familyDescription.HasColumnCodecLevel()) {
int level = familyDescription.GetColumnCodecLevel();
int minLevel = NArrow::MinimumCompressionLevel(codec.value()).value();
int maxLevel = NArrow::MaximumCompressionLevel(codec.value()).value();
if (level < minLevel || level > maxLevel) {
if (!NArrow::SupportsCompressionLevel(codec.value(), familyDescription.GetColumnCodecLevel())) {
return NKikimr::TConclusionStatus::Fail(TStringBuilder()
<< "family `" << familyDescription.GetName() << "`: incorrect level for codec `"
<< NArrow::CompressionToString(familyDescription.GetColumnCodec()) << "`. expected: ["
<< minLevel << ":" << maxLevel << "]");
<< NArrow::MinimumCompressionLevel(codec.value()).value() << ":"
<< NArrow::MaximumCompressionLevel(codec.value()).value() << "]");
}
}

Expand Down

0 comments on commit fef7e41

Please sign in to comment.