diff --git a/ydb/core/formats/arrow/process_columns.cpp b/ydb/core/formats/arrow/process_columns.cpp index 6f0de9accd2e..846542608304 100644 --- a/ydb/core/formats/arrow/process_columns.cpp +++ b/ydb/core/formats/arrow/process_columns.cpp @@ -9,9 +9,36 @@ namespace NKikimr::NArrow { namespace { -template + +template +class TColumnNameAccessor { +public: + static const std::string& GetFieldName(const T& val) { + return val; + } + static TString DebugString(const std::vector& items) { + return JoinSeq(",", items); + } +}; + +template <> +class TColumnNameAccessor> { +public: + static const std::string& GetFieldName(const std::shared_ptr& val) { + return val->name(); + } + static TString DebugString(const std::vector>& items) { + TStringBuilder sb; + for (auto&& i : items) { + sb << i->name() << ","; + } + return sb; + } +}; + +template std::shared_ptr ExtractColumnsValidateImpl( - const std::shared_ptr& srcBatch, const std::vector& columnNames) { + const std::shared_ptr& srcBatch, const std::vector& columnNames) { std::vector> fields; fields.reserve(columnNames.size()); std::vector::TColumn>> columns; @@ -19,7 +46,7 @@ std::shared_ptr ExtractColumnsValidateImpl( auto srcSchema = srcBatch->schema(); for (auto& name : columnNames) { - const int pos = srcSchema->GetFieldIndex(name); + const int pos = srcSchema->GetFieldIndex(TColumnNameAccessor::GetFieldName(name)); if (Y_LIKELY(pos > -1)) { fields.push_back(srcSchema->field(pos)); columns.push_back(srcBatch->column(pos)); @@ -70,16 +97,16 @@ TConclusion> AdaptColumnsImpl( return NAdapter::TDataBuilderPolicy::Build(std::make_shared(fields), std::move(columns), srcBatch->num_rows()); } -template +template std::shared_ptr ExtractImpl(const TColumnOperator::EExtractProblemsPolicy& policy, - const std::shared_ptr& incoming, const std::vector& columnNames) { + const std::shared_ptr& incoming, const std::vector& columnNames) { AFL_VERIFY(incoming); AFL_VERIFY(columnNames.size()); auto result = ExtractColumnsValidateImpl(incoming, columnNames); switch (policy) { case TColumnOperator::EExtractProblemsPolicy::Verify: AFL_VERIFY((ui32)result->num_columns() == columnNames.size())("schema", incoming->schema()->ToString())( - "required", JoinSeq(",", columnNames)); + "required", TColumnNameAccessor::DebugString(columnNames)); break; case TColumnOperator::EExtractProblemsPolicy::Null: if ((ui32)result->num_columns() != columnNames.size()) { @@ -123,6 +150,16 @@ std::shared_ptr TColumnOperator::Extract( return ExtractImpl(AbsentColumnPolicy, incoming, columnNames); } +std::shared_ptr TColumnOperator::Extract( + const std::shared_ptr& incoming, const std::vector>& columns) { + return ExtractImpl(AbsentColumnPolicy, incoming, columns); +} + +std::shared_ptr TColumnOperator::Extract( + const std::shared_ptr& incoming, const std::vector>& columns) { + return ExtractImpl(AbsentColumnPolicy, incoming, columns); +} + std::shared_ptr TColumnOperator::Extract( const std::shared_ptr& incoming, const std::vector& columnNames) { return ExtractImpl(AbsentColumnPolicy, incoming, columnNames); @@ -171,5 +208,47 @@ NKikimr::TConclusion> TColumnOperator::Reorder( const std::shared_ptr& incoming, const std::vector& columnNames) { return ReorderImpl(incoming, columnNames); } +namespace { +template +TConclusion BuildSequentialSubsetImpl( + const std::shared_ptr& srcBatch, const std::shared_ptr& dstSchema) { + AFL_VERIFY(srcBatch); + AFL_VERIFY(dstSchema); + if (dstSchema->num_fields() < srcBatch->schema()->num_fields()) { + AFL_ERROR(NKikimrServices::ARROW_HELPER)("event", "incorrect columns set: destination must been wider than source")( + "source", srcBatch->schema()->ToString())("destination", dstSchema->ToString()); + return TConclusionStatus::Fail("incorrect columns set: destination must been wider than source"); + } + std::set fieldIdx; + auto itSrc = srcBatch->schema()->fields().begin(); + auto itDst = dstSchema->fields().begin(); + while (itSrc != srcBatch->schema()->fields().end() && itDst != dstSchema->fields().end()) { + if ((*itSrc)->name() != (*itDst)->name()) { + ++itDst; + } else { + fieldIdx.emplace(itDst - dstSchema->fields().begin()); + if (!(*itDst)->Equals(*itSrc)) { + AFL_ERROR(NKikimrServices::ARROW_HELPER)("event", "cannot_use_incoming_batch")("reason", "invalid_column_type")( + "column_type", (*itDst)->ToString(true))("incoming_type", (*itSrc)->ToString(true)); + return TConclusionStatus::Fail("incompatible column types"); + } + + ++itDst; + ++itSrc; + } + } + if (itDst == dstSchema->fields().end() && itSrc != srcBatch->schema()->fields().end()) { + AFL_ERROR(NKikimrServices::ARROW_HELPER)("event", "incorrect columns order in source set")("source", srcBatch->schema()->ToString())( + "destination", dstSchema->ToString()); + return TConclusionStatus::Fail("incorrect columns order in source set"); + } + return TSchemaSubset(fieldIdx, dstSchema->num_fields()); +} +} // namespace + +TConclusion TColumnOperator::BuildSequentialSubset( + const std::shared_ptr& incoming, const std::shared_ptr& dstSchema) { + return BuildSequentialSubsetImpl(incoming, dstSchema); +} } // namespace NKikimr::NArrow diff --git a/ydb/core/formats/arrow/process_columns.h b/ydb/core/formats/arrow/process_columns.h index 2aaadb5c3112..ad57af9e6647 100644 --- a/ydb/core/formats/arrow/process_columns.h +++ b/ydb/core/formats/arrow/process_columns.h @@ -38,9 +38,16 @@ class TColumnOperator { std::shared_ptr Extract( const std::shared_ptr& incoming, const std::vector& columnNames); std::shared_ptr Extract(const std::shared_ptr& incoming, const std::vector& columnNames); + std::shared_ptr Extract( + const std::shared_ptr& incoming, const std::vector>& columns); + std::shared_ptr Extract( + const std::shared_ptr& incoming, const std::vector>& columns); std::shared_ptr Extract(const std::shared_ptr& incoming, const std::vector& columnNames); std::shared_ptr Extract(const std::shared_ptr& incoming, const std::vector& columnNames); + TConclusion BuildSequentialSubset( + const std::shared_ptr& incoming, const std::shared_ptr& dstSchema); + TConclusion> Adapt( const std::shared_ptr& incoming, const std::shared_ptr& dstSchema, TSchemaSubset* subset = nullptr); TConclusion> Adapt( diff --git a/ydb/core/formats/arrow/size_calcer.cpp b/ydb/core/formats/arrow/size_calcer.cpp index c718b7807410..1b14f0d66132 100644 --- a/ydb/core/formats/arrow/size_calcer.cpp +++ b/ydb/core/formats/arrow/size_calcer.cpp @@ -246,7 +246,7 @@ NKikimr::NArrow::TSerializedBatch TSerializedBatch::Build(std::shared_ptrschema()), NArrow::SerializeBatchNoCompression(batch), batch->num_rows(), + return TSerializedBatch(NArrow::SerializeSchema(*batch->schema()), NArrow::SerializeBatchNoCompression(batch), batch->num_rows(), NArrow::GetBatchDataSize(batch), specialKeys); } diff --git a/ydb/core/tx/columnshard/columnshard__write.cpp b/ydb/core/tx/columnshard/columnshard__write.cpp index 16dc43439613..58c521e31dae 100644 --- a/ydb/core/tx/columnshard/columnshard__write.cpp +++ b/ydb/core/tx/columnshard/columnshard__write.cpp @@ -19,7 +19,7 @@ namespace NKikimr::NColumnShard { using namespace NTabletFlatExecutor; -void TColumnShard::OverloadWriteFail(const EOverloadStatus overloadReason, const NEvWrite::TWriteData& writeData, const ui64 cookie, +void TColumnShard::OverloadWriteFail(const EOverloadStatus overloadReason, const NEvWrite::TWriteMeta& writeMeta, const ui64 writeSize, const ui64 cookie, std::unique_ptr&& event, const TActorContext& ctx) { Counters.GetTabletCounters()->IncCounter(COUNTER_WRITE_FAIL); switch (overloadReason) { @@ -27,28 +27,28 @@ void TColumnShard::OverloadWriteFail(const EOverloadStatus overloadReason, const Counters.OnWriteOverloadDisk(); break; case EOverloadStatus::InsertTable: - Counters.OnWriteOverloadInsertTable(writeData.GetSize()); + Counters.OnWriteOverloadInsertTable(writeSize); break; case EOverloadStatus::OverloadMetadata: - Counters.OnWriteOverloadMetadata(writeData.GetSize()); + Counters.OnWriteOverloadMetadata(writeSize); break; case EOverloadStatus::ShardTxInFly: - Counters.OnWriteOverloadShardTx(writeData.GetSize()); + Counters.OnWriteOverloadShardTx(writeSize); break; case EOverloadStatus::ShardWritesInFly: - Counters.OnWriteOverloadShardWrites(writeData.GetSize()); + Counters.OnWriteOverloadShardWrites(writeSize); break; case EOverloadStatus::ShardWritesSizeInFly: - Counters.OnWriteOverloadShardWritesSize(writeData.GetSize()); + Counters.OnWriteOverloadShardWritesSize(writeSize); break; case EOverloadStatus::None: Y_ABORT("invalid function usage"); } - AFL_TRACE(NKikimrServices::TX_COLUMNSHARD)("event", "write_overload")("size", writeData.GetSize())( - "path_id", writeData.GetWriteMeta().GetTableId())("reason", overloadReason); + AFL_TRACE(NKikimrServices::TX_COLUMNSHARD)("event", "write_overload")("size", writeSize)("path_id", writeMeta.GetTableId())( + "reason", overloadReason); - ctx.Send(writeData.GetWriteMeta().GetSource(), event.release(), 0, cookie); + ctx.Send(writeMeta.GetSource(), event.release(), 0, cookie); } TColumnShard::EOverloadStatus TColumnShard::CheckOverloaded(const ui64 tableId) const { @@ -240,7 +240,7 @@ void TColumnShard::Handle(TEvColumnShard::TEvWrite::TPtr& ev, const TActorContex if (overloadStatus != EOverloadStatus::None) { std::unique_ptr result = std::make_unique( TabletID(), writeData.GetWriteMeta(), NKikimrTxColumnShard::EResultStatus::OVERLOADED); - OverloadWriteFail(overloadStatus, writeData, cookie, std::move(result), ctx); + OverloadWriteFail(overloadStatus, writeData.GetWriteMeta(), writeData.GetSize(), cookie, std::move(result), ctx); Counters.GetCSCounters().OnFailedWriteResponse(EWriteFailReason::Overload); } else { if (ui64 writeId = (ui64)HasLongTxWrite(writeMeta.GetLongTxIdUnsafe(), writeMeta.GetWritePartId())) { @@ -538,10 +538,9 @@ void TColumnShard::Handle(NEvents::TDataEvents::TEvWrite::TPtr& ev, const TActor auto overloadStatus = CheckOverloaded(tableId); if (overloadStatus != EOverloadStatus::None) { - NEvWrite::TWriteData writeData(NEvWrite::TWriteMeta(0, tableId, source, {}), arrowData, nullptr, nullptr); std::unique_ptr result = NEvents::TDataEvents::TEvWriteResult::BuildError( TabletID(), 0, NKikimrDataEvents::TEvWriteResult::STATUS_OVERLOADED, "overload data error"); - OverloadWriteFail(overloadStatus, writeData, cookie, std::move(result), ctx); + OverloadWriteFail(overloadStatus, NEvWrite::TWriteMeta(0, tableId, source, {}), arrowData->GetSize(), cookie, std::move(result), ctx); return; } @@ -554,11 +553,11 @@ void TColumnShard::Handle(NEvents::TDataEvents::TEvWrite::TPtr& ev, const TActor ui64 lockId = 0; if (behaviour == EOperationBehaviour::NoTxWrite) { - static TAtomicCounter Counter = 0; - const ui64 shift = (ui64)1 << 47; - lockId = shift + Counter.Inc(); + lockId = BuildEphemeralTxId(); + } else if (behaviour == EOperationBehaviour::InTxWrite) { + lockId = record.GetTxId(); } else { - lockId = (behaviour == EOperationBehaviour::InTxWrite) ? record.GetTxId() : record.GetLockTxId(); + lockId = record.GetLockTxId(); } OperationsManager->RegisterLock(lockId, Generation()); diff --git a/ydb/core/tx/columnshard/columnshard_impl.h b/ydb/core/tx/columnshard/columnshard_impl.h index d6b2937e66f7..8e303896ce75 100644 --- a/ydb/core/tx/columnshard/columnshard_impl.h +++ b/ydb/core/tx/columnshard/columnshard_impl.h @@ -286,6 +286,12 @@ class TColumnShard void OnTieringModified(const std::optional pathId = {}); public: + ui64 BuildEphemeralTxId() { + static TAtomicCounter Counter = 0; + static constexpr ui64 shift = (ui64)1 << 47; + return shift | Counter.Inc(); + } + enum class EOverloadStatus { ShardTxInFly /* "shard_tx" */, ShardWritesInFly /* "shard_writes" */, @@ -320,7 +326,7 @@ class TColumnShard } private: - void OverloadWriteFail(const EOverloadStatus overloadReason, const NEvWrite::TWriteData& writeData, const ui64 cookie, std::unique_ptr&& event, const TActorContext& ctx); + void OverloadWriteFail(const EOverloadStatus overloadReason, const NEvWrite::TWriteMeta& writeMeta, const ui64 writeSize, const ui64 cookie, std::unique_ptr&& event, const TActorContext& ctx); EOverloadStatus CheckOverloaded(const ui64 tableId) const; protected: @@ -534,6 +540,9 @@ class TColumnShard public: ui64 TabletTxCounter = 0; + bool HasLongTxWrites(const TInsertWriteId insertWriteId) const { + return LongTxWrites.contains(insertWriteId); + } void EnqueueProgressTx(const TActorContext& ctx, const std::optional continueTxId); NOlap::TSnapshot GetLastTxSnapshot() const { return NOlap::TSnapshot(LastPlannedStep, LastPlannedTxId); diff --git a/ydb/core/tx/columnshard/engines/portions/constructor.cpp b/ydb/core/tx/columnshard/engines/portions/constructor.cpp index 56575c60d54b..39cd0fe983dc 100644 --- a/ydb/core/tx/columnshard/engines/portions/constructor.cpp +++ b/ydb/core/tx/columnshard/engines/portions/constructor.cpp @@ -67,22 +67,6 @@ void TPortionInfoConstructor::LoadIndex(const TIndexChunkLoadContext& loadContex const NKikimr::NOlap::TColumnRecord& TPortionInfoConstructor::AppendOneChunkColumn(TColumnRecord&& record) { Y_ABORT_UNLESS(record.ColumnId); - std::optional maxChunk; - for (auto&& i : Records) { - if (i.ColumnId == record.ColumnId) { - if (!maxChunk) { - maxChunk = i.Chunk; - } else { - Y_ABORT_UNLESS(*maxChunk + 1 == i.Chunk); - maxChunk = i.Chunk; - } - } - } - if (maxChunk) { - AFL_VERIFY(*maxChunk + 1 == record.Chunk)("max", *maxChunk)("record", record.Chunk); - } else { - AFL_VERIFY(0 == record.Chunk)("record", record.Chunk); - } Records.emplace_back(std::move(record)); return Records.back(); } diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/constructor/read_metadata.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/constructor/read_metadata.cpp index 6848167e54db..c24fbe0577a7 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/constructor/read_metadata.cpp +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/constructor/read_metadata.cpp @@ -35,8 +35,11 @@ TConclusionStatus TReadMetadata::Init( if (LockId) { for (auto&& i : CommittedBlobs) { if (auto writeId = i.GetWriteIdOptional()) { - auto op = owner->GetOperationsManager().GetOperationByInsertWriteIdVerified(*writeId); - AddWriteIdToCheck(*writeId, op->GetLockId()); + if (owner->HasLongTxWrites(*writeId)) { + } else { + auto op = owner->GetOperationsManager().GetOperationByInsertWriteIdVerified(*writeId); + AddWriteIdToCheck(*writeId, op->GetLockId()); + } } } } diff --git a/ydb/core/tx/columnshard/engines/scheme/index_info.cpp b/ydb/core/tx/columnshard/engines/scheme/index_info.cpp index 8e4031b81419..c6203f9142a2 100644 --- a/ydb/core/tx/columnshard/engines/scheme/index_info.cpp +++ b/ydb/core/tx/columnshard/engines/scheme/index_info.cpp @@ -32,22 +32,24 @@ ui32 TIndexInfo::GetColumnIdVerified(const std::string& name) const { } std::optional TIndexInfo::GetColumnIdOptional(const std::string& name) const { - const auto ni = ColumnNames.find(name); - - if (ni != ColumnNames.end()) { - return ni->second; + const auto pred = [](const TNameInfo& item, const std::string& value) { + return item.GetName() < value; + }; + auto it = std::lower_bound(ColumnNames.begin(), ColumnNames.end(), name, pred); + if (it != ColumnNames.end() && it->GetName() == name) { + return it->GetColumnId(); } return IIndexInfo::GetColumnIdOptional(name); } -TString TIndexInfo::GetColumnName(ui32 id, bool required) const { - const auto ci = ColumnFeatures.find(id); - - if (ci != ColumnFeatures.end()) { - return ci->second->GetColumnName(); +TString TIndexInfo::GetColumnName(const ui32 id, bool required) const { + const auto& f = GetColumnFeaturesOptional(id); + if (!f) { + AFL_VERIFY(!required); + return ""; + } else { + return f->GetColumnName(); } - - return IIndexInfo::GetColumnName(id, required); } const std::vector& TIndexInfo::GetColumnIds(const bool withSpecial) const { @@ -62,9 +64,7 @@ std::vector TIndexInfo::GetColumnNames(const std::vector& ids) co std::vector out; out.reserve(ids.size()); for (ui32 id : ids) { - const auto ci = ColumnFeatures.find(id); - Y_ABORT_UNLESS(ci != ColumnFeatures.end()); - out.push_back(ci->second->GetColumnName()); + out.push_back(GetColumnName(id)); } return out; } @@ -73,9 +73,7 @@ std::vector TIndexInfo::GetColumnSTLNames(const std::vector& std::vector out; out.reserve(ids.size()); for (ui32 id : ids) { - const auto ci = ColumnFeatures.find(id); - Y_ABORT_UNLESS(ci != ColumnFeatures.end()); - out.push_back(ci->second->GetColumnName()); + out.push_back(GetColumnName(id)); } return out; } @@ -123,33 +121,30 @@ void TIndexInfo::SetAllKeys(const std::shared_ptr& operators, } MinMaxIdxColumnsIds.insert(GetPKFirstColumnId()); if (!Schema) { - AFL_VERIFY(IdIntoIndex.empty()); AFL_VERIFY(!SchemaWithSpecials); InitializeCaches(operators, columns, nullptr); } } TColumnSaver TIndexInfo::GetColumnSaver(const ui32 columnId) const { - auto it = ColumnFeatures.find(columnId); - AFL_VERIFY(it != ColumnFeatures.end()); - return it->second->GetColumnSaver(); + return GetColumnFeaturesVerified(columnId).GetColumnSaver(); } std::shared_ptr TIndexInfo::GetColumnLoaderOptional(const ui32 columnId) const { - auto it = ColumnFeatures.find(columnId); - if (it == ColumnFeatures.end()) { + const auto& cFeatures = GetColumnFeaturesOptional(columnId); + if (!cFeatures) { return nullptr; } else { - return it->second->GetLoader(); + return cFeatures->GetLoader(); } } std::optional TIndexInfo::GetColumnIndexOptional(const ui32 id) const { - auto it = IdIntoIndex.find(id); - if (it == IdIntoIndex.end()) { + auto it = std::lower_bound(SchemaColumnIdsWithSpecials.begin(), SchemaColumnIdsWithSpecials.end(), id); + if (it == SchemaColumnIdsWithSpecials.end() || *it != id) { return std::nullopt; } else { - return it->second; + return it - SchemaColumnIdsWithSpecials.begin(); } } @@ -222,18 +217,19 @@ bool TIndexInfo::DeserializeFromProto(const NKikimrSchemeOp::TColumnTableSchema& THashMap columns; { TMemoryProfileGuard g("TIndexInfo::DeserializeFromProto::Columns"); + ColumnNames.clear(); for (const auto& col : schema.GetColumns()) { const ui32 id = col.GetId(); const TString& name = cache->GetStringCache(col.GetName()); const bool notNull = col.HasNotNull() ? col.GetNotNull() : false; auto typeInfoMod = NScheme::TypeInfoModFromProtoColumnType(col.GetTypeId(), col.HasTypeInfo() ? &col.GetTypeInfo() : nullptr); columns[id] = NTable::TColumn(name, id, typeInfoMod.TypeInfo, cache->GetStringCache(typeInfoMod.TypeMod), notNull); - ColumnNames[name] = id; + ColumnNames.emplace_back(name, id); } + std::sort(ColumnNames.begin(), ColumnNames.end()); } for (const auto& keyName : schema.GetKeyColumnNames()) { - Y_ABORT_UNLESS(ColumnNames.contains(keyName)); - PKColumnIds.push_back(ColumnNames[keyName]); + PKColumnIds.push_back(GetColumnIdVerified(keyName)); } InitializeCaches(operators, columns, cache, false); SetAllKeys(operators, columns); @@ -255,7 +251,7 @@ bool TIndexInfo::DeserializeFromProto(const NKikimrSchemeOp::TColumnTableSchema& AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("event", "cannot_parse_column_feature")("reason", fConclusion.GetErrorMessage()); return false; } - AFL_VERIFY(ColumnFeatures.emplace(col.GetId(), fConclusion.DetachResult()).second); + ColumnFeatures.emplace_back(fConclusion.DetachResult()); } for (auto&& cId : GetSystemColumnIds()) { THashMap> it; @@ -264,8 +260,12 @@ bool TIndexInfo::DeserializeFromProto(const NKikimrSchemeOp::TColumnTableSchema& return BuildDefaultColumnFeatures(cId, {}, operators); }; auto fConclusion = cache->GetOrCreateColumnFeatures(fingerprint, createPred); - AFL_VERIFY(ColumnFeatures.emplace(cId, fConclusion.DetachResult()).second); + ColumnFeatures.emplace_back(fConclusion.DetachResult()); } + const auto pred = [](const std::shared_ptr& l, const std::shared_ptr& r) { + return l->GetColumnId() < r->GetColumnId(); + }; + std::sort(ColumnFeatures.begin(), ColumnFeatures.end(), pred); } Version = schema.GetVersion(); @@ -345,22 +345,18 @@ void TIndexInfo::InitializeCaches(const std::shared_ptr& opera { TMemoryProfileGuard g("TIndexInfo::DeserializeFromProto::InitializeCaches::SchemaFields"); SchemaColumnIdsWithSpecials = IIndexInfo::AddSpecialFieldIds(SchemaColumnIds); - ui32 idx = 0; - for (auto&& i : SchemaColumnIdsWithSpecials) { - AFL_VERIFY(IdIntoIndex.emplace(i, idx++).second); - } } if (withColumnFeatures) { { TMemoryProfileGuard g("TIndexInfo::DeserializeFromProto::InitializeCaches::Columns"); for (auto&& c : columns) { - AFL_VERIFY(ColumnFeatures.emplace(c.first, BuildDefaultColumnFeatures(c.first, columns, operators)).second); + ColumnFeatures.emplace_back(BuildDefaultColumnFeatures(c.first, columns, operators)); } } { TMemoryProfileGuard g("TIndexInfo::DeserializeFromProto::InitializeCaches::SysColumns"); for (auto&& cId : GetSystemColumnIds()) { - AFL_VERIFY(ColumnFeatures.emplace(cId, BuildDefaultColumnFeatures(cId, columns, operators)).second); + ColumnFeatures.emplace_back(BuildDefaultColumnFeatures(cId, columns, operators)); } } } @@ -390,9 +386,6 @@ std::shared_ptr TIndexInfo::GetColumnExternalDefaultValueVerified } std::shared_ptr TIndexInfo::GetColumnExternalDefaultValueVerified(const ui32 columnId) const { - if (IIndexInfo::IsSpecialColumn(columnId)) { - return IIndexInfo::DefaultColumnValue(columnId); - } return GetColumnFeaturesVerified(columnId).GetDefaultValue().GetValue(); } @@ -462,4 +455,9 @@ std::shared_ptr TIndexInfo::BuildDefaultColumnF } } +std::shared_ptr TIndexInfo::GetColumnExternalDefaultValueByIndexVerified(const ui32 colIndex) const { + AFL_VERIFY(colIndex < ColumnFeatures.size())("index", colIndex)("size", ColumnFeatures.size()); + return ColumnFeatures[colIndex]->GetDefaultValue().GetValue(); +} + } // namespace NKikimr::NOlap diff --git a/ydb/core/tx/columnshard/engines/scheme/index_info.h b/ydb/core/tx/columnshard/engines/scheme/index_info.h index 9f8eab48e28f..0c04b4abd8d1 100644 --- a/ydb/core/tx/columnshard/engines/scheme/index_info.h +++ b/ydb/core/tx/columnshard/engines/scheme/index_info.h @@ -106,14 +106,30 @@ class TSchemaObjectsCache { struct TIndexInfo: public IIndexInfo { private: using TColumns = THashMap; - using TColumnNames = THashMap; - TColumnNames ColumnNames; + class TNameInfo { + private: + YDB_READONLY_DEF(TString, Name); + YDB_READONLY(ui32, ColumnId, 0); + + public: + TNameInfo(const TString& name, const ui32 columnId) + : Name(name) + , ColumnId(columnId) + { + + } + + bool operator<(const TNameInfo& item) const { + return Name < item.Name; + } + }; + + std::vector ColumnNames; std::vector PKColumnIds; std::vector PKColumns; - THashMap IdIntoIndex; - THashMap> ColumnFeatures; + std::vector> ColumnFeatures; THashMap Indexes; TIndexInfo(const TString& name); bool SchemeNeedActualization = false; @@ -128,31 +144,34 @@ struct TIndexInfo: public IIndexInfo { public: std::shared_ptr GetCompactionPlannerConstructor() const; - - bool IsNullableVerified(const std::string& fName) const { - return IsNullableVerified(GetColumnIdVerified(fName)); + bool IsNullableVerifiedByIndex(const ui32 colIndex) const { + AFL_VERIFY(colIndex < ColumnFeatures.size()); + return ColumnFeatures[colIndex]->GetIsNullable(); } bool IsNullableVerified(const ui32 colId) const { - auto it = ColumnFeatures.find(colId); - if (it == ColumnFeatures.end()) { - AFL_VERIFY(IIndexInfo::IsSpecialColumn(colId)); - return IIndexInfo::IsNullableVerified(colId); - } - return it->second->GetIsNullable(); + return GetColumnFeaturesVerified(colId).GetIsNullable(); } std::shared_ptr GetColumnExternalDefaultValueVerified(const std::string& colName) const; std::shared_ptr GetColumnExternalDefaultValueVerified(const ui32 colId) const; + std::shared_ptr GetColumnExternalDefaultValueByIndexVerified(const ui32 colIndex) const; + bool GetExternalGuaranteeExclusivePK() const { return ExternalGuaranteeExclusivePK; } const TColumnFeatures& GetColumnFeaturesVerified(const ui32 columnId) const { - auto it = ColumnFeatures.find(columnId); - AFL_VERIFY(it != ColumnFeatures.end()); - return *it->second; + return *ColumnFeatures[GetColumnIndexVerified(columnId)]; + } + + const std::shared_ptr& GetColumnFeaturesOptional(const ui32 columnId) const { + if (auto idx = GetColumnIndexOptional(columnId)) { + return ColumnFeatures[*idx]; + } else { + return Default>(); + } } NSplitter::TEntityGroups GetEntityGroupsByStorageId(const TString& specialTier, const IStoragesManager& storages) const; @@ -167,7 +186,7 @@ struct TIndexInfo: public IIndexInfo { result.emplace(portionTierName); } else { for (auto&& i : ColumnFeatures) { - result.emplace(i.second->GetOperator()->GetStorageId()); + result.emplace(i->GetOperator()->GetStorageId()); } } return result; @@ -187,9 +206,7 @@ struct TIndexInfo: public IIndexInfo { if (specialTier && specialTier != IStoragesManager::DefaultStorageId) { return specialTier; } else { - auto it = ColumnFeatures.find(columnId); - AFL_VERIFY(it != ColumnFeatures.end()); - return it->second->GetOperator()->GetStorageId(); + return GetColumnFeaturesVerified(columnId).GetOperator()->GetStorageId(); } } @@ -208,7 +225,7 @@ struct TIndexInfo: public IIndexInfo { << "name=" << Name << ";" << ")"; for (auto&& i : ColumnFeatures) { - sb << GetColumnName(i.first) << ":" << i.second->DebugString() << ";"; + sb << i->GetColumnName() << ":" << i->DebugString() << ";"; } return sb; } @@ -225,12 +242,12 @@ struct TIndexInfo: public IIndexInfo { const std::shared_ptr& operators, const TColumns& columns, const std::vector& pkNames) { TIndexInfo result = BuildDefault(); for (auto&& i : columns) { - result.ColumnNames[i.second.Name] = i.first; + result.ColumnNames.emplace_back(i.second.Name, i.first); } + std::sort(result.ColumnNames.begin(), result.ColumnNames.end()); for (auto&& i : pkNames) { - auto it = result.ColumnNames.find(i); - AFL_VERIFY(it != result.ColumnNames.end()); - result.PKColumnIds.emplace_back(it->second); + const ui32 columnId = result.GetColumnIdVerified(i); + result.PKColumnIds.emplace_back(columnId); } result.SetAllKeys(operators, columns); return result; @@ -238,18 +255,14 @@ struct TIndexInfo: public IIndexInfo { std::vector> ActualizeColumnData( const std::vector>& source, const TIndexInfo& sourceIndexInfo, const ui32 columnId) const { - auto itCurrent = ColumnFeatures.find(columnId); - auto itPred = sourceIndexInfo.ColumnFeatures.find(columnId); - AFL_VERIFY(itCurrent != ColumnFeatures.end()); - AFL_VERIFY(itPred != sourceIndexInfo.ColumnFeatures.end()); - return itCurrent->second->ActualizeColumnData(source, *itPred->second); + return GetColumnFeaturesVerified(columnId).ActualizeColumnData(source, sourceIndexInfo.GetColumnFeaturesVerified(columnId)); } static std::optional BuildFromProto(const NKikimrSchemeOp::TColumnTableSchema& schema, const std::shared_ptr& operators, const std::shared_ptr& cache); bool HasColumnId(const ui32 columnId) const { - return ColumnFeatures.contains(columnId); + return !!GetColumnIndexOptional(columnId); } bool HasColumnName(const std::string& columnName) const { @@ -344,7 +357,7 @@ struct TIndexInfo: public IIndexInfo { std::optional GetColumnIdOptional(const std::string& name) const; /// Returns a name of the column located by id. - TString GetColumnName(ui32 id, bool required = true) const; + TString GetColumnName(const ui32 id, bool required = true) const; /// Returns names of columns defined by the specific ids. std::vector GetColumnNames(const std::vector& ids) const; diff --git a/ydb/core/tx/columnshard/engines/scheme/versions/abstract_scheme.cpp b/ydb/core/tx/columnshard/engines/scheme/versions/abstract_scheme.cpp index 51e00503aeee..c7b8e5cb6a53 100644 --- a/ydb/core/tx/columnshard/engines/scheme/versions/abstract_scheme.cpp +++ b/ydb/core/tx/columnshard/engines/scheme/versions/abstract_scheme.cpp @@ -80,7 +80,7 @@ TConclusion> ISnapshotSchema::PrepareForModi const std::shared_ptr dstSchema = GetIndexInfo().ArrowSchema(); - auto batch = NArrow::TColumnOperator().SkipIfAbsent().Extract(incomingBatch, dstSchema->field_names()); + auto batch = NArrow::TColumnOperator().SkipIfAbsent().Extract(incomingBatch, dstSchema->fields()); for (auto&& i : batch->schema()->fields()) { const ui32 columnId = GetIndexInfo().GetColumnIdVerified(i->name()); @@ -120,16 +120,17 @@ TConclusion> ISnapshotSchema::PrepareForModi case NEvWrite::EModificationType::Upsert: { AFL_VERIFY(batch->num_columns() <= dstSchema->num_fields()); if (batch->num_columns() < dstSchema->num_fields()) { - for (auto&& f : dstSchema->fields()) { - if (GetIndexInfo().IsNullableVerified(f->name())) { + for (ui32 idx = 0; idx < (ui32)dstSchema->num_fields(); ++idx) { + if (GetIndexInfo().IsNullableVerifiedByIndex(idx)) { continue; } - if (batch->GetColumnByName(f->name())) { + if (GetIndexInfo().GetColumnExternalDefaultValueByIndexVerified(idx)) { continue; } - if (!GetIndexInfo().GetColumnExternalDefaultValueVerified(f->name())) { - return TConclusionStatus::Fail("empty field for non-default column: '" + f->name() + "'"); + if (batch->GetColumnByName(dstSchema->field(idx)->name())) { + continue; } + return TConclusionStatus::Fail("empty field for non-default column: '" + dstSchema->field(idx)->name() + "'"); } } return batch; @@ -206,8 +207,9 @@ std::vector> ISnapshotSchema::GetAbsentFields(cons TConclusionStatus ISnapshotSchema::CheckColumnsDefault(const std::vector>& fields) const { for (auto&& i : fields) { - auto defaultValue = GetExternalDefaultValueVerified(i->name()); - if (!defaultValue && !GetIndexInfo().IsNullableVerified(i->name())) { + const ui32 colId = GetColumnIdVerified(i->name()); + auto defaultValue = GetExternalDefaultValueVerified(colId); + if (!defaultValue && !GetIndexInfo().IsNullableVerified(colId)) { return TConclusionStatus::Fail("not nullable field with no default: " + i->name()); } } @@ -218,8 +220,9 @@ TConclusion> ISnapshotSchema::BuildDefaultBa const std::vector>& fields, const ui32 rowsCount, const bool force) const { std::vector> columns; for (auto&& i : fields) { - auto defaultValue = GetExternalDefaultValueVerified(i->name()); - if (!defaultValue && !GetIndexInfo().IsNullableVerified(i->name())) { + const ui32 columnId = GetColumnIdVerified(i->name()); + auto defaultValue = GetExternalDefaultValueVerified(columnId); + if (!defaultValue && !GetIndexInfo().IsNullableVerified(columnId)) { if (force) { defaultValue = NArrow::DefaultScalar(i->type()); } else { diff --git a/ydb/core/tx/columnshard/operations/batch_builder/builder.cpp b/ydb/core/tx/columnshard/operations/batch_builder/builder.cpp index d211c3a00761..7dabddaf606c 100644 --- a/ydb/core/tx/columnshard/operations/batch_builder/builder.cpp +++ b/ydb/core/tx/columnshard/operations/batch_builder/builder.cpp @@ -35,10 +35,10 @@ TConclusionStatus TBuildBatchesTask::DoExecute(const std::shared_ptr& /*t return TConclusionStatus::Fail("cannot prepare incoming batch: " + preparedConclusion.GetErrorMessage()); } auto batch = preparedConclusion.DetachResult(); - const std::vector> defaultFields = ActualSchema->GetAbsentFields(batch->schema()); std::shared_ptr merger; switch (WriteData.GetWriteMeta().GetModificationType()) { case NEvWrite::EModificationType::Upsert: { + const std::vector> defaultFields = ActualSchema->GetAbsentFields(batch->schema()); if (defaultFields.empty()) { std::shared_ptr task = std::make_shared(TabletId, ParentActorId, BufferActorId, std::move(WriteData), batch, ActualSchema); diff --git a/ydb/core/tx/columnshard/operations/slice_builder/builder.cpp b/ydb/core/tx/columnshard/operations/slice_builder/builder.cpp index 039ef8393f1f..f7c3ac8715ab 100644 --- a/ydb/core/tx/columnshard/operations/slice_builder/builder.cpp +++ b/ydb/core/tx/columnshard/operations/slice_builder/builder.cpp @@ -43,16 +43,17 @@ TConclusionStatus TBuildSlicesTask::DoExecute(const std::shared_ptr& /*ta return TConclusionStatus::Fail("no data in batch"); } const auto& indexSchema = ActualSchema->GetIndexInfo().ArrowSchema(); - NArrow::TSchemaSubset subset; - auto reorderConclusion = NArrow::TColumnOperator().Adapt(OriginalBatch, indexSchema, &subset); - if (reorderConclusion.IsFail()) { - AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("event", "unadaptable schemas")("index", indexSchema->ToString())("problem", reorderConclusion.GetErrorMessage()); - ReplyError("cannot reorder schema: " + reorderConclusion.GetErrorMessage(), + auto subsetConclusion = NArrow::TColumnOperator().BuildSequentialSubset(OriginalBatch, indexSchema); + if (subsetConclusion.IsFail()) { + AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("event", "unadaptable schemas")("index", indexSchema->ToString())( + "problem", subsetConclusion.GetErrorMessage()); + ReplyError( + "unadaptable schema: " + subsetConclusion.GetErrorMessage(), NColumnShard::TEvPrivate::TEvWriteBlobsResult::EErrorClass::Internal); - return TConclusionStatus::Fail("cannot reorder schema: " + reorderConclusion.GetErrorMessage()); - } else { - OriginalBatch = reorderConclusion.DetachResult(); + return TConclusionStatus::Fail("cannot reorder schema: " + subsetConclusion.GetErrorMessage()); } + NArrow::TSchemaSubset subset = subsetConclusion.DetachResult(); + if (OriginalBatch->num_columns() != indexSchema->num_fields()) { AFL_VERIFY(OriginalBatch->num_columns() < indexSchema->num_fields())("original", OriginalBatch->num_columns())( "index", indexSchema->num_fields()); diff --git a/ydb/core/tx/columnshard/splitter/ut/ya.make b/ydb/core/tx/columnshard/splitter/ut/ya.make index 82d16868f00f..c7a6a0be4c0c 100644 --- a/ydb/core/tx/columnshard/splitter/ut/ya.make +++ b/ydb/core/tx/columnshard/splitter/ut/ya.make @@ -18,6 +18,8 @@ PEERDIR( ydb/core/kqp/session_actor ydb/core/tx/tx_proxy ydb/core/tx/columnshard/engines/storage/chunks + ydb/core/tx/columnshard/engines/storage/indexes/max + ydb/core/tx/columnshard/engines/storage/indexes/count_min_sketch ydb/core/tx ydb/core/mind ydb/library/yql/minikql/comp_nodes/llvm14 diff --git a/ydb/core/tx/tx_proxy/rpc_long_tx.cpp b/ydb/core/tx/tx_proxy/rpc_long_tx.cpp index 449df1b6102e..557cf13c14cb 100644 --- a/ydb/core/tx/tx_proxy/rpc_long_tx.cpp +++ b/ydb/core/tx/tx_proxy/rpc_long_tx.cpp @@ -30,8 +30,7 @@ class TLongTxWriteBase: public TActorBootstrapped { public: TLongTxWriteBase(const TString& databaseName, const TString& path, const TString& token, const TLongTxId& longTxId, const TString& dedupId) - : TBase() - , DatabaseName(databaseName) + : DatabaseName(databaseName) , Path(path) , DedupId(dedupId) , LongTxId(longTxId) @@ -41,8 +40,8 @@ class TLongTxWriteBase: public TActorBootstrapped { } } - ~TLongTxWriteBase() { - MemoryInFlight.Sub(InFlightSize); + virtual ~TLongTxWriteBase() { + AFL_VERIFY(MemoryInFlight.Sub(InFlightSize) >= 0); } protected: @@ -66,6 +65,7 @@ class TLongTxWriteBase: public TActorBootstrapped { } auto accessor = ExtractDataAccessor(); + AFL_VERIFY(!InFlightSize); InFlightSize = accessor->GetSize(); const i64 sizeInFlight = MemoryInFlight.Add(InFlightSize); if (TLimits::MemoryInFlightWriting < (ui64)sizeInFlight && sizeInFlight != InFlightSize) {