Skip to content

Commit

Permalink
Use uid as idempotency key KIKIMR-21059 (ydb-platform#1887)
Browse files Browse the repository at this point in the history
  • Loading branch information
CyberROFL authored Feb 13, 2024
1 parent e90285a commit 5c443ab
Show file tree
Hide file tree
Showing 4 changed files with 127 additions and 12 deletions.
19 changes: 13 additions & 6 deletions ydb/core/tx/schemeshard/schemeshard_export__create.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,19 @@ struct TSchemeShard::TExport::TTxCreate: public TSchemeShard::TXxport::TTxBase {
}

const TString& uid = GetUid(request.GetRequest().GetOperationParams().labels());
if (uid && Self->ExportsByUid.contains(uid)) {
return Reply(
std::move(response),
Ydb::StatusIds::ALREADY_EXISTS,
TStringBuilder() << "Export with uid '" << uid << "' already exists"
);
if (uid) {
if (auto it = Self->ExportsByUid.find(uid); it != Self->ExportsByUid.end()) {
if (IsSameDomain(it->second, request.GetDatabaseName())) {
Self->FromXxportInfo(*response->Record.MutableResponse()->MutableEntry(), it->second);
return Reply(std::move(response));
} else {
return Reply(
std::move(response),
Ydb::StatusIds::ALREADY_EXISTS,
TStringBuilder() << "Export with uid '" << uid << "' already exists"
);
}
}
}

const TPath domainPath = TPath::Resolve(request.GetDatabaseName(), Self);
Expand Down
19 changes: 13 additions & 6 deletions ydb/core/tx/schemeshard/schemeshard_import__create.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,19 @@ struct TSchemeShard::TImport::TTxCreate: public TSchemeShard::TXxport::TTxBase {
}

const TString& uid = GetUid(request.GetRequest().GetOperationParams().labels());
if (uid && Self->ImportsByUid.contains(uid)) {
return Reply(
std::move(response),
Ydb::StatusIds::ALREADY_EXISTS,
TStringBuilder() << "Import with uid '" << uid << "' already exists"
);
if (uid) {
if (auto it = Self->ImportsByUid.find(uid); it != Self->ImportsByUid.end()) {
if (IsSameDomain(it->second, request.GetDatabaseName())) {
Self->FromXxportInfo(*response->Record.MutableResponse()->MutableEntry(), it->second);
return Reply(std::move(response));
} else {
return Reply(
std::move(response),
Ydb::StatusIds::ALREADY_EXISTS,
TStringBuilder() << "Import with uid '" << uid << "' already exists"
);
}
}
}

const TPath domainPath = TPath::Resolve(request.GetDatabaseName(), Self);
Expand Down
48 changes: 48 additions & 0 deletions ydb/core/tx/schemeshard/ut_export/ut_export.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1394,4 +1394,52 @@ partitioning_settings {

TestGetExport(runtime, txId, "/MyRoot", Ydb::StatusIds::CANCELLED);
}

Y_UNIT_TEST(UidAsIdempotencyKey) {
TTestBasicRuntime runtime;
TTestEnv env(runtime);
ui64 txId = 100;

TestCreateTable(runtime, ++txId, "/MyRoot", R"(
Name: "Table"
Columns { Name: "key" Type: "Utf8" }
Columns { Name: "value" Type: "Utf8" }
KeyColumnNames: ["key"]
)");
env.TestWaitNotification(runtime, txId);

TPortManager portManager;
const ui16 port = portManager.GetPort();

TS3Mock s3Mock({}, TS3Mock::TSettings(port));
UNIT_ASSERT(s3Mock.Start());

const auto request = Sprintf(R"(
OperationParams {
labels {
key: "uid"
value: "foo"
}
}
ExportToS3Settings {
endpoint: "localhost:%d"
scheme: HTTP
items {
source_path: "/MyRoot/Table"
destination_prefix: ""
}
}
)", port);

// create operation
TestExport(runtime, ++txId, "/MyRoot", request);
const ui64 exportId = txId;
// create operation again with same uid
TestExport(runtime, ++txId, "/MyRoot", request);
// new operation was not created
TestGetExport(runtime, txId, "/MyRoot", Ydb::StatusIds::NOT_FOUND);
// check previous operation
TestGetExport(runtime, exportId, "/MyRoot");
env.TestWaitNotification(runtime, exportId);
}
}
53 changes: 53 additions & 0 deletions ydb/core/tx/schemeshard/ut_restore/ut_restore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2451,6 +2451,59 @@ Y_UNIT_TEST_SUITE(TImportTests) {
Run(runtime, env, ConvertTestData(data), request, Ydb::StatusIds::PRECONDITION_FAILED);
Run(runtime, env, ConvertTestData(data), request, Ydb::StatusIds::SUCCESS, "/MyRoot", false, userSID);
}

Y_UNIT_TEST(UidAsIdempotencyKey) {
TTestBasicRuntime runtime;
TTestEnv env(runtime, TTestEnvOptions());
ui64 txId = 100;

const auto data = GenerateTestData(R"(
columns {
name: "key"
type { optional_type { item { type_id: UTF8 } } }
}
columns {
name: "value"
type { optional_type { item { type_id: UTF8 } } }
}
primary_key: "key"
)", {{"a", 1}});

TPortManager portManager;
const ui16 port = portManager.GetPort();

TS3Mock s3Mock(ConvertTestData(data), TS3Mock::TSettings(port));
UNIT_ASSERT(s3Mock.Start());

const auto request = Sprintf(R"(
OperationParams {
labels {
key: "uid"
value: "foo"
}
}
ImportFromS3Settings {
endpoint: "localhost:%d"
scheme: HTTP
items {
source_prefix: ""
destination_path: "/MyRoot/Table"
}
}
)", port);

// create operation
TestImport(runtime, ++txId, "/MyRoot", request);
const ui64 importId = txId;
// create operation again with same uid
TestImport(runtime, ++txId, "/MyRoot", request);
// new operation was not created
TestGetImport(runtime, txId, "/MyRoot", Ydb::StatusIds::NOT_FOUND);
// check previous operation
TestGetImport(runtime, importId, "/MyRoot");
env.TestWaitNotification(runtime, importId);
}

}

Y_UNIT_TEST_SUITE(TImportWithRebootsTests) {
Expand Down

0 comments on commit 5c443ab

Please sign in to comment.