Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make explicitly pausing the sync session a separate state #6183

Merged
merged 12 commits into from
Jan 11, 2023
5 changes: 2 additions & 3 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
# NEXT RELEASE

### Enhancements
* <New feature description> (PR [#????](https://github.com/realm/realm-core/pull/????))
* None.
* `SyncSession::pause()` and `SyncSession::resume()` allow users to suspend a Realm's sync session until it is explicitly resumed in ([#6183](https://github.com/realm/realm-core/pull/6183)). Previously `SyncSession::log_out()` and `SyncSession::close()` could be resumed under a number of circumstances where `SyncSession::revive_if_needed()` were called (like when freezing a realm) - fixes ([#6085](https://github.com/realm/realm-core/issues/6085))

### Fixed
* <How do the end-user experience this issue? what was the impact?> ([#????](https://github.com/realm/realm-core/issues/????), since v?.?.?)
* None.

### Breaking changes
* None.
* `SyncSession::log_out()` has been renamed to `SyncSession::force_close()` to reflect what it actually does ([#6183](https://github.com/realm/realm-core/pull/6183))

### Compatibility
* Fileformat: Generates files with format v23. Reads and automatically upgrade from fileformat v5.
Expand Down
1 change: 1 addition & 0 deletions src/realm.h
Original file line number Diff line number Diff line change
Expand Up @@ -3391,6 +3391,7 @@ typedef enum realm_sync_session_state {
RLM_SYNC_SESSION_STATE_DYING,
RLM_SYNC_SESSION_STATE_INACTIVE,
RLM_SYNC_SESSION_STATE_WAITING_FOR_ACCESS_TOKEN,
RLM_SYNC_SESSION_STATE_PAUSED,
} realm_sync_session_state_e;

typedef enum realm_sync_connection_state {
Expand Down
2 changes: 1 addition & 1 deletion src/realm/object-store/audit.mm
Original file line number Diff line number Diff line change
Expand Up @@ -744,7 +744,7 @@ bool write_event(Timestamp timestamp, StringData activity, StringData event_type
wait_for_upload(sync_session);
}
else {
sync_session->log_out();
sync_session->force_close();
m_open_paths.erase(m_current_realm->config().path);
}
}
Expand Down
5 changes: 3 additions & 2 deletions src/realm/object-store/c_api/sync.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ static_assert(realm_sync_session_state_e(SyncSession::State::Dying) == RLM_SYNC_
static_assert(realm_sync_session_state_e(SyncSession::State::Inactive) == RLM_SYNC_SESSION_STATE_INACTIVE);
static_assert(realm_sync_session_state_e(SyncSession::State::WaitingForAccessToken) ==
RLM_SYNC_SESSION_STATE_WAITING_FOR_ACCESS_TOKEN);
static_assert(realm_sync_session_state_e(SyncSession::State::Paused) == RLM_SYNC_SESSION_STATE_PAUSED);

static_assert(realm_sync_connection_state_e(SyncSession::ConnectionState::Disconnected) ==
RLM_SYNC_CONNECTION_STATE_DISCONNECTED);
Expand Down Expand Up @@ -909,12 +910,12 @@ RLM_API const char* realm_sync_session_get_file_path(const realm_sync_session_t*

RLM_API void realm_sync_session_pause(realm_sync_session_t* session) noexcept
{
(*session)->log_out();
(*session)->pause();
}

RLM_API void realm_sync_session_resume(realm_sync_session_t* session) noexcept
{
(*session)->revive_if_needed();
(*session)->resume();
}

RLM_API bool realm_sync_immediately_run_file_actions(realm_app_t* realm_app, const char* sync_path,
Expand Down
2 changes: 1 addition & 1 deletion src/realm/object-store/sync/async_open_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ void AsyncOpenTask::cancel()
// thus deadlocking.
if (session) {
// Does a better way exists for canceling the download?
session->log_out();
session->force_close();
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/realm/object-store/sync/sync_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -782,7 +782,7 @@ void SyncManager::close_all_sessions()
}

for (auto& [_, session] : sessions) {
session->log_out();
session->force_close();
}

get_sync_client().wait_for_session_terminations();
Expand Down
103 changes: 82 additions & 21 deletions src/realm/object-store/sync/sync_session.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,26 @@ void SyncSession::become_inactive(util::CheckedUniqueLock lock, std::error_code
REALM_ASSERT(m_state != State::Inactive);
m_state = State::Inactive;

do_become_inactive(std::move(lock), ec);
}

void SyncSession::become_paused(util::CheckedUniqueLock lock)
{
REALM_ASSERT(m_state != State::Paused);
auto old_state = m_state;
m_state = State::Paused;

// Nothing to do if we're already inactive besides update the state.
if (old_state == State::Inactive) {
m_state_mutex.unlock(lock);
return;
}

do_become_inactive(std::move(lock), {});
}

void SyncSession::do_become_inactive(util::CheckedUniqueLock lock, std::error_code ec)
{
// Manually set the disconnected state. Sync would also do this, but
// since the underlying SyncSession object already have been destroyed,
// we are not able to get the callback.
Expand Down Expand Up @@ -653,7 +673,7 @@ void SyncSession::handle_error(SyncError error)

// Dont't bother invoking m_config.error_handler if the sync is inactive.
// It does not make sense to call the handler when the session is closed.
if (m_state == State::Inactive) {
if (m_state == State::Inactive || m_state == State::Paused) {
return;
}

Expand Down Expand Up @@ -882,6 +902,7 @@ void SyncSession::nonsync_transact_notify(sync::version_type version)
break;
case State::Dying:
case State::Inactive:
case State::Paused:
break;
}
}
Expand All @@ -892,25 +913,12 @@ void SyncSession::revive_if_needed()
switch (m_state) {
case State::Active:
case State::WaitingForAccessToken:
case State::Paused:
return;
case State::Dying:
case State::Inactive: {
// Revive.
auto u = user();
if (!u || !u->access_token_refresh_required()) {
become_active();
return;
}

become_waiting_for_access_token();
// Release the lock for SDKs with a single threaded
// networking implementation such as our test suite
// so that the update can trigger a state change from
// the completion handler.
lock.unlock();
initiate_access_token_refresh();
case State::Inactive:
do_revive(std::move(lock));
break;
}
}
}

Expand All @@ -924,11 +932,12 @@ void SyncSession::handle_reconnect()
case State::Dying:
case State::Inactive:
case State::WaitingForAccessToken:
case State::Paused:
break;
}
}

void SyncSession::log_out()
void SyncSession::force_close()
{
util::CheckedUniqueLock lock(m_state_mutex);
switch (m_state) {
Expand All @@ -938,10 +947,59 @@ void SyncSession::log_out()
become_inactive(std::move(lock));
break;
case State::Inactive:
case State::Paused:
break;
}
}

void SyncSession::pause()
{
util::CheckedUniqueLock lock(m_state_mutex);
switch (m_state) {
case State::Active:
case State::Dying:
case State::WaitingForAccessToken:
case State::Inactive:
become_paused(std::move(lock));
break;
case State::Paused:
break;
}
}

void SyncSession::resume()
{
util::CheckedUniqueLock lock(m_state_mutex);
switch (m_state) {
case State::Active:
case State::WaitingForAccessToken:
return;
case State::Paused:
case State::Dying:
case State::Inactive:
do_revive(std::move(lock));
break;
}
}

void SyncSession::do_revive(util::CheckedUniqueLock&& lock)
{
auto u = user();
if (!u || !u->access_token_refresh_required()) {
become_active();
m_state_mutex.unlock(lock);
return;
}

become_waiting_for_access_token();
// Release the lock for SDKs with a single threaded
// networking implementation such as our test suite
// so that the update can trigger a state change from
// the completion handler.
m_state_mutex.unlock(lock);
initiate_access_token_refresh();
}

void SyncSession::close()
{
util::CheckedUniqueLock lock(m_state_mutex);
Expand Down Expand Up @@ -970,6 +1028,9 @@ void SyncSession::close(util::CheckedUniqueLock lock)
case State::Dying:
m_state_mutex.unlock(lock);
break;
case State::Paused:
m_state_mutex.unlock(lock);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would you mind adding a comment here that mentions paused is supposed to be sticky and that state should not be changed to inactive.

break;
case State::Inactive: {
if (m_sync_manager) {
m_sync_manager->unregister_session(m_db->get_path());
Expand All @@ -994,7 +1055,7 @@ void SyncSession::shutdown_and_wait()
// Realm file to be closed. This works so long as this SyncSession object remains in the
// `inactive` state after the invocation of shutdown_and_wait().
util::CheckedUniqueLock lock(m_state_mutex);
if (m_state != State::Inactive) {
if (m_state != State::Inactive && m_state != State::Paused) {
become_inactive(std::move(lock));
}
}
Expand Down Expand Up @@ -1111,7 +1172,7 @@ void SyncSession::update_configuration(SyncConfig new_config)
{
while (true) {
util::CheckedUniqueLock state_lock(m_state_mutex);
if (m_state != State::Inactive) {
if (m_state != State::Inactive && m_state != State::Paused) {
// Changing the state releases the lock, which means that by the
// time we reacquire the lock the state may have changed again
// (either due to one of the callbacks being invoked or another
Expand All @@ -1122,7 +1183,7 @@ void SyncSession::update_configuration(SyncConfig new_config)
}

util::CheckedUniqueLock config_lock(m_config_mutex);
REALM_ASSERT(m_state == State::Inactive);
REALM_ASSERT(m_state == State::Inactive || m_state == State::Paused);
REALM_ASSERT(!m_session);
REALM_ASSERT(m_config.sync_config->user == new_config.user);
m_config.sync_config = std::make_shared<SyncConfig>(std::move(new_config));
Expand Down
29 changes: 26 additions & 3 deletions src/realm/object-store/sync/sync_session.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ class SyncSession : public std::enable_shared_from_this<SyncSession> {
Dying,
Inactive,
WaitingForAccessToken,
Paused,
};

enum class ConnectionState {
Expand Down Expand Up @@ -175,17 +176,30 @@ class SyncSession : public std::enable_shared_from_this<SyncSession> {
// Specifically:
// If the sync session is currently `Dying`, ask it to stay alive instead.
// If the sync session is currently `Inactive`, recreate it.
// If the sync session is currently `Paused`, do nothing - call resume() instead.
// Otherwise, a no-op.
void revive_if_needed() REQUIRES(!m_state_mutex, !m_config_mutex);

// Perform any actions needed in response to regaining network connectivity.
void handle_reconnect() REQUIRES(!m_state_mutex);

// Inform the sync session that it should close.
// Inform the sync session that it should close. This will respect the stop policy specified in
// the SyncConfig, so its possible the session will remain open either until all pending local
// changes are uploaded or possibly forever.
void close() REQUIRES(!m_state_mutex, !m_config_mutex, !m_connection_state_mutex);

// Inform the sync session that it should log out.
void log_out() REQUIRES(!m_state_mutex, !m_connection_state_mutex);
// Inform the sync session that it should close immediately, regardless of the stop policy.
// The session may resume after calling this if a new Realm is opened for the underlying DB
// of the SyncSession. Use pause() to close the sync session until you want to explicitly
// resume it.
void force_close() REQUIRES(!m_state_mutex, !m_connection_state_mutex);

// Closes the sync session so that it will not resume until resume() is called.
void pause() REQUIRES(!m_state_mutex, !m_connection_state_mutex);

// Resumes the sync session after it was paused by calling pause(). If the sync session is inactive
// for any other reason this will also resume it.
void resume() REQUIRES(!m_state_mutex, !m_config_mutex);

// Shut down the synchronization session (sync::Session) and wait for the Realm file to no
// longer be open on behalf of it.
Expand Down Expand Up @@ -357,8 +371,17 @@ class SyncSession : public std::enable_shared_from_this<SyncSession> {
void become_dying(util::CheckedUniqueLock) RELEASE(m_state_mutex) REQUIRES(!m_connection_state_mutex);
void become_inactive(util::CheckedUniqueLock, std::error_code ec = {}) RELEASE(m_state_mutex)
REQUIRES(!m_connection_state_mutex);
void become_paused(util::CheckedUniqueLock) RELEASE(m_state_mutex) REQUIRES(!m_connection_state_mutex);
void become_waiting_for_access_token() REQUIRES(m_state_mutex);

// do_become_inactive is called from both become_paused()/become_inactive() and does all the steps to
// shutdown and cleanup the sync session besides setting m_state.
void do_become_inactive(util::CheckedUniqueLock, std::error_code ec) RELEASE(m_state_mutex)
REQUIRES(!m_connection_state_mutex);
// do_revive is called from both revive_if_needed() and resume(). It does all the steps to transition
// from a state that is not Active to Active.
void do_revive(util::CheckedUniqueLock&& lock) RELEASE(m_state_mutex) REQUIRES(!m_config_mutex);

void add_completion_callback(util::UniqueFunction<void(std::error_code)> callback, ProgressDirection direction)
REQUIRES(m_state_mutex);

Expand Down
2 changes: 1 addition & 1 deletion src/realm/object-store/sync/sync_user.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,7 @@ void SyncUser::log_out()
// logged back in, they will automatically be reactivated.
for (auto& [path, weak_session] : m_sessions) {
if (auto ptr = weak_session.lock()) {
ptr->log_out();
ptr->force_close();
m_waiting_sessions[path] = std::move(ptr);
}
}
Expand Down
2 changes: 1 addition & 1 deletion test/object-store/audit.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1796,7 +1796,7 @@ TEST_CASE("audit integration tests") {
std::move(mut_subs).commit();
}

realm->sync_session()->log_out();
realm->sync_session()->force_close();
generate_event(realm, 0);
get_audit_events_from_baas(session, *config.audit_config->audit_user, 1);
}
Expand Down
31 changes: 29 additions & 2 deletions test/object-store/sync/app.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2690,7 +2690,7 @@ TEST_CASE("app: sync integration", "[sync][app]") {
std::mutex mutex;
bool done = false;
auto r = Realm::get_shared_realm(config);
r->sync_session()->close();
r->sync_session()->pause();

// Create 26 MB worth of dogs in 26 transactions, which should work but
// will result in an error from the server if the changesets are batched
Expand All @@ -2710,7 +2710,7 @@ TEST_CASE("app: sync integration", "[sync][app]") {
REQUIRE(!ec);
done = true;
});
r->sync_session()->revive_if_needed();
r->sync_session()->resume();

// If we haven't gotten an error in more than 5 minutes, then something has gone wrong
// and we should fail the test.
Expand Down Expand Up @@ -2753,6 +2753,33 @@ TEST_CASE("app: sync integration", "[sync][app]") {
REQUIRE(error.server_requests_action == sync::ProtocolErrorInfo::Action::ClientReset);
}

SECTION("freezing realm does not resume session") {
SyncTestFile config(app, partition, schema);
auto realm = Realm::get_shared_realm(config);
wait_for_download(*realm);

auto state = realm->sync_session()->state();
REQUIRE(state == SyncSession::State::Active);

realm->sync_session()->pause();
state = realm->sync_session()->state();
REQUIRE(state == SyncSession::State::Paused);

realm->read_group();

{
auto frozen = realm->freeze();
REQUIRE(realm->sync_session() == realm->sync_session());
REQUIRE(realm->sync_session()->state() == SyncSession::State::Paused);
}

{
auto frozen = Realm::get_frozen_realm(config, realm->read_transaction_version());
REQUIRE(realm->sync_session() == realm->sync_session());
REQUIRE(realm->sync_session()->state() == SyncSession::State::Paused);
}
}

SECTION("validation") {
SyncTestFile config(app, partition, schema);

Expand Down
Loading