Skip to content

Commit

Permalink
Broke out the client reset error and action storage from PR #7542
Browse files Browse the repository at this point in the history
  • Loading branch information
Michael Wilkerson-Barker committed Apr 30, 2024
1 parent a8df133 commit 805add6
Show file tree
Hide file tree
Showing 12 changed files with 581 additions and 189 deletions.
60 changes: 31 additions & 29 deletions src/realm/object-store/sync/sync_session.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -425,17 +425,17 @@ void SyncSession::update_error_and_mark_file_for_deletion(SyncError& error, Shou
}
}

void SyncSession::download_fresh_realm(sync::ProtocolErrorInfo::Action server_requests_action)
void SyncSession::download_fresh_realm(const sync::SessionErrorInfo& error_info)
{
// first check that recovery will not be prevented
if (server_requests_action == sync::ProtocolErrorInfo::Action::ClientResetNoRecovery) {
if (error_info.server_requests_action == sync::ProtocolErrorInfo::Action::ClientResetNoRecovery) {
auto mode = config(&SyncConfig::client_resync_mode);
if (mode == ClientResyncMode::Recover) {
handle_fresh_realm_downloaded(
nullptr,
{ErrorCodes::RuntimeError,
"A client reset is required but the server does not permit recovery for this client"},
server_requests_action);
error_info);
return;
}
}
Expand Down Expand Up @@ -472,7 +472,7 @@ void SyncSession::download_fresh_realm(sync::ProtocolErrorInfo::Action server_re
catch (...) {
// Failed to open the fresh path after attempting to delete it, so we
// just can't do automatic recovery.
handle_fresh_realm_downloaded(nullptr, exception_to_status(), server_requests_action);
handle_fresh_realm_downloaded(nullptr, exception_to_status(), error_info);
return;
}

Expand Down Expand Up @@ -514,7 +514,7 @@ void SyncSession::download_fresh_realm(sync::ProtocolErrorInfo::Action server_re
using SubscriptionState = sync::SubscriptionSet::State;
fresh_sub.get_state_change_notification(SubscriptionState::Complete)
.then([=](SubscriptionState) -> util::Future<sync::SubscriptionSet> {
if (server_requests_action != sync::ProtocolErrorInfo::Action::MigrateToFLX) {
if (error_info.server_requests_action != sync::ProtocolErrorInfo::Action::MigrateToFLX) {
return fresh_sub;
}
if (!self->m_migration_store->is_migration_in_progress()) {
Expand All @@ -539,11 +539,10 @@ void SyncSession::download_fresh_realm(sync::ProtocolErrorInfo::Action server_re
// it immediately
fresh_sync_session->force_close();
if (subs.is_ok()) {
self->handle_fresh_realm_downloaded(db, Status::OK(), server_requests_action,
std::move(subs.get_value()));
self->handle_fresh_realm_downloaded(db, Status::OK(), error_info, std::move(subs.get_value()));
}
else {
self->handle_fresh_realm_downloaded(nullptr, subs.get_status(), server_requests_action);
self->handle_fresh_realm_downloaded(nullptr, subs.get_status(), error_info);
}
});
}
Expand All @@ -553,15 +552,14 @@ void SyncSession::download_fresh_realm(sync::ProtocolErrorInfo::Action server_re
// it immediately
fresh_sync_session->force_close();
if (auto strong_self = weak_self.lock()) {
strong_self->handle_fresh_realm_downloaded(db, s, server_requests_action);
strong_self->handle_fresh_realm_downloaded(db, s, error_info);
}
});
}
fresh_sync_session->revive_if_needed();
}

void SyncSession::handle_fresh_realm_downloaded(DBRef db, Status status,
sync::ProtocolErrorInfo::Action server_requests_action,
void SyncSession::handle_fresh_realm_downloaded(DBRef db, Status status, const sync::SessionErrorInfo& error_info,
std::optional<sync::SubscriptionSet> new_subs)
{
util::CheckedUniqueLock lock(m_state_mutex);
Expand Down Expand Up @@ -596,7 +594,8 @@ void SyncSession::handle_fresh_realm_downloaded(DBRef db, Status status,
// that moving to the inactive state doesn't clear them - they will be
// re-registered when the session becomes active again.
{
m_server_requests_action = server_requests_action;
m_client_reset_error = error_info.status;
m_server_requests_action = error_info.server_requests_action;
m_client_reset_fresh_copy = db;
CompletionCallbacks callbacks;
std::swap(m_completion_callbacks, callbacks);
Expand All @@ -613,8 +612,8 @@ void SyncSession::handle_fresh_realm_downloaded(DBRef db, Status status,
become_inactive(std::move(lock), Status::OK(), cancel_subscription_notifications); // unlocks the lock

// Once the session is inactive, update sync config and subscription store after migration.
if (server_requests_action == sync::ProtocolErrorInfo::Action::MigrateToFLX ||
server_requests_action == sync::ProtocolErrorInfo::Action::RevertToPBS) {
if (error_info.server_requests_action == sync::ProtocolErrorInfo::Action::MigrateToFLX ||
error_info.server_requests_action == sync::ProtocolErrorInfo::Action::RevertToPBS) {
apply_sync_config_after_migration_or_rollback();
auto flx_sync_requested = config(&SyncConfig::flx_sync_requested);
update_subscription_store(flx_sync_requested, std::move(new_subs));
Expand Down Expand Up @@ -698,7 +697,7 @@ void SyncSession::handle_error(sync::SessionErrorInfo error)
case ClientResyncMode::RecoverOrDiscard:
[[fallthrough]];
case ClientResyncMode::Recover:
download_fresh_realm(error.server_requests_action);
download_fresh_realm(error);
return; // do not propagate the error to the user at this point
}
break;
Expand All @@ -710,7 +709,7 @@ void SyncSession::handle_error(sync::SessionErrorInfo error)
m_migration_store->migrate_to_flx(*error.migration_query_string,
m_original_sync_config->partition_value);
save_sync_config_after_migration_or_rollback();
download_fresh_realm(error.server_requests_action);
download_fresh_realm(error);
return;
case sync::ProtocolErrorInfo::Action::RevertToPBS:
// If the client was updated to use FLX natively, but the server was rolled back to PBS,
Expand All @@ -724,7 +723,7 @@ void SyncSession::handle_error(sync::SessionErrorInfo error)
// Original config was PBS, rollback the migration
m_migration_store->rollback_to_pbs();
save_sync_config_after_migration_or_rollback();
download_fresh_realm(error.server_requests_action);
download_fresh_realm(error);
return;
case sync::ProtocolErrorInfo::Action::RefreshUser:
if (auto u = user()) {
Expand Down Expand Up @@ -827,17 +826,23 @@ void SyncSession::handle_progress_update(uint64_t downloaded, uint64_t downloada
upload_estimate);
}

static sync::Session::Config::ClientReset make_client_reset_config(const RealmConfig& base_config,
const std::shared_ptr<SyncConfig>& sync_config,
DBRef&& fresh_copy, bool recovery_is_allowed,
bool schema_migration_detected)

static sync::Session::Config::ClientReset
make_client_reset_config(const RealmConfig& base_config, const std::shared_ptr<SyncConfig>& sync_config,
DBRef&& fresh_copy, sync::ProtocolErrorInfo::Action action, std::optional<Status> error,
bool schema_migration_detected)
{
REALM_ASSERT(sync_config->client_resync_mode != ClientResyncMode::Manual);

sync::Session::Config::ClientReset config;
config.mode = sync_config->client_resync_mode;
config.fresh_copy = std::move(fresh_copy);
config.recovery_is_allowed = recovery_is_allowed;
config.action = action;
config.error = std::move(error);
// Migrations are allowed to recover local data.
config.recovery_is_allowed = action == sync::ProtocolErrorInfo::Action::ClientReset ||
action == sync::ProtocolErrorInfo::Action::MigrateToFLX ||
action == sync::ProtocolErrorInfo::Action::RevertToPBS;

// The conditions here are asymmetric because if we have *either* a before
// or after callback we need to make sure to initialize the local schema
Expand Down Expand Up @@ -942,16 +947,13 @@ void SyncSession::create_sync_session()
session_config.custom_http_headers = sync_config.custom_http_headers;

if (m_server_requests_action != sync::ProtocolErrorInfo::Action::NoAction) {
// Migrations are allowed to recover local data.
const bool allowed_to_recover = m_server_requests_action == sync::ProtocolErrorInfo::Action::ClientReset ||
m_server_requests_action == sync::ProtocolErrorInfo::Action::MigrateToFLX ||
m_server_requests_action == sync::ProtocolErrorInfo::Action::RevertToPBS;
// Use the original sync config, not the updated one from the migration store
session_config.client_reset_config =
make_client_reset_config(m_config, m_original_sync_config, std::move(m_client_reset_fresh_copy),
allowed_to_recover, m_previous_schema_version.has_value());
session_config.client_reset_config = make_client_reset_config(
m_config, m_original_sync_config, std::move(m_client_reset_fresh_copy), m_server_requests_action,
std::move(m_client_reset_error), m_previous_schema_version.has_value());
session_config.schema_version = m_previous_schema_version.value_or(m_config.schema_version);
m_server_requests_action = sync::ProtocolErrorInfo::Action::NoAction;
m_client_reset_error.reset();
}

m_session = m_client.make_session(m_db, m_flx_subscription_store, m_migration_store, std::move(session_config));
Expand Down
6 changes: 3 additions & 3 deletions src/realm/object-store/sync/sync_session.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -406,10 +406,9 @@ class SyncSession : public std::enable_shared_from_this<SyncSession> {
void apply_sync_config_after_migration_or_rollback() REQUIRES(!m_config_mutex, !m_state_mutex);
void save_sync_config_after_migration_or_rollback() REQUIRES(!m_config_mutex);

void download_fresh_realm(sync::ProtocolErrorInfo::Action server_requests_action)
void download_fresh_realm(const sync::SessionErrorInfo& error_info)
REQUIRES(!m_config_mutex, !m_state_mutex, !m_connection_state_mutex);
void handle_fresh_realm_downloaded(DBRef db, Status status,
sync::ProtocolErrorInfo::Action server_requests_action,
void handle_fresh_realm_downloaded(DBRef db, Status status, const sync::SessionErrorInfo& error_info,
std::optional<sync::SubscriptionSet> new_subs = std::nullopt)
REQUIRES(!m_state_mutex, !m_config_mutex, !m_connection_state_mutex);
void handle_error(sync::SessionErrorInfo) REQUIRES(!m_state_mutex, !m_config_mutex, !m_connection_state_mutex);
Expand Down Expand Up @@ -506,6 +505,7 @@ class SyncSession : public std::enable_shared_from_this<SyncSession> {
std::optional<int64_t> m_migration_sentinel_query_version GUARDED_BY(m_state_mutex);
sync::ProtocolErrorInfo::Action
m_server_requests_action GUARDED_BY(m_state_mutex) = sync::ProtocolErrorInfo::Action::NoAction;
std::optional<Status> m_client_reset_error GUARDED_BY(m_state_mutex);
DBRef m_client_reset_fresh_copy GUARDED_BY(m_state_mutex);
_impl::SyncClient& m_client;
SyncManager* m_sync_manager GUARDED_BY(m_state_mutex) = nullptr;
Expand Down
32 changes: 19 additions & 13 deletions src/realm/sync/client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1983,35 +1983,41 @@ void SessionWrapper::handle_pending_client_reset_acknowledgement()

auto pending_reset = _impl::client_reset::has_pending_reset(*m_db->start_frozen());
REALM_ASSERT(pending_reset);
m_sess->logger.info("Tracking pending client reset of type \"%1\" from %2", pending_reset->type,
pending_reset->time);
m_sess->logger.info(util::LogCategory::reset, "Tracking pending %1", pending_reset->to_string());
if (pending_reset->error) {
m_sess->logger.info(util::LogCategory::reset, "Originating client reset error: %1", *pending_reset->error);
}
// Now that the client reset merge is complete, wait for the changes to synchronize with the server
async_wait_for(true, true, [self = util::bind_ptr(this), pending_reset = *pending_reset](Status status) {
if (status == ErrorCodes::OperationAborted) {
return;
}
auto& logger = self->m_sess->logger;
if (!status.is_ok()) {
logger.error("Error while tracking client reset acknowledgement: %1", status);
logger.error(util::LogCategory::reset, "Error while tracking client reset acknowledgement: %1", status);
return;
}

logger.debug(util::LogCategory::reset, "%1 has been acknowledged by the server. ", pending_reset.to_string());
if (pending_reset.error) {
logger.info(util::LogCategory::reset, "Originating client reset error: %1", *pending_reset.error);
}

auto wt = self->m_db->start_write();
auto cur_pending_reset = _impl::client_reset::has_pending_reset(*wt);
if (!cur_pending_reset) {
logger.debug(
"Was going to remove client reset tracker for type \"%1\" from %2, but it was already removed",
pending_reset.type, pending_reset.time);
logger.debug(util::LogCategory::reset, "Client reset cycle detection tracker already removed.");
return;
}
else if (cur_pending_reset->type != pending_reset.type || cur_pending_reset->time != pending_reset.time) {
logger.debug(
"Was going to remove client reset tracker for type \"%1\" from %2, but found type \"%3\" from %4.",
pending_reset.type, pending_reset.time, cur_pending_reset->type, cur_pending_reset->time);
if (cur_pending_reset->mode != pending_reset.mode || cur_pending_reset->action != pending_reset.action ||
cur_pending_reset->time != pending_reset.time) {
logger.info(util::LogCategory::reset, "Found new pending %1", cur_pending_reset->to_string());
if (cur_pending_reset->error) {
logger.info(util::LogCategory::reset, "New client reset error: %1", *cur_pending_reset->error);
}
}
else {
logger.debug("Client reset of type \"%1\" from %2 has been acknowledged by the server. "
"Removing cycle detection tracker.",
pending_reset.type, pending_reset.time);
logger.debug(util::LogCategory::reset, "Removing client reset cycle detection tracker.");
}
_impl::client_reset::remove_pending_client_resets(*wt);
wt->commit();
Expand Down
5 changes: 3 additions & 2 deletions src/realm/sync/client_base.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,12 @@ class SyncSocketProvider;
struct ClientReset {
realm::ClientResyncMode mode;
DBRef fresh_copy;
bool recovery_is_allowed = true;
bool recovery_is_allowed;
sync::ProtocolErrorInfo::Action action = sync::ProtocolErrorInfo::Action::ClientReset;
std::optional<Status> error;
util::UniqueFunction<VersionID()> notify_before_client_reset;
util::UniqueFunction<void(VersionID before_version, bool did_recover)> notify_after_client_reset;
};

static constexpr milliseconds_type default_connect_timeout = 120000; // 2 minutes
static constexpr milliseconds_type default_connection_linger_time = 30000; // 30 seconds
static constexpr milliseconds_type default_ping_keepalive_period = 60000; // 1 minute
Expand Down
8 changes: 3 additions & 5 deletions src/realm/sync/noinst/client_impl_base.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2253,11 +2253,9 @@ bool Session::client_reset_if_needed()
auto on_flx_version_complete = [this](int64_t version) {
this->on_flx_sync_version_complete(version);
};
bool did_reset = client_reset::perform_client_reset(
logger, *get_db(), *client_reset_config->fresh_copy, client_reset_config->mode,
std::move(client_reset_config->notify_before_client_reset),
std::move(client_reset_config->notify_after_client_reset), m_client_file_ident, get_flx_subscription_store(),
on_flx_version_complete, client_reset_config->recovery_is_allowed);
bool did_reset =
client_reset::perform_client_reset(logger, *get_db(), std::move(*client_reset_config), m_client_file_ident,
get_flx_subscription_store(), on_flx_version_complete);
if (!did_reset) {
return false;
}
Expand Down
Loading

0 comments on commit 805add6

Please sign in to comment.