Skip to content

Commit

Permalink
Clean max scalar (#10826)
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanmorozov333 authored Oct 28, 2024
1 parent 4762a8a commit e2ee1b2
Show file tree
Hide file tree
Showing 21 changed files with 270 additions and 196 deletions.
1 change: 1 addition & 0 deletions .github/config/muted_ya.txt
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ ydb/core/kqp/ut/scheme [*/*]+chunk+chunk
ydb/core/kqp/ut/service KqpService.CloseSessionsWithLoad
ydb/core/kqp/ut/service [*/*] chunk chunk
ydb/core/kqp/ut/service [*/*]+chunk+chunk
ydb/services/ydb/ut YdbLogStore.AlterLogTable
ydb/core/mind/hive/ut THiveTest.DrainWithHiveRestart
ydb/core/persqueue/ut [*/*] chunk chunk
ydb/core/quoter/ut QuoterWithKesusTest.PrefetchCoefficient
Expand Down
57 changes: 51 additions & 6 deletions ydb/core/kqp/ut/olap/statistics_ut.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#include "helpers/typed_local.h"

#include <ydb/core/tx/columnshard/hooks/testing/controller.h>

namespace NKikimr::NKqp {
Expand All @@ -14,32 +15,75 @@ Y_UNIT_TEST_SUITE(KqpOlapStatistics) {
helper.CreateTestOlapTable();
auto tableClient = kikimr.GetTableClient();
{
auto alterQuery = TStringBuilder() << R"(ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_INDEX, NAME=max_pk_int, TYPE=MAX, FEATURES=`{\"column_name\": \"pk_int\"}`))";
auto alterQuery =
TStringBuilder()
<< R"(ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_INDEX, NAME=max_pk_int, TYPE=MAX, FEATURES=`{\"column_name\": \"pk_int\"}`))";
auto session = tableClient.CreateSession().GetValueSync().GetSession();
auto alterResult = session.ExecuteSchemeQuery(alterQuery).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(alterResult.GetStatus(), NYdb::EStatus::SUCCESS, alterResult.GetIssues().ToString());
}
{
auto alterQuery = TStringBuilder() << "ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_INDEX, NAME=max_pk_int, TYPE=MAX, FEATURES=`{\"column_name\": \"field\"}`);";
auto alterQuery = TStringBuilder() << "ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_INDEX, "
"NAME=max_pk_int, TYPE=MAX, FEATURES=`{\"column_name\": \"field\"}`);";
auto session = tableClient.CreateSession().GetValueSync().GetSession();
auto alterResult = session.ExecuteSchemeQuery(alterQuery).GetValueSync();
UNIT_ASSERT_VALUES_UNEQUAL_C(alterResult.GetStatus(), NYdb::EStatus::SUCCESS, alterResult.GetIssues().ToString());
}
{
auto alterQuery = TStringBuilder() << "ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_INDEX, NAME=max_pk_int, TYPE=MAX, FEATURES=`{\"column_name\": \"pk_int\"}`);";
auto alterQuery = TStringBuilder() << "ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_INDEX, "
"NAME=max_pk_int, TYPE=MAX, FEATURES=`{\"column_name\": \"pk_int\"}`);";
auto session = tableClient.CreateSession().GetValueSync().GetSession();
auto alterResult = session.ExecuteSchemeQuery(alterQuery).GetValueSync();
UNIT_ASSERT_VALUES_UNEQUAL_C(alterResult.GetStatus(), NYdb::EStatus::SUCCESS, alterResult.GetIssues().ToString());
}
{
auto alterQuery = TStringBuilder() << "ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=DROP_INDEX, NAME=max_pk_int);";
auto alterQuery = TStringBuilder()
<< "ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=DROP_INDEX, NAME=max_pk_int);";
auto session = tableClient.CreateSession().GetValueSync().GetSession();
auto alterResult = session.ExecuteSchemeQuery(alterQuery).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(alterResult.GetStatus(), NYdb::EStatus::SUCCESS, alterResult.GetIssues().ToString());
}
}
}

Y_UNIT_TEST(StatsUsageNotPK) {
auto csController = NYDBTest::TControllers::RegisterCSControllerGuard<NYDBTest::NColumnShard::TController>();
{
auto settings = TKikimrSettings().SetWithSampleTables(false);
TKikimrRunner kikimr(settings);
Tests::NCommon::TLoggerInit(kikimr).Initialize();
TTypedLocalHelper helper("Utf8", kikimr);
helper.CreateTestOlapTable();
auto tableClient = kikimr.GetTableClient();
{
auto alterQuery = TStringBuilder() << "ALTER TABLE `/Root/olapStore/olapTable` SET (TTL = Interval(\"P1D\") ON ts);";
auto session = tableClient.CreateSession().GetValueSync().GetSession();
auto alterResult = session.ExecuteSchemeQuery(alterQuery).GetValueSync();
UNIT_ASSERT_VALUES_UNEQUAL(alterResult.GetStatus(), NYdb::EStatus::SUCCESS);
}
{
auto alterQuery =
TStringBuilder()
<< R"(ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_INDEX, NAME=max_ts, TYPE=MAX, FEATURES=`{\"column_name\": \"ts\"}`))";
auto session = tableClient.CreateSession().GetValueSync().GetSession();
auto alterResult = session.ExecuteSchemeQuery(alterQuery).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(alterResult.GetStatus(), NYdb::EStatus::SUCCESS, alterResult.GetIssues().ToString());
}
{
auto alterQuery = TStringBuilder() << "ALTER TABLE `/Root/olapStore/olapTable` SET (TTL = Interval(\"P1D\") ON ts);";
auto session = tableClient.CreateSession().GetValueSync().GetSession();
auto alterResult = session.ExecuteSchemeQuery(alterQuery).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL(alterResult.GetStatus(), NYdb::EStatus::SUCCESS);
}
{
auto alterQuery = TStringBuilder() << "ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=DROP_INDEX, NAME=max_ts);";
auto session = tableClient.CreateSession().GetValueSync().GetSession();
auto alterResult = session.ExecuteSchemeQuery(alterQuery).GetValueSync();
UNIT_ASSERT_VALUES_UNEQUAL(alterResult.GetStatus(), NYdb::EStatus::SUCCESS);
}
}
}

Y_UNIT_TEST(StatsUsageWithTTL) {
auto csController = NYDBTest::TControllers::RegisterCSControllerGuard<NYDBTest::NColumnShard::TController>();
{
Expand All @@ -50,7 +94,8 @@ Y_UNIT_TEST_SUITE(KqpOlapStatistics) {
helper.CreateTestOlapTable();
auto tableClient = kikimr.GetTableClient();
{
auto alterQuery = TStringBuilder() << "ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_INDEX, TYPE=MAX, NAME=max_ts, FEATURES=`{\"column_name\": \"ts\"}`);";
auto alterQuery = TStringBuilder() << "ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_INDEX, TYPE=MAX, "
"NAME=max_ts, FEATURES=`{\"column_name\": \"ts\"}`);";
auto session = tableClient.CreateSession().GetValueSync().GetSession();
auto alterResult = session.ExecuteSchemeQuery(alterQuery).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(alterResult.GetStatus(), NYdb::EStatus::SUCCESS, alterResult.GetIssues().ToString());
Expand All @@ -71,4 +116,4 @@ Y_UNIT_TEST_SUITE(KqpOlapStatistics) {
}
}

}
} // namespace NKikimr::NKqp
40 changes: 38 additions & 2 deletions ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5082,6 +5082,24 @@ Y_UNIT_TEST_SUITE(KqpScheme) {
auto result = session.ExecuteSchemeQuery(query).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());

{
auto query2 = TStringBuilder() << R"(
--!syntax_v1
ALTER OBJECT `)" << tableName << R"(` (TYPE TABLE) SET (ACTION=UPSERT_INDEX,
NAME=max_value1, TYPE=MAX, FEATURES=`{\"column_name\": \"Value1\"}`))";
result = session.ExecuteSchemeQuery(query2).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
}

{
auto query2 = TStringBuilder() << R"(
--!syntax_v1
ALTER OBJECT `)" << tableName << R"(` (TYPE TABLE) SET (ACTION=UPSERT_INDEX,
NAME=max_value2, TYPE=MAX, FEATURES=`{\"column_name\": \"Value2\"}`))";
result = session.ExecuteSchemeQuery(query2).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
}

