Skip to content

Commit

Permalink
Merge 655994a into 11534d6
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanmorozov333 authored Jan 29, 2024
2 parents 11534d6 + 655994a commit f1f2718
Show file tree
Hide file tree
Showing 18 changed files with 185 additions and 54 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
#include "drop_index.h"
#include <util/string/type.h>

namespace NKikimr::NKqp {

TConclusionStatus TDropIndexOperation::DoDeserialize(NYql::TObjectSettingsImpl::TFeaturesExtractor& features) {
{
auto fValue = features.Extract("NAME");
if (!fValue) {
return TConclusionStatus::Fail("can't find parameter NAME");
}
IndexName = *fValue;
}
return TConclusionStatus::Success();
}

void TDropIndexOperation::DoSerializeScheme(NKikimrSchemeOp::TAlterColumnTableSchema& schemaData) const {
*schemaData.AddDropIndexes() = IndexName;
}

}
19 changes: 19 additions & 0 deletions ydb/core/kqp/gateway/behaviour/tablestore/operations/drop_index.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
#include "abstract.h"

namespace NKikimr::NKqp {

class TDropIndexOperation : public ITableStoreOperation {
static TString GetTypeName() {
return "DROP_INDEX";
}

static inline auto Registrator = TFactory::TRegistrator<TDropIndexOperation>(GetTypeName());
private:
TString IndexName;
public:
TConclusionStatus DoDeserialize(NYql::TObjectSettingsImpl::TFeaturesExtractor& features) override;
void DoSerializeScheme(NKikimrSchemeOp::TAlterColumnTableSchema& schemaData) const override;
};

}

Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ SRCS(
GLOBAL alter_column.cpp
GLOBAL drop_column.cpp
GLOBAL upsert_index.cpp
GLOBAL drop_index.cpp
)

PEERDIR(
Expand Down
10 changes: 8 additions & 2 deletions ydb/core/kqp/ut/olap/kqp_olap_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1357,8 +1357,7 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
}

Y_UNIT_TEST(IndexesModificationError) {
auto settings = TKikimrSettings()
.SetWithSampleTables(false);
auto settings = TKikimrSettings().SetWithSampleTables(false);
TKikimrRunner kikimr(settings);

TLocalHelper(kikimr).CreateTestOlapTable();
Expand Down Expand Up @@ -1408,6 +1407,13 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
UNIT_ASSERT_VALUES_EQUAL_C(alterResult.GetStatus(), EStatus::SUCCESS, alterResult.GetIssues().ToString());
}

{
auto alterQuery = TStringBuilder() << "ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=DROP_INDEX, NAME=index_uid);";
auto session = tableClient.CreateSession().GetValueSync().GetSession();
auto alterResult = session.ExecuteSchemeQuery(alterQuery).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(alterResult.GetStatus(), EStatus::SUCCESS, alterResult.GetIssues().ToString());
}

}

