diff --git a/ydb/core/protos/data_events.proto b/ydb/core/protos/data_events.proto index 0887a11b969c..804665fd1211 100644 --- a/ydb/core/protos/data_events.proto +++ b/ydb/core/protos/data_events.proto @@ -95,6 +95,7 @@ message TEvWrite { repeated uint32 ColumnIds = 3 [packed = true]; optional uint64 PayloadIndex = 4; optional EDataFormat PayloadFormat = 5; + optional string PayloadSchema = 6; } // Transaction operations diff --git a/ydb/core/tx/columnshard/blobs_action/transaction/tx_write.cpp b/ydb/core/tx/columnshard/blobs_action/transaction/tx_write.cpp index 96a5cf794190..4aaefbb96902 100644 --- a/ydb/core/tx/columnshard/blobs_action/transaction/tx_write.cpp +++ b/ydb/core/tx/columnshard/blobs_action/transaction/tx_write.cpp @@ -163,6 +163,7 @@ void TTxWrite::Complete(const TActorContext& ctx) { Self->Counters.GetCSCounters().OnSuccessWriteResponse(); } Self->Counters.GetTabletCounters()->IncCounter(COUNTER_IMMEDIATE_TX_COMPLETED); + Self->SetupIndexation(); } } // namespace NKikimr::NColumnShard diff --git a/ydb/core/tx/columnshard/operations/write_data.cpp b/ydb/core/tx/columnshard/operations/write_data.cpp index 56a0ad5e16cb..0f7440aaf5e4 100644 --- a/ydb/core/tx/columnshard/operations/write_data.cpp +++ b/ydb/core/tx/columnshard/operations/write_data.cpp @@ -12,26 +12,43 @@ bool TArrowData::Parse(const NKikimrDataEvents::TEvWrite_TOperation& proto, cons } IncomingData = payload.GetDataFromPayload(proto.GetPayloadIndex()); if (proto.HasType()) { - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "invalid_modification_type")("proto", proto.DebugString()); auto type = TEnumOperator::DeserializeFromProto(proto.GetType()); if (!type) { + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "invalid_modification_type")("proto", proto.DebugString()); return false; } ModificationType = *type; } - std::vector columns; - for (auto&& columnId : proto.GetColumnIds()) { - columns.emplace_back(columnId); + if (proto.HasPayloadSchema()) { + PayloadSchema = NArrow::DeserializeSchema(proto.GetPayloadSchema()); + } else { + std::vector columns; + for (auto&& columnId : proto.GetColumnIds()) { + columns.emplace_back(columnId); + } + if (columns.empty()) { + BatchSchema = IndexSchema; + } else { + BatchSchema = std::make_shared(IndexSchema, columns); + } + if (BatchSchema->GetColumnsCount() != columns.size()) { + return false; + } } - BatchSchema = std::make_shared(IndexSchema, columns); OriginalDataSize = IncomingData.size(); - return BatchSchema->GetColumnsCount() == columns.size() && !IncomingData.empty(); + return !!IncomingData; } TConclusion> TArrowData::ExtractBatch() { Y_ABORT_UNLESS(!!IncomingData); - auto result = NArrow::DeserializeBatch(IncomingData, std::make_shared(BatchSchema->GetSchema()->fields())); + std::shared_ptr result; + if (PayloadSchema) { + result = NArrow::DeserializeBatch(IncomingData, PayloadSchema); + } else { + result = NArrow::DeserializeBatch(IncomingData, std::make_shared(BatchSchema->GetSchema()->fields())); + } + IncomingData = ""; return result; } diff --git a/ydb/core/tx/columnshard/operations/write_data.h b/ydb/core/tx/columnshard/operations/write_data.h index f1c95285abd8..2b0599c4e92f 100644 --- a/ydb/core/tx/columnshard/operations/write_data.h +++ b/ydb/core/tx/columnshard/operations/write_data.h @@ -30,6 +30,7 @@ class TArrowData : public NEvWrite::IDataContainer { private: NOlap::ISnapshotSchema::TPtr IndexSchema; NOlap::ISnapshotSchema::TPtr BatchSchema; + std::shared_ptr PayloadSchema; TString IncomingData; NEvWrite::EModificationType ModificationType = NEvWrite::EModificationType::Upsert; }; diff --git a/ydb/core/tx/data_events/columnshard_splitter.h b/ydb/core/tx/data_events/columnshard_splitter.h index 36adbe96f528..2d0feda6b3a7 100644 --- a/ydb/core/tx/data_events/columnshard_splitter.h +++ b/ydb/core/tx/data_events/columnshard_splitter.h @@ -1,6 +1,8 @@ #pragma once +#include "events.h" #include "shards_splitter.h" +#include "payload_helper.h" #include #include @@ -8,11 +10,10 @@ #include #include - namespace NKikimr::NEvWrite { -class TColumnShardShardsSplitter : public IShardsSplitter { - class TShardInfo : public IShardInfo { +class TColumnShardShardsSplitter: public IShardsSplitter { + class TShardInfo: public IShardInfo { private: const TString SchemaData; const TString Data; @@ -23,8 +24,8 @@ class TColumnShardShardsSplitter : public IShardsSplitter { : SchemaData(schemaData) , Data(data) , RowsCount(rowsCount) - , GranuleShardingVersion(granuleShardingVersion) - {} + , GranuleShardingVersion(granuleShardingVersion) { + } virtual ui64 GetBytes() const override { return Data.size(); @@ -42,9 +43,18 @@ class TColumnShardShardsSplitter : public IShardsSplitter { evWrite.SetArrowData(SchemaData, Data); evWrite.Record.SetGranuleShardingVersion(GranuleShardingVersion); } - virtual void Serialize(NEvents::TDataEvents::TEvWrite& evWrite) const override { + virtual void Serialize(NEvents::TDataEvents::TEvWrite& evWrite, const ui64 tableId, const ui64 schemaVersion) const override { TPayloadWriter writer(evWrite); - writer.AddDataToPayload(Data); + TString data = Data; + writer.AddDataToPayload(std::move(data)); + + auto* operation = evWrite.Record.AddOperations(); + operation->SetPayloadSchema(SchemaData); + operation->SetType(NKikimrDataEvents::TEvWrite::TOperation::OPERATION_REPLACE); + operation->SetPayloadFormat(NKikimrDataEvents::FORMAT_ARROW); + operation->SetPayloadIndex(0); + operation->MutableTableId()->SetTableId(tableId); + operation->MutableTableId()->SetSchemaVersion(schemaVersion); } }; diff --git a/ydb/core/tx/data_events/shard_writer.cpp b/ydb/core/tx/data_events/shard_writer.cpp index 15c76356d3ed..2af7667e2ed7 100644 --- a/ydb/core/tx/data_events/shard_writer.cpp +++ b/ydb/core/tx/data_events/shard_writer.cpp @@ -7,9 +7,10 @@ namespace NKikimr::NEvWrite { - TWritersController::TWritersController(const ui32 writesCount, const NActors::TActorIdentity& longTxActorId, const NLongTxService::TLongTxId& longTxId) + TWritersController::TWritersController(const ui32 writesCount, const NActors::TActorIdentity& longTxActorId, const NLongTxService::TLongTxId& longTxId, const bool immediateWrite) : WritesCount(writesCount) , LongTxActorId(longTxActorId) + , ImmediateWrite(immediateWrite) , LongTxId(longTxId) { Y_ABORT_UNLESS(writesCount); @@ -39,11 +40,12 @@ namespace NKikimr::NEvWrite { } } - TShardWriter::TShardWriter(const ui64 shardId, const ui64 tableId, const TString& dedupId, const IShardInfo::TPtr& data, + TShardWriter::TShardWriter(const ui64 shardId, const ui64 tableId, const ui64 schemaVersion, const TString& dedupId, const IShardInfo::TPtr& data, const NWilson::TProfileSpan& parentSpan, TWritersController::TPtr externalController, const ui32 writePartIdx, const EModificationType mType, const bool immediateWrite) : ShardId(shardId) , WritePartIdx(writePartIdx) , TableId(tableId) + , SchemaVersion(schemaVersion) , DedupId(dedupId) , DataForShard(data) , ExternalController(externalController) @@ -54,39 +56,44 @@ namespace NKikimr::NEvWrite { { } - void TShardWriter::Bootstrap() { - if (ImmediateWriting) { + void TShardWriter::SendWriteRequest() { + if (ImmediateWrite) { auto ev = MakeHolder(NKikimrDataEvents::TEvWrite::MODE_IMMEDIATE); - DataForShard->Serialize(*ev); + DataForShard->Serialize(*ev, TableId, SchemaVersion); SendToTablet(std::move(ev)); } else { auto ev = MakeHolder(SelfId(), ExternalController->GetLongTxId(), TableId, DedupId, "", WritePartIdx, ModificationType); DataForShard->Serialize(*ev); SendToTablet(std::move(ev)); } + } + + void TShardWriter::Bootstrap() { + SendWriteRequest(); Become(&TShardWriter::StateMain); + Schedule(TDuration::Seconds(10), new TEvents::TEvWakeup()); } - void TShardWriter::Handle(NEvents::TDataEvents::TEvWrite::TPtr& ev) { + void TShardWriter::Handle(NEvents::TDataEvents::TEvWriteResult::TPtr& ev) { const auto* msg = ev->Get(); Y_ABORT_UNLESS(msg->Record.GetOrigin() == ShardId); - const auto ydbStatus = msg->GetYdbStatus(); - if (ydbStatus == Ydb::StatusIds::OVERLOADED) { + const auto ydbStatus = msg->GetStatus(); + if (ydbStatus == NKikimrDataEvents::TEvWriteResult::STATUS_OVERLOADED) { if (RetryWriteRequest(true)) { return; } } auto gPassAway = PassAwayGuard(); - if (ydbStatus != Ydb::StatusIds::SUCCESS) { - ExternalController->OnFail(ydbStatus, + if (ydbStatus != NKikimrDataEvents::TEvWriteResult::STATUS_COMPLETED) { + ExternalController->OnFail(Ydb::StatusIds::INTERNAL_ERROR, TStringBuilder() << "Cannot write data into shard " << ShardId << " in longTx " << ExternalController->GetLongTxId().ToString()); return; } - ExternalController->OnSuccess(ShardId, msg->Record.GetWriteId(), WritePartIdx); + ExternalController->OnSuccess(ShardId, 0, WritePartIdx); } void TShardWriter::Handle(TEvColumnShard::TEvWriteResult::TPtr& ev) { @@ -132,6 +139,7 @@ namespace NKikimr::NEvWrite { void TShardWriter::HandleTimeout(const TActorContext& /*ctx*/) { RetryWriteRequest(false); + Schedule(TDuration::Seconds(10), new TEvents::TEvWakeup()); } bool TShardWriter::RetryWriteRequest(const bool delayed) { @@ -142,9 +150,7 @@ namespace NKikimr::NEvWrite { Schedule(OverloadTimeout(), new TEvents::TEvWakeup()); } else { ++NumRetries; - auto ev = MakeHolder(SelfId(), ExternalController->GetLongTxId(), TableId, DedupId, "", WritePartIdx, ModificationType); - DataForShard->Serialize(*ev); - SendToTablet(std::move(ev)); + SendWriteRequest(); } return true; } diff --git a/ydb/core/tx/data_events/shard_writer.h b/ydb/core/tx/data_events/shard_writer.h index ef121acd22a5..323bffa5056f 100644 --- a/ydb/core/tx/data_events/shard_writer.h +++ b/ydb/core/tx/data_events/shard_writer.h @@ -1,7 +1,8 @@ #pragma once -#include "shards_splitter.h" #include "common/modification_type.h" +#include "events.h" +#include "shards_splitter.h" #include #include @@ -89,6 +90,7 @@ class TWritersController { NActors::TActorIdentity LongTxActorId; std::vector WriteIds; const TMonotonic StartInstant = TMonotonic::Now(); + const bool ImmediateWrite = false; YDB_READONLY_DEF(NLongTxService::TLongTxId, LongTxId); YDB_READONLY(std::shared_ptr, Counters, std::make_shared()); void SendReply() { @@ -96,6 +98,9 @@ class TWritersController { Counters->OnFailedFullReply(TMonotonic::Now() - StartInstant); AFL_VERIFY(Code); LongTxActorId.Send(LongTxActorId, new TEvPrivate::TEvShardsWriteResult(*Code, Issues)); + } else if (ImmediateWrite) { + Counters->OnSucceedFullReply(TMonotonic::Now() - StartInstant); + LongTxActorId.Send(LongTxActorId, new TEvPrivate::TEvShardsWriteResult(Ydb::StatusIds::SUCCESS)); } else { Counters->OnSucceedFullReply(TMonotonic::Now() - StartInstant); auto req = MakeHolder(LongTxId); @@ -129,7 +134,7 @@ class TWritersController { }; - TWritersController(const ui32 writesCount, const NActors::TActorIdentity& longTxActorId, const NLongTxService::TLongTxId& longTxId); + TWritersController(const ui32 writesCount, const NActors::TActorIdentity& longTxActorId, const NLongTxService::TLongTxId& longTxId, const bool immediateWrite); void OnSuccess(const ui64 shardId, const ui64 writeId, const ui32 writePartId); void OnFail(const Ydb::StatusIds::StatusCode code, const TString& message); }; @@ -144,6 +149,7 @@ class TShardWriter: public NActors::TActorBootstrapped { const ui64 ShardId; const ui64 WritePartIdx; const ui64 TableId; + const ui64 SchemaVersion; const TString DedupId; const IShardInfo::TPtr DataForShard; ui32 NumRetries = 0; @@ -153,6 +159,7 @@ class TShardWriter: public NActors::TActorBootstrapped { EModificationType ModificationType; const bool ImmediateWrite = false; + void SendWriteRequest(); static TDuration OverloadTimeout() { return TDuration::MilliSeconds(OverloadedDelayMs); } @@ -165,14 +172,15 @@ class TShardWriter: public NActors::TActorBootstrapped { TBase::PassAway(); } public: - TShardWriter(const ui64 shardId, const ui64 tableId, const TString& dedupId, const IShardInfo::TPtr& data, - const NWilson::TProfileSpan& parentSpan, TWritersController::TPtr externalController, const ui32 writePartIdx, const EModificationType mType); + TShardWriter(const ui64 shardId, const ui64 tableId, const ui64 schemaVersion, const TString& dedupId, const IShardInfo::TPtr& data, + const NWilson::TProfileSpan& parentSpan, TWritersController::TPtr externalController, const ui32 writePartIdx, + const EModificationType mType, const bool immediateWrite); STFUNC(StateMain) { switch (ev->GetTypeRewrite()) { hFunc(TEvColumnShard::TEvWriteResult, Handle); hFunc(TEvPipeCache::TEvDeliveryProblem, Handle); - hFunc(NEvents::TDataEvents::TEvWrite, Handle); + hFunc(NEvents::TDataEvents::TEvWriteResult, Handle); CFunc(TEvents::TSystem::Wakeup, HandleTimeout); } } diff --git a/ydb/core/tx/data_events/shards_splitter.h b/ydb/core/tx/data_events/shards_splitter.h index 5a66b15e5a17..bde0656ccb91 100644 --- a/ydb/core/tx/data_events/shards_splitter.h +++ b/ydb/core/tx/data_events/shards_splitter.h @@ -41,7 +41,7 @@ class IShardsSplitter { virtual ~IShardInfo() {} virtual void Serialize(TEvColumnShard::TEvWrite& evWrite) const = 0; - virtual void Serialize(NEvents::TDataEvents::TEvWrite& evWrite) const = 0; + virtual void Serialize(NEvents::TDataEvents::TEvWrite& evWrite, const ui64 tableId, const ui64 schemaVersion) const = 0; virtual ui64 GetBytes() const = 0; virtual ui32 GetRowsCount() const = 0; virtual const TString& GetData() const = 0; @@ -66,6 +66,10 @@ class IShardsSplitter { TYdbConclusionStatus SplitData(const NSchemeCache::TSchemeCacheNavigate::TEntry& schemeEntry, const IEvWriteDataAccessor& data) { TableId = schemeEntry.TableId.PathId.LocalPathId; + AFL_VERIFY(schemeEntry.ColumnTableInfo); + AFL_VERIFY(schemeEntry.ColumnTableInfo->Description.HasSchema()); + SchemaVersion = schemeEntry.ColumnTableInfo->Description.GetSchema().GetVersion(); + AFL_VERIFY(SchemaVersion); return DoSplitData(schemeEntry, data); } @@ -73,6 +77,10 @@ class IShardsSplitter { return TableId; } + ui64 GetSchemaVersion() const { + return SchemaVersion; + } + const TFullSplitData& GetSplitData() const { Y_ABORT_UNLESS(FullSplitData); return *FullSplitData; @@ -84,6 +92,7 @@ class IShardsSplitter { virtual TYdbConclusionStatus DoSplitData(const NSchemeCache::TSchemeCacheNavigate::TEntry& schemeEntry, const IEvWriteDataAccessor& data) = 0; ui64 TableId = 0; + ui64 SchemaVersion = 0; protected: std::optional FullSplitData; }; diff --git a/ydb/core/tx/tx_proxy/rpc_long_tx.cpp b/ydb/core/tx/tx_proxy/rpc_long_tx.cpp index 6f027d5aad15..74e71c64397b 100644 --- a/ydb/core/tx/tx_proxy/rpc_long_tx.cpp +++ b/ydb/core/tx/tx_proxy/rpc_long_tx.cpp @@ -27,14 +27,18 @@ class TLongTxWriteBase: public TActorBootstrapped { protected: using TThis = typename TBase::TThis; + const bool NoTxWrite = false; public: - TLongTxWriteBase(const TString& databaseName, const TString& path, const TString& token, const TLongTxId& longTxId, const TString& dedupId) - : DatabaseName(databaseName) + TLongTxWriteBase(const TString& databaseName, const TString& path, const TString& token, const TLongTxId& longTxId, const TString& dedupId, + const bool noTxWrite) + : NoTxWrite(noTxWrite) + , DatabaseName(databaseName) , Path(path) , DedupId(dedupId) , LongTxId(longTxId) - , ActorSpan(0, NWilson::TTraceId::NewTraceId(0, Max()), "TLongTxWriteBase") { + , ActorSpan(0, NWilson::TTraceId::NewTraceId(0, Max()), "TLongTxWriteBase") + { if (token) { UserToken.emplace(token); } @@ -91,7 +95,7 @@ class TLongTxWriteBase: public TActorBootstrapped { accessor.reset(); const auto& splittedData = shardsSplitter->GetSplitData(); - InternalController = std::make_shared(splittedData.GetShardRequestsCount(), this->SelfId(), LongTxId); + InternalController = std::make_shared(splittedData.GetShardRequestsCount(), this->SelfId(), LongTxId, NoTxWrite); ui32 sumBytes = 0; ui32 rowsCount = 0; ui32 writeIdx = 0; @@ -100,8 +104,9 @@ class TLongTxWriteBase: public TActorBootstrapped { InternalController->GetCounters()->OnRequest(shardInfo->GetRowsCount(), shardInfo->GetBytes()); sumBytes += shardInfo->GetBytes(); rowsCount += shardInfo->GetRowsCount(); - this->Register(new NEvWrite::TShardWriter(shard, shardsSplitter->GetTableId(), DedupId, shardInfo, ActorSpan, InternalController, - ++writeIdx, NEvWrite::EModificationType::Replace)); + this->Register(new NEvWrite::TShardWriter(shard, shardsSplitter->GetTableId(), shardsSplitter->GetSchemaVersion(), DedupId, shardInfo, + ActorSpan, InternalController, + ++writeIdx, NEvWrite::EModificationType::Replace, NoTxWrite)); } } pSpan.Attribute("affected_shards_count", (long)splittedData.GetShardsInfo().size()); @@ -125,11 +130,19 @@ class TLongTxWriteBase: public TActorBootstrapped { void Handle(NEvWrite::TWritersController::TEvPrivate::TEvShardsWriteResult::TPtr& ev) { NWilson::TProfileSpan pSpan(0, ActorSpan.GetTraceId(), "ShardsWriteResult"); const auto* msg = ev->Get(); - Y_ABORT_UNLESS(msg->Status != Ydb::StatusIds::SUCCESS); - for (auto& issue : msg->Issues) { - RaiseIssue(issue); + if (msg->Status == Ydb::StatusIds::SUCCESS) { + if (IndexReady) { + ReplySuccess(); + } else { + ColumnShardReady = true; + } + } else { + Y_ABORT_UNLESS(msg->Status != Ydb::StatusIds::SUCCESS); + for (auto& issue : msg->Issues) { + RaiseIssue(issue); + } + ReplyError(msg->Status); } - ReplyError(msg->Status); } void Handle(TEvLongTxService::TEvAttachColumnShardWritesResult::TPtr& ev) { @@ -218,12 +231,11 @@ class TLongTxWriteInternal: public TLongTxWriteBase { explicit TLongTxWriteInternal(const TActorId& replyTo, const TLongTxId& longTxId, const TString& dedupId, const TString& databaseName, const TString& path, std::shared_ptr navigateResult, std::shared_ptr batch, std::shared_ptr issues, const bool noTxWrite) - : TBase(databaseName, path, TString(), longTxId, dedupId) + : TBase(databaseName, path, TString(), longTxId, dedupId, noTxWrite) , ReplyTo(replyTo) , NavigateResult(navigateResult) , Batch(batch) , Issues(issues) - , NoTxWrite(noTxWrite) { Y_ABORT_UNLESS(Issues); DataAccessor = std::make_unique(Batch); @@ -262,7 +274,6 @@ class TLongTxWriteInternal: public TLongTxWriteBase { std::shared_ptr NavigateResult; std::shared_ptr Batch; std::shared_ptr Issues; - const bool NoTxWrite = false; }; TActorId DoLongTxWriteSameMailbox(const TActorContext& ctx, const TActorId& replyTo, const NLongTxService::TLongTxId& longTxId,