auto query2 = TStringBuilder() << R"(
--!syntax_v1
ALTER TABLE `)" << tableName << R"(` SET(TTL = Interval("P1D") ON Key);)";
Expand Down Expand Up @@ -7654,7 +7672,7 @@ Y_UNIT_TEST_SUITE(KqpOlapScheme) {
};

TTestHelper::TColumnTable testTable;
testTable.SetName("/Root/ColumnTableTest").SetPrimaryKey({"id", "id_second"}).SetSharding({"id"}).SetSchema(schema).SetTTL("created_at", "Interval(\"PT1H\")");
testTable.SetName("/Root/ColumnTableTest").SetPrimaryKey({"created_at", "id_second"}).SetSharding({"created_at"}).SetSchema(schema).SetTTL("created_at", "Interval(\"PT1H\")");
testHelper.CreateTable(testTable);

{
Expand Down Expand Up @@ -7716,7 +7734,7 @@ Y_UNIT_TEST_SUITE(KqpOlapScheme) {
};

TTestHelper::TColumnTable testTable;
testTable.SetName(tableName).SetPrimaryKey({"id", "id_second"}).SetSharding({"id"}).SetSchema(schema).SetTTL("created_at", "Interval(\"PT1H\")");
testTable.SetName(tableName).SetPrimaryKey({"created_at", "id_second"}).SetSharding({"created_at"}).SetSchema(schema).SetTTL("created_at", "Interval(\"PT1H\")");
testHelper.CreateTable(testTable);
testHelper.CreateTier("tier1");

Expand Down Expand Up @@ -7755,6 +7773,16 @@ Y_UNIT_TEST_SUITE(KqpOlapScheme) {
testTable.SetName("/Root/ColumnTableTest").SetPrimaryKey({"id", "id_second"}).SetSharding({"id"}).SetSchema(schema);
testHelper.CreateTable(testTable);

{
auto alterQuery = TStringBuilder() << R"(
--!syntax_v1
ALTER OBJECT `)" << testTable.GetName() << R"(` (TYPE TABLE) SET (ACTION=UPSERT_INDEX,
NAME=max_value1, TYPE=MAX, FEATURES=`{\"column_name\": \"created_at\"}`))";
auto alterResult = testHelper.GetSession().ExecuteSchemeQuery(alterQuery).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(alterResult.GetStatus(), EStatus::SUCCESS, alterResult.GetIssues().ToString());

}

{
auto alterQuery = TStringBuilder() << "ALTER TABLE `" << testTable.GetName() << "`SET (TTL = Interval(\"PT1H\") ON created_at);";
auto alterResult = testHelper.GetSession().ExecuteSchemeQuery(alterQuery).GetValueSync();
Expand Down Expand Up @@ -8412,6 +8440,14 @@ Y_UNIT_TEST_SUITE(KqpOlapScheme) {
TTestHelper::TColumnTable testTable;
testTable.SetName("/Root/ColumnTableTest").SetPrimaryKey({"id"}).SetSharding({"id"}).SetSchema(schema);
testHelper.CreateTable(testTable);
{
auto alterQuery = TStringBuilder() << R"(
--!syntax_v1
ALTER OBJECT `)" << testTable.GetName() << R"(` (TYPE TABLE) SET (ACTION=UPSERT_INDEX,
NAME=max_pk_int, TYPE=MAX, FEATURES=`{\"column_name\": \"created_at\"}`))";
auto alterResult = testHelper.GetSession().ExecuteSchemeQuery(alterQuery).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(alterResult.GetStatus(), EStatus::SUCCESS, alterResult.GetIssues().ToString());
}

{
auto alterQuery = TStringBuilder() << "ALTER TABLE `" << testTable.GetName() << "`SET (TTL = Interval(\"PT1H\") ON created_at);";
Expand Down
46 changes: 16 additions & 30 deletions ydb/core/tx/columnshard/engines/portions/column_record.cpp
Original file line number Diff line number Diff line change
@@ -1,63 +1,49 @@
#include "column_record.h"

#include <ydb/core/formats/arrow/arrow_helpers.h>

#include <ydb/core/tx/columnshard/data_sharing/protos/data.pb.h>
#include <ydb/core/tx/columnshard/columnshard_schema.h>
#include <ydb/core/tx/columnshard/common/scalars.h>
#include <ydb/core/tx/columnshard/data_sharing/protos/data.pb.h>
#include <ydb/core/tx/columnshard/engines/scheme/index_info.h>
#include <ydb/core/tx/columnshard/columnshard_schema.h>

namespace NKikimr::NOlap {

TConclusionStatus TChunkMeta::DeserializeFromProto(const TChunkAddress& address, const NKikimrTxColumnShard::TIndexColumnMeta& proto, const TSimpleColumnInfo& columnInfo) {
auto field = columnInfo.GetArrowField();
TConclusionStatus TChunkMeta::DeserializeFromProto(const NKikimrTxColumnShard::TIndexColumnMeta& proto) {
if (proto.HasNumRows()) {
NumRows = proto.GetNumRows();
}
if (proto.HasRawBytes()) {
RawBytes = proto.GetRawBytes();
}
if (proto.HasMaxValue()) {
AFL_VERIFY(field)("field_id", address.GetColumnId())("field_name", columnInfo.GetColumnName());
Max = ConstantToScalar(proto.GetMaxValue(), field->type());
}
return TConclusionStatus::Success();
}

TChunkMeta::TChunkMeta(const TColumnChunkLoadContext& context, const TSimpleColumnInfo& columnInfo) {
DeserializeFromProto(context.GetAddress(), context.GetMetaProto(), columnInfo).Validate();
TChunkMeta::TChunkMeta(const TColumnChunkLoadContext& context) {
DeserializeFromProto(context.GetMetaProto()).Validate();
}

TChunkMeta::TChunkMeta(const std::shared_ptr<NArrow::NAccessor::IChunkedArray>& column, const TSimpleColumnInfo& columnInfo)
: TBase(column, columnInfo.GetNeedMinMax(), columnInfo.GetIsSorted())
{
TChunkMeta::TChunkMeta(const std::shared_ptr<NArrow::NAccessor::IChunkedArray>& column)
: TBase(column) {
}

NKikimrTxColumnShard::TIndexColumnMeta TChunkMeta::SerializeToProto() const {
NKikimrTxColumnShard::TIndexColumnMeta meta;
meta.SetNumRows(NumRows);
meta.SetRawBytes(RawBytes);
if (HasMax()) {
ScalarToConstant(*Max, *meta.MutableMaxValue());
ScalarToConstant(*Max, *meta.MutableMinValue());
}
return meta;
}

TColumnRecord::TColumnRecord(const TBlobRangeLink16::TLinkId blobLinkId, const TColumnChunkLoadContext& loadContext, const TSimpleColumnInfo& columnInfo)
: Meta(loadContext, columnInfo)
TColumnRecord::TColumnRecord(const TBlobRangeLink16::TLinkId blobLinkId, const TColumnChunkLoadContext& loadContext)
: Meta(loadContext)
, ColumnId(loadContext.GetAddress().GetColumnId())
, Chunk(loadContext.GetAddress().GetChunk())
, BlobRange(loadContext.GetBlobRange().BuildLink(blobLinkId))
{
, BlobRange(loadContext.GetBlobRange().BuildLink(blobLinkId)) {
}

TColumnRecord::TColumnRecord(
const TChunkAddress& address, const std::shared_ptr<NArrow::NAccessor::IChunkedArray>& column, const TSimpleColumnInfo& columnInfo)
: Meta(column, columnInfo)
TColumnRecord::TColumnRecord(const TChunkAddress& address, const std::shared_ptr<NArrow::NAccessor::IChunkedArray>& column)
: Meta(column)
, ColumnId(address.GetColumnId())
, Chunk(address.GetChunk())
{
, Chunk(address.GetChunk()) {
}

NKikimrColumnShardDataSharingProto::TColumnRecord TColumnRecord::SerializeToProto() const {
Expand All @@ -69,11 +55,11 @@ NKikimrColumnShardDataSharingProto::TColumnRecord TColumnRecord::SerializeToProt
return result;
}

NKikimr::TConclusionStatus TColumnRecord::DeserializeFromProto(const NKikimrColumnShardDataSharingProto::TColumnRecord& proto, const TSimpleColumnInfo& columnInfo) {
NKikimr::TConclusionStatus TColumnRecord::DeserializeFromProto(const NKikimrColumnShardDataSharingProto::TColumnRecord& proto) {
ColumnId = proto.GetColumnId();
Chunk = proto.GetChunkIdx();
{
auto parse = Meta.DeserializeFromProto(GetAddress(), proto.GetMeta(), columnInfo);
auto parse = Meta.DeserializeFromProto(proto.GetMeta());
if (!parse) {
return parse;
}
Expand All @@ -88,4 +74,4 @@ NKikimr::TConclusionStatus TColumnRecord::DeserializeFromProto(const NKikimrColu
return TConclusionStatus::Success();
}

}
} // namespace NKikimr::NOlap
29 changes: 12 additions & 17 deletions ydb/core/tx/columnshard/engines/portions/column_record.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,15 @@

#include "common.h"

#include <ydb/library/formats/arrow/accessor/abstract/accessor.h>
#include <ydb/library/formats/arrow/splitter/stats.h>
#include <ydb/core/tx/columnshard/blob.h>
#include <ydb/core/tx/columnshard/common/snapshot.h>
#include <ydb/core/tx/columnshard/engines/protos/portion_info.pb.h>
#include <ydb/core/tx/columnshard/splitter/chunk_meta.h>
#include <ydb/core/tx/columnshard/splitter/chunks.h>

#include <ydb/library/accessor/accessor.h>
#include <ydb/library/formats/arrow/accessor/abstract/accessor.h>
#include <ydb/library/formats/arrow/splitter/stats.h>

#include <contrib/libs/apache/arrow/cpp/src/arrow/array/array_base.h>
#include <contrib/libs/apache/arrow/cpp/src/arrow/record_batch.h>
Expand All @@ -30,19 +30,17 @@ struct TChunkMeta: public TSimpleChunkMeta {
private:
using TBase = TSimpleChunkMeta;
TChunkMeta() = default;
[[nodiscard]] TConclusionStatus DeserializeFromProto(
const TChunkAddress& address, const NKikimrTxColumnShard::TIndexColumnMeta& proto, const TSimpleColumnInfo& columnInfo);
[[nodiscard]] TConclusionStatus DeserializeFromProto(const NKikimrTxColumnShard::TIndexColumnMeta& proto);
friend class TColumnRecord;

public:
TChunkMeta(TSimpleChunkMeta&& baseMeta)
: TBase(baseMeta) {
}

[[nodiscard]] static TConclusion<TChunkMeta> BuildFromProto(
const TChunkAddress& address, const NKikimrTxColumnShard::TIndexColumnMeta& proto, const TSimpleColumnInfo& columnInfo) {
[[nodiscard]] static TConclusion<TChunkMeta> BuildFromProto(const NKikimrTxColumnShard::TIndexColumnMeta& proto) {
TChunkMeta result;
auto parse = result.DeserializeFromProto(address, proto, columnInfo);
auto parse = result.DeserializeFromProto(proto);
if (!parse) {
return parse;
}
Expand All @@ -61,9 +59,9 @@ struct TChunkMeta: public TSimpleChunkMeta {
}
};

TChunkMeta(const TColumnChunkLoadContext& context, const TSimpleColumnInfo& columnInfo);
TChunkMeta(const TColumnChunkLoadContext& context);

TChunkMeta(const std::shared_ptr<NArrow::NAccessor::IChunkedArray>& column, const TSimpleColumnInfo& columnInfo);
TChunkMeta(const std::shared_ptr<NArrow::NAccessor::IChunkedArray>& column);
};

class TColumnRecord {
Expand All @@ -74,7 +72,7 @@ class TColumnRecord {
}

TColumnRecord() = default;
TConclusionStatus DeserializeFromProto(const NKikimrColumnShardDataSharingProto::TColumnRecord& proto, const TSimpleColumnInfo& columnInfo);
TConclusionStatus DeserializeFromProto(const NKikimrColumnShardDataSharingProto::TColumnRecord& proto);

public:
ui32 ColumnId = 0;
Expand Down Expand Up @@ -124,10 +122,9 @@ class TColumnRecord {
}

NKikimrColumnShardDataSharingProto::TColumnRecord SerializeToProto() const;
static TConclusion<TColumnRecord> BuildFromProto(
const NKikimrColumnShardDataSharingProto::TColumnRecord& proto, const TSimpleColumnInfo& columnInfo) {
static TConclusion<TColumnRecord> BuildFromProto(const NKikimrColumnShardDataSharingProto::TColumnRecord& proto) {
TColumnRecord result;
auto parse = result.DeserializeFromProto(proto, columnInfo);
auto parse = result.DeserializeFromProto(proto);
if (!parse) {
return parse;
}
Expand Down Expand Up @@ -166,10 +163,8 @@ class TColumnRecord {
<< "blob_range:" << BlobRange.ToString() << ";";
}

TColumnRecord(
const TChunkAddress& address, const std::shared_ptr<NArrow::NAccessor::IChunkedArray>& column, const TSimpleColumnInfo& columnInfo);

TColumnRecord(const TBlobRangeLink16::TLinkId blobLinkId, const TColumnChunkLoadContext& loadContext, const TSimpleColumnInfo& columnInfo);
TColumnRecord(const TChunkAddress& address, const std::shared_ptr<NArrow::NAccessor::IChunkedArray>& column);
TColumnRecord(const TBlobRangeLink16::TLinkId blobLinkId, const TColumnChunkLoadContext& loadContext);

friend IOutputStream& operator<<(IOutputStream& out, const TColumnRecord& rec) {
out << '{';
Expand Down
Loading

0 comments on commit e2ee1b2

Please sign in to comment.