Skip to content

Commit

Permalink
Updated PR 7542 with changes from master (and PR 7649)
Browse files Browse the repository at this point in the history
  • Loading branch information
Michael Wilkerson-Barker committed May 31, 2024
1 parent 25212bc commit 807aa83
Show file tree
Hide file tree
Showing 8 changed files with 100 additions and 47 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
* Refactor `sync::Session` to eliminate the bind() step of session creation ([#7609](https://github.com/realm/realm-core/pull/7609)).
* Add ScopeExitFail which only calls the handler if exiting the scope via an uncaught exception ([#7609](https://github.com/realm/realm-core/pull/7609)).
* Add the originating error and server requests action that caused a client reset to occur to the client reset tracking metadata storage. ([PR #7649](https://github.com/realm/realm-core/pull/7649))
* Fix client reset failure during sync migration due to previous incomplete client reset. ([PR #7542](https://github.com/realm/realm-core/pull/7542), since v13.11.0)

----------------------------------------------

Expand Down
5 changes: 5 additions & 0 deletions src/realm/sync/client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1067,6 +1067,11 @@ SyncClientHookAction SessionImpl::call_debug_hook(SyncClientHookEvent event, con
return call_debug_hook(data);
}

SyncClientHookAction SessionImpl::call_debug_hook(SyncClientHookEvent event)
{
return call_debug_hook(event, m_progress, m_last_sent_flx_query_version, DownloadBatchState::SteadyState, 0);
}

bool SessionImpl::is_steady_state_download_message(DownloadBatchState batch_state, int64_t query_version)
{
// Should never be called if session is not active
Expand Down
5 changes: 5 additions & 0 deletions src/realm/sync/config.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,11 @@ enum class SyncClientHookEvent {
SessionActivating,
SessionSuspended,
BindMessageSent,
IdentMessageReceived,
IdentMessageSent,
ClientResetMergeStarting,
ClientResetMergeComplete,
ClientResetMergeFailed,
BootstrapBatchAboutToProcess,
};

Expand Down
16 changes: 10 additions & 6 deletions src/realm/sync/noinst/client_impl_base.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1727,8 +1727,7 @@ void Session::activate()
reset_protocol_state();
m_state = Active;

call_debug_hook(SyncClientHookEvent::SessionActivating, m_progress, m_last_sent_flx_query_version,
DownloadBatchState::SteadyState, 0);
call_debug_hook(SyncClientHookEvent::SessionActivating);

REALM_ASSERT(!m_suspended);
m_conn.one_more_active_unsuspended_session(); // Throws
Expand Down Expand Up @@ -1946,8 +1945,7 @@ void Session::send_bind_message()
m_conn.initiate_write_message(out, this); // Throws

m_bind_message_sent = true;
call_debug_hook(SyncClientHookEvent::BindMessageSent, m_progress, m_last_sent_flx_query_version,
DownloadBatchState::SteadyState, 0);
call_debug_hook(SyncClientHookEvent::BindMessageSent);

// Ready to send the IDENT message if the file identifier pair is already
// available.
Expand Down Expand Up @@ -1994,6 +1992,7 @@ void Session::send_ident_message()
m_conn.initiate_write_message(out, this); // Throws

m_ident_message_sent = true;
call_debug_hook(SyncClientHookEvent::IdentMessageSent);

// Other messages may be waiting to be sent
enlist_to_send(); // Throws
Expand Down Expand Up @@ -2270,9 +2269,12 @@ bool Session::client_reset_if_needed()
auto on_flx_version_complete = [this](int64_t version) {
this->on_flx_sync_version_complete(version);
};
call_debug_hook(SyncClientHookEvent::ClientResetMergeStarting);
bool did_reset =
client_reset::perform_client_reset(logger, *get_db(), std::move(*client_reset_config), m_client_file_ident,
get_flx_subscription_store(), on_flx_version_complete);

call_debug_hook(SyncClientHookEvent::ClientResetMergeComplete);
if (!did_reset) {
return false;
}
Expand Down Expand Up @@ -2335,6 +2337,7 @@ Status Session::receive_ident_message(SaltedFileIdent client_file_ident)
}

m_client_file_ident = client_file_ident;
call_debug_hook(SyncClientHookEvent::IdentMessageReceived);

if (REALM_UNLIKELY(get_client().is_dry_run())) {
// Ready to send the IDENT message
Expand All @@ -2351,8 +2354,9 @@ Status Session::receive_ident_message(SaltedFileIdent client_file_ident)
catch (const std::exception& e) {
auto err_msg = util::format("A fatal error occurred during client reset: '%1'", e.what());
logger.error(err_msg.c_str());
SessionErrorInfo err_info(Status{ErrorCodes::AutoClientResetFailed, err_msg}, IsFatal{true});
suspend(err_info);
ProtocolErrorInfo prot_info = {ErrorCodes::AutoClientResetFailed, err_msg, IsFatal{true}};
call_debug_hook(SyncClientHookEvent::ClientResetMergeFailed, prot_info);
suspend({prot_info});
return Status::OK();
}
if (!did_client_reset) {
Expand Down
1 change: 1 addition & 0 deletions src/realm/sync/noinst/client_impl_base.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -1180,6 +1180,7 @@ class ClientImpl::Session {
size_t);
SyncClientHookAction call_debug_hook(SyncClientHookEvent event, const ProtocolErrorInfo&);
SyncClientHookAction call_debug_hook(const SyncClientHookData& data);
SyncClientHookAction call_debug_hook(SyncClientHookEvent event);

bool is_steady_state_download_message(DownloadBatchState batch_state, int64_t query_version);

Expand Down
75 changes: 43 additions & 32 deletions src/realm/sync/noinst/client_reset.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -416,38 +416,49 @@ ClientResyncMode reset_precheck_guard(const TransactionRef& wt_local, ClientResy
{
if (auto previous_reset = sync::PendingResetStore::has_pending_reset(wt_local)) {
logger.info(util::LogCategory::reset, "Found a previous %1", *previous_reset);
switch (previous_reset->mode) {
case ClientResyncMode::Manual:
REALM_UNREACHABLE();
case ClientResyncMode::DiscardLocal:
throw ClientResetFailed(util::format("A previous '%1' mode reset from %2 did not succeed, "
"giving up on '%3' mode to prevent a cycle",
previous_reset->mode, previous_reset->time, mode));
case ClientResyncMode::Recover:
switch (mode) {
case ClientResyncMode::Recover:
throw ClientResetFailed(util::format("A previous '%1' mode reset from %2 did not succeed, "
"giving up on '%3' mode to prevent a cycle",
previous_reset->mode, previous_reset->time, mode));
case ClientResyncMode::RecoverOrDiscard:
mode = ClientResyncMode::DiscardLocal;
logger.info(util::LogCategory::reset,
"A previous '%1' mode reset from %2 downgrades this mode ('%3') to DiscardLocal",
previous_reset->mode, previous_reset->time, mode);
sync::PendingResetStore::clear_pending_reset(wt_local);
break;
case ClientResyncMode::DiscardLocal:
sync::PendingResetStore::clear_pending_reset(wt_local);
// previous mode Recover and this mode is Discard, this is not a cycle yet
break;
case ClientResyncMode::Manual:
REALM_UNREACHABLE();
}
break;
case ClientResyncMode::RecoverOrDiscard:
throw ClientResetFailed(util::format("Unexpected previous '%1' mode reset from %2 did not "
"succeed, giving up on '%3' mode to prevent a cycle",
previous_reset->mode, previous_reset->time, mode));
if (action != previous_reset->action) {
// IF a different client reset is being performed, cler the pending client reset and start over.
logger.info(util::LogCategory::reset,
"New '%1' client reset of type: '%2' is incompatible - clearing previous reset", action,
mode);
sync::PendingResetStore::clear_pending_reset(wt_local);
}
else {
switch (previous_reset->mode) {
case ClientResyncMode::Manual:
REALM_UNREACHABLE();
case ClientResyncMode::DiscardLocal:
throw ClientResetFailed(util::format("A previous '%1' mode reset from %2 did not succeed, "
"giving up on '%3' mode to prevent a cycle",
previous_reset->mode, previous_reset->time, mode));
case ClientResyncMode::Recover:
switch (mode) {
case ClientResyncMode::Recover:
throw ClientResetFailed(
util::format("A previous '%1' mode reset from %2 did not succeed, "
"giving up on '%3' mode to prevent a cycle",
previous_reset->mode, previous_reset->time, mode));
case ClientResyncMode::RecoverOrDiscard:
mode = ClientResyncMode::DiscardLocal;
logger.info(
util::LogCategory::reset,
"A previous '%1' mode reset from %2 downgrades this mode ('%3') to DiscardLocal",
previous_reset->mode, previous_reset->time, mode);
sync::PendingResetStore::clear_pending_reset(wt_local);
break;
case ClientResyncMode::DiscardLocal:
sync::PendingResetStore::clear_pending_reset(wt_local);
// previous mode Recover and this mode is Discard, this is not a cycle yet
break;
case ClientResyncMode::Manual:
REALM_UNREACHABLE();
}
break;
case ClientResyncMode::RecoverOrDiscard:
throw ClientResetFailed(util::format("Unexpected previous '%1' mode reset from %2 did not "
"succeed, giving up on '%3' mode to prevent a cycle",
previous_reset->mode, previous_reset->time, mode));
}
}
}
if (action == PendingReset::Action::ClientResetNoRecovery) {
Expand Down
40 changes: 33 additions & 7 deletions test/object-store/sync/flx_migration.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,8 @@ TEST_CASE("Test client migration and rollback", "[sync][flx][flx migration][baas

TEST_CASE("Test client migration and rollback with recovery", "[sync][flx][flx migration][baas]") {
auto logger_ptr = util::Logger::get_default_logger();
enum TestState { idle, wait_for_merge, merge_complete, rollback_complete };
TestingStateMachine<TestState> test_state(TestState::idle);

const std::string partition = "migration-test";
const Schema mig_schema{
Expand All @@ -331,6 +333,20 @@ TEST_CASE("Test client migration and rollback with recovery", "[sync][flx][flx m
SyncTestFile config(session.app()->current_user(), partition, server_app_config.schema);
config.sync_config->client_resync_mode = ClientResyncMode::Recover;
config.schema_version = 0;
config.sync_config->on_sync_client_event_hook = [&](std::weak_ptr<SyncSession>, const SyncClientHookData& data) {
test_state.transition_with([data](TestState cur_state) -> std::optional<TestState> {
if (data.event == SyncClientHookEvent::ClientResetMergeComplete &&
cur_state == TestState::wait_for_merge) {
return TestState::merge_complete;
}
return std::nullopt;
});
if (test_state.get() == TestState::merge_complete) {
// Wait for the FLX->PBS rollback to complete before continuing
test_state.wait_for(TestState::rollback_complete, std::chrono::seconds(25));
}
return SyncClientHookAction::NoAction;
};

// Fill some objects
auto objects = fill_test_data(config); // 5 objects starting at 1 with no partition value set
Expand Down Expand Up @@ -437,15 +453,16 @@ TEST_CASE("Test client migration and rollback with recovery", "[sync][flx][flx m
REALM_ASSERT(result.get_value() == sync::SubscriptionSet::State::Superseded);
}

test_state.transition_to(TestState::wait_for_merge);

// Migrate back to FLX - and keep the realm session open
trigger_server_migration(session.app_session(), MigrateToFLX, logger_ptr);

// wait for the subscription store to initialize after downloading
timed_wait_for(
[&outer_realm]() {
return outer_realm->sync_session() && outer_realm->sync_session()->get_flx_subscription_store();
},
std::chrono::seconds(180));
// Cancel any connect waits (since sync session is still active) and try to connect now
outer_realm->sync_session()->handle_reconnect();

// wait for the fresh realm to download and merge with the current local realm
test_state.wait_for(TestState::merge_complete, std::chrono::seconds(180));

// Verify data has been sync'ed and there is only 1 subscription for the Object table
{
Expand All @@ -460,9 +477,18 @@ TEST_CASE("Test client migration and rollback with recovery", "[sync][flx][flx m
REQUIRE(active_subs.find("flx_migrated_Object"));
}

// Roll back to PBS once again - and keep the realm session open
// Roll back to PBS once again before the client reset is complete and keep the realm session open
// NOTE: the realm session is blocked in the hook callback until the rollback is complete
trigger_server_migration(session.app_session(), RollbackToPBS, logger_ptr);

// Release the realm session; will reconnect and perform the rollback to PBS client reset
test_state.transition_to(TestState::rollback_complete);

// Cancel any connect waits (since sync session is still active) and try to connect now
outer_realm->sync_session()->handle_reconnect();

// During the rollback client reset, the previous migrate to flx client reset operation is still
// tracked, but will be removed since the new rollback server requests action is incompatible.
REQUIRE(!wait_for_upload(*outer_realm));
REQUIRE(!wait_for_download(*outer_realm));

Expand Down
4 changes: 2 additions & 2 deletions test/object-store/util/test_utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,10 @@ class TestingStateMachine {
m_cv.notify_one();
}

void wait_for(E target)
bool wait_for(E target, std::chrono::milliseconds period = std::chrono::seconds(15))
{
std::unique_lock lock{m_mutex};
m_cv.wait(lock, [&] {
return m_cv.wait_for(lock, period, [&] {
return m_cur_state == target;
});
}
Expand Down

0 comments on commit 807aa83

Please sign in to comment.