Skip to content

Commit

Permalink
Propagate error (ydb-platform#5240)
Browse files Browse the repository at this point in the history
  • Loading branch information
CyberROFL committed Jun 10, 2024
1 parent e27ec33 commit 36bfe84
Show file tree
Hide file tree
Showing 11 changed files with 35 additions and 14 deletions.
1 change: 1 addition & 0 deletions ydb/core/grpc_services/rpc_replication.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ class TDescribeReplicationRPC: public TRpcSchemeRequestActor<TDescribeReplicatio
}

static void ConvertItem(const NKikimrReplication::TReplicationConfig::TTargetSpecific::TTarget& from, Ydb::Replication::DescribeReplicationResult::Item& to) {
to.set_id(from.GetId());
to.set_source_path(from.GetSrcPath());
to.set_destination_path(from.GetDstPath());
if (from.HasSrcStreamName()) {
Expand Down
1 change: 1 addition & 0 deletions ydb/core/protos/replication.proto
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ message TReplicationConfig {
optional string SrcPath = 1;
optional string DstPath = 2;
optional string SrcStreamName = 3;
optional uint64 Id = 4;
}

repeated TTarget Targets = 1;
Expand Down
7 changes: 5 additions & 2 deletions ydb/core/tx/replication/controller/tx_alter_dst_result.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,14 @@ class TController::TTxAlterDstResult: public TTxBase {
replication->SetState(TReplication::EState::Done);
}
} else {
replication->SetState(TReplication::EState::Error);
target->SetDstState(TReplication::EDstState::Error);
target->SetIssue(TStringBuilder() << "Alter dst error"
<< ": " << NKikimrScheme::EStatus_Name(Ev->Get()->Status)
<< ", " << Ev->Get()->Error);

replication->SetState(TReplication::EState::Error, TStringBuilder() << "Error in target #" << target->GetId()
<< ": " << target->GetIssue());

CLOG_E(ctx, "Alter dst error"
<< ": rid# " << rid
<< ", tid# " << tid
Expand All @@ -73,7 +75,8 @@ class TController::TTxAlterDstResult: public TTxBase {

NIceDb::TNiceDb db(txc.DB);
db.Table<Schema::Replications>().Key(rid).Update(
NIceDb::TUpdate<Schema::Replications::State>(replication->GetState())
NIceDb::TUpdate<Schema::Replications::State>(replication->GetState()),
NIceDb::TUpdate<Schema::Replications::Issue>(replication->GetIssue())
);
db.Table<Schema::Targets>().Key(rid, tid).Update(
NIceDb::TUpdate<Schema::Targets::DstState>(target->GetDstState()),
Expand Down
7 changes: 5 additions & 2 deletions ydb/core/tx/replication/controller/tx_create_dst_result.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,14 @@ class TController::TTxCreateDstResult: public TTxBase {
<< ", tid# " << tid
<< ", pathId# " << Ev->Get()->DstPathId);
} else {
Replication->SetState(TReplication::EState::Error);
target->SetDstState(TReplication::EDstState::Error);
target->SetIssue(TStringBuilder() << "Create dst error"
<< ": " << NKikimrScheme::EStatus_Name(Ev->Get()->Status)
<< ", " << Ev->Get()->Error);

Replication->SetState(TReplication::EState::Error, TStringBuilder() << "Error in target #" << target->GetId()
<< ": " << target->GetIssue());

CLOG_E(ctx, "Create dst error"
<< ": rid# " << rid
<< ", tid# " << tid
Expand All @@ -70,7 +72,8 @@ class TController::TTxCreateDstResult: public TTxBase {

NIceDb::TNiceDb db(txc.DB);
db.Table<Schema::Replications>().Key(rid).Update(
NIceDb::TUpdate<Schema::Replications::State>(Replication->GetState())
NIceDb::TUpdate<Schema::Replications::State>(Replication->GetState()),
NIceDb::TUpdate<Schema::Replications::Issue>(Replication->GetIssue())
);
db.Table<Schema::Targets>().Key(rid, tid).Update(
NIceDb::TUpdate<Schema::Targets::DstPathOwnerId>(target->GetDstPathId().OwnerId),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,14 @@ class TController::TTxCreateStreamResult: public TTxBase {
} else {
const auto& status = Ev->Get()->Status;

Replication->SetState(TReplication::EState::Error);
target->SetStreamState(TReplication::EStreamState::Error);
target->SetIssue(TStringBuilder() << "Create stream error"
<< ": " << status.GetStatus()
<< ", " << status.GetIssues().ToOneLineString());

Replication->SetState(TReplication::EState::Error, TStringBuilder() << "Error in target #" << target->GetId()
<< ": " << target->GetIssue());

CLOG_E(ctx, "Create stream error"
<< ": rid# " << rid
<< ", tid# " << tid
Expand All @@ -71,7 +73,10 @@ class TController::TTxCreateStreamResult: public TTxBase {
NIceDb::TNiceDb db(txc.DB);
db.Table<Schema::SrcStreams>().Key(rid, tid).Update<Schema::SrcStreams::State>(target->GetStreamState());
db.Table<Schema::Targets>().Key(rid, tid).Update<Schema::Targets::Issue>(target->GetIssue());
db.Table<Schema::Replications>().Key(rid).Update<Schema::Replications::State>(Replication->GetState());
db.Table<Schema::Replications>().Key(rid).Update(
NIceDb::TUpdate<Schema::Replications::State>(Replication->GetState()),
NIceDb::TUpdate<Schema::Replications::Issue>(Replication->GetIssue())
);

return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ class TController::TTxDescribeReplication: public TTxBase {
}

auto& item = *Result->Record.AddTargets();
item.SetId(target->GetId());
item.SetSrcPath(target->GetSrcPath());
item.SetDstPath(target->GetDstPath());
if (target->GetStreamName()) {
Expand Down
7 changes: 5 additions & 2 deletions ydb/core/tx/replication/controller/tx_worker_error.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,16 @@ class TController::TTxWorkerError: public TTxBase {
<< ", tid# " << WorkerId.TargetId()
<< ", error# " << Error);

replication->SetState(TReplication::EState::Error);
target->SetDstState(TReplication::EDstState::Error);
target->SetIssue(Error);

replication->SetState(TReplication::EState::Error, TStringBuilder() << "Error in target #" << target->GetId()
<< ": " << target->GetIssue());

NIceDb::TNiceDb db(txc.DB);
db.Table<Schema::Replications>().Key(WorkerId.ReplicationId()).Update(
NIceDb::TUpdate<Schema::Replications::State>(replication->GetState())
NIceDb::TUpdate<Schema::Replications::State>(replication->GetState()),
NIceDb::TUpdate<Schema::Replications::Issue>(replication->GetIssue())
);
db.Table<Schema::Targets>().Key(WorkerId.ReplicationId(), WorkerId.TargetId()).Update(
NIceDb::TUpdate<Schema::Targets::DstState>(target->GetDstState()),
Expand Down
1 change: 1 addition & 0 deletions ydb/public/api/protos/draft/ydb_replication.proto
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ message DescribeReplicationResult {
string source_path = 1;
string destination_path = 2;
optional string source_changefeed_name = 3;
uint64 id = 4;
}

message RunningState {
Expand Down
13 changes: 7 additions & 6 deletions ydb/public/lib/ydb_cli/commands/ydb_service_scheme.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -429,14 +429,15 @@ int TCommandDescribe::PrintReplicationResponsePretty(const NYdb::NReplication::T
}

if (const auto& items = desc.GetItems()) {
Cout << Endl << "Items (source [changefeed] => destination):";
TPrettyTable table({ "#", "Source", "Changefeed", "Destination" }, TPrettyTableConfig().WithoutRowDelimiters());
for (const auto& item : items) {
Cout << Endl << " " << item.SrcPath;
if (item.SrcChangefeedName) {
Cout << " [" << *item.SrcChangefeedName << "]";
}
Cout << " => " << item.DstPath;
table.AddRow()
.Column(0, item.Id)
.Column(1, item.SrcPath)
.Column(2, item.SrcChangefeedName.value_or("n/a"))
.Column(3, item.DstPath);
}
Cout << Endl << "Items:" << Endl << table;
}

Cout << Endl;
Expand Down
1 change: 1 addition & 0 deletions ydb/public/sdk/cpp/client/draft/ydb_replication.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ TReplicationDescription::TReplicationDescription(const Ydb::Replication::Describ
Items_.reserve(desc.items_size());
for (const auto& item : desc.items()) {
Items_.push_back(TItem{
.Id = item.id(),
.SrcPath = item.source_path(),
.DstPath = item.destination_path(),
.SrcChangefeedName = item.has_source_changefeed_name()
Expand Down
1 change: 1 addition & 0 deletions ydb/public/sdk/cpp/client/draft/ydb_replication.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ class TErrorState {
class TReplicationDescription {
public:
struct TItem {
ui64 Id;
TString SrcPath;
TString DstPath;
std::optional<TString> SrcChangefeedName;
Expand Down

0 comments on commit 36bfe84

Please sign in to comment.