Skip to content

Commit

Permalink
fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanmorozov333 committed Sep 19, 2024
1 parent 0c9cbd6 commit 743ec1a
Show file tree
Hide file tree
Showing 9 changed files with 111 additions and 47 deletions.
1 change: 1 addition & 0 deletions ydb/core/protos/data_events.proto
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ message TEvWrite {
repeated uint32 ColumnIds = 3 [packed = true];
optional uint64 PayloadIndex = 4;
optional EDataFormat PayloadFormat = 5;
optional string PayloadSchema = 6;
}

// Transaction operations
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ void TTxWrite::Complete(const TActorContext& ctx) {
Self->Counters.GetCSCounters().OnSuccessWriteResponse();
}
Self->Counters.GetTabletCounters()->IncCounter(COUNTER_IMMEDIATE_TX_COMPLETED);
Self->SetupIndexation();
}

} // namespace NKikimr::NColumnShard
31 changes: 24 additions & 7 deletions ydb/core/tx/columnshard/operations/write_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,26 +12,43 @@ bool TArrowData::Parse(const NKikimrDataEvents::TEvWrite_TOperation& proto, cons
}
IncomingData = payload.GetDataFromPayload(proto.GetPayloadIndex());
if (proto.HasType()) {
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "invalid_modification_type")("proto", proto.DebugString());
auto type = TEnumOperator<NEvWrite::EModificationType>::DeserializeFromProto(proto.GetType());
if (!type) {
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "invalid_modification_type")("proto", proto.DebugString());
return false;
}
ModificationType = *type;
}

std::vector<ui32> columns;
for (auto&& columnId : proto.GetColumnIds()) {
columns.emplace_back(columnId);
if (proto.HasPayloadSchema()) {
PayloadSchema = NArrow::DeserializeSchema(proto.GetPayloadSchema());
} else {
std::vector<ui32> columns;
for (auto&& columnId : proto.GetColumnIds()) {
columns.emplace_back(columnId);
}
if (columns.empty()) {
BatchSchema = IndexSchema;
} else {
BatchSchema = std::make_shared<NOlap::TFilteredSnapshotSchema>(IndexSchema, columns);
}
if (BatchSchema->GetColumnsCount() != columns.size()) {
return false;
}
}
BatchSchema = std::make_shared<NOlap::TFilteredSnapshotSchema>(IndexSchema, columns);
OriginalDataSize = IncomingData.size();
return BatchSchema->GetColumnsCount() == columns.size() && !IncomingData.empty();
return !!IncomingData;
}

TConclusion<std::shared_ptr<arrow::RecordBatch>> TArrowData::ExtractBatch() {
Y_ABORT_UNLESS(!!IncomingData);
auto result = NArrow::DeserializeBatch(IncomingData, std::make_shared<arrow::Schema>(BatchSchema->GetSchema()->fields()));
std::shared_ptr<arrow::RecordBatch> result;
if (PayloadSchema) {
result = NArrow::DeserializeBatch(IncomingData, PayloadSchema);
} else {
result = NArrow::DeserializeBatch(IncomingData, std::make_shared<arrow::Schema>(BatchSchema->GetSchema()->fields()));
}

IncomingData = "";
return result;
}
Expand Down
1 change: 1 addition & 0 deletions ydb/core/tx/columnshard/operations/write_data.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ class TArrowData : public NEvWrite::IDataContainer {
private:
NOlap::ISnapshotSchema::TPtr IndexSchema;
NOlap::ISnapshotSchema::TPtr BatchSchema;
std::shared_ptr<arrow::Schema> PayloadSchema;
TString IncomingData;
NEvWrite::EModificationType ModificationType = NEvWrite::EModificationType::Upsert;
};
Expand Down
24 changes: 17 additions & 7 deletions ydb/core/tx/data_events/columnshard_splitter.h
Original file line number Diff line number Diff line change
@@ -1,18 +1,19 @@
#pragma once

#include "events.h"
#include "shards_splitter.h"
#include "payload_helper.h"

#include <ydb/core/tx/sharding/sharding.h>
#include <ydb/core/tx/columnshard/columnshard.h>
#include <ydb/core/formats/arrow/size_calcer.h>
#include <ydb/core/formats/arrow/arrow_helpers.h>
#include <ydb/core/scheme/scheme_types_proto.h>


