Skip to content

Commit

Permalink
Catching exceptions (ydb-platform#14280)
Browse files Browse the repository at this point in the history
  • Loading branch information
CyberROFL authored Feb 7, 2025
1 parent 0f886a0 commit f33f619
Show file tree
Hide file tree
Showing 6 changed files with 180 additions and 48 deletions.
1 change: 1 addition & 0 deletions ydb/apps/ydb/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
* Fixed a bug where some errors could be ignored when restoring from a local backup.
* Added `ydb workload log import generator` command.
* Queries in `ydb workload run` command are now executed in random order.
* Include topics in local backups (`ydb tools dump` and `ydb tools restore`). In this release, only the settings of the topics are retained; messages are not included in the backup.
Expand Down
110 changes: 64 additions & 46 deletions ydb/public/lib/ydb_cli/dump/restore_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
#include <ydb/public/lib/ydb_cli/dump/util/view_utils.h>
#include <ydb-cpp-sdk/client/proto/accessor.h>

#include <library/cpp/threading/future/core/future.h>

#include <util/generic/hash.h>
#include <util/generic/hash_set.h>
#include <util/generic/maybe.h>
Expand All @@ -31,6 +33,7 @@ using namespace NOperation;
using namespace NScheme;
using namespace NTable;
using namespace NTopic;
using namespace NThreading;

extern const char DOC_API_TABLE_VERSION_ATTR[] = "__document_api_version";
extern const char DOC_API_REQUEST_TYPE[] = "_document_api_request";
Expand Down Expand Up @@ -124,12 +127,22 @@ TVector<TFsPath> CollectDataFiles(const TFsPath& fsPath) {
return dataFiles;
}

TRestoreResult CombineResults(const TVector<TRestoreResult>& results) {
for (auto result : results) {
if (!result.IsSuccess()) {
return result;
TRestoreResult CombineResults(const TVector<TFuture<TRestoreResult>>& results) {
try {
for (auto result : results) {
auto status = result.ExtractValueSync();
if (!status.IsSuccess()) {
return status;
}
}
} catch (NStatusHelpers::TYdbErrorException& e) {
return e.ExtractStatus();
} catch (const std::exception& e) {
return Result<TRestoreResult>(EStatus::INTERNAL_ERROR,
TStringBuilder() << "Caught exception: " << e.what()
);
}

return Result<TRestoreResult>();
}

Expand Down Expand Up @@ -265,7 +278,7 @@ TRestoreResult TRestoreClient::Restore(const TString& fsPath, const TString& dbP

if (result.GetStatus() != EStatus::SCHEME_ERROR) {
LOG_E("Error finding db base path: " << result.GetIssues().ToOneLineString());
return Result<TRestoreResult>(EStatus::SCHEME_ERROR, "Can not find existing path");
return Result<TRestoreResult>(dbBasePath, EStatus::SCHEME_ERROR, "Can not find existing path");
}

dbBasePath = dbBasePath.Parent();
Expand All @@ -276,7 +289,7 @@ TRestoreResult TRestoreClient::Restore(const TString& fsPath, const TString& dbP
auto oldDirectoryList = RecursiveList(SchemeClient, dbBasePath);
if (const auto& status = oldDirectoryList.Status; !status.IsSuccess()) {
LOG_E("Error listing db base path: " << dbBasePath.GetPath().Quote() << ": " << status.GetIssues().ToOneLineString());
return Result<TRestoreResult>(EStatus::SCHEME_ERROR, "Can not list existing directory");
return Result<TRestoreResult>(dbBasePath, EStatus::SCHEME_ERROR, "Can not list existing directory");
}

THashSet<TString> oldEntries;
Expand Down Expand Up @@ -777,66 +790,71 @@ TRestoreResult TRestoreClient::RestoreData(

THolder<NPrivate::IDataWriter> writer = CreateDataWriter(dbPath, settings, desc, accumulators);

TVector<TRestoreResult> accumulatorWorkersResults(accumulators.size(), Result<TRestoreResult>());
TVector<TFuture<TRestoreResult>> accumulatorResults(Reserve(accumulators.size()));
TThreadPool accumulatorWorkers(TThreadPool::TParams().SetBlocking(true));
accumulatorWorkers.Start(accumulators.size(), accumulators.size());

const ui32 dataFilesPerAccumulator = dataFilesCount / accumulators.size();
const ui32 dataFilesPerAccumulatorRemainder = dataFilesCount % accumulators.size();

for (ui32 i = 0; i < accumulators.size(); ++i) {
auto* accumulator = accumulators[i].Get();
auto promise = NewPromise<TRestoreResult>();
accumulatorResults.emplace_back(promise);

ui32 dataFileIdStart = dataFilesPerAccumulator * i + std::min(i, dataFilesPerAccumulatorRemainder);
ui32 dataFileIdEnd = dataFilesPerAccumulator * (i + 1) + std::min(i + 1, dataFilesPerAccumulatorRemainder);
auto func = [&, i, dataFileIdStart, dataFileIdEnd, accumulator]() {
for (size_t id = dataFileIdStart; id < dataFileIdEnd; ++id) {
const TFsPath& dataFile = dataFiles[id];
ui32 idStart = dataFilesPerAccumulator * i + std::min(i, dataFilesPerAccumulatorRemainder);
ui32 idEnd = dataFilesPerAccumulator * (i + 1) + std::min(i + 1, dataFilesPerAccumulatorRemainder);

LOG_D("Read data from " << dataFile.GetPath().Quote());
auto func = [&, idStart, idEnd, accumulator, result = std::move(promise)]() mutable {
try {
for (size_t id = idStart; id < idEnd; ++id) {
const TFsPath& dataFile = dataFiles[id];

TFileInput input(dataFile, settings.FileBufferSize_);
TString line;
ui64 lineNo = 0;
LOG_D("Read data from " << dataFile.GetPath().Quote());

while (input.ReadLine(line)) {
auto l = NPrivate::TLine(std::move(line), dataFile.GetPath(), ++lineNo);
TFileInput input(dataFile, settings.FileBufferSize_);
TString line;
ui64 lineNo = 0;

for (auto status = accumulator->Check(l); status != NPrivate::IDataAccumulator::OK; status = accumulator->Check(l)) {
if (status == NPrivate::IDataAccumulator::ERROR) {
accumulatorWorkersResults[i] = Result<TRestoreResult>(dbPath, EStatus::GENERIC_ERROR,
TStringBuilder() << "Invalid data: " << l.GetLocation());
return;
}
while (input.ReadLine(line)) {
auto l = NPrivate::TLine(std::move(line), dataFile.GetPath(), ++lineNo);

if (!accumulator->Ready(true)) {
LOG_E("Error reading data from " << dataFile.GetPath().Quote());
accumulatorWorkersResults[i] = Result<TRestoreResult>(dbPath, EStatus::INTERNAL_ERROR, "Data is not ready");
return;
}
for (auto status = accumulator->Check(l); status != NPrivate::IDataAccumulator::OK; status = accumulator->Check(l)) {
if (status == NPrivate::IDataAccumulator::ERROR) {
return result.SetValue(Result<TRestoreResult>(dbPath, EStatus::GENERIC_ERROR,
TStringBuilder() << "Invalid data: " << l.GetLocation()));
}

if (!writer->Push(accumulator->GetData(true))) {
LOG_E("Error writing data to " << dbPath.Quote() << ", file: " << dataFile.GetPath().Quote());
accumulatorWorkersResults[i] = Result<TRestoreResult>(dbPath, EStatus::GENERIC_ERROR, "Cannot write data #1");
return;
if (!accumulator->Ready(true)) {
LOG_E("Error reading data from " << dataFile.GetPath().Quote());
return result.SetValue(Result<TRestoreResult>(dbPath, EStatus::INTERNAL_ERROR, "Data is not ready"));
}

if (!writer->Push(accumulator->GetData(true))) {
LOG_E("Error writing data to " << dbPath.Quote() << ", file: " << dataFile.GetPath().Quote());
return result.SetValue(Result<TRestoreResult>(dbPath, EStatus::GENERIC_ERROR, "Cannot write data #1"));
}
}
}

accumulator->Feed(std::move(l));
if (accumulator->Ready()) {
if (!writer->Push(accumulator->GetData())) {
LOG_E("Error writing data to " << dbPath.Quote() << ", file: " << dataFile.GetPath().Quote());
accumulatorWorkersResults[i] = Result<TRestoreResult>(dbPath, EStatus::GENERIC_ERROR, "Cannot write data #2");
return;
accumulator->Feed(std::move(l));
if (accumulator->Ready()) {
if (!writer->Push(accumulator->GetData())) {
LOG_E("Error writing data to " << dbPath.Quote() << ", file: " << dataFile.GetPath().Quote());
return result.SetValue(Result<TRestoreResult>(dbPath, EStatus::GENERIC_ERROR, "Cannot write data #2"));
}
}
}
}
}

while (accumulator->Ready(true)) {
if (!writer->Push(accumulator->GetData(true))) {
accumulatorWorkersResults[i] = Result<TRestoreResult>(dbPath, EStatus::GENERIC_ERROR, "Cannot write data #3");
return;
while (accumulator->Ready(true)) {
if (!writer->Push(accumulator->GetData(true))) {
return result.SetValue(Result<TRestoreResult>(dbPath, EStatus::GENERIC_ERROR, "Cannot write data #3"));
}
}

result.SetValue(Result<TRestoreResult>());
} catch (...) {
result.SetException(std::current_exception());
}
};

Expand All @@ -846,7 +864,7 @@ TRestoreResult TRestoreClient::RestoreData(
}

accumulatorWorkers.Stop();
if (auto res = CombineResults(accumulatorWorkersResults); !res.IsSuccess()) {
if (auto res = CombineResults(accumulatorResults); !res.IsSuccess()) {
return res;
}

Expand Down
25 changes: 24 additions & 1 deletion ydb/public/lib/ydb_cli/dump/restore_import_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -406,7 +406,14 @@ class TKeyBuilder {
}

TYdbDumpValueParser parser(value, GetPrimitiveType(column.Type));
values.emplace(it->second, TValueConverter<TYdbDumpValueParser>(parser).ConvertSingle());
try {
values.emplace(it->second, TValueConverter<TYdbDumpValueParser>(parser).ConvertSingle());
} catch (const TFromStringException& e) {
auto loc = TStringBuilder() << line.GetLocation();
throw NStatusHelpers::TYdbErrorException(Result<TStatus>(loc, EStatus::SCHEME_ERROR, e.what()));
} catch (...) {
std::rethrow_exception(std::current_exception());
}
}

TKey key;
Expand Down Expand Up @@ -823,6 +830,7 @@ class TDataWriter: public NPrivate::IDataWriter {

if (retryNumber == maxRetries) {
LOG_E("There is no retries left, last result: " << importResult);
SetError(std::move(importResult));
return false;
}

Expand All @@ -833,6 +841,7 @@ class TDataWriter: public NPrivate::IDataWriter {
auto descResult = DescribeTable(TableClient, Path, desc);
if (!descResult.IsSuccess()) {
LOG_E("Error describing table " << Path.Quote() << ": " << descResult.GetIssues().ToOneLineString());
SetError(std::move(descResult));
return false;
}

Expand Down Expand Up @@ -867,13 +876,21 @@ class TDataWriter: public NPrivate::IDataWriter {
LOG_E("Can't import data to " << Path.Quote()
<< " at location " << data.GetLocation()
<< ", result: " << importResult);
SetError(std::move(importResult));
return false;
}
}

return false;
}

void SetError(TStatus&& error) {
TGuard<TMutex> lock(Mutex);
if (!Error) {
Error = std::move(error);
}
}

void Stop() {
AtomicSet(Stopped, 1);
}
Expand Down Expand Up @@ -932,6 +949,9 @@ class TDataWriter: public NPrivate::IDataWriter {

void Wait() override {
TasksQueue->Stop();
if (Error) {
throw NStatusHelpers::TYdbErrorException(std::move(*Error));
}
}

private:
Expand All @@ -950,6 +970,9 @@ class TDataWriter: public NPrivate::IDataWriter {
THolder<IThreadPool> TasksQueue;
TAtomic Stopped;

TMaybe<TStatus> Error;
TMutex Mutex;

}; // TDataWriter

} // anonymous
Expand Down
2 changes: 1 addition & 1 deletion ydb/public/lib/ydb_cli/dump/util/util.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
namespace NYdb::NDump {

inline void AddPath(NYdb::NIssue::TIssues& issues, const TString& path) {
issues.AddIssue(NYdb::NIssue::TIssue(TStringBuilder() << "Path: " << path)
issues.AddIssue(NYdb::NIssue::TIssue(TStringBuilder() << "path: " << path)
.SetCode(NYdb::NIssue::DEFAULT_ERROR, NYdb::NIssue::ESeverity::Info));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,14 @@ class TYdbErrorException : public TYdbException {
return out << e.Status_;
}

const TStatus& GetStatus() const {
return Status_;
}

TStatus&& ExtractStatus() {
return std::move(Status_);
}

private:
TStatus Status_;
};
Expand Down
82 changes: 82 additions & 0 deletions ydb/services/ydb/backup_ut/ydb_backup_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -878,6 +878,88 @@ Y_UNIT_TEST_SUITE(BackupRestore) {
);
}

Y_UNIT_TEST(ImportDataShouldHandleErrors) {
TKikimrWithGrpcAndRootSchema server;
auto driver = TDriver(TDriverConfig().SetEndpoint(Sprintf("localhost:%u", server.GetPort())));
TTableClient tableClient(driver);
auto session = tableClient.GetSession().ExtractValueSync().GetSession();
TTempDir tempDir;
const auto& pathToBackup = tempDir.Path();

constexpr const char* dbPath = "/Root";
constexpr const char* table = "/Root/table";

ExecuteDataDefinitionQuery(session, Sprintf(R"(
CREATE TABLE `%s` (
Key Uint32,
Value Utf8,
PRIMARY KEY (Key)
);
)",
table
));
ExecuteDataModificationQuery(session, Sprintf(R"(
UPSERT INTO `%s` (Key, Value)
VALUES (1, "one");
)",
table
));

NDump::TClient backupClient(driver);
{
const auto result = backupClient.Dump(dbPath, pathToBackup, NDump::TDumpSettings().Database(dbPath));
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
}

auto opts = NDump::TRestoreSettings().Mode(NDump::TRestoreSettings::EMode::ImportData);
using TYdbErrorException = V3::NStatusHelpers::TYdbErrorException;

ExecuteDataDefinitionQuery(session, Sprintf(R"(
DROP TABLE `%s`;
)", table
));
ExecuteDataDefinitionQuery(session, Sprintf(R"(
CREATE TABLE `%s` (
Key Utf8,
Value Uint32,
PRIMARY KEY (Key)
);
)", table
));
UNIT_ASSERT_EXCEPTION_SATISFIES(backupClient.Restore(pathToBackup, dbPath, opts), TYdbErrorException,
[](const TYdbErrorException& e) { return e.GetStatus().GetStatus() == EStatus::BAD_REQUEST; });

ExecuteDataDefinitionQuery(session, Sprintf(R"(
DROP TABLE `%s`;
)", table
));
ExecuteDataDefinitionQuery(session, Sprintf(R"(
CREATE TABLE `%s` (
Key Uint32,
PRIMARY KEY (Key)
);
)", table
));
UNIT_ASSERT_EXCEPTION_SATISFIES(backupClient.Restore(pathToBackup, dbPath, opts), TYdbErrorException,
[](const TYdbErrorException& e) { return e.GetStatus().GetStatus() == EStatus::BAD_REQUEST; });

ExecuteDataDefinitionQuery(session, Sprintf(R"(
DROP TABLE `%s`;
)", table
));
ExecuteDataDefinitionQuery(session, Sprintf(R"(
CREATE TABLE `%s` (
Key Uint32,
Value Utf8,
PRIMARY KEY (Key),
INDEX Idx GLOBAL SYNC ON (Value)
);
)", table
));
UNIT_ASSERT_EXCEPTION_SATISFIES(backupClient.Restore(pathToBackup, dbPath, opts), TYdbErrorException,
[](const TYdbErrorException& e) { return e.GetStatus().GetStatus() == EStatus::SCHEME_ERROR; });
}

// TO DO: test index impl table split boundaries restoration from a backup

Y_UNIT_TEST(RestoreViewQueryText) {
Expand Down

0 comments on commit f33f619

Please sign in to comment.