Skip to content

Commit

Permalink
Clean max scalar (ydb-platform#10826)
Browse files Browse the repository at this point in the history
Conflicts:
	.github/config/muted_ya.txt
	ydb/core/tx/schemeshard/olap/schema/schema.cpp
  • Loading branch information
ivanmorozov333 authored and zverevgeny committed Jan 5, 2025
1 parent fa5af2c commit 2032ce6
Show file tree
Hide file tree
Showing 21 changed files with 282 additions and 195 deletions.
13 changes: 13 additions & 0 deletions .github/config/muted_ya.txt
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ ydb/core/kqp/ut/service KqpQueryService.ExecuteQueryPgTableSelect
ydb/core/kqp/ut/service KqpQueryService.QueryOnClosedSession
ydb/core/kqp/ut/service KqpQueryService.TableSink_OltpUpdate
ydb/core/kqp/ut/service KqpService.CloseSessionsWithLoad
<<<<<<< HEAD
ydb/core/kqp/ut/service [38/50]*
ydb/core/kqp/ut/service KqpQueryService.TableSink_OltpReplace+HasSecondaryIndex
ydb/core/kqp/ut/spilling KqpScanSpilling.HandleErrorsCorrectly
Expand All @@ -53,6 +54,18 @@ ydb/core/persqueue/ut/ut_with_sdk [7/10] chunk chunk
ydb/core/persqueue/ut/ut_with_sdk [8/10] chunk chunk
ydb/core/tx/coordinator/ut Coordinator.RestoreTenantConfiguration
ydb/core/tx/datashard/ut_change_exchange Cdc.InitialScanDebezium
=======
ydb/core/kqp/ut/service [*/*] chunk chunk
ydb/core/kqp/ut/service [*/*]+chunk+chunk
ydb/services/ydb/ut YdbLogStore.AlterLogTable
ydb/core/mind/hive/ut THiveTest.DrainWithHiveRestart
ydb/core/persqueue/ut [*/*] chunk chunk
ydb/core/quoter/ut QuoterWithKesusTest.PrefetchCoefficient
ydb/core/tx/columnshard/ut_rw Normalizers.CleanEmptyPortionsNormalizer
ydb/core/tx/schemeshard/ut_move_reboots TSchemeShardMoveRebootsTest.WithData
ydb/core/tx/schemeshard/ut_move_reboots TSchemeShardMoveRebootsTest.WithDataAndPersistentPartitionStats
ydb/core/tx/schemeshard/ut_pq_reboots TPqGroupTestReboots.AlterWithReboots-PQConfigTransactionsAtSchemeShard-false
>>>>>>> e2ee1b2b1a (Clean max scalar (#10826))
ydb/core/tx/schemeshard/ut_restore TImportTests.ShouldSucceedOnManyTables
ydb/core/tx/schemeshard/ut_split_merge TSchemeShardSplitBySizeTest.Merge1KShards
ydb/core/tx/tiering/ut ColumnShardTiers.TTLUsage
Expand Down
57 changes: 51 additions & 6 deletions ydb/core/kqp/ut/olap/statistics_ut.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#include "helpers/typed_local.h"

#include <ydb/core/tx/columnshard/hooks/testing/controller.h>

namespace NKikimr::NKqp {
Expand All @@ -14,32 +15,75 @@ Y_UNIT_TEST_SUITE(KqpOlapStatistics) {
helper.CreateTestOlapTable();
auto tableClient = kikimr.GetTableClient();
{
auto alterQuery = TStringBuilder() << R"(ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_INDEX, NAME=max_pk_int, TYPE=MAX, FEATURES=`{\"column_name\": \"pk_int\"}`))";
auto alterQuery =
TStringBuilder()
<< R"(ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_INDEX, NAME=max_pk_int, TYPE=MAX, FEATURES=`{\"column_name\": \"pk_int\"}`))";
auto session = tableClient.CreateSession().GetValueSync().GetSession();
auto alterResult = session.ExecuteSchemeQuery(alterQuery).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(alterResult.GetStatus(), NYdb::EStatus::SUCCESS, alterResult.GetIssues().ToString());
}
{
auto alterQuery = TStringBuilder() << "ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_INDEX, NAME=max_pk_int, TYPE=MAX, FEATURES=`{\"column_name\": \"field\"}`);";
auto alterQuery = TStringBuilder() << "ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_INDEX, "
"NAME=max_pk_int, TYPE=MAX, FEATURES=`{\"column_name\": \"field\"}`);";
auto session = tableClient.CreateSession().GetValueSync().GetSession();
auto alterResult = session.ExecuteSchemeQuery(alterQuery).GetValueSync();
UNIT_ASSERT_VALUES_UNEQUAL_C(alterResult.GetStatus(), NYdb::EStatus::SUCCESS, alterResult.GetIssues().ToString());
}
{
auto alterQuery = TStringBuilder() << "ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_INDEX, NAME=max_pk_int, TYPE=MAX, FEATURES=`{\"column_name\": \"pk_int\"}`);";
auto alterQuery = TStringBuilder() << "ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_INDEX, "
"NAME=max_pk_int, TYPE=MAX, FEATURES=`{\"column_name\": \"pk_int\"}`);";
auto session = tableClient.CreateSession().GetValueSync().GetSession();
auto alterResult = session.ExecuteSchemeQuery(alterQuery).GetValueSync();
UNIT_ASSERT_VALUES_UNEQUAL_C(alterResult.GetStatus(), NYdb::EStatus::SUCCESS, alterResult.GetIssues().ToString());
}
{
auto alterQuery = TStringBuilder() << "ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=DROP_INDEX, NAME=max_pk_int);";
auto alterQuery = TStringBuilder()
<< "ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=DROP_INDEX, NAME=max_pk_int);";
auto session = tableClient.CreateSession().GetValueSync().GetSession();
auto alterResult = session.ExecuteSchemeQuery(alterQuery).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(alterResult.GetStatus(), NYdb::EStatus::SUCCESS, alterResult.GetIssues().ToString());
}
}
}

Y_UNIT_TEST(StatsUsageNotPK) {
auto csController = NYDBTest::TControllers::RegisterCSControllerGuard<NYDBTest::NColumnShard::TController>();
{
auto settings = TKikimrSettings().SetWithSampleTables(false);
TKikimrRunner kikimr(settings);
Tests::NCommon::TLoggerInit(kikimr).Initialize();
TTypedLocalHelper helper("Utf8", kikimr);
helper.CreateTestOlapTable();
auto tableClient = kikimr.GetTableClient();
{
auto alterQuery = TStringBuilder() << "ALTER TABLE `/Root/olapStore/olapTable` SET (TTL = Interval(\"P1D\") ON ts);";
auto session = tableClient.CreateSession().GetValueSync().GetSession();
auto alterResult = session.ExecuteSchemeQuery(alterQuery).GetValueSync();
UNIT_ASSERT_VALUES_UNEQUAL(alterResult.GetStatus(), NYdb::EStatus::SUCCESS);
}
{
auto alterQuery =
TStringBuilder()
<< R"(ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_INDEX, NAME=max_ts, TYPE=MAX, FEATURES=`{\"column_name\": \"ts\"}`))";
auto session = tableClient.CreateSession().GetValueSync().GetSession();
auto alterResult = session.ExecuteSchemeQuery(alterQuery).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(alterResult.GetStatus(), NYdb::EStatus::SUCCESS, alterResult.GetIssues().ToString());
}
{
auto alterQuery = TStringBuilder() << "ALTER TABLE `/Root/olapStore/olapTable` SET (TTL = Interval(\"P1D\") ON ts);";
auto session = tableClient.CreateSession().GetValueSync().GetSession();
auto alterResult = session.ExecuteSchemeQuery(alterQuery).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL(alterResult.GetStatus(), NYdb::EStatus::SUCCESS);
}
{
auto alterQuery = TStringBuilder() << "ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=DROP_INDEX, NAME=max_ts);";
auto session = tableClient.CreateSession().GetValueSync().GetSession();
auto alterResult = session.ExecuteSchemeQuery(alterQuery).GetValueSync();
UNIT_ASSERT_VALUES_UNEQUAL(alterResult.GetStatus(), NYdb::EStatus::SUCCESS);
}
}
}

Y_UNIT_TEST(StatsUsageWithTTL) {
auto csController = NYDBTest::TControllers::RegisterCSControllerGuard<NYDBTest::NColumnShard::TController>();
{
Expand All @@ -50,7 +94,8 @@ Y_UNIT_TEST_SUITE(KqpOlapStatistics) {
helper.CreateTestOlapTable();
auto tableClient = kikimr.GetTableClient();
{
auto alterQuery = TStringBuilder() << "ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_INDEX, TYPE=MAX, NAME=max_ts, FEATURES=`{\"column_name\": \"ts\"}`);";
auto alterQuery = TStringBuilder() << "ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_INDEX, TYPE=MAX, "
"NAME=max_ts, FEATURES=`{\"column_name\": \"ts\"}`);";
auto session = tableClient.CreateSession().GetValueSync().GetSession();
auto alterResult = session.ExecuteSchemeQuery(alterQuery).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(alterResult.GetStatus(), NYdb::EStatus::SUCCESS, alterResult.GetIssues().ToString());
Expand All @@ -71,4 +116,4 @@ Y_UNIT_TEST_SUITE(KqpOlapStatistics) {
}
}

}
} // namespace NKikimr::NKqp
40 changes: 38 additions & 2 deletions ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4679,6 +4679,24 @@ Y_UNIT_TEST_SUITE(KqpScheme) {
auto result = session.ExecuteSchemeQuery(query).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());

{
auto query2 = TStringBuilder() << R"(
--!syntax_v1
ALTER OBJECT `)" << tableName << R"(` (TYPE TABLE) SET (ACTION=UPSERT_INDEX,
NAME=max_value1, TYPE=MAX, FEATURES=`{\"column_name\": \"Value1\"}`))";
result = session.ExecuteSchemeQuery(query2).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
}

{
auto query2 = TStringBuilder() << R"(
--!syntax_v1
ALTER OBJECT `)" << tableName << R"(` (TYPE TABLE) SET (ACTION=UPSERT_INDEX,
NAME=max_value2, TYPE=MAX, FEATURES=`{\"column_name\": \"Value2\"}`))";
result = session.ExecuteSchemeQuery(query2).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
}

auto query2 = TStringBuilder() << R"(
--!syntax_v1
ALTER TABLE `)" << tableName << R"(` SET(TTL = Interval("P1D") ON Key);)";
Expand Down Expand Up @@ -7234,7 +7252,7 @@ Y_UNIT_TEST_SUITE(KqpOlapScheme) {
};

TTestHelper::TColumnTable testTable;
testTable.SetName("/Root/ColumnTableTest").SetPrimaryKey({"id", "id_second"}).SetSharding({"id"}).SetSchema(schema).SetTTL("created_at", "Interval(\"PT1H\")");
testTable.SetName("/Root/ColumnTableTest").SetPrimaryKey({"created_at", "id_second"}).SetSharding({"created_at"}).SetSchema(schema).SetTTL("created_at", "Interval(\"PT1H\")");
testHelper.CreateTable(testTable);

{
Expand Down Expand Up @@ -7296,7 +7314,7 @@ Y_UNIT_TEST_SUITE(KqpOlapScheme) {
};

TTestHelper::TColumnTable testTable;
testTable.SetName(tableName).SetPrimaryKey({"id", "id_second"}).SetSharding({"id"}).SetSchema(schema).SetTTL("created_at", "Interval(\"PT1H\")");
testTable.SetName(tableName).SetPrimaryKey({"created_at", "id_second"}).SetSharding({"created_at"}).SetSchema(schema).SetTTL("created_at", "Interval(\"PT1H\")");
testHelper.CreateTable(testTable);
testHelper.CreateTier("tier1");

Expand Down Expand Up @@ -7335,6 +7353,16 @@ Y_UNIT_TEST_SUITE(KqpOlapScheme) {
testTable.SetName("/Root/ColumnTableTest").SetPrimaryKey({"id", "id_second"}).SetSharding({"id"}).SetSchema(schema);
testHelper.CreateTable(testTable);

{
auto alterQuery = TStringBuilder() << R"(
--!syntax_v1
ALTER OBJECT `)" << testTable.GetName() << R"(` (TYPE TABLE) SET (ACTION=UPSERT_INDEX,
NAME=max_value1, TYPE=MAX, FEATURES=`{\"column_name\": \"created_at\"}`))";
auto alterResult = testHelper.GetSession().ExecuteSchemeQuery(alterQuery).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(alterResult.GetStatus(), EStatus::SUCCESS, alterResult.GetIssues().ToString());

}

{
auto alterQuery = TStringBuilder() << "ALTER TABLE `" << testTable.GetName() << "`SET (TTL = Interval(\"PT1H\") ON created_at);";
auto alterResult = testHelper.GetSession().ExecuteSchemeQuery(alterQuery).GetValueSync();
Expand Down Expand Up @@ -7992,6 +8020,14 @@ Y_UNIT_TEST_SUITE(KqpOlapScheme) {
TTestHelper::TColumnTable testTable;
testTable.SetName("/Root/ColumnTableTest").SetPrimaryKey({"id"}).SetSharding({"id"}).SetSchema(schema);
testHelper.CreateTable(testTable);
{
auto alterQuery = TStringBuilder() << R"(
--!syntax_v1
ALTER OBJECT `)" << testTable.GetName() << R"(` (TYPE TABLE) SET (ACTION=UPSERT_INDEX,
NAME=max_pk_int, TYPE=MAX, FEATURES=`{\"column_name\": \"created_at\"}`))";
auto alterResult = testHelper.GetSession().ExecuteSchemeQuery(alterQuery).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(alterResult.GetStatus(), EStatus::SUCCESS, alterResult.GetIssues().ToString());
}

{
auto alterQuery = TStringBuilder() << "ALTER TABLE `" << testTable.GetName() << "`SET (TTL = Interval(\"PT1H\") ON created_at);";
Expand Down
46 changes: 16 additions & 30 deletions ydb/core/tx/columnshard/engines/portions/column_record.cpp
Original file line number Diff line number Diff line change
@@ -1,63 +1,49 @@
#include "column_record.h"

#include <ydb/core/formats/arrow/arrow_helpers.h>

#include <ydb/core/tx/columnshard/data_sharing/protos/data.pb.h>
#include <ydb/core/tx/columnshard/columnshard_schema.h>
#include <ydb/core/tx/columnshard/common/scalars.h>
#include <ydb/core/tx/columnshard/data_sharing/protos/data.pb.h>
#include <ydb/core/tx/columnshard/engines/scheme/index_info.h>
#include <ydb/core/tx/columnshard/columnshard_schema.h>

namespace NKikimr::NOlap {

TConclusionStatus TChunkMeta::DeserializeFromProto(const TChunkAddress& address, const NKikimrTxColumnShard::TIndexColumnMeta& proto, const TSimpleColumnInfo& columnInfo) {
auto field = columnInfo.GetArrowField();
TConclusionStatus TChunkMeta::DeserializeFromProto(const NKikimrTxColumnShard::TIndexColumnMeta& proto) {
if (proto.HasNumRows()) {
NumRows = proto.GetNumRows();
}
if (proto.HasRawBytes()) {
RawBytes = proto.GetRawBytes();
}
if (proto.HasMaxValue()) {
AFL_VERIFY(field)("field_id", address.GetColumnId())("field_name", columnInfo.GetColumnName());
Max = ConstantToScalar(proto.GetMaxValue(), field->type());
}
return TConclusionStatus::Success();
}

TChunkMeta::TChunkMeta(const TColumnChunkLoadContext& context, const TSimpleColumnInfo& columnInfo) {
DeserializeFromProto(context.GetAddress(), context.GetMetaProto(), columnInfo).Validate();
TChunkMeta::TChunkMeta(const TColumnChunkLoadContext& context) {
DeserializeFromProto(context.GetMetaProto()).Validate();
}

TChunkMeta::TChunkMeta(const std::shared_ptr<NArrow::NAccessor::IChunkedArray>& column, const TSimpleColumnInfo& columnInfo)
: TBase(column, columnInfo.GetNeedMinMax(), columnInfo.GetIsSorted())
{
TChunkMeta::TChunkMeta(const std::shared_ptr<NArrow::NAccessor::IChunkedArray>& column)
: TBase(column) {
}

NKikimrTxColumnShard::TIndexColumnMeta TChunkMeta::SerializeToProto() const {
NKikimrTxColumnShard::TIndexColumnMeta meta;
meta.SetNumRows(NumRows);
meta.SetRawBytes(RawBytes);
if (HasMax()) {
ScalarToConstant(*Max, *meta.MutableMaxValue());
ScalarToConstant(*Max, *meta.MutableMinValue());
}
return meta;
}

TColumnRecord::TColumnRecord(const TBlobRangeLink16::TLinkId blobLinkId, const TColumnChunkLoadContext& loadContext, const TSimpleColumnInfo& columnInfo)
: Meta(loadContext, columnInfo)
TColumnRecord::TColumnRecord(const TBlobRangeLink16::TLinkId blobLinkId, const TColumnChunkLoadContext& loadContext)
: Meta(loadContext)
, ColumnId(loadContext.GetAddress().GetColumnId())
, Chunk(loadContext.GetAddress().GetChunk())
, BlobRange(loadContext.GetBlobRange().BuildLink(blobLinkId))
{
, BlobRange(loadContext.GetBlobRange().BuildLink(blobLinkId)) {
}

TColumnRecord::TColumnRecord(
const TChunkAddress& address, const std::shared_ptr<NArrow::NAccessor::IChunkedArray>& column, const TSimpleColumnInfo& columnInfo)
: Meta(column, columnInfo)
TColumnRecord::TColumnRecord(const TChunkAddress& address, const std::shared_ptr<NArrow::NAccessor::IChunkedArray>& column)
: Meta(column)
, ColumnId(address.GetColumnId())
, Chunk(address.GetChunk())
{
, Chunk(address.GetChunk()) {
}

NKikimrColumnShardDataSharingProto::TColumnRecord TColumnRecord::SerializeToProto() const {
Expand All @@ -69,11 +55,11 @@ NKikimrColumnShardDataSharingProto::TColumnRecord TColumnRecord::SerializeToProt
return result;
}

NKikimr::TConclusionStatus TColumnRecord::DeserializeFromProto(const NKikimrColumnShardDataSharingProto::TColumnRecord& proto, const TSimpleColumnInfo& columnInfo) {
NKikimr::TConclusionStatus TColumnRecord::DeserializeFromProto(const NKikimrColumnShardDataSharingProto::TColumnRecord& proto) {
ColumnId = proto.GetColumnId();
Chunk = proto.GetChunkIdx();
{
auto parse = Meta.DeserializeFromProto(GetAddress(), proto.GetMeta(), columnInfo);
auto parse = Meta.DeserializeFromProto(proto.GetMeta());
if (!parse) {
return parse;
}
Expand All @@ -88,4 +74,4 @@ NKikimr::TConclusionStatus TColumnRecord::DeserializeFromProto(const NKikimrColu
return TConclusionStatus::Success();
}

}
} // namespace NKikimr::NOlap
Loading

0 comments on commit 2032ce6

Please sign in to comment.