Skip to content

Commit

Permalink
immediate writing for bulk upsert
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanmorozov333 committed Sep 19, 2024
1 parent 5321b21 commit 0c9cbd6
Show file tree
Hide file tree
Showing 8 changed files with 71 additions and 28 deletions.
1 change: 1 addition & 0 deletions ydb/core/protos/feature_flags.proto
Original file line number Diff line number Diff line change
Expand Up @@ -161,4 +161,5 @@ message TFeatureFlags {
optional bool EnableOlapCompression = 142 [default = false];
optional bool EnableExternalDataSourcesOnServerless = 143 [default = true];
optional bool EnableSparsedColumns = 144 [default = false];
optional bool EnableImmediateWritingOnBulkUpsert = 145 [default = false];
}
12 changes: 8 additions & 4 deletions ydb/core/tx/data_events/columnshard_splitter.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,22 +26,26 @@ class TColumnShardShardsSplitter : public IShardsSplitter {
, GranuleShardingVersion(granuleShardingVersion)
{}

ui64 GetBytes() const override {
virtual ui64 GetBytes() const override {
return Data.size();
}

ui32 GetRowsCount() const override {
virtual ui32 GetRowsCount() const override {
return RowsCount;
}

const TString& GetData() const override {
virtual const TString& GetData() const override {
return Data;
}

void Serialize(TEvWrite& evWrite) const override {
virtual void Serialize(TEvColumnShard::TEvWrite& evWrite) const override {
evWrite.SetArrowData(SchemaData, Data);
evWrite.Record.SetGranuleShardingVersion(GranuleShardingVersion);
}
virtual void Serialize(NEvents::TDataEvents::TEvWrite& evWrite) const override {
TPayloadWriter<NEvents::TDataEvents::TEvWrite> writer(evWrite);
writer.AddDataToPayload(Data);
}
};

private:
Expand Down
39 changes: 34 additions & 5 deletions ydb/core/tx/data_events/shard_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ namespace NKikimr::NEvWrite {
}

TShardWriter::TShardWriter(const ui64 shardId, const ui64 tableId, const TString& dedupId, const IShardInfo::TPtr& data,
const NWilson::TProfileSpan& parentSpan, TWritersController::TPtr externalController, const ui32 writePartIdx, const EModificationType mType)
const NWilson::TProfileSpan& parentSpan, TWritersController::TPtr externalController, const ui32 writePartIdx, const EModificationType mType, const bool immediateWrite)
: ShardId(shardId)
, WritePartIdx(writePartIdx)
, TableId(tableId)
Expand All @@ -50,17 +50,46 @@ namespace NKikimr::NEvWrite {
, LeaderPipeCache(MakePipePerNodeCacheID(false))
, ActorSpan(parentSpan.BuildChildrenSpan("ShardWriter"))
, ModificationType(mType)
, ImmediateWrite(immediateWrite)
{
}

void TShardWriter::Bootstrap() {
auto ev = MakeHolder<TEvWrite>(SelfId(), ExternalController->GetLongTxId(), TableId, DedupId, "", WritePartIdx, ModificationType);
DataForShard->Serialize(*ev);
SendToTablet(std::move(ev));
if (ImmediateWriting) {
auto ev = MakeHolder<NEvents::TDataEvents::TEvWrite>(NKikimrDataEvents::TEvWrite::MODE_IMMEDIATE);
DataForShard->Serialize(*ev);
SendToTablet(std::move(ev));
} else {
auto ev = MakeHolder<TEvColumnShard::TEvWrite>(SelfId(), ExternalController->GetLongTxId(), TableId, DedupId, "", WritePartIdx, ModificationType);
DataForShard->Serialize(*ev);
SendToTablet(std::move(ev));
}
Become(&TShardWriter::StateMain);
}

void TShardWriter::Handle(TEvWriteResult::TPtr& ev) {
void TShardWriter::Handle(NEvents::TDataEvents::TEvWrite::TPtr& ev) {
const auto* msg = ev->Get();
Y_ABORT_UNLESS(msg->Record.GetOrigin() == ShardId);

const auto ydbStatus = msg->GetYdbStatus();
if (ydbStatus == Ydb::StatusIds::OVERLOADED) {
if (RetryWriteRequest(true)) {
return;
}
}

auto gPassAway = PassAwayGuard();
if (ydbStatus != Ydb::StatusIds::SUCCESS) {
ExternalController->OnFail(ydbStatus,
TStringBuilder() << "Cannot write data into shard " << ShardId << " in longTx " <<
ExternalController->GetLongTxId().ToString());
return;
}

ExternalController->OnSuccess(ShardId, msg->Record.GetWriteId(), WritePartIdx);
}

void TShardWriter::Handle(TEvColumnShard::TEvWriteResult::TPtr& ev) {
const auto* msg = ev->Get();
Y_ABORT_UNLESS(msg->Record.GetOrigin() == ShardId);

Expand Down
7 changes: 5 additions & 2 deletions ydb/core/tx/data_events/shard_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ class TShardWriter: public NActors::TActorBootstrapped<TShardWriter> {
const TActorId LeaderPipeCache;
NWilson::TProfileSpan ActorSpan;
EModificationType ModificationType;
const bool ImmediateWrite = false;

static TDuration OverloadTimeout() {
return TDuration::MilliSeconds(OverloadedDelayMs);
Expand All @@ -169,16 +170,18 @@ class TShardWriter: public NActors::TActorBootstrapped<TShardWriter> {

STFUNC(StateMain) {
switch (ev->GetTypeRewrite()) {
hFunc(TEvWriteResult, Handle);
hFunc(TEvColumnShard::TEvWriteResult, Handle);
hFunc(TEvPipeCache::TEvDeliveryProblem, Handle);
hFunc(NEvents::TDataEvents::TEvWrite, Handle);
CFunc(TEvents::TSystem::Wakeup, HandleTimeout);
}
}

void Bootstrap();

void Handle(TEvWriteResult::TPtr& ev);
void Handle(TEvColumnShard::TEvWriteResult::TPtr& ev);
void Handle(TEvPipeCache::TEvDeliveryProblem::TPtr& ev);
void Handle(NEvents::TDataEvents::TEvWriteResult::TPtr& ev);
void HandleTimeout(const TActorContext& ctx);
private:
bool RetryWriteRequest(const bool delayed = true);
Expand Down
6 changes: 2 additions & 4 deletions ydb/core/tx/data_events/shards_splitter.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,6 @@

namespace NKikimr::NEvWrite {

using TEvWrite = TEvColumnShard::TEvWrite;
using TEvWriteResult = TEvColumnShard::TEvWriteResult;

class IShardsSplitter {
public:
using TPtr = std::shared_ptr<IShardsSplitter>;
Expand Down Expand Up @@ -43,7 +40,8 @@ class IShardsSplitter {
using TPtr = std::shared_ptr<IShardInfo>;
virtual ~IShardInfo() {}

virtual void Serialize(TEvWrite& evWrite) const = 0;
virtual void Serialize(TEvColumnShard::TEvWrite& evWrite) const = 0;
virtual void Serialize(NEvents::TDataEvents::TEvWrite& evWrite) const = 0;
virtual ui64 GetBytes() const = 0;
virtual ui32 GetRowsCount() const = 0;
virtual const TString& GetData() const = 0;
Expand Down
16 changes: 10 additions & 6 deletions ydb/core/tx/tx_proxy/rpc_long_tx.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -217,12 +217,14 @@ class TLongTxWriteInternal: public TLongTxWriteBase<TLongTxWriteInternal> {
public:
explicit TLongTxWriteInternal(const TActorId& replyTo, const TLongTxId& longTxId, const TString& dedupId, const TString& databaseName,
const TString& path, std::shared_ptr<const NSchemeCache::TSchemeCacheNavigate> navigateResult, std::shared_ptr<arrow::RecordBatch> batch,
std::shared_ptr<NYql::TIssues> issues)
std::shared_ptr<NYql::TIssues> issues, const bool noTxWrite)
: TBase(databaseName, path, TString(), longTxId, dedupId)
, ReplyTo(replyTo)
, NavigateResult(navigateResult)
, Batch(batch)
, Issues(issues) {
, Issues(issues)
, NoTxWrite(noTxWrite)
{
Y_ABORT_UNLESS(Issues);
DataAccessor = std::make_unique<TParsedBatchData>(Batch);
}
Expand All @@ -246,12 +248,12 @@ class TLongTxWriteInternal: public TLongTxWriteBase<TLongTxWriteInternal> {
if (!message.empty()) {
Issues->AddIssue(NYql::TIssue(message));
}
this->Send(ReplyTo, new TEvents::TEvCompleted(0, status));
this->Send(ReplyTo, new TEvents::TEvCompleted(0, status, NoTxWrite));
PassAway();
}

void ReplySuccess() override {
this->Send(ReplyTo, new TEvents::TEvCompleted(0, Ydb::StatusIds::SUCCESS));
this->Send(ReplyTo, new TEvents::TEvCompleted(0, Ydb::StatusIds::SUCCESS, NoTxWrite));
PassAway();
}

Expand All @@ -260,13 +262,15 @@ class TLongTxWriteInternal: public TLongTxWriteBase<TLongTxWriteInternal> {
std::shared_ptr<const NSchemeCache::TSchemeCacheNavigate> NavigateResult;
std::shared_ptr<arrow::RecordBatch> Batch;
std::shared_ptr<NYql::TIssues> Issues;
const bool NoTxWrite = false;
};

TActorId DoLongTxWriteSameMailbox(const TActorContext& ctx, const TActorId& replyTo, const NLongTxService::TLongTxId& longTxId,
const TString& dedupId, const TString& databaseName, const TString& path,
std::shared_ptr<const NSchemeCache::TSchemeCacheNavigate> navigateResult, std::shared_ptr<arrow::RecordBatch> batch,
std::shared_ptr<NYql::TIssues> issues) {
return ctx.RegisterWithSameMailbox(new TLongTxWriteInternal(replyTo, longTxId, dedupId, databaseName, path, navigateResult, batch, issues));
std::shared_ptr<NYql::TIssues> issues, const bool noTxWrite) {
return ctx.RegisterWithSameMailbox(
new TLongTxWriteInternal(replyTo, longTxId, dedupId, databaseName, path, navigateResult, batch, issues, noTxWrite));
}

//
Expand Down
14 changes: 8 additions & 6 deletions ydb/core/tx/tx_proxy/upload_rows_common_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -181,8 +181,8 @@ namespace NTxProxy {
TActorId DoLongTxWriteSameMailbox(const TActorContext& ctx, const TActorId& replyTo,
const NLongTxService::TLongTxId& longTxId, const TString& dedupId,
const TString& databaseName, const TString& path,
std::shared_ptr<const NSchemeCache::TSchemeCacheNavigate> navigateResult,
std::shared_ptr<arrow::RecordBatch> batch, std::shared_ptr<NYql::TIssues> issues);
std::shared_ptr<const NSchemeCache::TSchemeCacheNavigate> navigateResult, std::shared_ptr<arrow::RecordBatch> batch,
std::shared_ptr<NYql::TIssues> issues, const bool noTxWrite);

template <NKikimrServices::TActivity::EType DerivedActivityType>
class TUploadRowsBase : public TActorBootstrapped<TUploadRowsBase<DerivedActivityType>> {
Expand Down Expand Up @@ -921,8 +921,8 @@ class TUploadRowsBase : public TActorBootstrapped<TUploadRowsBase<DerivedActivit
TBase::Become(&TThis::StateWaitWriteBatchResult);
ui32 batchNo = 0;
TString dedupId = ToString(batchNo);
DoLongTxWriteSameMailbox(ctx, ctx.SelfID, LongTxId, dedupId,
GetDatabase(), GetTable(), ResolveNamesResult, Batch, Issues);
DoLongTxWriteSameMailbox(ctx, ctx.SelfID, LongTxId, dedupId, GetDatabase(), GetTable(), ResolveNamesResult, Batch, Issues,
AppData(ctx)->FeatureFlags.GetEnableImmediateWritingOnBulkUpsert());
}

void RollbackLongTx(const TActorContext& ctx) {
Expand All @@ -949,9 +949,11 @@ class TUploadRowsBase : public TActorBootstrapped<TUploadRowsBase<DerivedActivit
RaiseIssue(issue);
}
return ReplyWithResult(status, ctx);
} else if (ev->Get()->NoTxWritten) {
return ReplyWithResult(status, ctx);
} else {
CommitLongTx(ctx);
}

CommitLongTx(ctx);
}

void CommitLongTx(const TActorContext& ctx) {
Expand Down
4 changes: 3 additions & 1 deletion ydb/library/actors/core/events.h
Original file line number Diff line number Diff line change
Expand Up @@ -170,9 +170,11 @@ namespace NActors {
struct TEvCompleted: public TEventLocal<TEvCompleted, TSystem::Completed> {
const ui32 Id;
const ui32 Status;
TEvCompleted(ui32 id = 0, ui32 status = 0)
bool NoTxWritten = false;
TEvCompleted(ui32 id = 0, ui32 status = 0, const bool noTxWritten = false)
: Id(id)
, Status(status)
, NoTxWritten(noTxWritten)
{
}
};
Expand Down

0 comments on commit 0c9cbd6

Please sign in to comment.