Y_UNIT_TEST(PushdownFilter) {
Expand Down
12 changes: 7 additions & 5 deletions ydb/core/sys_view/common/schema.h
Original file line number Diff line number Diff line change
Expand Up @@ -394,14 +394,15 @@ struct Schema : NIceDb::Schema {
struct PortionId: Column<6, NScheme::NTypeIds::Uint64> {};
struct ChunkIdx : Column<7, NScheme::NTypeIds::Uint64> {};
struct ColumnName: Column<8, NScheme::NTypeIds::Utf8> {};
struct InternalColumnId : Column<9, NScheme::NTypeIds::Uint32> {};
struct InternalEntityId : Column<9, NScheme::NTypeIds::Uint32> {};
struct BlobId : Column<10, NScheme::NTypeIds::Utf8> {};
struct BlobRangeOffset : Column<11, NScheme::NTypeIds::Uint64> {};
struct BlobRangeSize : Column<12, NScheme::NTypeIds::Uint64> {};
struct Activity : Column<13, NScheme::NTypeIds::Bool> {};
struct TierName : Column<14, NScheme::NTypeIds::Utf8> {};
struct TierName: Column<14, NScheme::NTypeIds::Utf8> {};
struct EntityType: Column<15, NScheme::NTypeIds::Utf8> {};

using TKey = TableKey<PathId, TabletId, PortionId, InternalColumnId, ChunkIdx>;
using TKey = TableKey<PathId, TabletId, PortionId, InternalEntityId, ChunkIdx>;
using TColumns = TableColumns<
PathId,
Kind,
Expand All @@ -411,12 +412,13 @@ struct Schema : NIceDb::Schema {
PortionId,
ChunkIdx,
ColumnName,
InternalColumnId,
InternalEntityId,
BlobId,
BlobRangeOffset,
BlobRangeSize,
Activity,
TierName
TierName,
EntityType
>;
};

Expand Down
80 changes: 56 additions & 24 deletions ydb/core/tx/columnshard/columnshard__stats_scan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,31 +59,63 @@ void TStatsIterator::ApplyRangePredicates(std::shared_ptr<arrow::RecordBatch>& b
}

void TStatsIterator::AppendStats(const std::vector<std::unique_ptr<arrow::ArrayBuilder>>& builders, const NOlap::TPortionInfo& portion) {
std::vector<const NOlap::TColumnRecord*> records;
for (auto&& r: portion.Records) {
records.emplace_back(&r);
}
if (Reverse) {
std::reverse(records.begin(), records.end());
{
std::vector<const NOlap::TColumnRecord*> records;
for (auto&& r : portion.Records) {
records.emplace_back(&r);
}
if (Reverse) {
std::reverse(records.begin(), records.end());
}
for (auto&& r : records) {
NArrow::Append<arrow::UInt64Type>(*builders[0], portion.GetPathId());
const std::string prod = ::ToString(portion.GetMeta().Produced);
NArrow::Append<arrow::StringType>(*builders[1], prod);
NArrow::Append<arrow::UInt64Type>(*builders[2], ReadMetadata->TabletId);
NArrow::Append<arrow::UInt64Type>(*builders[3], r->GetMeta().GetNumRowsVerified());
NArrow::Append<arrow::UInt64Type>(*builders[4], r->GetMeta().GetRawBytesVerified());
NArrow::Append<arrow::UInt64Type>(*builders[5], portion.GetPortionId());
NArrow::Append<arrow::UInt64Type>(*builders[6], r->GetChunkIdx());
NArrow::Append<arrow::StringType>(*builders[7], ReadMetadata->GetColumnNameDef(r->GetColumnId()).value_or("undefined"));
NArrow::Append<arrow::UInt32Type>(*builders[8], r->GetColumnId());
std::string blobIdString = r->BlobRange.BlobId.ToStringLegacy();
NArrow::Append<arrow::StringType>(*builders[9], blobIdString);
NArrow::Append<arrow::UInt64Type>(*builders[10], r->BlobRange.Offset);
NArrow::Append<arrow::UInt64Type>(*builders[11], r->BlobRange.Size);
NArrow::Append<arrow::BooleanType>(*builders[12], !portion.HasRemoveSnapshot() || ReadMetadata->GetRequestSnapshot() < portion.GetRemoveSnapshot());
std::string strTierName(portion.GetMeta().GetTierName().data(), portion.GetMeta().GetTierName().size());
NArrow::Append<arrow::StringType>(*builders[13], strTierName);
NArrow::Append<arrow::StringType>(*builders[14], "COLUMN");
}
}
for (auto&& r: records) {
NArrow::Append<arrow::UInt64Type>(*builders[0], portion.GetPathId());
const std::string prod = ::ToString(portion.GetMeta().Produced);
NArrow::Append<arrow::StringType>(*builders[1], prod);
NArrow::Append<arrow::UInt64Type>(*builders[2], ReadMetadata->TabletId);
NArrow::Append<arrow::UInt64Type>(*builders[3], r->GetMeta().GetNumRowsVerified());
NArrow::Append<arrow::UInt64Type>(*builders[4], r->GetMeta().GetRawBytesVerified());
NArrow::Append<arrow::UInt64Type>(*builders[5], portion.GetPortionId());
NArrow::Append<arrow::UInt64Type>(*builders[6], r->GetChunkIdx());
NArrow::Append<arrow::StringType>(*builders[7], ReadMetadata->GetColumnNameDef(r->GetColumnId()).value_or("undefined"));
NArrow::Append<arrow::UInt32Type>(*builders[8], r->GetColumnId());
std::string blobIdString = r->BlobRange.BlobId.ToStringLegacy();
NArrow::Append<arrow::StringType>(*builders[9], blobIdString);
NArrow::Append<arrow::UInt64Type>(*builders[10], r->BlobRange.Offset);
NArrow::Append<arrow::UInt64Type>(*builders[11], r->BlobRange.Size);
NArrow::Append<arrow::BooleanType>(*builders[12], !portion.HasRemoveSnapshot() || ReadMetadata->GetRequestSnapshot() < portion.GetRemoveSnapshot());
std::string strTierName(portion.GetMeta().GetTierName().data(), portion.GetMeta().GetTierName().size());
NArrow::Append<arrow::StringType>(*builders[13], strTierName);
{
std::vector<const NOlap::TIndexChunk*> indexes;
for (auto&& r : portion.GetIndexes()) {
indexes.emplace_back(&r);
}
if (Reverse) {
std::reverse(indexes.begin(), indexes.end());
}
for (auto&& r : indexes) {
NArrow::Append<arrow::UInt64Type>(*builders[0], portion.GetPathId());
const std::string prod = ::ToString(portion.GetMeta().Produced);
NArrow::Append<arrow::StringType>(*builders[1], prod);
NArrow::Append<arrow::UInt64Type>(*builders[2], ReadMetadata->TabletId);
NArrow::Append<arrow::UInt64Type>(*builders[3], r->GetRecordsCount());
NArrow::Append<arrow::UInt64Type>(*builders[4], r->GetRawBytes());
NArrow::Append<arrow::UInt64Type>(*builders[5], portion.GetPortionId());
NArrow::Append<arrow::UInt64Type>(*builders[6], r->GetChunkIdx());
NArrow::Append<arrow::StringType>(*builders[7], ReadMetadata->GetEntityName(r->GetIndexId()).value_or("undefined"));
NArrow::Append<arrow::UInt32Type>(*builders[8], r->GetIndexId());
std::string blobIdString = r->GetBlobRange().BlobId.ToStringLegacy();
NArrow::Append<arrow::StringType>(*builders[9], blobIdString);
NArrow::Append<arrow::UInt64Type>(*builders[10], r->GetBlobRange().Offset);
NArrow::Append<arrow::UInt64Type>(*builders[11], r->GetBlobRange().Size);
NArrow::Append<arrow::BooleanType>(*builders[12], !portion.HasRemoveSnapshot() || ReadMetadata->GetRequestSnapshot() < portion.GetRemoveSnapshot());
std::string strTierName(portion.GetMeta().GetTierName().data(), portion.GetMeta().GetTierName().size());
NArrow::Append<arrow::StringType>(*builders[13], strTierName);
NArrow::Append<arrow::StringType>(*builders[14], "INDEX");
}
}
}

Expand Down
7 changes: 5 additions & 2 deletions ydb/core/tx/columnshard/columnshard_schema.h
Original file line number Diff line number Diff line change
Expand Up @@ -299,9 +299,10 @@ struct Schema : NIceDb::Schema {
struct Offset: Column<6, NScheme::NTypeIds::Uint32> {};
struct Size: Column<7, NScheme::NTypeIds::Uint32> {};
struct RecordsCount: Column<8, NScheme::NTypeIds::Uint32> {};
struct RawBytes: Column<9, NScheme::NTypeIds::Uint64> {};

using TKey = TableKey<PathId, PortionId, IndexId, ChunkIdx>;
using TColumns = TableColumns<PathId, PortionId, IndexId, ChunkIdx, Blob, Offset, Size, RecordsCount>;
using TColumns = TableColumns<PathId, PortionId, IndexId, ChunkIdx, Blob, Offset, Size, RecordsCount, RawBytes>;
};

using TTables = SchemaTables<
Expand Down Expand Up @@ -620,15 +621,17 @@ class TIndexChunkLoadContext {
YDB_READONLY_DEF(TBlobRange, BlobRange);
TChunkAddress Address;
const ui32 RecordsCount;
const ui32 RawBytes;
public:
TIndexChunk BuildIndexChunk() const {
return TIndexChunk(Address.GetColumnId(), Address.GetChunkIdx(), RecordsCount, BlobRange);
return TIndexChunk(Address.GetColumnId(), Address.GetChunkIdx(), RecordsCount, RawBytes, BlobRange);
}

template <class TSource>
TIndexChunkLoadContext(const TSource& rowset, const IBlobGroupSelector* dsGroupSelector)
: Address(rowset.template GetValue<NColumnShard::Schema::IndexIndexes::IndexId>(), rowset.template GetValue<NColumnShard::Schema::IndexIndexes::ChunkIdx>())
, RecordsCount(rowset.template GetValue<NColumnShard::Schema::IndexIndexes::RecordsCount>())
, RawBytes(rowset.template GetValue<NColumnShard::Schema::IndexIndexes::RawBytes>())
{
AFL_VERIFY(Address.GetColumnId())("event", "incorrect address")("address", Address.DebugString());
TString strBlobId = rowset.template GetValue<NColumnShard::Schema::IndexIndexes::Blob>();
Expand Down
4 changes: 3 additions & 1 deletion ydb/core/tx/columnshard/engines/db_wrapper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,9 @@ void TDbWrapper::WriteIndex(const TPortionInfo& portion, const TIndexChunk& row)
db.Table<IndexIndexes>().Key(portion.GetPathId(), portion.GetPortionId(), row.GetIndexId(), row.GetChunkIdx()).Update(
NIceDb::TUpdate<IndexIndexes::Blob>(row.GetBlobRange().BlobId.SerializeBinary()),
NIceDb::TUpdate<IndexIndexes::Offset>(row.GetBlobRange().Offset),
NIceDb::TUpdate<IndexIndexes::Size>(row.GetBlobRange().Size)
NIceDb::TUpdate<IndexIndexes::Size>(row.GetBlobRange().Size),
NIceDb::TUpdate<IndexIndexes::RecordsCount>(row.GetRecordsCount()),
NIceDb::TUpdate<IndexIndexes::RawBytes>(row.GetRawBytes())
);
}

Expand Down
4 changes: 3 additions & 1 deletion ydb/core/tx/columnshard/engines/portions/column_record.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,15 @@ class TIndexChunk {
YDB_READONLY(ui32, IndexId, 0);
YDB_READONLY(ui32, ChunkIdx, 0);
YDB_READONLY(ui32, RecordsCount, 0);
YDB_READONLY(ui32, RawBytes, 0);
YDB_READONLY_DEF(TBlobRange, BlobRange);

public:
TIndexChunk(const ui32 indexId, const ui32 chunkIdx, const ui32 recordsCount, const TBlobRange& blobRange)
TIndexChunk(const ui32 indexId, const ui32 chunkIdx, const ui32 recordsCount, const ui64 rawBytes, const TBlobRange& blobRange)
: IndexId(indexId)
, ChunkIdx(chunkIdx)
, RecordsCount(recordsCount)
, RawBytes(rawBytes)
, BlobRange(blobRange) {

}
Expand Down
13 changes: 12 additions & 1 deletion ydb/core/tx/columnshard/engines/reader/read_metadata.h
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ struct TReadStatsMetadata : public TReadMetadataBase, public std::enable_shared_

const TSnapshot& GetRequestSnapshot() const { return RequestSnapshot; }

std::optional<std::string> GetColumnNameDef(const ui32 columnId) const {
std::optional<std::string> GetColumnNameDef(const ui32 columnId) const {
if (!ResultIndexSchema) {
return {};
}
Expand All @@ -276,6 +276,17 @@ struct TReadStatsMetadata : public TReadMetadataBase, public std::enable_shared_
return f->name();
}

std::optional<std::string> GetEntityName(const ui32 entityId) const {
if (!ResultIndexSchema) {
return {};
}
auto result = ResultIndexSchema->GetIndexInfo().GetColumnNameOptional(entityId);
if (!!result) {
return result;
}
return ResultIndexSchema->GetIndexInfo().GetIndexNameOptional(entityId);
}

explicit TReadStatsMetadata(ui64 tabletId, const ESorting sorting, const TProgramContainer& ssaProgram, const std::shared_ptr<ISnapshotSchema>& schema, const TSnapshot& requestSnapshot)
: TBase(sorting, ssaProgram)
, RequestSnapshot(requestSnapshot)
Expand Down
23 changes: 23 additions & 0 deletions ydb/core/tx/columnshard/engines/scheme/index_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,29 @@ struct TIndexInfo : public NTable::TScheme::TTableSchema {
TColumnSaver GetColumnSaver(const ui32 columnId, const TSaverContext& context) const;
std::shared_ptr<TColumnLoader> GetColumnLoaderOptional(const ui32 columnId) const;
std::shared_ptr<TColumnLoader> GetColumnLoaderVerified(const ui32 columnId) const;
std::optional<std::string> GetColumnNameOptional(const ui32 columnId) const {
auto f = GetColumnFieldOptional(columnId);
if (!f) {
return {};
}
return f->name();
}

NIndexes::TIndexMetaContainer GetIndexOptional(const ui32 indexId) const {
auto it = Indexes.find(indexId);
if (it == Indexes.end()) {
return NIndexes::TIndexMetaContainer();
}
return it->second;
}

std::optional<TString> GetIndexNameOptional(const ui32 indexId) const {
auto meta = GetIndexOptional(indexId);
if (!meta) {
return {};
}
return meta->GetIndexName();
}

void AppendIndexes(std::map<ui32, std::vector<std::shared_ptr<IPortionDataChunk>>>& originalData) const {
for (auto&& i : Indexes) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ namespace NKikimr::NOlap::NIndexes {
class IIndexMetaConstructor {
protected:
virtual TConclusionStatus DoDeserializeFromJson(const NJson::TJsonValue& jsonInfo) = 0;
virtual std::shared_ptr<IIndexMeta> DoCreateIndexMeta(const ui32 indexId, const NSchemeShard::TOlapSchema& currentSchema, NSchemeShard::IErrorCollector& errors) const = 0;
virtual std::shared_ptr<IIndexMeta> DoCreateIndexMeta(const ui32 indexId, const TString& indexName, const NSchemeShard::TOlapSchema& currentSchema, NSchemeShard::IErrorCollector& errors) const = 0;
virtual TConclusionStatus DoDeserializeFromProto(const NKikimrSchemeOp::TOlapIndexRequested& proto) = 0;
virtual void DoSerializeToProto(NKikimrSchemeOp::TOlapIndexRequested& proto) const = 0;
public:
Expand All @@ -29,8 +29,8 @@ class IIndexMetaConstructor {
return DoDeserializeFromJson(jsonInfo);
}

std::shared_ptr<IIndexMeta> CreateIndexMeta(const ui32 indexId, const NSchemeShard::TOlapSchema& currentSchema, NSchemeShard::IErrorCollector& errors) const {
return DoCreateIndexMeta(indexId, currentSchema, errors);
std::shared_ptr<IIndexMeta> CreateIndexMeta(const ui32 indexId, const TString& indexName, const NSchemeShard::TOlapSchema& currentSchema, NSchemeShard::IErrorCollector& errors) const {
return DoCreateIndexMeta(indexId, indexName, currentSchema, errors);
}

TConclusionStatus DeserializeFromProto(const NKikimrSchemeOp::TOlapIndexRequested& proto) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,12 @@
#include <ydb/core/formats/arrow/hash/xx_hash.h>
#include <ydb/core/formats/arrow/hash/calcer.h>
#include <ydb/core/formats/arrow/serializer/full.h>
#include <ydb/core/formats/arrow/size_calcer.h>

namespace NKikimr::NOlap::NIndexes {

void TPortionIndexChunk::DoAddIntoPortion(const TBlobRange& bRange, TPortionInfo& portionInfo) const {
portionInfo.AddIndex(TIndexChunk(GetEntityId(), GetChunkIdx(), RecordsCount, bRange));
portionInfo.AddIndex(TIndexChunk(GetEntityId(), GetChunkIdx(), RecordsCount, RawBytes, bRange));
}

std::shared_ptr<NKikimr::NOlap::IPortionDataChunk> TIndexByColumns::DoBuildIndex(const ui32 indexId, std::map<ui32, std::vector<std::shared_ptr<IPortionDataChunk>>>& data, const TIndexInfo& indexInfo) const {
Expand All @@ -27,16 +28,16 @@ std::shared_ptr<NKikimr::NOlap::IPortionDataChunk> TIndexByColumns::DoBuildIndex
TChunkedBatchReader reader(std::move(columnReaders));
std::shared_ptr<arrow::RecordBatch> indexBatch = DoBuildIndexImpl(reader);
const TString indexData = TColumnSaver(nullptr, Serializer).Apply(indexBatch);
return std::make_shared<TPortionIndexChunk>(indexId, recordsCount, indexData);
return std::make_shared<TPortionIndexChunk>(indexId, recordsCount, NArrow::GetBatchDataSize(indexBatch), indexData);
}

bool TIndexByColumns::DoDeserializeFromProto(const NKikimrSchemeOp::TOlapIndexDescription& /*proto*/) {
Serializer = std::make_shared<NArrow::NSerialization::TFullDataSerializer>(arrow::ipc::IpcWriteOptions::Defaults());
return true;
}

TIndexByColumns::TIndexByColumns(const ui32 indexId, const std::set<ui32>& columnIds)
: TBase(indexId)
TIndexByColumns::TIndexByColumns(const ui32 indexId, const TString& indexName, const std::set<ui32>& columnIds)
: TBase(indexId, indexName)
, ColumnIds(columnIds)
{
Serializer = std::make_shared<NArrow::NSerialization::TFullDataSerializer>(arrow::ipc::IpcWriteOptions::Defaults());
Expand Down
Loading

0 comments on commit f1f2718

Please sign in to comment.