Skip to content

Commit

Permalink
Merge f8903b2 into 855f39a
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanmorozov333 authored Sep 28, 2024
2 parents 855f39a + f8903b2 commit 8499e32
Show file tree
Hide file tree
Showing 6 changed files with 87 additions and 12 deletions.
18 changes: 18 additions & 0 deletions ydb/core/formats/arrow/accessor/abstract/constructor.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,15 @@ class IConstructor {
virtual TString DoDebugString() const {
return "";
}
virtual bool DoIsEqualWithSameTypeTo(const IConstructor& item) const = 0;

public:
virtual ~IConstructor() = default;

bool IsEqualWithSameTypeTo(const IConstructor& item) const {
return DoIsEqualWithSameTypeTo(item);
}

TString DebugString() const {
return TStringBuilder() << GetClassName() << ":" << DoDebugString();
}
Expand Down Expand Up @@ -69,6 +74,19 @@ class TConstructorContainer: public NBackgroundTasks::TInterfaceProtoContainer<I
public:
using TBase::TBase;

bool IsEqualTo(const TConstructorContainer& item) const {
if (!GetObjectPtr() && !item.GetObjectPtr()) {
return true;
} else if (!!GetObjectPtr() && !!item.GetObjectPtr()) {
if (GetObjectPtr()->GetClassName() != item.GetObjectPtr()->GetClassName()) {
return false;
}
return GetObjectPtr()->IsEqualWithSameTypeTo(*item.GetObjectPtr());
} else {
return false;
}
}

static TConstructorContainer GetDefaultConstructor();
};

Expand Down
5 changes: 5 additions & 0 deletions ydb/core/formats/arrow/accessor/plain/constructor.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,11 @@ class TConstructor: public IConstructor {

private:
static inline auto Registrator = TFactory::TRegistrator<TConstructor>(GetClassNameStatic());

virtual bool DoIsEqualWithSameTypeTo(const IConstructor& /*item*/) const override {
return true;
}

virtual TConclusion<std::shared_ptr<NArrow::NAccessor::IChunkedArray>> DoConstruct(
const std::shared_ptr<arrow::RecordBatch>& originalData, const TChunkConstructionData& externalInfo) const override;
virtual NKikimrArrowAccessorProto::TConstructor DoSerializeToProto() const override;
Expand Down
5 changes: 5 additions & 0 deletions ydb/core/formats/arrow/accessor/sparsed/constructor.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,11 @@ class TConstructor: public IConstructor {

private:
static inline auto Registrator = TFactory::TRegistrator<TConstructor>(GetClassNameStatic());

virtual bool DoIsEqualWithSameTypeTo(const IConstructor& /*item*/) const override {
return true;
}

virtual TConclusion<std::shared_ptr<NArrow::NAccessor::IChunkedArray>> DoConstruct(
const std::shared_ptr<arrow::RecordBatch>& originalData, const TChunkConstructionData& externalInfo) const override;
virtual NKikimrArrowAccessorProto::TConstructor DoSerializeToProto() const override;
Expand Down
15 changes: 15 additions & 0 deletions ydb/core/formats/arrow/save_load/loader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,4 +65,19 @@ std::shared_ptr<NKikimr::NArrow::NAccessor::IChunkedArray> TColumnLoader::BuildD
return AccessorConstructor->ConstructDefault(TChunkConstructionData(recordsCount, DefaultValue, ResultField->type())).DetachResult();
}

bool TColumnLoader::IsEqualTo(const TColumnLoader& item) const {
if (!!Transformer != !!item.Transformer) {
return false;
} else if (!!Transformer && !Transformer->IsEqualTo(*item.Transformer)) {
return false;
}
if (!Serializer.IsEqualTo(item.Serializer)) {
return false;
}
if (!AccessorConstructor.IsEqualTo(item.AccessorConstructor)) {
return false;
}
return true;
}

} // namespace NKikimr::NArrow::NAccessor
12 changes: 1 addition & 11 deletions ydb/core/formats/arrow/save_load/loader.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,7 @@ class TColumnLoader {
public:
std::shared_ptr<IChunkedArray> BuildDefaultAccessor(const ui32 recordsCount) const;

bool IsEqualTo(const TColumnLoader& item) const {
if (!!Transformer != !!item.Transformer) {
return false;
} else if (!!Transformer && !Transformer->IsEqualTo(*item.Transformer)) {
return false;
}
if (!Serializer.IsEqualTo(item.Serializer)) {
return false;
}
return true;
}
bool IsEqualTo(const TColumnLoader& item) const;

TString DebugString() const;

Expand Down
44 changes: 43 additions & 1 deletion ydb/core/kqp/ut/olap/sparsed_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ Y_UNIT_TEST_SUITE(KqpOlapSparsed) {
const TString StoreName;
ui32 MultiColumnRepCount = 100;
static const ui32 SKIP_GROUPS = 7;
const TVector<TString> FIELD_NAMES{"utf", "int", "uint", "float", "double"};
const TVector<TString> FIELD_NAMES{ "utf", "int", "uint", "float", "double" };
public:
TSparsedDataTest(const TString& storeName)
: Kikimr(Settings)
Expand Down Expand Up @@ -302,6 +302,48 @@ Y_UNIT_TEST_SUITE(KqpOlapSparsed) {
TSparsedDataTest test("");
test.Execute();
}

Y_UNIT_TEST(AccessorActualization) {
auto settings = TKikimrSettings().SetWithSampleTables(false);
TKikimrRunner kikimr(settings);

auto csController = NYDBTest::TControllers::RegisterCSControllerGuard<NYDBTest::NColumnShard::TController>();
csController->SetOverridePeriodicWakeupActivationPeriod(TDuration::Seconds(1));
csController->SetOverrideLagForCompactionBeforeTierings(TDuration::Seconds(1));
csController->SetOverrideReduceMemoryIntervalLimit(1LLU << 30);

TLocalHelper(kikimr).CreateTestOlapTable();
auto tableClient = kikimr.GetTableClient();

auto session = tableClient.CreateSession().GetValueSync().GetSession();
Tests::NCommon::TLoggerInit(kikimr).SetComponents({ NKikimrServices::TX_COLUMNSHARD }, "CS").SetPriority(NActors::NLog::PRI_DEBUG).Initialize();

for (ui32 i = 0; i < 10; ++i) {
WriteTestData(kikimr, "/Root/olapStore/olapTable", 1000000, 300000000 + i * 1000, 10000);
}
csController->WaitIndexation(TDuration::Seconds(3));
csController->WaitCompactions(TDuration::Seconds(3));

{
auto result = session.ExecuteSchemeQuery("ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=ALTER_COLUMN, NAME=uid, `DATA_ACCESSOR_CONSTRUCTOR.CLASS_NAME`=`SPARSED`)").GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
}

{
auto result = session.ExecuteSchemeQuery("ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_OPTIONS, SCHEME_NEED_ACTUALIZATION=`true`)").GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
}
csController->WaitActualization(TDuration::Seconds(5));
{
auto it = tableClient.StreamExecuteScanQuery(R"(
--!syntax_v1
SELECT count(uid) FROM `/Root/olapStore/olapTable`
)").GetValueSync();
UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString());
Cerr << StreamResultToYson(it) << Endl;
}
}

}

} // namespace

0 comments on commit 8499e32

Please sign in to comment.