Skip to content

Commit

Permalink
Merge b5b4f35 into 5b9ad59
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanmorozov333 authored Feb 28, 2024
2 parents 5b9ad59 + b5b4f35 commit 2936ca7
Show file tree
Hide file tree
Showing 131 changed files with 1,767 additions and 1,184 deletions.
4 changes: 3 additions & 1 deletion ydb/core/base/blobstorage.h
Original file line number Diff line number Diff line change
Expand Up @@ -1013,14 +1013,16 @@ struct TEvBlobStorage {
bool WrittenBeyondBarrier = false; // was this blob written beyond the barrier?
mutable NLWTrace::TOrbit Orbit;
std::shared_ptr<TExecutionRelay> ExecutionRelay;
const TString StorageId;

TEvPutResult(NKikimrProto::EReplyStatus status, const TLogoBlobID &id, const TStorageStatusFlags statusFlags,
ui32 groupId, float approximateFreeSpaceShare)
ui32 groupId, float approximateFreeSpaceShare, const TString& storageId = Default<TString>())
: Status(status)
, Id(id)
, StatusFlags(statusFlags)
, GroupId(groupId)
, ApproximateFreeSpaceShare(approximateFreeSpaceShare)
, StorageId(storageId)
{}

TString Print(bool isFull) const {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@ TConclusionStatus TAddColumnOperation::DoDeserialize(NYql::TObjectSettingsImpl::
}
ColumnName = *fValue;
}
StorageId = features.Extract("STORAGE_ID");
if (StorageId && !*StorageId) {
return TConclusionStatus::Fail("STORAGE_ID cannot be empty string");
}
{
auto fValue = features.Extract("TYPE");
if (!fValue) {
Expand All @@ -31,6 +35,9 @@ void TAddColumnOperation::DoSerializeScheme(NKikimrSchemeOp::TAlterColumnTableSc
auto column = schemaData.AddAddColumns();
column->SetName(ColumnName);
column->SetType(ColumnType);
if (StorageId) {
column->SetStorageId(*StorageId);
}
column->SetNotNull(NotNull);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ class TAddColumnOperation : public ITableStoreOperation {
private:
TString ColumnName;
TString ColumnType;
std::optional<TString> StorageId;
bool NotNull = false;
public:
TConclusionStatus DoDeserialize(NYql::TObjectSettingsImpl::TFeaturesExtractor& features) override;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@ TConclusionStatus TAlterColumnOperation::DoDeserialize(NYql::TObjectSettingsImpl
}
ColumnName = *fValue;
}
StorageId = features.Extract("STORAGE_ID");
if (StorageId && !*StorageId) {
return TConclusionStatus::Fail("STORAGE_ID cannot be empty string");
}
{
auto result = DictionaryEncodingDiff.DeserializeFromRequestFeatures(features);
if (!result) {
Expand All @@ -28,6 +32,9 @@ TConclusionStatus TAlterColumnOperation::DoDeserialize(NYql::TObjectSettingsImpl
void TAlterColumnOperation::DoSerializeScheme(NKikimrSchemeOp::TAlterColumnTableSchema& schemaData) const {
auto* column = schemaData.AddAlterColumns();
column->SetName(ColumnName);
if (StorageId && !!*StorageId) {
column->SetStorageId(*StorageId);
}
if (!!Serializer) {
Serializer.SerializeToProto(*column->MutableSerializer());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ class TAlterColumnOperation : public ITableStoreOperation {
static inline auto Registrator = TFactory::TRegistrator<TAlterColumnOperation>(GetTypeName());

TString ColumnName;
std::optional<TString> StorageId;

NArrow::NSerialization::TSerializerContainer Serializer;
NArrow::NDictionary::TEncodingDiff DictionaryEncodingDiff;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ TConclusionStatus TUpsertIndexOperation::DoDeserialize(NYql::TObjectSettingsImpl
}
IndexName = *fValue;
}
StorageId = features.Extract("STORAGE_ID");
if (StorageId && !*StorageId) {
return TConclusionStatus::Fail("STORAGE_ID cannot be empty string");
}
TString indexType;
{
auto fValue = features.Extract("TYPE");
Expand Down Expand Up @@ -42,6 +46,9 @@ TConclusionStatus TUpsertIndexOperation::DoDeserialize(NYql::TObjectSettingsImpl

void TUpsertIndexOperation::DoSerializeScheme(NKikimrSchemeOp::TAlterColumnTableSchema& schemaData) const {
auto* indexProto = schemaData.AddUpsertIndexes();
if (StorageId) {
indexProto->SetStorageId(*StorageId);
}
indexProto->SetName(IndexName);
IndexMetaConstructor.SerializeToProto(*indexProto);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ class TUpsertIndexOperation : public ITableStoreOperation {
static inline auto Registrator = TFactory::TRegistrator<TUpsertIndexOperation>(GetTypeName());
private:
TString IndexName;
std::optional<TString> StorageId;
NBackgroundTasks::TInterfaceProtoContainer<NOlap::NIndexes::IIndexMetaConstructor> IndexMetaConstructor;
public:
TConclusionStatus DoDeserialize(NYql::TObjectSettingsImpl::TFeaturesExtractor& features) override;
Expand Down
11 changes: 9 additions & 2 deletions ydb/core/kqp/ut/olap/kqp_olap_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include <ydb/core/testlib/tablet_helpers.h>
#include <ydb/core/testlib/test_client.h>
#include <ydb/core/testlib/cs_helper.h>
#include <ydb/core/wrappers/fake_storage.h>
#include <util/system/sanitizers.h>

#include <fmt/format.h>
Expand Down Expand Up @@ -2127,6 +2128,7 @@ Y_UNIT_TEST_SUITE(KqpOlap) {

TLocalHelper(kikimr).CreateTestOlapTable();
auto tableClient = kikimr.GetTableClient();
auto csController = NYDBTest::TControllers::RegisterCSControllerGuard<NYDBTest::NColumnShard::TController>();

{
WriteTestData(kikimr, "/Root/olapStore/olapTable", 10000, 3000000, 1000);
Expand All @@ -2137,6 +2139,11 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
WriteTestData(kikimr, "/Root/olapStore/olapTable", 20000, 2000000, 7000);
WriteTestData(kikimr, "/Root/olapStore/olapTable", 30000, 1000000, 11000);
}
while (csController->GetIndexations().Val() == 0) {
Cout << "Wait indexation..." << Endl;
Sleep(TDuration::Seconds(2));
}
AFL_VERIFY(Singleton<NWrappers::NExternalStorage::TFakeExternalStorage>()->GetSize());

{
TString query = R"(
Expand Down Expand Up @@ -3637,12 +3644,12 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
auto settings = TKikimrSettings()
.SetWithSampleTables(false);
TKikimrRunner kikimr(settings);
auto csController = NYDBTest::TControllers::RegisterCSControllerGuard<NYDBTest::NColumnShard::TController>();
static ui32 numKinds = 2;

auto csController = NYDBTest::TControllers::RegisterCSControllerGuard<NYDBTest::NColumnShard::TController>();
TLocalHelper(kikimr).CreateTestOlapTable();
for (ui64 i = 0; i < 100; ++i) {
WriteTestData(kikimr, "/Root/olapStore/olapTable", 0, 1000000 + i*10000, 1000);
WriteTestData(kikimr, "/Root/olapStore/olapTable", 0, 1000000 + i * 10000, 1000);
}

auto tableClient = kikimr.GetTableClient();
Expand Down
11 changes: 8 additions & 3 deletions ydb/core/protos/flat_scheme_op.proto
Original file line number Diff line number Diff line change
Expand Up @@ -418,6 +418,7 @@ message TOlapColumnDiff {
optional string Name = 1;
optional TDictionaryEncodingSettings DictionaryEncoding = 4;
optional TOlapColumn.TSerializer Serializer = 5;
optional string StorageId = 6;
}

message TOlapColumnDescription {
Expand All @@ -435,6 +436,7 @@ message TOlapColumnDescription {
optional TCompressionOptions Compression = 8[deprecated = true];
optional TDictionaryEncodingSettings DictionaryEncoding = 9;
optional TOlapColumn.TSerializer Serializer = 10;
optional string StorageId = 11;
}

message TRequestedBloomFilter {
Expand All @@ -445,8 +447,9 @@ message TRequestedBloomFilter {
message TOlapIndexRequested {
optional string Name = 1;
optional TCompressionOptions Compression = 3;
optional string StorageId = 4;

optional string ClassName = 2;
optional string ClassName = 39;
oneof Implementation {
TRequestedBloomFilter BloomFilter = 40;
}
Expand All @@ -465,9 +468,11 @@ message TOlapIndexDescription {
optional string Name = 2;
optional TCompressionOptions Compression = 3;

optional string ClassName = 4;
optional string StorageId = 4;

optional string ClassName = 40;
oneof Implementation {
TBloomFilter BloomFilter = 40;
TBloomFilter BloomFilter = 41;
}
}

Expand Down
4 changes: 2 additions & 2 deletions ydb/core/testlib/cs_helper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -184,9 +184,9 @@ TString THelper::GetTestTableSchema() const {
TStringBuilder sb;
sb << R"(Columns{ Name: "timestamp" Type : "Timestamp" NotNull : true })";
sb << R"(Columns{ Name: "resource_id" Type : "Utf8" })";
sb << R"(Columns{ Name: "uid" Type : "Utf8" })";
sb << R"(Columns{ Name: "uid" Type : "Utf8" StorageId : "__MEMORY" })";
sb << R"(Columns{ Name: "level" Type : "Int32" })";
sb << R"(Columns{ Name: "message" Type : "Utf8" })";
sb << R"(Columns{ Name: "message" Type : "Utf8" StorageId : "__MEMORY" })";
if (GetWithJsonDocument()) {
sb << R"(Columns{ Name: "json_payload" Type : "JsonDocument" })";
}
Expand Down
4 changes: 3 additions & 1 deletion ydb/core/tx/columnshard/blob_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,14 @@ struct TEvBlobCache {
TString Data;
const bool FromCache = false;
const TInstant ConstructTime = Now();
const TString DataSourceId;

TEvReadBlobRangeResult(const TBlobRange& blobRange, NKikimrProto::EReplyStatus status, const TString& data, const bool fromCache = false)
TEvReadBlobRangeResult(const TBlobRange& blobRange, NKikimrProto::EReplyStatus status, const TString& data, const bool fromCache = false, const TString& dataSourceId = Default<TString>())
: BlobRange(blobRange)
, Status(status)
, Data(data)
, FromCache(fromCache)
, DataSourceId(dataSourceId)
{}
};

Expand Down
12 changes: 0 additions & 12 deletions ydb/core/tx/columnshard/blobs_action/abstract/action.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,4 @@

namespace NKikimr::NOlap {

std::shared_ptr<NKikimr::NOlap::IBlobsWritingAction> TBlobsAction::GetWriting(const TPortionInfo& portionInfo) {
return GetStorageAction(portionInfo.GetBlobsStorage()->GetStorageId()).GetWriting(ConsumerId);
}

std::shared_ptr<NKikimr::NOlap::IBlobsReadingAction> TBlobsAction::GetReading(const TPortionInfo& portionInfo) {
return GetStorageAction(portionInfo.GetBlobsStorage()->GetStorageId()).GetReading(ConsumerId);
}

std::shared_ptr<NKikimr::NOlap::IBlobsDeclareRemovingAction> TBlobsAction::GetRemoving(const TPortionInfo& portionInfo) {
return GetStorageAction(portionInfo.GetBlobsStorage()->GetStorageId()).GetRemoving(ConsumerId);
}

}
14 changes: 8 additions & 6 deletions ydb/core/tx/columnshard/blobs_action/abstract/action.h
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,14 @@ class TBlobsAction {

}

TString GetStorageIds() const {
TStringBuilder sb;
for (auto&& i : StorageActions) {
sb << i.first << ",";
}
return sb;
}

ui32 GetWritingBlobsCount() const {
ui32 result = 0;
for (auto&& [_, action] : StorageActions) {
Expand Down Expand Up @@ -160,20 +168,14 @@ class TBlobsAction {
return GetStorageAction(storageId).GetRemoving(ConsumerId);
}

std::shared_ptr<IBlobsDeclareRemovingAction> GetRemoving(const TPortionInfo& portionInfo);

std::shared_ptr<IBlobsWritingAction> GetWriting(const TString& storageId) {
return GetStorageAction(storageId).GetWriting(ConsumerId);
}

std::shared_ptr<IBlobsWritingAction> GetWriting(const TPortionInfo& portionInfo);

std::shared_ptr<IBlobsReadingAction> GetReading(const TString& storageId) {
return GetStorageAction(storageId).GetReading(ConsumerId);
}

std::shared_ptr<IBlobsReadingAction> GetReading(const TPortionInfo& portionInfo);

};

}
4 changes: 4 additions & 0 deletions ydb/core/tx/columnshard/blobs_action/abstract/blob_set.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ class TTabletsByBlob {
private:
THashMap<TUnifiedBlobId, THashSet<TTabletId>> Data;
public:
void Clear() {
Data.clear();
}

NKikimrColumnShardBlobOperationsProto::TTabletsByBlob SerializeToProto() const;

TConclusionStatus DeserializeFromProto(const NKikimrColumnShardBlobOperationsProto::TTabletsByBlob& proto);
Expand Down
58 changes: 26 additions & 32 deletions ydb/core/tx/columnshard/blobs_action/abstract/read.cpp
Original file line number Diff line number Diff line change
@@ -1,57 +1,40 @@
#include "read.h"
#include <ydb/library/actors/core/log.h>
#include <util/string/join.h>

namespace NKikimr::NOlap {

void IBlobsReadingAction::StartReading(THashMap<TUnifiedBlobId, THashSet<TBlobRange>>&& ranges) {
void IBlobsReadingAction::StartReading(THashSet<TBlobRange>&& ranges) {
AFL_VERIFY(ranges.size());
AFL_VERIFY(Counters);
for (auto&& i : ranges) {
AFL_VERIFY(i.second.size());
for (auto&& br : i.second) {
Counters->OnRequest(br.Size);
}
Counters->OnRequest(i.Size);
}
return DoStartReading(ranges);
}

void IBlobsReadingAction::ExtractBlobsDataTo(THashMap<TBlobRange, TString>& result) {
AFL_VERIFY(Started);
if (result.empty()) {
std::swap(result, Replies);
} else {
for (auto&& i : Replies) {
AFL_VERIFY(result.emplace(i.first, std::move(i.second)).second);
}
Replies.clear();
}
RangesForResult.clear();
return DoStartReading(std::move(ranges));
}

void IBlobsReadingAction::Start(const THashSet<TBlobRange>& rangesInProgress) {
Y_ABORT_UNLESS(!Started);
Started = true;

Y_ABORT_UNLESS(RangesForRead.size() + RangesForResult.size());
StartWaitingRanges = TMonotonic::Now();
for (auto&& i : RangesForRead) {
WaitingRangesCount += i.second.size();
}
THashMap<TUnifiedBlobId, THashSet<TBlobRange>> rangesFiltered;
WaitingRangesCount = RangesForRead.size();
THashSet<TBlobRange> rangesFiltered;
if (rangesInProgress.empty()) {
rangesFiltered = RangesForRead;
} else {
for (auto&& i : RangesForRead) {
for (auto&& r : i.second) {
if (!rangesInProgress.contains(r)) {
rangesFiltered[r.BlobId].emplace(r);
}
for (auto&& r : RangesForRead) {
if (!rangesInProgress.contains(r)) {
rangesFiltered.emplace(r);
}
}
}
if (rangesFiltered.size()) {
StartReading(std::move(rangesFiltered));
}
Started = true;
for (auto&& i : RangesForResult) {
AFL_VERIFY(i.second.size() == i.first.Size);
AFL_VERIFY(Replies.emplace(i.first, i.second).second);
}
}
Expand All @@ -60,6 +43,7 @@ void IBlobsReadingAction::OnReadResult(const TBlobRange& range, const TString& d
AFL_VERIFY(Counters);
AFL_VERIFY(--WaitingRangesCount >= 0);
Counters->OnReply(range.Size, TMonotonic::Now() - StartWaitingRanges);
AFL_VERIFY(data.size() == range.Size);
Replies.emplace(range, data);
}

Expand All @@ -70,13 +54,23 @@ void IBlobsReadingAction::OnReadError(const TBlobRange& range, const TErrorStatu
Fails.emplace(range, replyStatus);
}

void IBlobsReadingAction::AddRange(const TBlobRange& range, const TString& result /*= Default<TString>()*/) {
void IBlobsReadingAction::AddRange(const TBlobRange& range, const std::optional<TString>& result /*= {}*/) {
Y_ABORT_UNLESS(!Started);
if (!result) {
AFL_VERIFY(RangesForRead[range.BlobId].emplace(range).second)("range", range.ToString());
AFL_VERIFY(!RangesForResult.contains(range));
AFL_VERIFY(RangesForRead.emplace(range).second)("range", range.ToString());
} else {
AFL_VERIFY(RangesForResult.emplace(range, result).second)("range", range.ToString());
AFL_VERIFY(result->size() == range.Size);
AFL_VERIFY(RangesForResult.emplace(range, *result).second)("range", range.ToString());
}
}

TString TActionReadBlobs::DebugString() const {
THashSet<TBlobRange> ranges;
for (auto&& i : Blobs) {
ranges.emplace(i.first);
}
return JoinSeq(",", ranges);
}

}
Loading

0 comments on commit 2936ca7

Please sign in to comment.