namespace NKikimr::NEvWrite {

class TColumnShardShardsSplitter : public IShardsSplitter {
class TShardInfo : public IShardInfo {
class TColumnShardShardsSplitter: public IShardsSplitter {
class TShardInfo: public IShardInfo {
private:
const TString SchemaData;
const TString Data;
Expand All @@ -23,8 +24,8 @@ class TColumnShardShardsSplitter : public IShardsSplitter {
: SchemaData(schemaData)
, Data(data)
, RowsCount(rowsCount)
, GranuleShardingVersion(granuleShardingVersion)
{}
, GranuleShardingVersion(granuleShardingVersion) {
}

virtual ui64 GetBytes() const override {
return Data.size();
Expand All @@ -42,9 +43,18 @@ class TColumnShardShardsSplitter : public IShardsSplitter {
evWrite.SetArrowData(SchemaData, Data);
evWrite.Record.SetGranuleShardingVersion(GranuleShardingVersion);
}
virtual void Serialize(NEvents::TDataEvents::TEvWrite& evWrite) const override {
virtual void Serialize(NEvents::TDataEvents::TEvWrite& evWrite, const ui64 tableId, const ui64 schemaVersion) const override {
TPayloadWriter<NEvents::TDataEvents::TEvWrite> writer(evWrite);
writer.AddDataToPayload(Data);
TString data = Data;
writer.AddDataToPayload(std::move(data));

auto* operation = evWrite.Record.AddOperations();
operation->SetPayloadSchema(SchemaData);
operation->SetType(NKikimrDataEvents::TEvWrite::TOperation::OPERATION_REPLACE);
operation->SetPayloadFormat(NKikimrDataEvents::FORMAT_ARROW);
operation->SetPayloadIndex(0);
operation->MutableTableId()->SetTableId(tableId);
operation->MutableTableId()->SetSchemaVersion(schemaVersion);
}
};

Expand Down
34 changes: 20 additions & 14 deletions ydb/core/tx/data_events/shard_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@

namespace NKikimr::NEvWrite {

TWritersController::TWritersController(const ui32 writesCount, const NActors::TActorIdentity& longTxActorId, const NLongTxService::TLongTxId& longTxId)
TWritersController::TWritersController(const ui32 writesCount, const NActors::TActorIdentity& longTxActorId, const NLongTxService::TLongTxId& longTxId, const bool immediateWrite)
: WritesCount(writesCount)
, LongTxActorId(longTxActorId)
, ImmediateWrite(immediateWrite)
, LongTxId(longTxId)
{
Y_ABORT_UNLESS(writesCount);
Expand Down Expand Up @@ -39,11 +40,12 @@ namespace NKikimr::NEvWrite {
}
}

TShardWriter::TShardWriter(const ui64 shardId, const ui64 tableId, const TString& dedupId, const IShardInfo::TPtr& data,
TShardWriter::TShardWriter(const ui64 shardId, const ui64 tableId, const ui64 schemaVersion, const TString& dedupId, const IShardInfo::TPtr& data,
const NWilson::TProfileSpan& parentSpan, TWritersController::TPtr externalController, const ui32 writePartIdx, const EModificationType mType, const bool immediateWrite)
: ShardId(shardId)
, WritePartIdx(writePartIdx)
, TableId(tableId)
, SchemaVersion(schemaVersion)
, DedupId(dedupId)
, DataForShard(data)
, ExternalController(externalController)
Expand All @@ -54,39 +56,44 @@ namespace NKikimr::NEvWrite {
{
}

void TShardWriter::Bootstrap() {
if (ImmediateWriting) {
void TShardWriter::SendWriteRequest() {
if (ImmediateWrite) {
auto ev = MakeHolder<NEvents::TDataEvents::TEvWrite>(NKikimrDataEvents::TEvWrite::MODE_IMMEDIATE);
DataForShard->Serialize(*ev);
DataForShard->Serialize(*ev, TableId, SchemaVersion);
SendToTablet(std::move(ev));
} else {
auto ev = MakeHolder<TEvColumnShard::TEvWrite>(SelfId(), ExternalController->GetLongTxId(), TableId, DedupId, "", WritePartIdx, ModificationType);
DataForShard->Serialize(*ev);
SendToTablet(std::move(ev));
}
}

void TShardWriter::Bootstrap() {
SendWriteRequest();
Become(&TShardWriter::StateMain);
Schedule(TDuration::Seconds(10), new TEvents::TEvWakeup());
}

void TShardWriter::Handle(NEvents::TDataEvents::TEvWrite::TPtr& ev) {
void TShardWriter::Handle(NEvents::TDataEvents::TEvWriteResult::TPtr& ev) {
const auto* msg = ev->Get();
Y_ABORT_UNLESS(msg->Record.GetOrigin() == ShardId);

const auto ydbStatus = msg->GetYdbStatus();
if (ydbStatus == Ydb::StatusIds::OVERLOADED) {
const auto ydbStatus = msg->GetStatus();
if (ydbStatus == NKikimrDataEvents::TEvWriteResult::STATUS_OVERLOADED) {
if (RetryWriteRequest(true)) {
return;
}
}

auto gPassAway = PassAwayGuard();
if (ydbStatus != Ydb::StatusIds::SUCCESS) {
ExternalController->OnFail(ydbStatus,
if (ydbStatus != NKikimrDataEvents::TEvWriteResult::STATUS_COMPLETED) {
ExternalController->OnFail(Ydb::StatusIds::INTERNAL_ERROR,
TStringBuilder() << "Cannot write data into shard " << ShardId << " in longTx " <<
ExternalController->GetLongTxId().ToString());
return;
}

ExternalController->OnSuccess(ShardId, msg->Record.GetWriteId(), WritePartIdx);
ExternalController->OnSuccess(ShardId, 0, WritePartIdx);
}

void TShardWriter::Handle(TEvColumnShard::TEvWriteResult::TPtr& ev) {
Expand Down Expand Up @@ -132,6 +139,7 @@ namespace NKikimr::NEvWrite {

void TShardWriter::HandleTimeout(const TActorContext& /*ctx*/) {
RetryWriteRequest(false);
Schedule(TDuration::Seconds(10), new TEvents::TEvWakeup());
}

bool TShardWriter::RetryWriteRequest(const bool delayed) {
Expand All @@ -142,9 +150,7 @@ namespace NKikimr::NEvWrite {
Schedule(OverloadTimeout(), new TEvents::TEvWakeup());
} else {
++NumRetries;
auto ev = MakeHolder<TEvWrite>(SelfId(), ExternalController->GetLongTxId(), TableId, DedupId, "", WritePartIdx, ModificationType);
DataForShard->Serialize(*ev);
SendToTablet(std::move(ev));
SendWriteRequest();
}
return true;
}
Expand Down
18 changes: 13 additions & 5 deletions ydb/core/tx/data_events/shard_writer.h
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
#pragma once

#include "shards_splitter.h"
#include "common/modification_type.h"
#include "events.h"
#include "shards_splitter.h"

#include <ydb/library/accessor/accessor.h>
#include <ydb/core/base/tablet_pipecache.h>
Expand Down Expand Up @@ -89,13 +90,17 @@ class TWritersController {
NActors::TActorIdentity LongTxActorId;
std::vector<TWriteIdForShard> WriteIds;
const TMonotonic StartInstant = TMonotonic::Now();
const bool ImmediateWrite = false;
YDB_READONLY_DEF(NLongTxService::TLongTxId, LongTxId);
YDB_READONLY(std::shared_ptr<TCSUploadCounters>, Counters, std::make_shared<TCSUploadCounters>());
void SendReply() {
if (FailsCount.Val()) {
Counters->OnFailedFullReply(TMonotonic::Now() - StartInstant);
AFL_VERIFY(Code);
LongTxActorId.Send(LongTxActorId, new TEvPrivate::TEvShardsWriteResult(*Code, Issues));
} else if (ImmediateWrite) {
Counters->OnSucceedFullReply(TMonotonic::Now() - StartInstant);
LongTxActorId.Send(LongTxActorId, new TEvPrivate::TEvShardsWriteResult(Ydb::StatusIds::SUCCESS));
} else {
Counters->OnSucceedFullReply(TMonotonic::Now() - StartInstant);
auto req = MakeHolder<NLongTxService::TEvLongTxService::TEvAttachColumnShardWrites>(LongTxId);
Expand Down Expand Up @@ -129,7 +134,7 @@ class TWritersController {

};

TWritersController(const ui32 writesCount, const NActors::TActorIdentity& longTxActorId, const NLongTxService::TLongTxId& longTxId);
TWritersController(const ui32 writesCount, const NActors::TActorIdentity& longTxActorId, const NLongTxService::TLongTxId& longTxId, const bool immediateWrite);
void OnSuccess(const ui64 shardId, const ui64 writeId, const ui32 writePartId);
void OnFail(const Ydb::StatusIds::StatusCode code, const TString& message);
};
Expand All @@ -144,6 +149,7 @@ class TShardWriter: public NActors::TActorBootstrapped<TShardWriter> {
const ui64 ShardId;
const ui64 WritePartIdx;
const ui64 TableId;
const ui64 SchemaVersion;
const TString DedupId;
const IShardInfo::TPtr DataForShard;
ui32 NumRetries = 0;
Expand All @@ -153,6 +159,7 @@ class TShardWriter: public NActors::TActorBootstrapped<TShardWriter> {
EModificationType ModificationType;
const bool ImmediateWrite = false;

void SendWriteRequest();
static TDuration OverloadTimeout() {
return TDuration::MilliSeconds(OverloadedDelayMs);
}
Expand All @@ -165,14 +172,15 @@ class TShardWriter: public NActors::TActorBootstrapped<TShardWriter> {
TBase::PassAway();
}
public:
TShardWriter(const ui64 shardId, const ui64 tableId, const TString& dedupId, const IShardInfo::TPtr& data,
const NWilson::TProfileSpan& parentSpan, TWritersController::TPtr externalController, const ui32 writePartIdx, const EModificationType mType);
TShardWriter(const ui64 shardId, const ui64 tableId, const ui64 schemaVersion, const TString& dedupId, const IShardInfo::TPtr& data,
const NWilson::TProfileSpan& parentSpan, TWritersController::TPtr externalController, const ui32 writePartIdx,
const EModificationType mType, const bool immediateWrite);

STFUNC(StateMain) {
switch (ev->GetTypeRewrite()) {
hFunc(TEvColumnShard::TEvWriteResult, Handle);
hFunc(TEvPipeCache::TEvDeliveryProblem, Handle);
hFunc(NEvents::TDataEvents::TEvWrite, Handle);
hFunc(NEvents::TDataEvents::TEvWriteResult, Handle);
CFunc(TEvents::TSystem::Wakeup, HandleTimeout);
}
}
Expand Down
11 changes: 10 additions & 1 deletion ydb/core/tx/data_events/shards_splitter.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class IShardsSplitter {
virtual ~IShardInfo() {}

virtual void Serialize(TEvColumnShard::TEvWrite& evWrite) const = 0;
virtual void Serialize(NEvents::TDataEvents::TEvWrite& evWrite) const = 0;
virtual void Serialize(NEvents::TDataEvents::TEvWrite& evWrite, const ui64 tableId, const ui64 schemaVersion) const = 0;
virtual ui64 GetBytes() const = 0;
virtual ui32 GetRowsCount() const = 0;
virtual const TString& GetData() const = 0;
Expand All @@ -66,13 +66,21 @@ class IShardsSplitter {

TYdbConclusionStatus SplitData(const NSchemeCache::TSchemeCacheNavigate::TEntry& schemeEntry, const IEvWriteDataAccessor& data) {
TableId = schemeEntry.TableId.PathId.LocalPathId;
AFL_VERIFY(schemeEntry.ColumnTableInfo);
AFL_VERIFY(schemeEntry.ColumnTableInfo->Description.HasSchema());
SchemaVersion = schemeEntry.ColumnTableInfo->Description.GetSchema().GetVersion();
AFL_VERIFY(SchemaVersion);
return DoSplitData(schemeEntry, data);
}

ui64 GetTableId() const {
return TableId;
}

ui64 GetSchemaVersion() const {
return SchemaVersion;
}

const TFullSplitData& GetSplitData() const {
Y_ABORT_UNLESS(FullSplitData);
return *FullSplitData;
Expand All @@ -84,6 +92,7 @@ class IShardsSplitter {
virtual TYdbConclusionStatus DoSplitData(const NSchemeCache::TSchemeCacheNavigate::TEntry& schemeEntry, const IEvWriteDataAccessor& data) = 0;

ui64 TableId = 0;
ui64 SchemaVersion = 0;
protected:
std::optional<TFullSplitData> FullSplitData;
};
Expand Down
Loading

0 comments on commit 743ec1a

Please sign in to comment.