Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RCORE-2063 Fix some client resets potentially failing with AutoClientResetFailed if a new client reset condition occurred before the first one completed #7542

Merged
merged 9 commits into from
Jun 4, 2024
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
* Refactor `sync::Session` to eliminate the bind() step of session creation ([#7609](https://github.com/realm/realm-core/pull/7609)).
* Add ScopeExitFail which only calls the handler if exiting the scope via an uncaught exception ([#7609](https://github.com/realm/realm-core/pull/7609)).
* Add the originating error and server requests action that caused a client reset to occur to the client reset tracking metadata storage. ([PR #7649](https://github.com/realm/realm-core/pull/7649))
* Fix client reset failure during sync migration due to previous incomplete client reset. ([PR #7542](https://github.com/realm/realm-core/pull/7542), since v13.11.0)

----------------------------------------------

Expand Down
5 changes: 5 additions & 0 deletions src/realm/sync/client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1067,6 +1067,11 @@ SyncClientHookAction SessionImpl::call_debug_hook(SyncClientHookEvent event, con
return call_debug_hook(data);
}

SyncClientHookAction SessionImpl::call_debug_hook(SyncClientHookEvent event)
{
return call_debug_hook(event, m_progress, m_last_sent_flx_query_version, DownloadBatchState::SteadyState, 0);
}

bool SessionImpl::is_steady_state_download_message(DownloadBatchState batch_state, int64_t query_version)
{
// Should never be called if session is not active
Expand Down
5 changes: 5 additions & 0 deletions src/realm/sync/config.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,11 @@ enum class SyncClientHookEvent {
SessionActivating,
SessionSuspended,
BindMessageSent,
IdentMessageReceived,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we don't seem to use (yet) most of these events, so maybe we should add them when they're needed

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed the unused events

IdentMessageSent,
ClientResetMergeStarting,
ClientResetMergeComplete,
ClientResetMergeFailed,
BootstrapBatchAboutToProcess,
};

Expand Down
13 changes: 9 additions & 4 deletions src/realm/sync/noinst/client_impl_base.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1727,8 +1727,7 @@ void Session::activate()
reset_protocol_state();
m_state = Active;

call_debug_hook(SyncClientHookEvent::SessionActivating, m_progress, m_last_sent_flx_query_version,
DownloadBatchState::SteadyState, 0);
call_debug_hook(SyncClientHookEvent::SessionActivating);

REALM_ASSERT(!m_suspended);
m_conn.one_more_active_unsuspended_session(); // Throws
Expand Down Expand Up @@ -1946,8 +1945,7 @@ void Session::send_bind_message()
m_conn.initiate_write_message(out, this); // Throws

m_bind_message_sent = true;
call_debug_hook(SyncClientHookEvent::BindMessageSent, m_progress, m_last_sent_flx_query_version,
DownloadBatchState::SteadyState, 0);
call_debug_hook(SyncClientHookEvent::BindMessageSent);

// Ready to send the IDENT message if the file identifier pair is already
// available.
Expand Down Expand Up @@ -1994,6 +1992,7 @@ void Session::send_ident_message()
m_conn.initiate_write_message(out, this); // Throws

m_ident_message_sent = true;
call_debug_hook(SyncClientHookEvent::IdentMessageSent);

// Other messages may be waiting to be sent
enlist_to_send(); // Throws
Expand Down Expand Up @@ -2270,9 +2269,12 @@ bool Session::client_reset_if_needed()
auto on_flx_version_complete = [this](int64_t version) {
this->on_flx_sync_version_complete(version);
};
call_debug_hook(SyncClientHookEvent::ClientResetMergeStarting);
bool did_reset =
client_reset::perform_client_reset(logger, *get_db(), std::move(*client_reset_config), m_client_file_ident,
get_flx_subscription_store(), on_flx_version_complete);

call_debug_hook(SyncClientHookEvent::ClientResetMergeComplete);
if (!did_reset) {
return false;
}
Expand Down Expand Up @@ -2335,6 +2337,7 @@ Status Session::receive_ident_message(SaltedFileIdent client_file_ident)
}

m_client_file_ident = client_file_ident;
call_debug_hook(SyncClientHookEvent::IdentMessageReceived);

if (REALM_UNLIKELY(get_client().is_dry_run())) {
// Ready to send the IDENT message
Expand All @@ -2351,6 +2354,8 @@ Status Session::receive_ident_message(SaltedFileIdent client_file_ident)
catch (const std::exception& e) {
auto err_msg = util::format("A fatal error occurred during client reset: '%1'", e.what());
logger.error(err_msg.c_str());
ProtocolErrorInfo prot_info = {ErrorCodes::AutoClientResetFailed, err_msg, IsFatal{true}};
call_debug_hook(SyncClientHookEvent::ClientResetMergeFailed, prot_info);
SessionErrorInfo err_info(Status{ErrorCodes::AutoClientResetFailed, err_msg}, IsFatal{true});
suspend(err_info);
return Status::OK();
Expand Down
1 change: 1 addition & 0 deletions src/realm/sync/noinst/client_impl_base.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -1180,6 +1180,7 @@ class ClientImpl::Session {
size_t);
SyncClientHookAction call_debug_hook(SyncClientHookEvent event, const ProtocolErrorInfo&);
SyncClientHookAction call_debug_hook(const SyncClientHookData& data);
SyncClientHookAction call_debug_hook(SyncClientHookEvent event);

bool is_steady_state_download_message(DownloadBatchState batch_state, int64_t query_version);

Expand Down
75 changes: 43 additions & 32 deletions src/realm/sync/noinst/client_reset.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -416,38 +416,49 @@ ClientResyncMode reset_precheck_guard(const TransactionRef& wt_local, ClientResy
{
if (auto previous_reset = sync::PendingResetStore::has_pending_reset(wt_local)) {
logger.info(util::LogCategory::reset, "Found a previous %1", *previous_reset);
switch (previous_reset->mode) {
case ClientResyncMode::Manual:
REALM_UNREACHABLE();
case ClientResyncMode::DiscardLocal:
throw ClientResetFailed(util::format("A previous '%1' mode reset from %2 did not succeed, "
"giving up on '%3' mode to prevent a cycle",
previous_reset->mode, previous_reset->time, mode));
case ClientResyncMode::Recover:
switch (mode) {
case ClientResyncMode::Recover:
throw ClientResetFailed(util::format("A previous '%1' mode reset from %2 did not succeed, "
"giving up on '%3' mode to prevent a cycle",
previous_reset->mode, previous_reset->time, mode));
case ClientResyncMode::RecoverOrDiscard:
mode = ClientResyncMode::DiscardLocal;
logger.info(util::LogCategory::reset,
"A previous '%1' mode reset from %2 downgrades this mode ('%3') to DiscardLocal",
previous_reset->mode, previous_reset->time, mode);
sync::PendingResetStore::clear_pending_reset(wt_local);
break;
case ClientResyncMode::DiscardLocal:
sync::PendingResetStore::clear_pending_reset(wt_local);
// previous mode Recover and this mode is Discard, this is not a cycle yet
break;
case ClientResyncMode::Manual:
REALM_UNREACHABLE();
}
break;
case ClientResyncMode::RecoverOrDiscard:
throw ClientResetFailed(util::format("Unexpected previous '%1' mode reset from %2 did not "
"succeed, giving up on '%3' mode to prevent a cycle",
previous_reset->mode, previous_reset->time, mode));
if (action != previous_reset->action) {
// IF a different client reset is being performed, cler the pending client reset and start over.
logger.info(util::LogCategory::reset,
"New '%1' client reset of type: '%2' is incompatible - clearing previous reset", action,
mode);
sync::PendingResetStore::clear_pending_reset(wt_local);
}
else {
switch (previous_reset->mode) {
case ClientResyncMode::Manual:
REALM_UNREACHABLE();
case ClientResyncMode::DiscardLocal:
throw ClientResetFailed(util::format("A previous '%1' mode reset from %2 did not succeed, "
"giving up on '%3' mode to prevent a cycle",
previous_reset->mode, previous_reset->time, mode));
case ClientResyncMode::Recover:
switch (mode) {
case ClientResyncMode::Recover:
throw ClientResetFailed(
util::format("A previous '%1' mode reset from %2 did not succeed, "
"giving up on '%3' mode to prevent a cycle",
previous_reset->mode, previous_reset->time, mode));
case ClientResyncMode::RecoverOrDiscard:
mode = ClientResyncMode::DiscardLocal;
logger.info(
util::LogCategory::reset,
"A previous '%1' mode reset from %2 downgrades this mode ('%3') to DiscardLocal",
previous_reset->mode, previous_reset->time, mode);
sync::PendingResetStore::clear_pending_reset(wt_local);
break;
case ClientResyncMode::DiscardLocal:
sync::PendingResetStore::clear_pending_reset(wt_local);
// previous mode Recover and this mode is Discard, this is not a cycle yet
break;
case ClientResyncMode::Manual:
REALM_UNREACHABLE();
}
break;
case ClientResyncMode::RecoverOrDiscard:
throw ClientResetFailed(util::format("Unexpected previous '%1' mode reset from %2 did not "
"succeed, giving up on '%3' mode to prevent a cycle",
previous_reset->mode, previous_reset->time, mode));
}
}
}
if (action == PendingReset::Action::ClientResetNoRecovery) {
Expand Down
40 changes: 33 additions & 7 deletions test/object-store/sync/flx_migration.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,8 @@ TEST_CASE("Test client migration and rollback", "[sync][flx][flx migration][baas

TEST_CASE("Test client migration and rollback with recovery", "[sync][flx][flx migration][baas]") {
auto logger_ptr = util::Logger::get_default_logger();
enum TestState { idle, wait_for_merge, merge_complete, rollback_complete };
TestingStateMachine<TestState> test_state(TestState::idle);

const std::string partition = "migration-test";
const Schema mig_schema{
Expand All @@ -331,6 +333,20 @@ TEST_CASE("Test client migration and rollback with recovery", "[sync][flx][flx m
SyncTestFile config(session.app()->current_user(), partition, server_app_config.schema);
config.sync_config->client_resync_mode = ClientResyncMode::Recover;
config.schema_version = 0;
config.sync_config->on_sync_client_event_hook = [&](std::weak_ptr<SyncSession>, const SyncClientHookData& data) {
test_state.transition_with([data](TestState cur_state) -> std::optional<TestState> {
if (data.event == SyncClientHookEvent::ClientResetMergeComplete &&
cur_state == TestState::wait_for_merge) {
return TestState::merge_complete;
}
return std::nullopt;
});
if (test_state.get() == TestState::merge_complete) {
// Wait for the FLX->PBS rollback to complete before continuing
test_state.wait_for(TestState::rollback_complete, std::chrono::seconds(25));
}
return SyncClientHookAction::NoAction;
};

// Fill some objects
auto objects = fill_test_data(config); // 5 objects starting at 1 with no partition value set
Expand Down Expand Up @@ -437,15 +453,16 @@ TEST_CASE("Test client migration and rollback with recovery", "[sync][flx][flx m
REALM_ASSERT(result.get_value() == sync::SubscriptionSet::State::Superseded);
}

test_state.transition_to(TestState::wait_for_merge);

// Migrate back to FLX - and keep the realm session open
trigger_server_migration(session.app_session(), MigrateToFLX, logger_ptr);

// wait for the subscription store to initialize after downloading
timed_wait_for(
[&outer_realm]() {
return outer_realm->sync_session() && outer_realm->sync_session()->get_flx_subscription_store();
},
std::chrono::seconds(180));
// Cancel any connect waits (since sync session is still active) and try to connect now
outer_realm->sync_session()->handle_reconnect();

// wait for the fresh realm to download and merge with the current local realm
test_state.wait_for(TestState::merge_complete, std::chrono::seconds(180));

// Verify data has been sync'ed and there is only 1 subscription for the Object table
{
Expand All @@ -460,9 +477,18 @@ TEST_CASE("Test client migration and rollback with recovery", "[sync][flx][flx m
REQUIRE(active_subs.find("flx_migrated_Object"));
}

// Roll back to PBS once again - and keep the realm session open
// Roll back to PBS once again before the client reset is complete and keep the realm session open
// NOTE: the realm session is blocked in the hook callback until the rollback is complete
trigger_server_migration(session.app_session(), RollbackToPBS, logger_ptr);

// Release the realm session; will reconnect and perform the rollback to PBS client reset
test_state.transition_to(TestState::rollback_complete);

// Cancel any connect waits (since sync session is still active) and try to connect now
outer_realm->sync_session()->handle_reconnect();

// During the rollback client reset, the previous migrate to flx client reset operation is still
// tracked, but will be removed since the new rollback server requests action is incompatible.
REQUIRE(!wait_for_upload(*outer_realm));
REQUIRE(!wait_for_download(*outer_realm));

Expand Down
4 changes: 2 additions & 2 deletions test/object-store/util/test_utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,10 @@ class TestingStateMachine {
m_cv.notify_one();
}

void wait_for(E target)
bool wait_for(E target, std::chrono::milliseconds period = std::chrono::seconds(15))
{
std::unique_lock lock{m_mutex};
m_cv.wait(lock, [&] {
return m_cv.wait_for(lock, period, [&] {
return m_cur_state == target;
});
}
Expand Down
Loading