Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

use different storages per columns #2274

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion ydb/core/base/blobstorage.h
Original file line number Diff line number Diff line change
Expand Up @@ -1013,14 +1013,16 @@ struct TEvBlobStorage {
bool WrittenBeyondBarrier = false; // was this blob written beyond the barrier?
mutable NLWTrace::TOrbit Orbit;
std::shared_ptr<TExecutionRelay> ExecutionRelay;
const TString StorageId;

TEvPutResult(NKikimrProto::EReplyStatus status, const TLogoBlobID &id, const TStorageStatusFlags statusFlags,
ui32 groupId, float approximateFreeSpaceShare)
ui32 groupId, float approximateFreeSpaceShare, const TString& storageId = Default<TString>())
: Status(status)
, Id(id)
, StatusFlags(statusFlags)
, GroupId(groupId)
, ApproximateFreeSpaceShare(approximateFreeSpaceShare)
, StorageId(storageId)
{}

TString Print(bool isFull) const {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@ TConclusionStatus TAddColumnOperation::DoDeserialize(NYql::TObjectSettingsImpl::
}
ColumnName = *fValue;
}
StorageId = features.Extract("STORAGE_ID");
if (StorageId && !*StorageId) {
return TConclusionStatus::Fail("STORAGE_ID cannot be empty string");
}
{
auto fValue = features.Extract("TYPE");
if (!fValue) {
Expand All @@ -31,6 +35,9 @@ void TAddColumnOperation::DoSerializeScheme(NKikimrSchemeOp::TAlterColumnTableSc
auto column = schemaData.AddAddColumns();
column->SetName(ColumnName);
column->SetType(ColumnType);
if (StorageId) {
column->SetStorageId(*StorageId);
}
column->SetNotNull(NotNull);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ class TAddColumnOperation : public ITableStoreOperation {
private:
TString ColumnName;
TString ColumnType;
std::optional<TString> StorageId;
bool NotNull = false;
public:
TConclusionStatus DoDeserialize(NYql::TObjectSettingsImpl::TFeaturesExtractor& features) override;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@ TConclusionStatus TAlterColumnOperation::DoDeserialize(NYql::TObjectSettingsImpl
}
ColumnName = *fValue;
}
StorageId = features.Extract("STORAGE_ID");
if (StorageId && !*StorageId) {
return TConclusionStatus::Fail("STORAGE_ID cannot be empty string");
}
{
auto result = DictionaryEncodingDiff.DeserializeFromRequestFeatures(features);
if (!result) {
Expand All @@ -28,6 +32,9 @@ TConclusionStatus TAlterColumnOperation::DoDeserialize(NYql::TObjectSettingsImpl
void TAlterColumnOperation::DoSerializeScheme(NKikimrSchemeOp::TAlterColumnTableSchema& schemaData) const {
auto* column = schemaData.AddAlterColumns();
column->SetName(ColumnName);
if (StorageId && !!*StorageId) {
column->SetStorageId(*StorageId);
}
if (!!Serializer) {
Serializer.SerializeToProto(*column->MutableSerializer());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ class TAlterColumnOperation : public ITableStoreOperation {
static inline auto Registrator = TFactory::TRegistrator<TAlterColumnOperation>(GetTypeName());

TString ColumnName;
std::optional<TString> StorageId;

NArrow::NSerialization::TSerializerContainer Serializer;
NArrow::NDictionary::TEncodingDiff DictionaryEncodingDiff;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ TConclusionStatus TUpsertIndexOperation::DoDeserialize(NYql::TObjectSettingsImpl
}
IndexName = *fValue;
}
StorageId = features.Extract("STORAGE_ID");
if (StorageId && !*StorageId) {
return TConclusionStatus::Fail("STORAGE_ID cannot be empty string");
}
TString indexType;
{
auto fValue = features.Extract("TYPE");
Expand Down Expand Up @@ -42,6 +46,9 @@ TConclusionStatus TUpsertIndexOperation::DoDeserialize(NYql::TObjectSettingsImpl

void TUpsertIndexOperation::DoSerializeScheme(NKikimrSchemeOp::TAlterColumnTableSchema& schemaData) const {
auto* indexProto = schemaData.AddUpsertIndexes();
if (StorageId) {
indexProto->SetStorageId(*StorageId);
}
indexProto->SetName(IndexName);
IndexMetaConstructor.SerializeToProto(*indexProto);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ class TUpsertIndexOperation : public ITableStoreOperation {
static inline auto Registrator = TFactory::TRegistrator<TUpsertIndexOperation>(GetTypeName());
private:
TString IndexName;
std::optional<TString> StorageId;
NBackgroundTasks::TInterfaceProtoContainer<NOlap::NIndexes::IIndexMetaConstructor> IndexMetaConstructor;
public:
TConclusionStatus DoDeserialize(NYql::TObjectSettingsImpl::TFeaturesExtractor& features) override;
Expand Down
11 changes: 9 additions & 2 deletions ydb/core/kqp/ut/olap/kqp_olap_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include <ydb/core/testlib/tablet_helpers.h>
#include <ydb/core/testlib/test_client.h>
#include <ydb/core/testlib/cs_helper.h>
#include <ydb/core/wrappers/fake_storage.h>
#include <util/system/sanitizers.h>

#include <fmt/format.h>
Expand Down Expand Up @@ -2127,6 +2128,7 @@ Y_UNIT_TEST_SUITE(KqpOlap) {

TLocalHelper(kikimr).CreateTestOlapTable();
auto tableClient = kikimr.GetTableClient();
auto csController = NYDBTest::TControllers::RegisterCSControllerGuard<NYDBTest::NColumnShard::TController>();

{
WriteTestData(kikimr, "/Root/olapStore/olapTable", 10000, 3000000, 1000);
Expand All @@ -2137,6 +2139,11 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
WriteTestData(kikimr, "/Root/olapStore/olapTable", 20000, 2000000, 7000);
WriteTestData(kikimr, "/Root/olapStore/olapTable", 30000, 1000000, 11000);
}
while (csController->GetIndexations().Val() == 0) {
Cout << "Wait indexation..." << Endl;
Sleep(TDuration::Seconds(2));
}
AFL_VERIFY(Singleton<NWrappers::NExternalStorage::TFakeExternalStorage>()->GetSize());

{
TString query = R"(
Expand Down Expand Up @@ -3637,12 +3644,12 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
auto settings = TKikimrSettings()
.SetWithSampleTables(false);
TKikimrRunner kikimr(settings);
auto csController = NYDBTest::TControllers::RegisterCSControllerGuard<NYDBTest::NColumnShard::TController>();
static ui32 numKinds = 2;

auto csController = NYDBTest::TControllers::RegisterCSControllerGuard<NYDBTest::NColumnShard::TController>();
TLocalHelper(kikimr).CreateTestOlapTable();
for (ui64 i = 0; i < 100; ++i) {
WriteTestData(kikimr, "/Root/olapStore/olapTable", 0, 1000000 + i*10000, 1000);
WriteTestData(kikimr, "/Root/olapStore/olapTable", 0, 1000000 + i * 10000, 1000);
}

auto tableClient = kikimr.GetTableClient();
Expand Down
11 changes: 8 additions & 3 deletions ydb/core/protos/flat_scheme_op.proto
Original file line number Diff line number Diff line change
Expand Up @@ -418,6 +418,7 @@ message TOlapColumnDiff {
optional string Name = 1;
optional TDictionaryEncodingSettings DictionaryEncoding = 4;
optional TOlapColumn.TSerializer Serializer = 5;
optional string StorageId = 6;
}

message TOlapColumnDescription {
Expand All @@ -435,6 +436,7 @@ message TOlapColumnDescription {
optional TCompressionOptions Compression = 8[deprecated = true];
optional TDictionaryEncodingSettings DictionaryEncoding = 9;
optional TOlapColumn.TSerializer Serializer = 10;
optional string StorageId = 11;
}

message TRequestedBloomFilter {
Expand All @@ -445,8 +447,9 @@ message TRequestedBloomFilter {
message TOlapIndexRequested {
optional string Name = 1;
optional TCompressionOptions Compression = 3;
optional string StorageId = 4;

optional string ClassName = 2;
optional string ClassName = 39;
oneof Implementation {
TRequestedBloomFilter BloomFilter = 40;
}
Expand All @@ -465,9 +468,11 @@ message TOlapIndexDescription {
optional string Name = 2;
optional TCompressionOptions Compression = 3;

optional string ClassName = 4;
optional string StorageId = 4;

optional string ClassName = 40;
oneof Implementation {
TBloomFilter BloomFilter = 40;
TBloomFilter BloomFilter = 41;
}
}

Expand Down
4 changes: 2 additions & 2 deletions ydb/core/testlib/cs_helper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -183,9 +183,9 @@ TString THelper::GetTestTableSchema() const {
TStringBuilder sb;
sb << R"(Columns{ Name: "timestamp" Type : "Timestamp" NotNull : true })";
sb << R"(Columns{ Name: "resource_id" Type : "Utf8" })";
sb << R"(Columns{ Name: "uid" Type : "Utf8" })";
sb << "Columns{ Name: \"uid\" Type : \"Utf8\" StorageId : \"" + OptionalStorageId + "\" }";
sb << R"(Columns{ Name: "level" Type : "Int32" })";
sb << R"(Columns{ Name: "message" Type : "Utf8" })";
sb << "Columns{ Name: \"message\" Type : \"Utf8\" StorageId : \"" + OptionalStorageId + "\" }";
if (GetWithJsonDocument()) {
sb << R"(Columns{ Name: "json_payload" Type : "JsonDocument" })";
}
Expand Down
1 change: 1 addition & 0 deletions ydb/core/testlib/cs_helper.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ class THelper: public THelperSchemaless {

std::shared_ptr<arrow::Schema> GetArrowSchema() const;
YDB_FLAG_ACCESSOR(WithJsonDocument, false);
YDB_ACCESSOR(TString, OptionalStorageId, "__MEMORY");
TString ShardingMethod = "HASH_FUNCTION_CONSISTENCY_64";
bool WithSomeNulls_ = false;
protected:
Expand Down
4 changes: 3 additions & 1 deletion ydb/core/tx/columnshard/blob_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,14 @@ struct TEvBlobCache {
TString Data;
const bool FromCache = false;
const TInstant ConstructTime = Now();
const TString DataSourceId;

TEvReadBlobRangeResult(const TBlobRange& blobRange, NKikimrProto::EReplyStatus status, const TString& data, const bool fromCache = false)
TEvReadBlobRangeResult(const TBlobRange& blobRange, NKikimrProto::EReplyStatus status, const TString& data, const bool fromCache = false, const TString& dataSourceId = Default<TString>())
: BlobRange(blobRange)
, Status(status)
, Data(data)
, FromCache(fromCache)
, DataSourceId(dataSourceId)
{}
};

Expand Down
12 changes: 0 additions & 12 deletions ydb/core/tx/columnshard/blobs_action/abstract/action.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,4 @@

namespace NKikimr::NOlap {

std::shared_ptr<NKikimr::NOlap::IBlobsWritingAction> TBlobsAction::GetWriting(const TPortionInfo& portionInfo) {
return GetStorageAction(portionInfo.GetBlobsStorage()->GetStorageId()).GetWriting(ConsumerId);
}

std::shared_ptr<NKikimr::NOlap::IBlobsReadingAction> TBlobsAction::GetReading(const TPortionInfo& portionInfo) {
return GetStorageAction(portionInfo.GetBlobsStorage()->GetStorageId()).GetReading(ConsumerId);
}

std::shared_ptr<NKikimr::NOlap::IBlobsDeclareRemovingAction> TBlobsAction::GetRemoving(const TPortionInfo& portionInfo) {
return GetStorageAction(portionInfo.GetBlobsStorage()->GetStorageId()).GetRemoving(ConsumerId);
}

}
14 changes: 8 additions & 6 deletions ydb/core/tx/columnshard/blobs_action/abstract/action.h
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,14 @@ class TBlobsAction {

}

TString GetStorageIds() const {
TStringBuilder sb;
for (auto&& i : StorageActions) {
sb << i.first << ",";
}
return sb;
}

ui32 GetWritingBlobsCount() const {
ui32 result = 0;
for (auto&& [_, action] : StorageActions) {
Expand Down Expand Up @@ -160,20 +168,14 @@ class TBlobsAction {
return GetStorageAction(storageId).GetRemoving(ConsumerId);
}

std::shared_ptr<IBlobsDeclareRemovingAction> GetRemoving(const TPortionInfo& portionInfo);

std::shared_ptr<IBlobsWritingAction> GetWriting(const TString& storageId) {
return GetStorageAction(storageId).GetWriting(ConsumerId);
}

std::shared_ptr<IBlobsWritingAction> GetWriting(const TPortionInfo& portionInfo);

std::shared_ptr<IBlobsReadingAction> GetReading(const TString& storageId) {
return GetStorageAction(storageId).GetReading(ConsumerId);
}

std::shared_ptr<IBlobsReadingAction> GetReading(const TPortionInfo& portionInfo);

};

}
4 changes: 4 additions & 0 deletions ydb/core/tx/columnshard/blobs_action/abstract/blob_set.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ class TTabletsByBlob {
private:
THashMap<TUnifiedBlobId, THashSet<TTabletId>> Data;
public:
void Clear() {
Data.clear();
}

NKikimrColumnShardBlobOperationsProto::TTabletsByBlob SerializeToProto() const;

TConclusionStatus DeserializeFromProto(const NKikimrColumnShardBlobOperationsProto::TTabletsByBlob& proto);
Expand Down
58 changes: 26 additions & 32 deletions ydb/core/tx/columnshard/blobs_action/abstract/read.cpp
Original file line number Diff line number Diff line change
@@ -1,57 +1,40 @@
#include "read.h"
#include <ydb/library/actors/core/log.h>
#include <util/string/join.h>

namespace NKikimr::NOlap {

void IBlobsReadingAction::StartReading(THashMap<TUnifiedBlobId, THashSet<TBlobRange>>&& ranges) {
void IBlobsReadingAction::StartReading(THashSet<TBlobRange>&& ranges) {
AFL_VERIFY(ranges.size());
AFL_VERIFY(Counters);
for (auto&& i : ranges) {
AFL_VERIFY(i.second.size());
for (auto&& br : i.second) {
Counters->OnRequest(br.Size);
}
Counters->OnRequest(i.Size);
}
return DoStartReading(ranges);
}

void IBlobsReadingAction::ExtractBlobsDataTo(THashMap<TBlobRange, TString>& result) {
AFL_VERIFY(Started);
if (result.empty()) {
std::swap(result, Replies);
} else {
for (auto&& i : Replies) {
AFL_VERIFY(result.emplace(i.first, std::move(i.second)).second);
}
Replies.clear();
}
RangesForResult.clear();
return DoStartReading(std::move(ranges));
}

void IBlobsReadingAction::Start(const THashSet<TBlobRange>& rangesInProgress) {
Y_ABORT_UNLESS(!Started);
Started = true;

Y_ABORT_UNLESS(RangesForRead.size() + RangesForResult.size());
StartWaitingRanges = TMonotonic::Now();
for (auto&& i : RangesForRead) {
WaitingRangesCount += i.second.size();
}
THashMap<TUnifiedBlobId, THashSet<TBlobRange>> rangesFiltered;
WaitingRangesCount = RangesForRead.size();
THashSet<TBlobRange> rangesFiltered;
if (rangesInProgress.empty()) {
rangesFiltered = RangesForRead;
} else {
for (auto&& i : RangesForRead) {
for (auto&& r : i.second) {
if (!rangesInProgress.contains(r)) {
rangesFiltered[r.BlobId].emplace(r);
}
for (auto&& r : RangesForRead) {
if (!rangesInProgress.contains(r)) {
rangesFiltered.emplace(r);
}
}
}
if (rangesFiltered.size()) {
StartReading(std::move(rangesFiltered));
}
Started = true;
for (auto&& i : RangesForResult) {
AFL_VERIFY(i.second.size() == i.first.Size);
AFL_VERIFY(Replies.emplace(i.first, i.second).second);
}
}
Expand All @@ -60,6 +43,7 @@ void IBlobsReadingAction::OnReadResult(const TBlobRange& range, const TString& d
AFL_VERIFY(Counters);
AFL_VERIFY(--WaitingRangesCount >= 0);
Counters->OnReply(range.Size, TMonotonic::Now() - StartWaitingRanges);
AFL_VERIFY(data.size() == range.Size);
Replies.emplace(range, data);
}

Expand All @@ -70,13 +54,23 @@ void IBlobsReadingAction::OnReadError(const TBlobRange& range, const TErrorStatu
Fails.emplace(range, replyStatus);
}

void IBlobsReadingAction::AddRange(const TBlobRange& range, const TString& result /*= Default<TString>()*/) {
void IBlobsReadingAction::AddRange(const TBlobRange& range, const std::optional<TString>& result /*= {}*/) {
Y_ABORT_UNLESS(!Started);
if (!result) {
AFL_VERIFY(RangesForRead[range.BlobId].emplace(range).second)("range", range.ToString());
AFL_VERIFY(!RangesForResult.contains(range));
AFL_VERIFY(RangesForRead.emplace(range).second)("range", range.ToString());
} else {
AFL_VERIFY(RangesForResult.emplace(range, result).second)("range", range.ToString());
AFL_VERIFY(result->size() == range.Size);
AFL_VERIFY(RangesForResult.emplace(range, *result).second)("range", range.ToString());
}
}

TString TActionReadBlobs::DebugString() const {
THashSet<TBlobRange> ranges;
for (auto&& i : Blobs) {
ranges.emplace(i.first);
}
return JoinSeq(",", ranges);
}

}
Loading
Loading