From 8a68a7fded440ef894a54f4f8bf24c4a3560df33 Mon Sep 17 00:00:00 2001 From: Thomas Goyne Date: Wed, 10 Apr 2024 12:20:03 -0700 Subject: [PATCH] Simplify the SessionWrapper lifecycle a bit Initializing a sync::Session was a multi-step process of creating the Session with a config, configuring some addition things via mutation functions, and then calling bind(). We can simplify this a bit by pushing everything into the config struct and binding inside the wrapper's constructor, eliminating the constructed-but-not-initialized state and letting us make more of the members const. --- CHANGELOG.md | 2 + src/realm/db.cpp | 5 +- .../object-store/sync/impl/sync_client.hpp | 2 +- src/realm/object-store/sync/sync_session.cpp | 74 +-- src/realm/sync/client.cpp | 609 +++++++----------- src/realm/sync/client.hpp | 312 ++++----- src/realm/sync/network/network.hpp | 21 +- src/realm/sync/noinst/client_impl_base.cpp | 16 + src/realm/sync/noinst/client_impl_base.hpp | 41 +- src/realm/util/future.hpp | 2 +- src/realm/util/scope_exit.hpp | 58 +- test/benchmark-sync/bench_transform.cpp | 6 - test/sync_fixtures.hpp | 59 +- test/test_client_reset.cpp | 81 +-- test/test_sync.cpp | 577 ++++++----------- test/test_util_scope_exit.cpp | 53 +- 16 files changed, 747 insertions(+), 1171 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 8a8b3a25238..8983cb86887 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,6 +17,8 @@ ### Internals * Work around a bug in VC++ that resulted in runtime errors when running the tests in a debug build (#[7741](https://github.com/realm/realm-core/issues/7741)). +* Refactor `sync::Session` to eliminate the bind() step of session creation ([#7609](https://github.com/realm/realm-core/pull/7609)). +* Add ScopeExitFail which only calls the handler if exiting the scope via an uncaught exception ([#7609](https://github.com/realm/realm-core/pull/7609)). ---------------------------------------------- diff --git a/src/realm/db.cpp b/src/realm/db.cpp index ac2967bfdb9..e85824b8458 100644 --- a/src/realm/db.cpp +++ b/src/realm/db.cpp @@ -1189,10 +1189,9 @@ void DB::open(const std::string& path, const DBOptions& options) SlabAlloc::DetachGuard alloc_detach_guard(alloc); alloc.note_reader_start(this); // must come after the alloc detach guard - auto handler = [this, &alloc]() noexcept { + auto reader_end_guard = make_scope_exit([this, &alloc]() noexcept { alloc.note_reader_end(this); - }; - auto reader_end_guard = make_scope_exit(handler); + }); // Check validity of top array (to give more meaningful errors // early) diff --git a/src/realm/object-store/sync/impl/sync_client.hpp b/src/realm/object-store/sync/impl/sync_client.hpp index 7e07cd750fe..941f4db7da2 100644 --- a/src/realm/object-store/sync/impl/sync_client.hpp +++ b/src/realm/object-store/sync/impl/sync_client.hpp @@ -123,7 +123,7 @@ struct SyncClient { std::unique_ptr make_session(std::shared_ptr db, std::shared_ptr flx_sub_store, std::shared_ptr migration_store, - sync::Session::Config config) + sync::Session::Config&& config) { return std::make_unique(m_client, std::move(db), std::move(flx_sub_store), std::move(migration_store), std::move(config)); diff --git a/src/realm/object-store/sync/sync_session.cpp b/src/realm/object-store/sync/sync_session.cpp index 9ecafa81693..9cbbc5cf9f9 100644 --- a/src/realm/object-store/sync/sync_session.cpp +++ b/src/realm/object-store/sync/sync_session.cpp @@ -108,10 +108,7 @@ void SyncSession::become_active() } // when entering from the Dying state the session will still be bound - if (!m_session) { - create_sync_session(); - m_session->bind(); - } + create_sync_session(); // Register all the pending wait-for-completion blocks. This can // potentially add a redundant callback if we're coming from the Dying @@ -896,6 +893,8 @@ void SyncSession::create_sync_session() SyncConfig& sync_config = *m_config.sync_config; REALM_ASSERT(sync_config.user); + std::weak_ptr weak_self = weak_from_this(); + sync::Session::Config session_config; session_config.signed_user_token = sync_config.user->access_token(); session_config.user_id = sync_config.user->user_id(); @@ -912,8 +911,8 @@ void SyncSession::create_sync_session() if (sync_config.on_sync_client_event_hook) { session_config.on_sync_client_event_hook = [hook = sync_config.on_sync_client_event_hook, - anchor = weak_from_this()](const SyncClientHookData& data) { - return hook(anchor, data); + weak_self](const SyncClientHookData& data) { + return hook(weak_self, data); }; } @@ -954,46 +953,41 @@ void SyncSession::create_sync_session() m_server_requests_action = sync::ProtocolErrorInfo::Action::NoAction; } - m_session = m_client.make_session(m_db, m_flx_subscription_store, m_migration_store, std::move(session_config)); - - std::weak_ptr weak_self = weak_from_this(); - - // Set up the wrapped progress handler callback - m_session->set_progress_handler([weak_self](uint_fast64_t downloaded, uint_fast64_t downloadable, - uint_fast64_t uploaded, uint_fast64_t uploadable, - uint_fast64_t snapshot_version, double download_estimate, - double upload_estimate, int64_t query_version) { + session_config.progress_handler = [weak_self](uint_fast64_t downloaded, uint_fast64_t downloadable, + uint_fast64_t uploaded, uint_fast64_t uploadable, + uint_fast64_t snapshot_version, double download_estimate, + double upload_estimate, int64_t query_version) { if (auto self = weak_self.lock()) { self->handle_progress_update(downloaded, downloadable, uploaded, uploadable, snapshot_version, download_estimate, upload_estimate, query_version); } - }); + }; - // Sets up the connection state listener. This callback is used for both reporting errors as well as changes to - // the connection state. - m_session->set_connection_state_change_listener( - [weak_self](sync::ConnectionState state, std::optional error) { - using cs = sync::ConnectionState; - ConnectionState new_state = [&] { - switch (state) { - case cs::disconnected: - return ConnectionState::Disconnected; - case cs::connecting: - return ConnectionState::Connecting; - case cs::connected: - return ConnectionState::Connected; - } - REALM_UNREACHABLE(); - }(); - // If the OS SyncSession object is destroyed, we ignore any events from the underlying Session as there is - // nothing useful we can do with them. - if (auto self = weak_self.lock()) { - self->update_connection_state(new_state); - if (error) { - self->handle_error(std::move(*error)); - } + session_config.connection_state_change_listener = [weak_self](sync::ConnectionState state, + std::optional error) { + using cs = sync::ConnectionState; + ConnectionState new_state = [&] { + switch (state) { + case cs::disconnected: + return ConnectionState::Disconnected; + case cs::connecting: + return ConnectionState::Connecting; + case cs::connected: + return ConnectionState::Connected; } - }); + REALM_UNREACHABLE(); + }(); + // If the OS SyncSession object is destroyed, we ignore any events from the underlying Session as there is + // nothing useful we can do with them. + if (auto self = weak_self.lock()) { + self->update_connection_state(new_state); + if (error) { + self->handle_error(std::move(*error)); + } + } + }; + + m_session = m_client.make_session(m_db, m_flx_subscription_store, m_migration_store, std::move(session_config)); } void SyncSession::update_connection_state(ConnectionState new_state) diff --git a/src/realm/sync/client.cpp b/src/realm/sync/client.cpp index 127872157f2..b2786796105 100644 --- a/src/realm/sync/client.cpp +++ b/src/realm/sync/client.cpp @@ -1,29 +1,14 @@ - -#include -#include -#include - -#include "realm/sync/client_base.hpp" -#include "realm/sync/protocol.hpp" -#include "realm/util/optional.hpp" #include + #include -#include -#include #include +#include #include +#include #include #include -#include -#include -#include -#include -#include -#include - -namespace realm { -namespace sync { +namespace realm::sync { namespace { using namespace realm::util; @@ -44,47 +29,35 @@ using ProxyConfig = SyncConfig::ProxyConfig; // Life cycle states of a session wrapper: // -// - Uninitiated -// - Unactualized -// - Actualized -// - Finalized -// -// The session wrapper moves from the Uninitiated to the Unactualized state when -// it is initiated, i.e., when initiate() is called. This may happen on any -// thread. -// -// The session wrapper moves from the Unactualized to the Actualized state when -// it is associated with a session object, i.e., when `m_sess` is made to refer -// to an object of type SessionImpl. This always happens on the event loop -// thread. -// -// The session wrapper moves from the Actualized to the Finalized state when it -// is dissociated from the session object. This happens in response to the -// session wrapper having been abandoned by the application. This always happens -// on the event loop thread. -// -// The session wrapper will exist in the Finalized state only while referenced -// from a post handler waiting to be executed. -// -// If the session wrapper is abandoned by the application while in the -// Uninitiated state, it will be destroyed immediately, since no post handlers -// can have been scheduled prior to initiation. +// The session wrapper begins life with an associated Client, but no underlying +// SessionImpl. On construction, it begins the actualization process by posting +// a job to the client's event loop. That job will set `m_sess` to a session impl +// and then set `m_actualized = true`. Once this happens `m_actualized` will +// never change again. // -// If the session wrapper is abandoned while in the Unactivated state, it will -// move immediately to the Finalized state. This may happen on any thread. +// When the external reference to the session (`sync::Session`, which in +// non-test code is always owned by a `SyncSession`) is destroyed, the wrapper +// begins finalization. If the wrapper has not yet been actualized this takes +// place immediately and `m_finalized = true` is set directly on the calling +// thread. If it has been actualized, a job is posted to the client's event loop +// which will tear down the session and then set `m_finalized = true`. Regardless +// of whether or not the session has been actualized, `m_abandoned = true` is +// immediately set when the external reference is released. // -// The moving of a session wrapper to, or from the Actualized state always -// happen on the event loop thread. All other state transitions may happen on -// any thread. +// When the associated Client is destroyed it calls force_close() on all +// actualized wrappers from its event loop. This causes the wrapper to tear down +// the session, but not not make it proceed to the finalized state. In normal +// usage the client will outlive all sessions, but in tests getting the teardown +// correct and race-free can be tricky so we permit either order. // -// NOTE: Activation of the session happens no later than during actualization, -// and initiation of deactivation happens no earlier than during -// finalization. See also activate_session() and initiate_session_deactivation() -// in ClientImpl::Connection. +// The wrapper will exist with `m_abandoned = true` and `m_finalized = false` +// only while waiting for finalization to happen. It will exist with +// `m_finalized = true` only while there are pending post handlers yet to be +// executed. class SessionWrapper final : public util::AtomicRefCountBase, DB::CommitListener { public: SessionWrapper(ClientImpl&, DBRef db, std::shared_ptr, std::shared_ptr, - Session::Config); + Session::Config&&); ~SessionWrapper() noexcept; ClientReplication& get_replication() noexcept; @@ -96,48 +69,48 @@ class SessionWrapper final : public util::AtomicRefCountBase, DB::CommitListener MigrationStore* get_migration_store(); - void set_progress_handler(util::UniqueFunction); - void set_connection_state_change_listener(util::UniqueFunction); - - void initiate(); - + // Immediately initiate deactivation of the wrapped session. Sets m_closed + // but *not* m_finalized. + // Must be called from event loop thread. void force_close(); + // Can be called from any thread. void on_commit(version_type new_version) override; + // Can be called from any thread. void cancel_reconnect_delay(); + // Can be called from any thread. void async_wait_for(bool upload_completion, bool download_completion, WaitOperCompletionHandler); + // Can be called from any thread. bool wait_for_upload_complete_or_client_stopped(); + // Can be called from any thread. bool wait_for_download_complete_or_client_stopped(); + // Can be called from any thread. void refresh(std::string_view signed_access_token); + // Can be called from any thread. static void abandon(util::bind_ptr) noexcept; // These are called from ClientImpl - void actualize(ServerEndpoint); + // Must be called from event loop thread. + void actualize(); void finalize(); void finalize_before_actualization() noexcept; + // Can be called from any thread. util::Future send_test_command(std::string body); void handle_pending_client_reset_acknowledgement(); void update_subscription_version_info(); + // Can be called from any thread. std::string get_appservices_connection_id(); -protected: - friend class ClientImpl; - - // m_initiated/m_abandoned is used to check that we aren't trying to update immutable properties like the progress - // handler or connection state listener after we've bound the session. We read the variable a bunch in - // REALM_ASSERTS on the event loop and on the user's thread, but we only set it once and while we're registering - // the session wrapper to be actualized. This function gets called from - // ClientImpl::register_unactualized_session_wrapper() to synchronize updating this variable on the main thread - // with reading the variable on the event loop. - void mark_initiated(); - void mark_abandoned(); + // Can be called from any thread, but inherently cannot be called + // concurrently with calls to any of the other non-confined functions. + bool mark_abandoned(); private: ClientImpl& m_client; @@ -154,22 +127,20 @@ class SessionWrapper final : public util::AtomicRefCountBase, DB::CommitListener const std::map m_custom_http_headers; const bool m_verify_servers_ssl_certificate; const bool m_simulate_integration_error; - const Optional m_ssl_trust_certificate_path; + const std::optional m_ssl_trust_certificate_path; const std::function m_ssl_verify_callback; const size_t m_flx_bootstrap_batch_size_bytes; + const std::string m_http_request_path_prefix; + const std::string m_virt_path; + const std::optional m_proxy_config; // This one is different from null when, and only when the session wrapper // is in ClientImpl::m_abandoned_session_wrappers. SessionWrapper* m_next = nullptr; - // After initiation, these may only be accessed by the event loop thread. - std::string m_http_request_path_prefix; - std::string m_virt_path; + // These may only be accessed by the event loop thread. std::string m_signed_access_token; - - util::Optional m_client_reset_config; - - util::Optional m_proxy_config; + std::optional m_client_reset_config; struct ReportedProgress { uint64_t snapshot = 0; @@ -181,13 +152,13 @@ class SessionWrapper final : public util::AtomicRefCountBase, DB::CommitListener uint64_t final_downloaded = 0; } m_reported_progress; - util::UniqueFunction m_progress_handler; + const util::UniqueFunction m_progress_handler; util::UniqueFunction m_connection_state_change_listener; - std::function m_debug_hook; + const util::UniqueFunction m_debug_hook; bool m_in_debug_hook = false; - SessionReason m_session_reason; + const SessionReason m_session_reason; const uint64_t m_schema_version; @@ -199,27 +170,23 @@ class SessionWrapper final : public util::AtomicRefCountBase, DB::CommitListener std::shared_ptr m_migration_store; - bool m_initiated = false; - - // Set to true when this session wrapper is actualized (or when it is - // finalized before proper actualization). It is then never modified again. + // Set to true when this session wrapper is actualized (i.e. the wrapped + // session is created), or when the wrapper is finalized before actualization. + // It is then never modified again. // - // A session specific post handler submitted after the initiation of the - // session wrapper (initiate()) will always find that `m_actualized` is - // true. This is the case, because the scheduling of such a post handler - // will have been preceded by the triggering of - // `ClientImpl::m_actualize_and_finalize` (in - // ClientImpl::register_unactualized_session_wrapper()), which ensures that - // ClientImpl::actualize_and_finalize_session_wrappers() gets to execute - // before the post handler. If the session wrapper is no longer in - // `ClientImpl::m_unactualized_session_wrappers` when - // ClientImpl::actualize_and_finalize_session_wrappers() executes, it must - // have been abandoned already, but in that case, - // finalize_before_actualization() has already been called. + // Actualization is scheduled during the construction of SessionWrapper, and + // so a session specific post handler will always find that `m_actualized` + // is true as the handler will always be run after the actualization job. + // This holds even if the wrapper is finalized or closed before actualization. bool m_actualized = false; - bool m_force_closed = false; + // Set to true when session deactivation is begun, either via force_close() + // or finalize(). + bool m_closed = false; + // Set to true in on_suspended() and then false in on_resumed(). Used to + // suppress spurious connection state and error reporting while the session + // is already in an error state. bool m_suspended = false; // Set when the session has been abandoned. After this point none of the @@ -244,16 +211,8 @@ class SessionWrapper final : public util::AtomicRefCountBase, DB::CommitListener // // If a session specific post handler, that is submitted after the // initiation of the session wrapper, sees that `m_sess` is null, it can - // conclude that the session wrapper has been both abandoned and - // finalized. This is true, because the scheduling of such a post handler - // will have been preceded by the triggering of - // `ClientImpl::m_actualize_and_finalize` (in - // ClientImpl::register_unactualized_session_wrapper()), which ensures that - // ClientImpl::actualize_and_finalize_session_wrappers() gets to execute - // before the post handler, so the session wrapper must have been actualized - // unless it was already abandoned by the application. If it was abandoned - // before it was actualized, it will already have been finalized by - // finalize_before_actualization(). + // conclude that the session wrapper has either been force closed or has + // been both abandoned and finalized. // // Must only be accessed from the event loop thread. SessionImpl* m_sess = nullptr; @@ -276,7 +235,7 @@ class SessionWrapper final : public util::AtomicRefCountBase, DB::CommitListener void on_download_completion(); void on_suspended(const SessionErrorInfo& error_info); void on_resumed(); - void on_connection_state_changed(ConnectionState, const util::Optional&); + void on_connection_state_changed(ConnectionState, const std::optional&); void on_flx_sync_progress(int64_t new_version, DownloadBatchState batch_state); void on_flx_sync_error(int64_t version, std::string_view err_msg); void on_flx_sync_version_complete(int64_t version); @@ -326,10 +285,18 @@ inline void SessionWrapperStack::clear() noexcept } -inline SessionWrapperStack::SessionWrapperStack(SessionWrapperStack&& q) noexcept - : m_back{q.m_back} +inline bool SessionWrapperStack::erase(SessionWrapper* w) noexcept { - q.m_back = nullptr; + SessionWrapper** p = &m_back; + while (*p && *p != w) { + p = &(*p)->m_next; + } + if (!*p) { + return false; + } + *p = w->m_next; + util::bind_ptr{w, util::bind_ptr_base::adopt_tag{}}; + return true; } @@ -351,18 +318,14 @@ ClientImpl::~ClientImpl() // are abandoned. REALM_ASSERT(m_stopped); REALM_ASSERT(m_unactualized_session_wrappers.empty()); + REALM_ASSERT(m_abandoned_session_wrappers.empty()); } void ClientImpl::cancel_reconnect_delay() { // Thread safety required - post([this](Status status) { - if (status == ErrorCodes::OperationAborted) - return; - else if (!status.is_ok()) - throw Exception(status); - + post([this] { for (auto& p : m_server_slots) { ServerSlot& slot = p.second; if (m_one_connection_per_session) { @@ -392,36 +355,21 @@ void ClientImpl::cancel_reconnect_delay() void ClientImpl::voluntary_disconnect_all_connections() { auto done_pf = util::make_promise_future(); - post([this, promise = std::move(done_pf.promise)](Status status) mutable { - if (status == ErrorCodes::OperationAborted) { - return; - } - - REALM_ASSERT(status.is_ok()); - + post([this, promise = std::move(done_pf.promise)]() mutable { try { for (auto& p : m_server_slots) { ServerSlot& slot = p.second; if (m_one_connection_per_session) { REALM_ASSERT(!slot.connection); - for (const auto& p : slot.alt_connections) { - ClientImpl::Connection& conn = *p.second; - if (conn.get_state() == ConnectionState::disconnected) { - continue; - } - conn.voluntary_disconnect(); + for (const auto& [_, conn] : slot.alt_connections) { + conn->voluntary_disconnect(); } } else { REALM_ASSERT(slot.alt_connections.empty()); - if (!slot.connection) { - continue; + if (slot.connection) { + slot.connection->voluntary_disconnect(); } - ClientImpl::Connection& conn = *slot.connection; - if (conn.get_state() == ConnectionState::disconnected) { - continue; - } - conn.voluntary_disconnect(); } } } @@ -461,12 +409,7 @@ bool ClientImpl::wait_for_session_terminations_or_client_stopped() // will happen after the session wrapper has been added to // `m_abandoned_session_wrappers`, but before the post handler submitted // below gets to execute. - post([this](Status status) mutable { - if (status == ErrorCodes::OperationAborted) - return; - else if (!status.is_ok()) - throw Exception(status); - + post([this] { { util::CheckedLockGuard lock{m_mutex}; m_sessions_terminated = true; @@ -505,7 +448,7 @@ util::Future ClientImpl::notify_session_terminated() void ClientImpl::drain_connections_on_loop() { - post([this](Status status) mutable { + post([this](Status status) { REALM_ASSERT(status.is_ok()); drain_connections(); }); @@ -541,14 +484,20 @@ void ClientImpl::shutdown() noexcept } -void ClientImpl::register_unactualized_session_wrapper(SessionWrapper* wrapper, ServerEndpoint endpoint) +void ClientImpl::register_unactualized_session_wrapper(SessionWrapper* wrapper) { // Thread safety required. { util::CheckedLockGuard lock{m_mutex}; + // We can't actualize the session if we've already been stopped, so + // just finalize it immediately. + if (m_stopped) { + wrapper->finalize_before_actualization(); + return; + } + REALM_ASSERT(m_actualize_and_finalize); - wrapper->mark_initiated(); - m_unactualized_session_wrappers.emplace(wrapper, std::move(endpoint)); // Throws + m_unactualized_session_wrappers.push(util::bind_ptr(wrapper)); } m_actualize_and_finalize->trigger(); } @@ -560,15 +509,16 @@ void ClientImpl::register_abandoned_session_wrapper(util::bind_ptrmark_abandoned(); + // The wrapper may have already been finalized before being abandoned + // if we were stopped when it was created. + if (wrapper->mark_abandoned()) + return; // If the session wrapper has not yet been actualized (on the event loop // thread), it can be immediately finalized. This ensures that we will // generally not actualize a session wrapper that has already been // abandoned. - auto i = m_unactualized_session_wrappers.find(wrapper.get()); - if (i != m_unactualized_session_wrappers.end()) { - m_unactualized_session_wrappers.erase(i); + if (m_unactualized_session_wrappers.erase(wrapper.get())) { wrapper->finalize_before_actualization(); return; } @@ -581,32 +531,38 @@ void ClientImpl::register_abandoned_session_wrapper(util::bind_ptr unactualized_session_wrappers; - SessionWrapperStack abandoned_session_wrappers; - bool stopped; - { - util::CheckedLockGuard lock{m_mutex}; - swap(m_unactualized_session_wrappers, unactualized_session_wrappers); - swap(m_abandoned_session_wrappers, abandoned_session_wrappers); - stopped = m_stopped; - } - // Note, we need to finalize old session wrappers before we actualize new - // ones. This ensures that deactivation of old sessions is initiated before - // new session are activated. This, in turn, ensures that the server does - // not see two overlapping sessions for the same local Realm file. - while (util::bind_ptr wrapper = abandoned_session_wrappers.pop()) - wrapper->finalize(); // Throws - if (stopped) { - for (auto& p : unactualized_session_wrappers) { - SessionWrapper& wrapper = *p.first; - wrapper.finalize_before_actualization(); + // We need to pop from the wrapper stacks while holding the lock to ensure + // that all updates to `SessionWrapper:m_next` are thread-safe, but then + // release the lock before finalizing or actualizing because those functions + // invoke user callbacks which may try to access the client and reacquire + // the lock. + // + // Finalization must always happen before actualization because we may be + // finalizing and actualizing sessions for the same Realm file, and + // actualizing first would result in overlapping sessions. Because we're + // releasing the lock new sessions may come in as we're looping, so we need + // a single loop that checks both fields. + while (true) { + bool finalize = true; + bool stopped; + util::bind_ptr wrapper; + { + util::CheckedLockGuard lock{m_mutex}; + wrapper = m_abandoned_session_wrappers.pop(); + if (!wrapper) { + wrapper = m_unactualized_session_wrappers.pop(); + finalize = false; + } + stopped = m_stopped; } - return; - } - for (auto& p : unactualized_session_wrappers) { - SessionWrapper& wrapper = *p.first; - ServerEndpoint server_endpoint = std::move(p.second); - wrapper.actualize(std::move(server_endpoint)); // Throws + if (!wrapper) + break; + if (finalize) + wrapper->finalize(); // Throws + else if (stopped) + wrapper->finalize_before_actualization(); + else + wrapper->actualize(); // Throws } } @@ -698,7 +654,7 @@ void SessionImpl::force_close() } void SessionImpl::on_connection_state_changed(ConnectionState state, - const util::Optional& error_info) + const std::optional& error_info) { // Only used to report errors back to the SyncSession while the Session is active if (m_state == SessionImpl::Active) { @@ -740,7 +696,7 @@ ClientHistory& SessionImpl::get_history() const noexcept return get_repl().get_history(); } -util::Optional& SessionImpl::get_client_reset_config() noexcept +std::optional& SessionImpl::get_client_reset_config() noexcept { // Can only be called if the session is active or being activated REALM_ASSERT_EX(m_state == State::Active || m_state == State::Unactivated, m_state); @@ -856,7 +812,7 @@ bool SessionImpl::process_flx_bootstrap_message(const SyncProgress& progress, Do } auto bootstrap_store = m_wrapper.get_flx_pending_bootstrap_store(); - util::Optional maybe_progress; + std::optional maybe_progress; if (batch_state == DownloadBatchState::LastInBatch) { maybe_progress = progress; } @@ -1188,7 +1144,6 @@ util::Future SessionImpl::send_test_command(std::string body) } auto pf = util::make_promise_future(); - get_client().post([this, promise = std::move(pf.promise), body = std::move(body)](Status status) mutable { // Includes operation_aborted if (!status.is_ok()) { @@ -1210,7 +1165,7 @@ util::Future SessionImpl::send_test_command(std::string body) // provides a link to the ClientImpl::Session that creates and receives messages with the server with // the ClientImpl::Connection that owns the ClientImpl::Session. SessionWrapper::SessionWrapper(ClientImpl& client, DBRef db, std::shared_ptr flx_sub_store, - std::shared_ptr migration_store, Session::Config config) + std::shared_ptr migration_store, Session::Config&& config) : m_client{client} , m_db(std::move(db)) , m_replication(m_db->get_replication()) @@ -1221,7 +1176,7 @@ SessionWrapper::SessionWrapper(ClientImpl& client, DBRef db, std::shared_ptrget_replication()); REALM_ASSERT(dynamic_cast(m_db->get_replication())); - if (m_client_reset_config) { - m_session_reason = SessionReason::ClientReset; - } + + // SessionWrapper begins at +1 retain count because Client retains and + // releases it while performing async operations, and these need to not + // take it to 0 or it could be deleted before the caller can retain it. + bind_ptr(); + m_client.register_unactualized_session_wrapper(this); } SessionWrapper::~SessionWrapper() noexcept { - if (m_db && m_actualized) { - m_db->remove_commit_listener(this); - m_db->release_sync_agent(); - } + // We begin actualization in the constructor and do not delete the wrapper + // until both the Client is done with it and the Session has abandoned it, + // so at this point we must have actualized, finalized, and been abandoned. + REALM_ASSERT(m_actualized); + REALM_ASSERT(m_abandoned); + REALM_ASSERT(m_finalized); + REALM_ASSERT(m_closed); + REALM_ASSERT(!m_db); } @@ -1345,62 +1309,19 @@ MigrationStore* SessionWrapper::get_migration_store() return m_migration_store.get(); } -inline void SessionWrapper::mark_initiated() -{ - REALM_ASSERT(!m_initiated); - REALM_ASSERT(!m_abandoned); - m_initiated = true; -} - - -inline void SessionWrapper::mark_abandoned() +inline bool SessionWrapper::mark_abandoned() { REALM_ASSERT(!m_abandoned); m_abandoned = true; -} - - -inline void SessionWrapper::set_progress_handler(util::UniqueFunction handler) -{ - REALM_ASSERT(!m_initiated); - m_progress_handler = std::move(handler); -} - - -inline void -SessionWrapper::set_connection_state_change_listener(util::UniqueFunction listener) -{ - REALM_ASSERT(!m_initiated); - m_connection_state_change_listener = std::move(listener); -} - - -void SessionWrapper::initiate() -{ - ServerEndpoint server_endpoint{m_protocol_envelope, m_server_address, m_server_port, - m_user_id, m_sync_mode, m_server_verified}; - m_client.register_unactualized_session_wrapper(this, std::move(server_endpoint)); // Throws - m_db->add_commit_listener(this); + return m_finalized; } void SessionWrapper::on_commit(version_type new_version) { // Thread safety required - REALM_ASSERT(m_initiated); - - util::bind_ptr self{this}; - m_client.post([self = std::move(self), new_version](Status status) { - if (status == ErrorCodes::OperationAborted) - return; - else if (!status.is_ok()) - throw Exception(status); - + m_client.post([self = util::bind_ptr{this}, new_version] { REALM_ASSERT(self->m_actualized); - if (REALM_UNLIKELY(self->m_finalized || self->m_force_closed)) { - return; - } - if (REALM_UNLIKELY(!self->m_sess)) return; // Already finalized SessionImpl& sess = *self->m_sess; @@ -1413,17 +1334,10 @@ void SessionWrapper::on_commit(version_type new_version) void SessionWrapper::cancel_reconnect_delay() { // Thread safety required - REALM_ASSERT(m_initiated); - - util::bind_ptr self{this}; - m_client.post([self = std::move(self)](Status status) { - if (status == ErrorCodes::OperationAborted) - return; - else if (!status.is_ok()) - throw Exception(status); + m_client.post([self = util::bind_ptr{this}] { REALM_ASSERT(self->m_actualized); - if (REALM_UNLIKELY(self->m_finalized || self->m_force_closed)) { + if (REALM_UNLIKELY(self->m_closed)) { return; } @@ -1440,16 +1354,9 @@ void SessionWrapper::async_wait_for(bool upload_completion, bool download_comple WaitOperCompletionHandler handler) { REALM_ASSERT(upload_completion || download_completion); - REALM_ASSERT(m_initiated); - - util::bind_ptr self{this}; - m_client.post([self = std::move(self), handler = std::move(handler), upload_completion, - download_completion](Status status) mutable { - if (status == ErrorCodes::OperationAborted) - return; - else if (!status.is_ok()) - throw Exception(status); + m_client.post([self = util::bind_ptr{this}, handler = std::move(handler), upload_completion, + download_completion]() mutable { REALM_ASSERT(self->m_actualized); if (REALM_UNLIKELY(!self->m_sess)) { // Already finalized @@ -1482,7 +1389,6 @@ void SessionWrapper::async_wait_for(bool upload_completion, bool download_comple bool SessionWrapper::wait_for_upload_complete_or_client_stopped() { // Thread safety required - REALM_ASSERT(m_initiated); REALM_ASSERT(!m_abandoned); std::int_fast64_t target_mark; @@ -1491,15 +1397,8 @@ bool SessionWrapper::wait_for_upload_complete_or_client_stopped() target_mark = ++m_target_upload_mark; } - util::bind_ptr self{this}; - m_client.post([self = std::move(self), target_mark](Status status) { - if (status == ErrorCodes::OperationAborted) - return; - else if (!status.is_ok()) - throw Exception(status); - + m_client.post([self = util::bind_ptr{this}, target_mark] { REALM_ASSERT(self->m_actualized); - REALM_ASSERT(!self->m_finalized); // The session wrapper may already have been finalized. This can only // happen if it was abandoned, but in that case, the call of // wait_for_upload_complete_or_client_stopped() must have returned @@ -1528,7 +1427,6 @@ bool SessionWrapper::wait_for_upload_complete_or_client_stopped() bool SessionWrapper::wait_for_download_complete_or_client_stopped() { // Thread safety required - REALM_ASSERT(m_initiated); REALM_ASSERT(!m_abandoned); std::int_fast64_t target_mark; @@ -1537,15 +1435,8 @@ bool SessionWrapper::wait_for_download_complete_or_client_stopped() target_mark = ++m_target_download_mark; } - util::bind_ptr self{this}; - m_client.post([self = std::move(self), target_mark](Status status) { - if (status == ErrorCodes::OperationAborted) - return; - else if (!status.is_ok()) - throw Exception(status); - + m_client.post([self = util::bind_ptr{this}, target_mark] { REALM_ASSERT(self->m_actualized); - REALM_ASSERT(!self->m_finalized); // The session wrapper may already have been finalized. This can only // happen if it was abandoned, but in that case, the call of // wait_for_download_complete_or_client_stopped() must have returned @@ -1574,15 +1465,9 @@ bool SessionWrapper::wait_for_download_complete_or_client_stopped() void SessionWrapper::refresh(std::string_view signed_access_token) { // Thread safety required - REALM_ASSERT(m_initiated); REALM_ASSERT(!m_abandoned); - m_client.post([self = util::bind_ptr(this), token = std::string(signed_access_token)](Status status) { - if (status == ErrorCodes::OperationAborted) - return; - else if (!status.is_ok()) - throw Exception(status); - + m_client.post([self = util::bind_ptr{this}, token = std::string(signed_access_token)] { REALM_ASSERT(self->m_actualized); if (REALM_UNLIKELY(!self->m_sess)) return; // Already finalized @@ -1597,67 +1482,66 @@ void SessionWrapper::refresh(std::string_view signed_access_token) } -inline void SessionWrapper::abandon(util::bind_ptr wrapper) noexcept +void SessionWrapper::abandon(util::bind_ptr wrapper) noexcept { - if (wrapper->m_initiated) { - ClientImpl& client = wrapper->m_client; - client.register_abandoned_session_wrapper(std::move(wrapper)); - } + ClientImpl& client = wrapper->m_client; + client.register_abandoned_session_wrapper(std::move(wrapper)); } // Must be called from event loop thread -void SessionWrapper::actualize(ServerEndpoint endpoint) +void SessionWrapper::actualize() { - REALM_ASSERT_DEBUG(m_initiated); + // actualize() can only ever be called once REALM_ASSERT(!m_actualized); REALM_ASSERT(!m_sess); - // Cannot be actualized if it's already been finalized or force closed + // The client should have removed this wrapper from those pending + // actualization if it called force_close() or finalize_before_actualize() REALM_ASSERT(!m_finalized); - REALM_ASSERT(!m_force_closed); - try { - m_db->claim_sync_agent(); - } - catch (const MultipleSyncAgents&) { - finalize_before_actualization(); - throw; - } - auto sync_mode = endpoint.server_mode; + REALM_ASSERT(!m_closed); + + m_actualized = true; + + ScopeExitFail close_on_error([&]() noexcept { + m_closed = true; + }); + + m_db->claim_sync_agent(); + m_db->add_commit_listener(this); + ScopeExitFail remove_commit_listener([&]() noexcept { + m_db->remove_commit_listener(this); + }); + ServerEndpoint endpoint{m_protocol_envelope, m_server_address, m_server_port, + m_user_id, m_sync_mode, m_server_verified}; bool was_created = false; ClientImpl::Connection& conn = m_client.get_connection( std::move(endpoint), m_authorization_header_name, m_custom_http_headers, m_verify_servers_ssl_certificate, m_ssl_trust_certificate_path, m_ssl_verify_callback, m_proxy_config, was_created); // Throws - try { - // FIXME: This only makes sense when each session uses a separate connection. - conn.update_connect_info(m_http_request_path_prefix, m_signed_access_token); // Throws - std::unique_ptr sess = std::make_unique(*this, conn); // Throws - if (sync_mode == SyncServerMode::FLX) { - m_flx_pending_bootstrap_store = std::make_unique(m_db, sess->logger); - } - - sess->logger.info("Binding '%1' to '%2'", m_db->get_path(), m_virt_path); // Throws - m_sess = sess.get(); - conn.activate_session(std::move(sess)); // Throws - } - catch (...) { + ScopeExitFail remove_connection([&]() noexcept { if (was_created) m_client.remove_connection(conn); + }); - // finalize_before_actualization() expects m_sess to be nullptr, but it's possible that we - // reached its assignment above before throwing. Unset it here so we get a clean unhandled - // exception failure instead of a REALM_ASSERT in finalize_before_actualization(). - m_sess = nullptr; - finalize_before_actualization(); - throw; + // FIXME: This only makes sense when each session uses a separate connection. + conn.update_connect_info(m_http_request_path_prefix, m_signed_access_token); // Throws + std::unique_ptr sess = std::make_unique(*this, conn); // Throws + if (m_sync_mode == SyncServerMode::FLX) { + m_flx_pending_bootstrap_store = std::make_unique(m_db, sess->logger); } + sess->logger.info("Binding '%1' to '%2'", m_db->get_path(), m_virt_path); // Throws + m_sess = sess.get(); + ScopeExitFail clear_sess([&]() noexcept { + m_sess = nullptr; + }); + conn.activate_session(std::move(sess)); // Throws + // Initialize the variables relying on the bootstrap store from the event loop to guarantee that a previous // session cannot change the state of the bootstrap store at the same time. update_subscription_version_info(); - m_actualized = true; if (was_created) conn.activate(); // Throws @@ -1676,16 +1560,20 @@ void SessionWrapper::actualize(ServerEndpoint endpoint) void SessionWrapper::force_close() { - if (m_force_closed || m_finalized) { + if (m_closed) { return; } REALM_ASSERT(m_actualized); REALM_ASSERT(m_sess); - m_force_closed = true; + m_closed = true; ClientImpl::Connection& conn = m_sess->get_connection(); conn.initiate_session_deactivation(m_sess); // Throws + // We need to keep the DB open until finalization, but we no longer want to + // know when commits are made + m_db->remove_commit_listener(this); + // Delete the pending bootstrap store since it uses a reference to the logger in m_sess m_flx_pending_bootstrap_store.reset(); // Clear the subscription and migration store refs since they are owned by SyncSession @@ -1705,30 +1593,12 @@ void SessionWrapper::finalize() { REALM_ASSERT(m_actualized); REALM_ASSERT(m_abandoned); + REALM_ASSERT(!m_finalized); - // Already finalized? - if (m_finalized) { - return; - } - - // Must be before marking as finalized as we expect m_finalized == false in on_change() - m_db->remove_commit_listener(this); + force_close(); m_finalized = true; - if (!m_force_closed) { - REALM_ASSERT(m_sess); - ClientImpl::Connection& conn = m_sess->get_connection(); - conn.initiate_session_deactivation(m_sess); // Throws - - // Delete the pending bootstrap store since it uses a reference to the logger in m_sess - m_flx_pending_bootstrap_store.reset(); - // Clear the subscription and migration store refs since they are owned by SyncSession - m_flx_subscription_store.reset(); - m_migration_store.reset(); - m_sess = nullptr; - } - // The Realm file can be closed now, as no access to the Realm file is // supposed to happen on behalf of a session after initiation of // deactivation. @@ -1761,9 +1631,14 @@ void SessionWrapper::finalize() // Called with a lock on `m_client.m_mutex`. inline void SessionWrapper::finalize_before_actualization() noexcept { + REALM_ASSERT(!m_finalized); REALM_ASSERT(!m_sess); m_actualized = true; - m_force_closed = true; + m_finalized = true; + m_closed = true; + m_db->remove_commit_listener(this); + m_db->release_sync_agent(); + m_db = nullptr; } inline void SessionWrapper::on_upload_progress(bool only_if_new_uploadable_data) @@ -1855,11 +1730,10 @@ void SessionWrapper::on_resumed() void SessionWrapper::on_connection_state_changed(ConnectionState state, - const util::Optional& error_info) + const std::optional& error_info) { - if (m_connection_state_change_listener) { - if (!m_suspended) - m_connection_state_change_listener(state, error_info); // Throws + if (m_connection_state_change_listener && !m_suspended) { + m_connection_state_change_listener(state, error_info); // Throws } } @@ -2044,10 +1918,8 @@ void SessionWrapper::update_subscription_version_info() std::string SessionWrapper::get_appservices_connection_id() { auto pf = util::make_promise_future(); - REALM_ASSERT(m_initiated); - util::bind_ptr self(this); - get_client().post([self, promise = std::move(pf.promise)](Status status) mutable { + m_client.post([self = util::bind_ptr{this}, promise = std::move(pf.promise)](Status status) mutable { if (!status.is_ok()) { promise.set_error(status); return; @@ -2082,7 +1954,6 @@ ClientImpl::Connection::Connection(ClientImpl& client, connection_ident_type ide , m_ssl_verify_callback{std::move(ssl_verify_callback)} // DEPRECATED , m_proxy_config{std::move(proxy_config)} // DEPRECATED , m_reconnect_info{reconnect_info} - , m_session_history{} , m_ident{ident} , m_server_endpoint{std::move(endpoint)} , m_authorization_header_name{authorization_header_name} // DEPRECATED @@ -2155,15 +2026,12 @@ std::string ClientImpl::Connection::get_http_request_path() const std::string ClientImpl::Connection::make_logger_prefix(connection_ident_type ident) { - std::ostringstream out; - out.imbue(std::locale::classic()); - out << "Connection[" << ident << "]: "; // Throws - return out.str(); // Throws + return util::format("Connection[%1] ", ident); } void ClientImpl::Connection::report_connection_state_change(ConnectionState state, - util::Optional error_info) + std::optional error_info) { if (m_force_closed) { return; @@ -2231,32 +2099,8 @@ bool Client::decompose_server_url(const std::string& url, ProtocolEnvelope& prot Session::Session(Client& client, DBRef db, std::shared_ptr flx_sub_store, std::shared_ptr migration_store, Config&& config) { - util::bind_ptr sess; - sess.reset(new SessionWrapper{*client.m_impl, std::move(db), std::move(flx_sub_store), std::move(migration_store), - std::move(config)}); // Throws - // The reference count passed back to the application is implicitly - // owned by a naked pointer. This is done to avoid exposing - // implementation details through the header file (that is, through the - // Session object). - m_impl = sess.release(); -} - - -void Session::set_progress_handler(util::UniqueFunction handler) -{ - m_impl->set_progress_handler(std::move(handler)); // Throws -} - - -void Session::set_connection_state_change_listener(util::UniqueFunction listener) -{ - m_impl->set_connection_state_change_listener(std::move(listener)); // Throws -} - - -void Session::bind() -{ - m_impl->initiate(); // Throws + m_impl = new SessionWrapper{*client.m_impl, std::move(db), std::move(flx_sub_store), std::move(migration_store), + std::move(config)}; // Throws } @@ -2326,5 +2170,4 @@ std::ostream& operator<<(std::ostream& os, ProxyConfig::Type proxyType) REALM_TERMINATE("Invalid Proxy Type object."); } -} // namespace sync -} // namespace realm +} // namespace realm::sync diff --git a/src/realm/sync/client.hpp b/src/realm/sync/client.hpp index bff50c91d72..5f5cd1eb3e4 100644 --- a/src/realm/sync/client.hpp +++ b/src/realm/sync/client.hpp @@ -3,13 +3,11 @@ #include #include -#include #include #include #include #include -#include #include #include #include @@ -106,29 +104,22 @@ class Client { /// synchronize multiple local Realm files, you need multiple sessions. /// /// A session object is always associated with a particular client object (\ref -/// Client). The application must ensure that the destruction of the associated -/// client object never happens before the destruction of the session -/// object. The consequences of a violation are unspecified. -/// -/// A session object is always associated with a particular local Realm file, -/// however, a session object does not represent a session until it is bound to -/// a server side Realm, i.e., until bind() is called. From the point of view of -/// the thread that calls bind(), the session starts precisely when the -/// execution of bind() starts, i.e., before bind() returns. +/// Client). Destroying the client while sessions still exist will forcibly +/// close the sessions. This is intended only for the convenience of code which +/// finds it difficult to ensure that objects are torn down in the correct +/// order, and using closed sessions has unspecified results. /// /// At most one session is allowed to exist for a particular local Realm file -/// (file system inode) at any point in time. Multiple session objects may -/// coexists for a single file, as long as bind() has been called on at most one -/// of them. Additionally, two bound session objects for the same file are -/// allowed to exist at different times, if they have no overlap in time (in -/// their bound state), as long as they are associated with the same client -/// object, or with two different client objects that do not overlap in -/// time. This means, in particular, that it is an error to create two bound -/// session objects for the same local Realm file, if they are associated with -/// two different client objects that overlap in time, even if the session -/// objects do not overlap in time (in their bound state). It is the -/// responsibility of the application to ensure that these rules are adhered -/// to. The consequences of a violation are unspecified. +/// (file system inode) at any point in time. Two session objects for the same +/// file are allowed to exist at different times, if they have no overlap in +/// time as long as they are associated with the same client object, or with +/// two different client objects that do not overlap in time. This means, in +/// particular, that it is an error to create two session objects for the same +/// local Realm file, if they are associated with two different client objects +/// that overlap in time, even if the session objects do not overlap in time +/// (in their bound state). It is the responsibility of the application to +/// ensure that these rules are adhered to. The consequences of a violation are +/// unspecified. /// /// Thread-safety: It is safe for multiple threads to construct, use (with some /// exceptions), and destroy session objects concurrently, regardless of whether @@ -137,10 +128,8 @@ class Client { /// thread-safe, while others are not. /// /// Callback semantics: All session specific callback functions will be executed -/// by the event loop thread, i.e., the thread that calls Client::run(). No -/// callback function will be called before Session::bind() is called. Callback -/// functions that are specified prior to calling bind() (e.g., any passed to -/// set_progress_handler()) may start to execute before bind() returns, as long +/// by the event loop thread, i.e., the thread that calls Client::run(). Callback +/// functions may start to execute before Session's constructor returns, as long /// as some thread is executing Client::run(). Likewise, completion handlers, /// such as those passed to async_wait_for_sync_completion() may start to /// execute before the submitting function returns. All session specific @@ -163,6 +152,7 @@ class Session { std::uint_fast64_t uploaded_bytes, std::uint_fast64_t uploadable_bytes, std::uint_fast64_t snapshot_version, double download_estimate, double upload_estimate, int64_t query_version); + using ConnectionStateChangeListener = void(ConnectionState, std::optional); using WaitOperCompletionHandler = util::UniqueFunction; using SSLVerifyCallback = bool(const std::string& server_address, port_type server_port, const char* pem_data, size_t pem_size, int preverify_ok, int depth); @@ -258,7 +248,7 @@ class Session { /// If ssl_trust_certificate_path is None (default), ssl_verify_callback /// (see below) is used if set, and the default device trust/anchor /// store is used otherwise. - util::Optional ssl_trust_certificate_path; + std::optional ssl_trust_certificate_path; /// /// DEPRECATED - Will be removed in a future release @@ -323,12 +313,12 @@ class Session { std::string signed_user_token; using ClientReset = sync::ClientReset; - util::Optional client_reset_config; + std::optional client_reset_config; /// /// DEPRECATED - Will be removed in a future release /// - util::Optional proxy_config; + std::optional proxy_config; /// When integrating a flexible sync bootstrap, process this many bytes of /// changeset data in a single integration attempt. @@ -340,11 +330,98 @@ class Session { /// This feature exists exclusively for testing purposes at this time. bool simulate_integration_error = false; - std::function on_sync_client_event_hook; + util::UniqueFunction on_sync_client_event_hook; + - /// The reason this synchronization session is used for. + /// Set a handler to monitor the state of download and upload progress. + /// + /// The handler must have signature + /// + /// void(uint_fast64_t downloaded_bytes, uint_fast64_t downloadable_bytes, + /// uint_fast64_t uploaded_bytes, uint_fast64_t uploadable_bytes, + /// uint_fast64_t progress_version); + /// + /// downloaded_bytes is the size in bytes of all downloaded changesets. + /// downloadable_bytes is equal to downloaded_bytes plus an estimate of + /// the size of the remaining server history. + /// + /// uploaded_bytes is the size in bytes of all locally produced changesets + /// that have been received and acknowledged by the server. + /// uploadable_bytes is the size in bytes of all locally produced changesets. + /// + /// Due to the nature of the merge rules, it is possible that the size of an + /// uploaded changeset uploaded from one client is not equal to the size of + /// the changesets that other clients will download. + /// + /// Typical uses of this function: + /// + /// Upload completion can be checked by + /// + /// bool upload_complete = (uploaded_bytes == uploadable_bytes); + /// + /// Download completion could be checked by + /// + /// bool download_complete = (downloaded_bytes == downloadable_bytes); + /// + /// However, download completion might never be reached because the server + /// can receive new changesets from other clients. downloadable_bytes can + /// decrease for two reasons: server side compaction and changesets of + /// local origin. Code using downloadable_bytes must not assume that it + /// is increasing. + /// + /// Upload progress can be calculated by caching an initial value of + /// uploaded_bytes from the last, or next, callback. Then + /// + /// double upload_progress = + /// (uploaded_bytes - initial_uploaded_bytes) + /// ------------------------------------------- + /// (uploadable_bytes - initial_uploaded_bytes) + /// + /// Download progress can be calculates similarly: + /// + /// double download_progress = + /// (downloaded_bytes - initial_downloaded_bytes) + /// ----------------------------------------------- + /// (downloadable_bytes - initial_downloaded_bytes) + /// + /// progress_version is 0 at the start of a session. When at least one + /// DOWNLOAD message has been received from the server, progress_version is + /// positive. progress_version can be used to ensure that the reported + /// progress contains information obtained from the server in the current + /// session. The server will send a message as soon as possible, and the + /// progress handler will eventually be called with a positive progress_version + /// unless the session is interrupted before a message from the server has + /// been received. /// - /// Note: Currently only used in FLX sync. + /// The handler is called on the event loop thread.The handler after bind(), + /// after each DOWNLOAD message, and after each local transaction + /// (nonsync_transact_notify). + util::UniqueFunction progress_handler; + + /// Install a connection state change listener. + /// + /// Sets a function to be called whenever the state of the underlying + /// network connection changes between "disconnected", "connecting", and + /// "connected". The initial state is always "disconnected". The next state + /// after "disconnected" is always "connecting". The next state after + /// "connecting" is either "connected" or "disconnected". The next state + /// after "connected" is always "disconnected". A switch to the + /// "disconnected" state only happens when an error occurs. + /// + /// Whenever the installed function is called, an SessionErrorInfo object is passed + /// when, and only when the passed state is ConnectionState::disconnected. + /// + /// When multiple sessions share a single connection, the state changes will + /// be reported for each session in turn. + /// + /// The callback function will always be called by the thread that executes + /// the event loop (Client::run()). If the + /// callback function throws an exception, that exception will "travel" out + /// through Client::run(). + util::UniqueFunction connection_state_change_listener; + + /// The purpose of this sync session. Reported to the server for informational purposes and has no functional + /// effect. SessionReason session_reason = SessionReason::Sync; /// Schema version @@ -354,8 +431,6 @@ class Session { }; /// \brief Start a new session for the specified client-side Realm. - /// - /// Note that the session is not fully activated until you call bind(). Session(Client&, std::shared_ptr, std::shared_ptr, std::shared_ptr, Config&& = {}); @@ -395,143 +470,6 @@ class Session { /// constructor and assignment operator. void detach() noexcept; - /// \brief Set a handler to monitor the state of download and upload - /// progress. - /// - /// The handler must have signature - /// - /// void(uint_fast64_t downloaded_bytes, uint_fast64_t downloadable_bytes, - /// uint_fast64_t uploaded_bytes, uint_fast64_t uploadable_bytes, - /// uint_fast64_t progress_version); - /// - /// downloaded_bytes is the size in bytes of all downloaded changesets. - /// downloadable_bytes is equal to downloaded_bytes plus an estimate of - /// the size of the remaining server history. - /// - /// uploaded_bytes is the size in bytes of all locally produced changesets - /// that have been received and acknowledged by the server. - /// uploadable_bytes is the size in bytes of all locally produced changesets. - /// - /// Due to the nature of the merge rules, it is possible that the size of an - /// uploaded changeset uploaded from one client is not equal to the size of - /// the changesets that other clients will download. - /// - /// Typical uses of this function: - /// - /// Upload completion can be checked by - /// - /// bool upload_complete = (uploaded_bytes == uploadable_bytes); - /// - /// Download completion could be checked by - /// - /// bool download_complete = (downloaded_bytes == downloadable_bytes); - /// - /// However, download completion might never be reached because the server - /// can receive new changesets from other clients. downloadable_bytes can - /// decrease for two reasons: server side compaction and changesets of - /// local origin. Code using downloadable_bytes must not assume that it - /// is increasing. - /// - /// Upload progress can be calculated by caching an initial value of - /// uploaded_bytes from the last, or next, callback. Then - /// - /// double upload_progress = - /// (uploaded_bytes - initial_uploaded_bytes) - /// ------------------------------------------- - /// (uploadable_bytes - initial_uploaded_bytes) - /// - /// Download progress can be calculates similarly: - /// - /// double download_progress = - /// (downloaded_bytes - initial_downloaded_bytes) - /// ----------------------------------------------- - /// (downloadable_bytes - initial_downloaded_bytes) - /// - /// progress_version is 0 at the start of a session. When at least one - /// DOWNLOAD message has been received from the server, progress_version is - /// positive. progress_version can be used to ensure that the reported - /// progress contains information obtained from the server in the current - /// session. The server will send a message as soon as possible, and the - /// progress handler will eventually be called with a positive progress_version - /// unless the session is interrupted before a message from the server has - /// been received. - /// - /// The handler is called on the event loop thread.The handler after bind(), - /// after each DOWNLOAD message, and after each local transaction - /// (nonsync_transact_notify). - /// - /// set_progress_handler() is not thread safe and it must be called before - /// bind() is called. Subsequent calls to set_progress_handler() overwrite - /// the previous calls. Typically, this function is called once per session. - /// - /// CAUTION: The specified callback function may get called before the call - /// to bind() returns, and it may get called (or continue to execute) after - /// the session object is destroyed. Please see "Callback semantics" section - /// under Session for more on this. - void set_progress_handler(util::UniqueFunction); - - using ConnectionStateChangeListener = void(ConnectionState, util::Optional); - - /// \brief Install a connection state change listener. - /// - /// Sets a function to be called whenever the state of the underlying - /// network connection changes between "disconnected", "connecting", and - /// "connected". The initial state is always "disconnected". The next state - /// after "disconnected" is always "connecting". The next state after - /// "connecting" is either "connected" or "disconnected". The next state - /// after "connected" is always "disconnected". A switch to the - /// "disconnected" state only happens when an error occurs. - /// - /// Whenever the installed function is called, an SessionErrorInfo object is passed - /// when, and only when the passed state is ConnectionState::disconnected. - /// - /// When multiple sessions share a single connection, the state changes will - /// be reported for each session in turn. - /// - /// The callback function will always be called by the thread that executes - /// the event loop (Client::run()), but not until bind() is called. If the - /// callback function throws an exception, that exception will "travel" out - /// through Client::run(). - /// - /// Note: Any call to this function must have returned before bind() is - /// called. If this function is called multiple times, each call overrides - /// the previous setting. - /// - /// Note: This function is **not thread-safe**. That is, it is an error if - /// it is called while another thread is executing any member function on - /// the same Session object. - /// - /// CAUTION: The specified callback function may get called before the call - /// to bind() returns, and it may get called (or continue to execute) after - /// the session object is destroyed. Please see "Callback semantics" section - /// under Session for more on this. - void set_connection_state_change_listener(util::UniqueFunction); - - //@{ - /// Deprecated! Use set_connection_state_change_listener() instead. - using ErrorHandler = void(const SessionErrorInfo&); - void set_error_handler(util::UniqueFunction); - //@} - - /// @{ \brief Bind this session to the specified server side Realm. - /// - /// No communication takes place on behalf of this session before the - /// session is bound, but as soon as the session becomes bound, the server - /// will start to push changes to the client, and vice versa. - /// - /// Note: It is an error if this function is called more than once per - /// Session object. - /// - /// Note: This function is **not thread-safe**. That is, it is an error if - /// it is called while another thread is executing any member function on - /// the same Session object. - /// - /// bind() binds this session to the specified server side Realm using the - /// parameters specified in the Session::Config object. - /// - /// The two other forms of bind() are convenience functions. - void bind(); - /// @} /// \brief Refresh the access token associated with this session. @@ -558,8 +496,6 @@ class Session { /// condition by detecting the `ProtocolError::token_expired` error, and /// always initiate a token renewal in this case. /// - /// It is an error to call this function before calling `Client::bind()`. - /// /// Note: This function is thread-safe. /// /// \param signed_user_token A cryptographically signed token describing the @@ -572,9 +508,6 @@ class Session { /// performed on its behalf, that is, after a transaction that is not /// performed to integrate a changeset that was downloaded from the server. /// - /// It is an error to call this function before bind() has been called, and - /// has returned. - /// /// Note: This function is fully thread-safe. That is, it may be called by /// any thread, and by multiple threads concurrently. void nonsync_transact_notify(version_type new_version); @@ -590,7 +523,7 @@ class Session { /// Upload is considered complete when all non-empty changesets of local /// origin have been uploaded to the server, and the server has acknowledged /// reception of them. Changesets of local origin introduced after the - /// initiation of the session (after bind() is called) will generally not be + /// initiation of the session will generally not be /// considered for upload unless they are announced to this client through /// nonsync_transact_notify() prior to the initiation of the wait operation, /// i.e., prior to the invocation of async_wait_for_upload_completion() or @@ -617,9 +550,6 @@ class Session { /// will not affect the waiting period of /// async_wait_for_download_completion(), and vice versa. /// - /// It is an error to call these functions before bind() has been called, - /// and has returned. - /// /// The specified completion handlers will always be executed by the thread /// that executes the event loop (the thread that calls Client::run()). If /// the handler throws an exception, that exception will "travel" out @@ -656,9 +586,6 @@ class Session { /// client's event loop thread exits from Client::run(), whichever happens /// first. /// - /// It is an error to call these functions before bind() has been called, - /// and has returned. - /// /// CAUTION: If Client::run() returns while a wait operation is in progress, /// these waiting functions return immediately, even if the completion /// condition is not yet satisfied. The completion condition is guaranteed @@ -690,9 +617,6 @@ class Session { /// periods of time, as that would effectively disable the built-in "server /// hammering" protection. /// - /// It is an error to call this function before bind() has been called, and - /// has returned. - /// /// This function is fully thread-safe. That is, it may be called by any /// thread, and by multiple threads concurrently. void cancel_reconnect_delay(); @@ -706,6 +630,8 @@ class Session { std::string get_appservices_connection_id(); private: + // This is a bare pointer rather than bind_ptr to avoid requiring the + // definition of SessionWrapper here. SessionWrapper* m_impl = nullptr; void abandon() noexcept; @@ -744,18 +670,6 @@ inline void Session::detach() noexcept m_impl = nullptr; } -inline void Session::set_error_handler(util::UniqueFunction handler) -{ - auto handler_2 = [handler = std::move(handler)](ConnectionState state, - const util::Optional& error_info) { - if (state != ConnectionState::disconnected) - return; - REALM_ASSERT(error_info); - handler(*error_info); // Throws - }; - set_connection_state_change_listener(std::move(handler_2)); // Throws -} - inline void Session::async_wait_for_sync_completion(WaitOperCompletionHandler handler) { bool upload_completion = true, download_completion = true; diff --git a/src/realm/sync/network/network.hpp b/src/realm/sync/network/network.hpp index 595d14c2a55..a08c843bd66 100644 --- a/src/realm/sync/network/network.hpp +++ b/src/realm/sync/network/network.hpp @@ -1998,6 +1998,8 @@ class Service::PostOperBase : public AsyncOper { template class Service::PostOper : public PostOperBase { + static_assert(std::is_nothrow_move_constructible_v); + public: PostOper(std::size_t size, Impl& service, H&& handler) : PostOperBase{size, service} @@ -2009,21 +2011,10 @@ class Service::PostOper : public PostOperBase { // Recycle the operation object before the handler is exceuted, such // that the memory is available for a new post operation that might be // initiated during the execution of the handler. - bool was_recycled = false; - try { - H handler = std::move(m_handler); // Throws - // Service::recycle_post_oper() destroys this operation object - Service::recycle_post_oper(m_service, this); - was_recycled = true; - handler(Status::OK()); // Throws - } - catch (...) { - if (!was_recycled) { - // Service::recycle_post_oper() destroys this operation object - Service::recycle_post_oper(m_service, this); - } - throw; - } + H handler = std::move(m_handler); + // Service::recycle_post_oper() destroys this operation object + Service::recycle_post_oper(m_service, this); + handler(Status::OK()); // Throws } private: diff --git a/src/realm/sync/noinst/client_impl_base.cpp b/src/realm/sync/noinst/client_impl_base.cpp index be37a084313..21e2a4c3453 100644 --- a/src/realm/sync/noinst/client_impl_base.cpp +++ b/src/realm/sync/noinst/client_impl_base.cpp @@ -260,6 +260,22 @@ void ClientImpl::post(SyncSocketProvider::FunctionHandler&& handler) }); } +void ClientImpl::post(util::UniqueFunction&& handler) +{ + REALM_ASSERT(m_socket_provider); + incr_outstanding_posts(); + m_socket_provider->post([handler = std::move(handler), this](Status status) { + auto decr_guard = util::make_scope_exit([&]() noexcept { + decr_outstanding_posts(); + }); + if (status == ErrorCodes::OperationAborted) + return; + if (!status.is_ok()) + throw Exception(status); + handler(); + }); +} + void ClientImpl::drain_connections() { diff --git a/src/realm/sync/noinst/client_impl_base.hpp b/src/realm/sync/noinst/client_impl_base.hpp index f987aee7f5a..f6a6d4a84a3 100644 --- a/src/realm/sync/noinst/client_impl_base.hpp +++ b/src/realm/sync/noinst/client_impl_base.hpp @@ -31,11 +31,10 @@ namespace realm::sync { -// (protocol, address, port, session_multiplex_ident) +// (protocol, address, port, user_id) // // `protocol` is included for convenience, even though it is not strictly part // of an endpoint. - struct ServerEndpoint { ProtocolEnvelope envelope; std::string address; @@ -47,6 +46,8 @@ struct ServerEndpoint { private: auto to_tuple() const { + // Does not include server_mode because all endpoints for a single Client + // must have the same mode. is_verified is not part of an endpoint's identity. return std::make_tuple(server_mode, envelope, std::ref(address), port, std::ref(user_id)); } @@ -56,7 +57,6 @@ struct ServerEndpoint { return lhs.to_tuple() == rhs.to_tuple(); } - friend inline bool operator<(const ServerEndpoint& lhs, const ServerEndpoint& rhs) { return lhs.to_tuple() < rhs.to_tuple(); @@ -71,13 +71,9 @@ class SessionWrapperStack { void push(util::bind_ptr) noexcept; util::bind_ptr pop() noexcept; void clear() noexcept; + bool erase(SessionWrapper*) noexcept; SessionWrapperStack() noexcept = default; - SessionWrapperStack(SessionWrapperStack&&) noexcept; ~SessionWrapperStack(); - friend void swap(SessionWrapperStack& q_1, SessionWrapperStack& q_2) noexcept - { - std::swap(q_1.m_back, q_2.m_back); - } private: SessionWrapper* m_back = nullptr; @@ -218,6 +214,7 @@ class ClientImpl { // Functions to post onto the event loop and create an event loop timer using the // SyncSocketProvider void post(SyncSocketProvider::FunctionHandler&& handler) REQUIRES(!m_drain_mutex); + void post(util::UniqueFunction&& handler) REQUIRES(!m_drain_mutex); SyncSocketProvider::SyncTimer create_timer(std::chrono::milliseconds delay, SyncSocketProvider::FunctionHandler&& handler) REQUIRES(!m_drain_mutex); @@ -262,10 +259,10 @@ class ClientImpl { SyncTrigger m_actualize_and_finalize; // Note: There is one server slot per server endpoint (hostname, port, - // session_multiplex_ident), and it survives from one connection object to - // the next, which is important because it carries information about a - // possible reconnect delay applying to the new connection object (server - // hammering protection). + // user_id), and it survives from one connection object to the next, which + // is important because it carries information about a possible reconnect + // delay applying to the new connection object (server hammering + // protection). struct ServerSlot { explicit ServerSlot(ReconnectInfo reconnect_info); ~ServerSlot(); @@ -297,7 +294,7 @@ class ClientImpl { // The set of session wrappers that are not yet wrapping a session object, // and are not yet abandoned (still referenced by the application). - std::map m_unactualized_session_wrappers GUARDED_BY(m_mutex); + SessionWrapperStack m_unactualized_session_wrappers GUARDED_BY(m_mutex); // The set of session wrappers that were successfully actualized, but are // now abandoned (no longer referenced by the application), and have not yet @@ -307,7 +304,7 @@ class ClientImpl { // Used with m_mutex std::condition_variable m_wait_or_client_stopped_cond; - void register_unactualized_session_wrapper(SessionWrapper*, ServerEndpoint) REQUIRES(!m_mutex); + void register_unactualized_session_wrapper(SessionWrapper*) REQUIRES(!m_mutex); void register_abandoned_session_wrapper(util::bind_ptr) noexcept REQUIRES(!m_mutex); void actualize_and_finalize_session_wrappers() REQUIRES(!m_mutex); @@ -325,13 +322,6 @@ class ClientImpl { // approach would be to allow for per-endpoint SSL parameters to be // specifiable through public member functions of ClientImpl from where they // could then be picked up as new connections are created on demand. - // - // FIXME: `session_multiplex_ident` should be eliminated from ServerEndpoint - // as it effectively disables part of the hammering protection scheme if it - // is used to ensure that each session gets a separate connection. With the - // alternative approach outlined in the previous FIXME (specify per endpoint - // SSL parameters at the client object level), there seems to be no more use - // for `session_multiplex_ident`. ClientImpl::Connection& get_connection(ServerEndpoint, const std::string& authorization_header_name, const std::map& custom_http_headers, bool verify_servers_ssl_certificate, @@ -952,13 +942,9 @@ class ClientImpl::Session { const SyncProgress& progress, const ReceivedChangesets&); /// See request_upload_completion_notification(). - /// - /// The default implementation does nothing. void on_upload_completion(); /// See request_download_completion_notification(). - /// - /// The default implementation does nothing. void on_download_completion(); //@{ @@ -969,8 +955,6 @@ class ClientImpl::Session { /// A switch to the suspended state only happens when an error occurs, /// and information about that error is passed to on_suspended(). /// - /// The default implementations of these functions do nothing. - /// /// These functions are always called by the event loop thread of the /// associated client object. /// @@ -1287,6 +1271,9 @@ void ClientImpl::Connection::for_each_active_session(H handler) inline void ClientImpl::Connection::voluntary_disconnect() { + if (m_state == ConnectionState::disconnected) { + return; + } m_reconnect_info.update(ConnectionTerminationReason::closed_voluntarily, std::nullopt); SessionErrorInfo error_info{Status{ErrorCodes::ConnectionClosed, "Connection closed"}, IsFatal{false}}; error_info.server_requests_action = ProtocolErrorInfo::Action::Transient; diff --git a/src/realm/util/future.hpp b/src/realm/util/future.hpp index aad7be1ef41..6057cfb63e6 100644 --- a/src/realm/util/future.hpp +++ b/src/realm/util/future.hpp @@ -1225,7 +1225,7 @@ class REALM_NODISCARD future_details::Future { * Returns a bound Promise and Future in a struct with friendly names (promise and future) that also * works well with C++17 structured bindings. */ -template +template inline auto make_promise_future() { return Promise::make_promise_future_impl(); diff --git a/src/realm/util/scope_exit.hpp b/src/realm/util/scope_exit.hpp index a8b054f3581..b553d2d7ad6 100644 --- a/src/realm/util/scope_exit.hpp +++ b/src/realm/util/scope_exit.hpp @@ -19,19 +19,27 @@ #ifndef REALM_UTIL_SCOPE_EXIT_HPP #define REALM_UTIL_SCOPE_EXIT_HPP +#include #include -namespace realm { -namespace util { +namespace realm::util { +// A guard which invokes the given function when exiting the scope (either via +// an exception or normal flow), used to clean up state which is not owned by +// an explicit RAII type. +// +// void foo() +// { +// begin_foo(); +// ScopeExit cleanup([&]() noexcept { +// end_foo(); +// }); +// +// // Do some things which may throw an exception +// } template class ScopeExit { public: - explicit ScopeExit(const H& handler) noexcept(std::is_nothrow_copy_constructible::value) - : m_handler(handler) - { - } - explicit ScopeExit(H&& handler) noexcept(std::is_nothrow_move_constructible::value) : m_handler(std::move(handler)) { @@ -61,13 +69,39 @@ class ScopeExit { }; template -ScopeExit::type> make_scope_exit(H&& handler) noexcept( - noexcept(ScopeExit::type>(std::forward(handler)))) +ScopeExit(H&&) -> ScopeExit>; + +// Similar to ScopeExit, but the handler is *only* invoked if the scope is +// exited via throwing an exception. +template +class ScopeExitFail : public ScopeExit { +public: + explicit ScopeExitFail(H&& handler) noexcept(std::is_nothrow_move_constructible::value) + : ScopeExit(std::move(handler)) + { + } + + ~ScopeExitFail() + { + if (std::uncaught_exceptions() == m_exception_count) + this->cancel(); + } + +private: + int m_exception_count = std::uncaught_exceptions(); +}; + +template +ScopeExitFail(H&&) -> ScopeExitFail>; + +// A helper which was required pre-C++17. New code should prefer `ScopeExit cleanup([&]() noexcept { ... })`. +template +ScopeExit> make_scope_exit(H&& handler) noexcept( + noexcept(ScopeExit>(std::forward(handler)))) { - return ScopeExit::type>(std::forward(handler)); + return ScopeExit>(std::forward(handler)); } -} // namespace util -} // namespace realm +} // namespace realm::util #endif // REALM_UTIL_SCOPE_EXIT_HPP diff --git a/test/benchmark-sync/bench_transform.cpp b/test/benchmark-sync/bench_transform.cpp index 84674866e94..d0377a1e918 100644 --- a/test/benchmark-sync/bench_transform.cpp +++ b/test/benchmark-sync/bench_transform.cpp @@ -86,9 +86,7 @@ void transform_transactions(TestContext& test_context) }; Session session_1 = fixture.make_session(0, 0, db_1, "/test", std::move(session_config)); - session_1.bind(); Session session_2 = fixture.make_session(1, 0, db_2, "/test"); - session_2.bind(); // Start server and upload changes of second client. fixture.start_server(0); @@ -167,9 +165,7 @@ void transform_instructions(TestContext& test_context) return SyncClientHookAction::NoAction; }; Session session_1 = fixture.make_session(0, 0, db_1, "/test", std::move(session_config)); - session_1.bind(); Session session_2 = fixture.make_session(1, 0, db_2, "/test"); - session_2.bind(); // Start server and upload changes of second client. fixture.start_server(0); @@ -246,9 +242,7 @@ void connected_objects(TestContext& test_context) return SyncClientHookAction::NoAction; }; Session session_1 = fixture.make_session(0, 0, db_1, "/test", std::move(session_config)); - session_1.bind(); Session session_2 = fixture.make_session(1, 0, db_2, "/test"); - session_2.bind(); // Start server and upload changes of second client. fixture.start_server(0); diff --git a/test/sync_fixtures.hpp b/test/sync_fixtures.hpp index d0d2e7641ca..b73e9ea8bda 100644 --- a/test/sync_fixtures.hpp +++ b/test/sync_fixtures.hpp @@ -699,17 +699,15 @@ class MultiClientServerFixture { Session::Config config = {}) { // *ClientServerFixture uses the service identifier "/realm-sync" to distinguish Sync - // connections, while the MongoDB/Stitch-based Sync server does not. + // connections, while BaaS does not. config.service_identifier = "/realm-sync"; config.realm_identifier = std::move(realm_identifier); config.server_port = m_server_ports[server_index]; config.server_address = "localhost"; - - Session session{*m_clients[client_index], std::move(db), nullptr, nullptr, std::move(config)}; if (m_connection_state_change_listeners[client_index]) { - session.set_connection_state_change_listener(m_connection_state_change_listeners[client_index]); + config.connection_state_change_listener = m_connection_state_change_listeners[client_index]; } - else { + else if (!config.connection_state_change_listener) { auto fallback_listener = [this](ConnectionState state, std::optional error) { if (state != ConnectionState::disconnected) return; @@ -720,9 +718,10 @@ class MultiClientServerFixture { CHECK_NOT(client_error_occurred); stop(); }; - session.set_connection_state_change_listener(fallback_listener); + config.connection_state_change_listener = fallback_listener; } - return session; + + return Session{*m_clients[client_index], std::move(db), nullptr, nullptr, std::move(config)}; } Session make_bound_session(int client_index, DBRef db, int server_index, std::string server_path, @@ -736,10 +735,7 @@ class MultiClientServerFixture { std::string signed_user_token, Session::Config config = {}) { config.signed_user_token = std::move(signed_user_token); - Session session = - make_session(client_index, server_index, std::move(db), std::move(server_path), std::move(config)); - session.bind(); - return session; + return make_session(client_index, server_index, std::move(db), std::move(server_path), std::move(config)); } void cancel_reconnect_delay(int client_index) @@ -945,7 +941,7 @@ class RealmFixture { using ErrorHandler = MultiClientServerFixture::ErrorHandler; struct Config : Session::Config { - std::function error_handler; + util::UniqueFunction error_handler; }; RealmFixture(ClientServerFixture&, const std::string& real_path, const std::string& virt_path, Config = {}); @@ -984,19 +980,16 @@ class RealmFixture { DBRef m_db; sync::Session m_session; - void setup_error_handler(util::UniqueFunction); + Config setup_error_handler(Config&&); }; inline RealmFixture::RealmFixture(ClientServerFixture& client_server_fixture, const std::string& real_path, const std::string& virt_path, Config config) - : m_self_ref{std::make_shared(this)} // Throws - , m_db{DB::create(make_client_replication(), real_path)} // Throws - , m_session{client_server_fixture.make_session(m_db, virt_path, std::move(config))} // Throws + : m_self_ref{std::make_shared(this)} // Throws + , m_db{DB::create(make_client_replication(), real_path)} // Throws + , m_session{client_server_fixture.make_session(m_db, virt_path, setup_error_handler(std::move(config)))} // Throws { - if (config.error_handler) - setup_error_handler(std::move(config.error_handler)); - m_session.bind(); } @@ -1004,12 +997,9 @@ inline RealmFixture::RealmFixture(MultiClientServerFixture& client_server_fixtur const std::string& real_path, const std::string& virt_path, Config config) : m_self_ref{std::make_shared(this)} // Throws , m_db{DB::create(make_client_replication(), real_path)} // Throws - , m_session{client_server_fixture.make_session(client_index, server_index, m_db, virt_path, std::move(config))} -// Throws + , m_session{client_server_fixture.make_session(client_index, server_index, m_db, virt_path, + setup_error_handler(std::move(config)))} // Throws { - if (config.error_handler) - setup_error_handler(std::move(config.error_handler)); - m_session.bind(); } inline RealmFixture::~RealmFixture() noexcept @@ -1073,15 +1063,18 @@ inline void RealmFixture::async_wait_for_download_completion(WaitOperCompletionH m_session.async_wait_for_download_completion(std::move(handler)); } -inline void RealmFixture::setup_error_handler(util::UniqueFunction handler) +inline RealmFixture::Config RealmFixture::setup_error_handler(Config&& config) { - auto listener = [handler = std::move(handler)](ConnectionState state, - const std::optional& error_info) { - if (state != ConnectionState::disconnected) - return; - REALM_ASSERT(error_info); - handler(error_info->status, error_info->is_fatal); - }; - m_session.set_connection_state_change_listener(std::move(listener)); + if (config.error_handler) { + config.connection_state_change_listener = + [handler = std::move(config.error_handler)](ConnectionState state, + const std::optional& error_info) { + if (state != ConnectionState::disconnected) + return; + REALM_ASSERT(error_info); + handler(error_info->status, error_info->is_fatal); + }; + } + return std::move(config); } } // namespace realm::fixtures diff --git a/test/test_client_reset.cpp b/test/test_client_reset.cpp index 077fa5626d6..846ec976115 100644 --- a/test/test_client_reset.cpp +++ b/test/test_client_reset.cpp @@ -124,7 +124,6 @@ TEST(ClientReset_NoLocalChanges) session.wait_for_upload_complete_or_client_stopped(); Session session_2 = fixture.make_session(path_2, server_path); - session_2.bind(); session_2.wait_for_download_complete_or_client_stopped(); } @@ -150,7 +149,9 @@ TEST(ClientReset_NoLocalChanges) // The session that receives an error. { BowlOfStonesSemaphore bowl; - auto listener = [&](ConnectionState state, util::Optional error_info) { + Session::Config config; + config.connection_state_change_listener = [&](ConnectionState state, + util::Optional error_info) { if (state != ConnectionState::disconnected) return; REALM_ASSERT(error_info); @@ -160,9 +161,7 @@ TEST(ClientReset_NoLocalChanges) bowl.add_stone(); }; - Session session = fixture.make_session(path_2, server_path); - session.set_connection_state_change_listener(listener); - session.bind(); + Session session = fixture.make_session(path_2, server_path, std::move(config)); bowl.get_stone(); } @@ -170,7 +169,6 @@ TEST(ClientReset_NoLocalChanges) SHARED_GROUP_TEST_PATH(path_fresh); { Session session_fresh = fixture.make_session(path_fresh, server_path); - session_fresh.bind(); session_fresh.wait_for_download_complete_or_client_stopped(); } DBRef sg_fresh = DB::create(make_client_replication(), path_fresh); @@ -192,7 +190,6 @@ TEST(ClientReset_NoLocalChanges) session_config.client_reset_config = std::move(client_reset_config); } Session session = fixture.make_session(sg, server_path, std::move(session_config)); - session.bind(); session.wait_for_download_complete_or_client_stopped(); } } @@ -225,7 +222,6 @@ TEST(ClientReset_InitialLocalChanges) DBRef db_2 = DB::create(make_client_replication(), path_2); Session session_1 = fixture.make_session(db_1, server_path); - session_1.bind(); // First we make a changeset and upload it { @@ -248,7 +244,6 @@ TEST(ClientReset_InitialLocalChanges) SHARED_GROUP_TEST_PATH(path_fresh); { Session session_fresh = fixture.make_session(path_fresh, server_path); - session_fresh.bind(); session_fresh.wait_for_download_complete_or_client_stopped(); } DBRef sg_fresh = DB::create(make_client_replication(), path_fresh); @@ -262,7 +257,6 @@ TEST(ClientReset_InitialLocalChanges) session_config_2.client_reset_config = std::move(client_reset_config); } Session session_2 = fixture.make_session(db_2, server_path, std::move(session_config_2)); - session_2.bind(); session_2.wait_for_upload_complete_or_client_stopped(); session_2.wait_for_download_complete_or_client_stopped(); @@ -329,7 +323,6 @@ TEST_TYPES(ClientReset_LocalChangesWhenOffline, std::true_type, std::false_type) // Download a new Realm. The state is empty. Session::Config session_config_1; Session session_1 = fixture.make_session(sg, server_path, std::move(session_config_1)); - session_1.bind(); session_1.wait_for_download_complete_or_client_stopped(); WriteTransaction wt{sg}; @@ -342,7 +335,6 @@ TEST_TYPES(ClientReset_LocalChangesWhenOffline, std::true_type, std::false_type) DBRef sg_2 = DB::create(make_client_replication(), path_2); Session session_2 = fixture.make_session(sg_2, server_path); - session_2.bind(); session_2.wait_for_upload_complete_or_client_stopped(); session_2.wait_for_download_complete_or_client_stopped(); @@ -367,7 +359,6 @@ TEST_TYPES(ClientReset_LocalChangesWhenOffline, std::true_type, std::false_type) SHARED_GROUP_TEST_PATH(path_fresh1); { Session session4 = fixture.make_session(path_fresh1, server_path); - session4.bind(); session4.wait_for_download_complete_or_client_stopped(); } DBRef sg_fresh1 = DB::create(make_client_replication(), path_fresh1); @@ -377,7 +368,6 @@ TEST_TYPES(ClientReset_LocalChangesWhenOffline, std::true_type, std::false_type) session_config_3.client_reset_config->mode = recover ? ClientResyncMode::Recover : ClientResyncMode::DiscardLocal; session_config_3.client_reset_config->fresh_copy = std::move(sg_fresh1); Session session_3 = fixture.make_session(sg, server_path, std::move(session_config_3)); - session_3.bind(); session_3.wait_for_upload_complete_or_client_stopped(); session_3.wait_for_download_complete_or_client_stopped(); @@ -468,9 +458,7 @@ TEST(ClientReset_ThreeClients) } Session session_1 = fixture.make_session(path_1, server_path); - session_1.bind(); Session session_2 = fixture.make_session(path_2, server_path); - session_2.bind(); session_1.wait_for_upload_complete_or_client_stopped(); session_2.wait_for_upload_complete_or_client_stopped(); @@ -577,22 +565,23 @@ TEST(ClientReset_ThreeClients) // The clients get session errors. { BowlOfStonesSemaphore bowl; - auto listener = [&](ConnectionState state, util::Optional error_info) { - if (state != ConnectionState::disconnected) - return; - REALM_ASSERT(error_info); - CHECK_EQUAL(error_info->status, ErrorCodes::SyncClientResetRequired); - CHECK_EQUAL(static_cast(error_info->raw_error_code), - ProtocolError::bad_server_version); - bowl.add_stone(); + auto config = [&] { + Session::Config config; + config.connection_state_change_listener = [&](ConnectionState state, + util::Optional error_info) { + if (state != ConnectionState::disconnected) + return; + REALM_ASSERT(error_info); + CHECK_EQUAL(error_info->status, ErrorCodes::SyncClientResetRequired); + CHECK_EQUAL(static_cast(error_info->raw_error_code), + ProtocolError::bad_server_version); + bowl.add_stone(); + }; + return config; }; - Session session_1 = fixture.make_session(path_1, server_path); - session_1.set_connection_state_change_listener(listener); - session_1.bind(); - Session session_2 = fixture.make_session(path_2, server_path); - session_2.set_connection_state_change_listener(listener); - session_2.bind(); + Session session_1 = fixture.make_session(path_1, server_path, config()); + Session session_2 = fixture.make_session(path_2, server_path, config()); bowl.get_stone(); bowl.get_stone(); } @@ -602,14 +591,12 @@ TEST(ClientReset_ThreeClients) SHARED_GROUP_TEST_PATH(path_fresh2); { Session session4 = fixture.make_session(path_fresh1, server_path); - session4.bind(); session4.wait_for_download_complete_or_client_stopped(); } DBRef sg_fresh1 = DB::create(make_client_replication(), path_fresh1); { Session session4 = fixture.make_session(path_fresh2, server_path); - session4.bind(); session4.wait_for_download_complete_or_client_stopped(); } DBRef sg_fresh2 = DB::create(make_client_replication(), path_fresh2); @@ -631,9 +618,7 @@ TEST(ClientReset_ThreeClients) session_config_2.client_reset_config = std::move(client_reset_config); } Session session_1 = fixture.make_session(path_1, server_path, std::move(session_config_1)); - session_1.bind(); Session session_2 = fixture.make_session(path_2, server_path, std::move(session_config_2)); - session_2.bind(); session_1.wait_for_download_complete_or_client_stopped(); session_2.wait_for_download_complete_or_client_stopped(); @@ -659,9 +644,7 @@ TEST(ClientReset_ThreeClients) // Upload and download complete the clients. Session session_1 = fixture.make_session(path_1, server_path); - session_1.bind(); Session session_2 = fixture.make_session(path_2, server_path); - session_2.bind(); session_1.wait_for_upload_complete_or_client_stopped(); session_2.wait_for_upload_complete_or_client_stopped(); @@ -672,9 +655,7 @@ TEST(ClientReset_ThreeClients) // A third client downloads the state { - Session::Config session_config; - Session session = fixture.make_session(path_3, server_path, std::move(session_config)); - session.bind(); + Session session = fixture.make_session(path_3, server_path); session.wait_for_download_complete_or_client_stopped(); } } @@ -733,7 +714,6 @@ TEST(ClientReset_DoNotRecoverSchema) SHARED_GROUP_TEST_PATH(path_fresh1); { Session session_fresh = fixture.make_session(path_fresh1, server_path_2); - session_fresh.bind(); session_fresh.wait_for_download_complete_or_client_stopped(); } DBRef sg_fresh1 = DB::create(make_client_replication(), path_fresh1); @@ -749,17 +729,17 @@ TEST(ClientReset_DoNotRecoverSchema) client_reset_config.fresh_copy = std::move(sg_fresh1); session_config.client_reset_config = std::move(client_reset_config); } - Session session = fixture.make_session(path_1, server_path_2, std::move(session_config)); + BowlOfStonesSemaphore bowl; - session.set_connection_state_change_listener( - [&](ConnectionState state, util::Optional error_info) { - if (state != ConnectionState::disconnected) - return; - REALM_ASSERT(error_info); - CHECK_EQUAL(error_info->status, ErrorCodes::AutoClientResetFailed); - bowl.add_stone(); - }); - session.bind(); + session_config.connection_state_change_listener = [&](ConnectionState state, + util::Optional error_info) { + if (state != ConnectionState::disconnected) + return; + REALM_ASSERT(error_info); + CHECK_EQUAL(error_info->status, ErrorCodes::AutoClientResetFailed); + bowl.add_stone(); + }; + Session session = fixture.make_session(path_1, server_path_2, std::move(session_config)); bowl.get_stone(); } @@ -824,7 +804,6 @@ TEST(ClientReset_PinnedVersion) SHARED_GROUP_TEST_PATH(path_fresh); { Session session_fresh = fixture.make_session(path_fresh, server_path_1); - session_fresh.bind(); session_fresh.wait_for_download_complete_or_client_stopped(); } DBRef sg_fresh = DB::create(make_client_replication(), path_fresh); diff --git a/test/test_sync.cpp b/test/test_sync.cpp index 5aec76830e0..cdcb7177055 100644 --- a/test/test_sync.cpp +++ b/test/test_sync.cpp @@ -1,17 +1,17 @@ +#include +#include +#include #include #include -#include #include #include #include -#include +#include #include -#include #include -#include -#include +#include #include -#include +#include #include #include @@ -142,28 +142,24 @@ TEST(Sync_BadVirtualPath) int nerrors = 0; - auto listener = [&](ConnectionState state, util::Optional error_info) { - if (state != ConnectionState::disconnected) - return; - REALM_ASSERT(error_info); - CHECK_EQUAL(error_info->status, ErrorCodes::BadSyncPartitionValue); - CHECK(error_info->is_fatal); - ++nerrors; - if (nerrors == 3) - fixture.stop(); + auto config = [&] { + Session::Config config; + config.connection_state_change_listener = [&](ConnectionState state, util::Optional error_info) { + if (state != ConnectionState::disconnected) + return; + REALM_ASSERT(error_info); + CHECK_EQUAL(error_info->status, ErrorCodes::BadSyncPartitionValue); + CHECK(error_info->is_fatal); + ++nerrors; + if (nerrors == 3) + fixture.stop(); + }; + return config; }; - Session session_1 = fixture.make_session(db_1, "/test.realm"); - session_1.set_connection_state_change_listener(listener); - session_1.bind(); - - Session session_2 = fixture.make_session(db_2, "/../test"); - session_2.set_connection_state_change_listener(listener); - session_2.bind(); - - Session session_3 = fixture.make_session(db_3, "test%abc "); - session_3.set_connection_state_change_listener(listener); - session_3.bind(); + Session session_1 = fixture.make_session(db_1, "/test.realm", config()); + Session session_2 = fixture.make_session(db_2, "/../test", config()); + Session session_3 = fixture.make_session(db_3, "test%abc ", config()); session_1.wait_for_download_complete_or_client_stopped(); session_2.wait_for_download_complete_or_client_stopped(); @@ -577,9 +573,8 @@ TEST(Sync_TokenWithoutExpirationAllowed) Session::Config sess_config; sess_config.signed_user_token = g_signed_test_user_token_expiration_unspecified; + sess_config.connection_state_change_listener = listener; Session session = fixture.make_session(db, "/test", std::move(sess_config)); - session.set_connection_state_change_listener(listener); - session.bind(); write_transaction(db, [](WriteTransaction& wt) { wt.get_group().add_table_with_primary_key("class_foo", type_Int, "id"); }); @@ -607,7 +602,6 @@ TEST(Sync_TokenWithNullExpirationAllowed) Session::Config config; config.signed_user_token = g_signed_test_user_token_expiration_null; Session session = fixture.make_session(db, "/test", std::move(config)); - session.bind(); { write_transaction(db, [](WriteTransaction& wt) { wt.get_group().add_table_with_primary_key("class_foo", type_Int, "id"); @@ -659,9 +653,7 @@ TEST(Sync_Replication) fixture.start(); Session session_1 = fixture.make_bound_session(db_1); - Session session_2 = fixture.make_session(db_2, "/test"); - session_2.bind(); // Create schema write_transaction(db_1, [](WriteTransaction& wt) { @@ -706,10 +698,7 @@ TEST(Sync_Merge) fixture.start(); Session session_1 = fixture.make_session(0, 0, db_1, "/test"); - session_1.bind(); - Session session_2 = fixture.make_session(1, 0, db_2, "/test"); - session_2.bind(); // Create schema on both clients. auto create_schema = [](DBRef db) { @@ -793,17 +782,16 @@ void test_schema_mismatch(unit_test::TestContext& test_context, util::FunctionRe fixture.allow_server_errors(0, 1); fixture.start(); - Session session_1 = fixture.make_session(0, 0, db_1, "/test"); - Session session_2 = fixture.make_session(1, 0, db_2, "/test"); - if (!expected_error_2) expected_error_2 = expected_error_1; - session_1.set_connection_state_change_listener(ExpectChangesetError{test_context, fixture, expected_error_1}); - session_2.set_connection_state_change_listener(ExpectChangesetError{test_context, fixture, expected_error_2}); + Session::Config config_1; + config_1.connection_state_change_listener = ExpectChangesetError{test_context, fixture, expected_error_1}; + Session::Config config_2; + config_2.connection_state_change_listener = ExpectChangesetError{test_context, fixture, expected_error_2}; - session_1.bind(); - session_2.bind(); + Session session_1 = fixture.make_session(0, 0, db_1, "/test", std::move(config_1)); + Session session_2 = fixture.make_session(1, 0, db_2, "/test", std::move(config_2)); session_1.wait_for_upload_complete_or_client_stopped(); session_2.wait_for_upload_complete_or_client_stopped(); @@ -1045,47 +1033,6 @@ TEST(Sync_UnbindBeforeActivation) } -TEST(Sync_AbandonUnboundSessions) -{ - TEST_DIR(dir); - TEST_CLIENT_DB(db_1); - TEST_CLIENT_DB(db_2); - TEST_CLIENT_DB(db_3); - ClientServerFixture fixture(dir, test_context); - fixture.start(); - - int n = 32; - for (int i = 0; i < n; ++i) { - fixture.make_session(db_1, "/test"); - fixture.make_session(db_2, "/test"); - fixture.make_session(db_3, "/test"); - } - - for (int i = 0; i < n; ++i) { - fixture.make_session(db_1, "/test"); - Session session = fixture.make_session(db_2, "/test"); - fixture.make_session(db_3, "/test"); - session.bind(); - } - - for (int i = 0; i < n; ++i) { - fixture.make_session(db_1, "/test"); - Session session = fixture.make_session(db_2, "/test"); - fixture.make_session(db_3, "/test"); - session.bind(); - session.wait_for_upload_complete_or_client_stopped(); - } - - for (int i = 0; i < n; ++i) { - fixture.make_session(db_1, "/test"); - Session session = fixture.make_session(db_2, "/test"); - fixture.make_session(db_3, "/test"); - session.bind(); - session.wait_for_download_complete_or_client_stopped(); - } -} - - #if 0 // FIXME: Disabled because substring operations are not yet supported in Core 6. // This test illustrates that our instruction set and merge rules @@ -1391,11 +1338,10 @@ TEST(Sync_Randomized) client_shared_groups[i] = DB::create(make_client_replication(), test_path); } - std::vector> sessions(num_clients); + std::vector sessions(num_clients); for (size_t i = 0; i < num_clients; ++i) { auto db = client_shared_groups[i]; - sessions[i] = std::make_unique(fixture.make_session(int(i), 0, db, "/test")); - sessions[i]->bind(); + sessions[i] = fixture.make_session(int(i), 0, db, "/test"); } auto run_client_test_program = [&](size_t i) { @@ -1422,14 +1368,14 @@ TEST(Sync_Randomized) // Wait until all local changes are uploaded, and acknowledged by the // server. for (size_t i = 0; i < num_clients; ++i) - sessions[i]->wait_for_upload_complete_or_client_stopped(); + sessions[i].wait_for_upload_complete_or_client_stopped(); log("Everything uploaded"); // Now wait for all previously uploaded changes to be downloaded by all // others. for (size_t i = 0; i < num_clients; ++i) - sessions[i]->wait_for_download_complete_or_client_stopped(); + sessions[i].wait_for_download_complete_or_client_stopped(); log("Everything downloaded"); @@ -2114,7 +2060,6 @@ TEST(Sync_MultipleServers) for (int i = 0; i < num_sessions_per_file; ++i) { int client_index = 0; Session session = fixture.make_session(client_index, server_index, db, server_path); - session.bind(); for (int j = 0; j < num_transacts_per_session; ++j) { WriteTransaction wt(db); TableRef table = wt.get_table("class_table"); @@ -2142,7 +2087,6 @@ TEST(Sync_MultipleServers) DBRef db = DB::create(make_client_replication(), path); std::string server_path = "/" + std::to_string(realm_index); Session session = fixture.make_session(client_index, server_index, db, server_path); - session.bind(); session.wait_for_download_complete_or_client_stopped(); } catch (...) { @@ -2673,7 +2617,6 @@ TEST(Sync_SSL_Certificate_1) session_config.signed_user_token = g_signed_test_user_token; Session session = fixture.make_session(db, "/test", std::move(session_config)); - session.bind(); fixture.start(); session.wait_for_download_complete_or_client_stopped(); @@ -2769,7 +2712,6 @@ TEST(Sync_SSL_Certificate_DER) session_config.signed_user_token = g_signed_test_user_token; Session session = fixture.make_session(db, "/test", std::move(session_config)); - session.bind(); fixture.start(); session.wait_for_download_complete_or_client_stopped(); @@ -2963,7 +2905,6 @@ TEST_IF(Sync_SSL_Certificate_Verify_Callback_External, false) session_config.ssl_verify_callback = ssl_verify_callback; Session session(client, db, nullptr, nullptr, std::move(session_config)); - session.bind(); session.wait_for_download_complete_or_client_stopped(); client.shutdown_and_wait(); @@ -2994,17 +2935,12 @@ TEST(Sync_UploadDownloadProgress_1) { int handler_entry = 0; - bool cond_var_signaled = false; - std::mutex mutex; - std::condition_variable cond_var; - ClientServerFixture fixture(server_dir, test_context); fixture.start(); - Session session = fixture.make_session(db, "/test"); - - auto progress_handler = [&](uint_fast64_t downloaded, uint_fast64_t downloadable, uint_fast64_t uploaded, - uint_fast64_t uploadable, uint_fast64_t snapshot, double, double, int64_t) { + Session::Config config; + config.progress_handler = [&](uint64_t downloaded, uint64_t downloadable, uint64_t uploaded, + uint64_t uploadable, uint64_t snapshot, double, double, int64_t) { downloaded_bytes = downloaded; downloadable_bytes = downloadable; uploaded_bytes = uploaded; @@ -3013,20 +2949,15 @@ TEST(Sync_UploadDownloadProgress_1) ++handler_entry; }; - std::unique_lock lock(mutex); - session.set_progress_handler(progress_handler); - session.set_connection_state_change_listener([&](ConnectionState state, util::Optional) { + auto pf = util::make_promise_future(); + config.connection_state_change_listener = [&](ConnectionState state, util::Optional) { if (state == ConnectionState::connected) { - std::unique_lock lock(mutex); - cond_var_signaled = true; - lock.unlock(); - cond_var.notify_one(); + pf.promise.emplace_value(); } - }); - session.bind(); - cond_var.wait(lock, [&] { - return cond_var_signaled; - }); + }; + + Session session = fixture.make_session(db, "/test", std::move(config)); + pf.future.get(); CHECK_EQUAL(handler_entry, 0); auto commit_version = write_transaction(db, [](WriteTransaction& wt) { @@ -3063,42 +2994,29 @@ TEST(Sync_UploadDownloadProgress_1) // are the ones stored in the Realm in the previous // session. - bool cond_var_signaled = false; - std::mutex mutex; - std::condition_variable cond_var; - ClientServerFixture fixture(server_dir, test_context); fixture.start(); - Session session = fixture.make_session(db, "/test"); int number_of_handler_calls = 0; - auto progress_handler = [&](uint_fast64_t downloaded, uint_fast64_t downloadable, uint_fast64_t uploaded, - uint_fast64_t uploadable, uint_fast64_t snapshot, double, double, int64_t) { + auto pf = util::make_promise_future(); + Session::Config config; + config.progress_handler = [&](uint64_t downloaded, uint64_t downloadable, uint64_t uploaded, + uint64_t uploadable, uint64_t snapshot, double, double, int64_t) { CHECK_EQUAL(downloaded, downloaded_bytes); CHECK_EQUAL(downloadable, downloaded_bytes); CHECK_EQUAL(uploaded, uploaded_bytes); CHECK_GREATER(uploadable, uploaded_bytes); CHECK_GREATER(snapshot, snapshot_version); number_of_handler_calls++; - - std::unique_lock lock(mutex); - cond_var_signaled = true; - lock.unlock(); - cond_var.notify_one(); + pf.promise.emplace_value(number_of_handler_calls); }; - std::unique_lock lock(mutex); - session.set_progress_handler(progress_handler); - session.bind(); + Session session = fixture.make_session(db, "/test", std::move(config)); write_transaction(db, [](WriteTransaction& wt) { wt.get_table("class_table")->create_object_with_primary_key(2); }); - cond_var.wait(lock, [&] { - return cond_var_signaled; - }); - - CHECK_EQUAL(number_of_handler_calls, 1); + CHECK_EQUAL(pf.future.get(), 1); } } @@ -3118,18 +3036,15 @@ TEST(Sync_UploadDownloadProgress_2) ClientServerFixture fixture(server_dir, test_context); fixture.start(); - Session session_1 = fixture.make_session(db_1, "/test"); - Session session_2 = fixture.make_session(db_2, "/test"); - uint_fast64_t downloaded_bytes_1 = 123; // Not zero uint_fast64_t downloadable_bytes_1 = 123; uint_fast64_t uploaded_bytes_1 = 123; uint_fast64_t uploadable_bytes_1 = 123; uint_fast64_t snapshot_version_1 = 0; - auto progress_handler_1 = [&](uint_fast64_t downloaded_bytes, uint_fast64_t downloadable_bytes, - uint_fast64_t uploaded_bytes, uint_fast64_t uploadable_bytes, - uint_fast64_t snapshot_version, double, double, int64_t) { + Session::Config config_1; + config_1.progress_handler = [&](uint64_t downloaded_bytes, uint64_t downloadable_bytes, uint64_t uploaded_bytes, + uint64_t uploadable_bytes, uint64_t snapshot_version, double, double, int64_t) { downloaded_bytes_1 = downloaded_bytes; downloadable_bytes_1 = downloadable_bytes; uploaded_bytes_1 = uploaded_bytes; @@ -3137,17 +3052,15 @@ TEST(Sync_UploadDownloadProgress_2) snapshot_version_1 = snapshot_version; }; - session_1.set_progress_handler(progress_handler_1); - uint_fast64_t downloaded_bytes_2 = 123; uint_fast64_t downloadable_bytes_2 = 123; uint_fast64_t uploaded_bytes_2 = 123; uint_fast64_t uploadable_bytes_2 = 123; uint_fast64_t snapshot_version_2 = 0; - auto progress_handler_2 = [&](uint_fast64_t downloaded_bytes, uint_fast64_t downloadable_bytes, - uint_fast64_t uploaded_bytes, uint_fast64_t uploadable_bytes, - uint_fast64_t snapshot_version, double, double, int64_t) { + Session::Config config_2; + config_2.progress_handler = [&](uint64_t downloaded_bytes, uint64_t downloadable_bytes, uint64_t uploaded_bytes, + uint64_t uploadable_bytes, uint64_t snapshot_version, double, double, int64_t) { downloaded_bytes_2 = downloaded_bytes; downloadable_bytes_2 = downloadable_bytes; uploaded_bytes_2 = uploaded_bytes; @@ -3155,10 +3068,8 @@ TEST(Sync_UploadDownloadProgress_2) snapshot_version_2 = snapshot_version; }; - session_2.set_progress_handler(progress_handler_2); - - session_1.bind(); - session_2.bind(); + Session session_1 = fixture.make_session(db_1, "/test", std::move(config_1)); + Session session_2 = fixture.make_session(db_2, "/test", std::move(config_2)); session_1.wait_for_upload_complete_or_client_stopped(); session_2.wait_for_upload_complete_or_client_stopped(); @@ -3327,16 +3238,6 @@ TEST(Sync_UploadDownloadProgress_3) client_config.reconnect_mode = ReconnectMode::testing; Client client(client_config); - // when connecting to the C++ server, use URL prefix: - Session::Config config; - config.service_identifier = "/realm-sync"; - config.server_address = server_address; - config.signed_user_token = g_signed_test_user_token; - config.server_port = server_port; - config.realm_identifier = "/test"; - - Session session(client, db, nullptr, nullptr, std::move(config)); - // entry is used to count the number of calls to // progress_handler. At the first call, the server is // not running, and it is started by progress_handler(). @@ -3350,10 +3251,15 @@ TEST(Sync_UploadDownloadProgress_3) uint_fast64_t uploadable_bytes_1 = 123; uint_fast64_t snapshot_version_1 = 0; - auto progress_handler = [&, entry = int(0), promise = util::CopyablePromiseHolder(std::move(signal_pf.promise))]( - uint_fast64_t downloaded_bytes, uint_fast64_t downloadable_bytes, - uint_fast64_t uploaded_bytes, uint_fast64_t uploadable_bytes, - uint_fast64_t snapshot_version, double, double, int64_t) mutable { + Session::Config config; + config.service_identifier = "/realm-sync"; + config.server_address = server_address; + config.signed_user_token = g_signed_test_user_token; + config.server_port = server_port; + config.realm_identifier = "/test"; + config.progress_handler = [&, entry = 0](uint_fast64_t downloaded_bytes, uint_fast64_t downloadable_bytes, + uint_fast64_t uploaded_bytes, uint_fast64_t uploadable_bytes, + uint_fast64_t snapshot_version, double, double, int64_t) mutable { downloaded_bytes_1 = downloaded_bytes; downloadable_bytes_1 = downloadable_bytes; uploaded_bytes_1 = uploaded_bytes; @@ -3369,19 +3275,17 @@ TEST(Sync_UploadDownloadProgress_3) } if (should_signal_cond_var) { - promise.get_promise().emplace_value(); + signal_pf.promise.emplace_value(); } entry++; }; - session.set_progress_handler(progress_handler); - server_thread.start([&] { server.run(); }); - session.bind(); + Session session(client, db, nullptr, nullptr, std::move(config)); session.wait_for_upload_complete_or_client_stopped(); session.wait_for_download_complete_or_client_stopped(); @@ -3459,13 +3363,11 @@ TEST(Sync_UploadDownloadProgress_4) ClientServerFixture fixture(server_dir, test_context, std::move(config)); fixture.start(); - Session session_1 = fixture.make_session(db_1, "/test"); - int entry_1 = 0; - - auto progress_handler_1 = [&](uint_fast64_t downloaded_bytes, uint_fast64_t downloadable_bytes, - uint_fast64_t uploaded_bytes, uint_fast64_t uploadable_bytes, - uint_fast64_t snapshot_version, double, double, int64_t) { + Session::Config config_1; + config_1.progress_handler = [&](uint_fast64_t downloaded_bytes, uint_fast64_t downloadable_bytes, + uint_fast64_t uploaded_bytes, uint_fast64_t uploadable_bytes, + uint_fast64_t snapshot_version, double, double, int64_t) { CHECK_EQUAL(downloaded_bytes, 0); CHECK_EQUAL(downloadable_bytes, 0); CHECK_NOT_EQUAL(uploadable_bytes, 0); @@ -3496,22 +3398,18 @@ TEST(Sync_UploadDownloadProgress_4) ++entry_1; }; - session_1.set_progress_handler(progress_handler_1); - - session_1.bind(); - + Session session_1 = fixture.make_session(db_1, "/test", std::move(config_1)); session_1.wait_for_upload_complete_or_client_stopped(); session_1.wait_for_download_complete_or_client_stopped(); CHECK_EQUAL(entry_1, 3); - Session session_2 = fixture.make_session(db_2, "/test"); - int entry_2 = 0; - auto progress_handler_2 = [&](uint_fast64_t downloaded_bytes, uint_fast64_t downloadable_bytes, - uint_fast64_t uploaded_bytes, uint_fast64_t uploadable_bytes, - uint_fast64_t snapshot_version, double, double, int64_t) { + Session::Config config_2; + config_2.progress_handler = [&](uint_fast64_t downloaded_bytes, uint_fast64_t downloadable_bytes, + uint_fast64_t uploaded_bytes, uint_fast64_t uploadable_bytes, + uint_fast64_t snapshot_version, double, double, int64_t) { CHECK_EQUAL(uploaded_bytes, 0); CHECK_EQUAL(uploadable_bytes, 0); @@ -3536,9 +3434,7 @@ TEST(Sync_UploadDownloadProgress_4) ++entry_2; }; - session_2.set_progress_handler(progress_handler_2); - - session_2.bind(); + Session session_2 = fixture.make_session(db_2, "/test", std::move(config_2)); session_2.wait_for_upload_complete_or_client_stopped(); session_2.wait_for_download_complete_or_client_stopped(); @@ -3554,39 +3450,24 @@ TEST(Sync_UploadDownloadProgress_5) TEST_DIR(server_dir); TEST_CLIENT_DB(db); - std::mutex mutex; - std::condition_variable session_cv; - bool signaled = false; - ClientServerFixture fixture(server_dir, test_context); fixture.start(); - Session session = fixture.make_session(db, "/test"); - - auto progress_handler = [&](uint_fast64_t downloaded_bytes, uint_fast64_t downloadable_bytes, - uint_fast64_t uploaded_bytes, uint_fast64_t uploadable_bytes, - uint_fast64_t snapshot_version, double, double, int64_t) mutable { + auto pf = util::make_promise_future(); + Session::Config config; + config.progress_handler = [&](uint_fast64_t downloaded_bytes, uint_fast64_t downloadable_bytes, + uint_fast64_t uploaded_bytes, uint_fast64_t uploadable_bytes, + uint_fast64_t snapshot_version, double, double, int64_t) { CHECK_EQUAL(downloaded_bytes, 0); CHECK_EQUAL(downloadable_bytes, 0); CHECK_EQUAL(uploaded_bytes, 0); CHECK_EQUAL(uploadable_bytes, 0); CHECK_EQUAL(snapshot_version, 3); - std::lock_guard lock{mutex}; - signaled = true; - session_cv.notify_one(); + pf.promise.emplace_value(); }; - session.set_progress_handler(progress_handler); - - { - std::unique_lock lock{mutex}; - session.bind(); - // Wait until the progress handler is called on the session before tearing down the client - session_cv.wait_for(lock, std::chrono::seconds(5), [&]() { - return signaled; - }); - } - CHECK(signaled); + Session session = fixture.make_session(db, "/test", std::move(config)); + pf.future.get(); // The check is that we reach this point. } @@ -3630,44 +3511,29 @@ TEST(Sync_UploadDownloadProgress_6) server_thread.join(); }); + auto session_pf = util::make_promise_future*>(); + auto complete_pf = util::make_promise_future(); Session::Config session_config; session_config.server_address = "localhost"; session_config.server_port = server_port; session_config.realm_identifier = "/test"; session_config.service_identifier = "/realm-sync"; session_config.signed_user_token = g_signed_test_user_token; - - std::mutex mutex; - std::condition_variable session_cv; - bool signaled = false; - auto session = std::make_unique(client, db, nullptr, nullptr, std::move(session_config)); - - auto progress_handler = [&](uint_fast64_t downloaded_bytes, uint_fast64_t downloadable_bytes, - uint_fast64_t uploaded_bytes, uint_fast64_t uploadable_bytes, - uint_fast64_t snapshot_version, double, double, int64_t) { + session_config.progress_handler = [&](uint_fast64_t downloaded_bytes, uint_fast64_t downloadable_bytes, + uint_fast64_t uploaded_bytes, uint_fast64_t uploadable_bytes, + uint_fast64_t snapshot_version, double, double, int64_t) { CHECK_EQUAL(downloaded_bytes, 0); CHECK_EQUAL(downloadable_bytes, 0); CHECK_EQUAL(uploaded_bytes, 0); CHECK_EQUAL(uploadable_bytes, 0); CHECK_EQUAL(snapshot_version, 3); - std::lock_guard lock{mutex}; - session.reset(); - signaled = true; - session_cv.notify_one(); + session_pf.future.get()->reset(); + complete_pf.promise.emplace_value(); }; - - session->set_progress_handler(progress_handler); - - { - std::unique_lock lock{mutex}; - session->bind(); - // Wait until the progress handler is called on the session before tearing down the client - session_cv.wait_for(lock, std::chrono::seconds(5), [&]() { - return signaled; - }); - } - CHECK(signaled); - CHECK(!(session)); + auto session = std::make_unique(client, db, nullptr, nullptr, std::move(session_config)); + session_pf.promise.emplace_value(&session); + complete_pf.future.get(); + CHECK(!session); // The check is that we reach this point without deadlocking or throwing an assert while tearing // down the active session @@ -3714,8 +3580,7 @@ TEST(Sync_UploadDownloadProgress_7) session_config.realm_identifier = "/test"; session_config.signed_user_token = g_signed_test_user_token; - auto session = std::make_unique(client, db, nullptr, nullptr, std::move(session_config)); - session->bind(); + Session session(client, db, nullptr, nullptr, std::move(session_config)); client.shutdown_and_wait(); server.stop(); @@ -3732,7 +3597,6 @@ TEST(Sync_UploadProgress_EmptyCommits) ClientServerFixture fixture(server_dir, test_context); fixture.start(); - Session session = fixture.make_session(db, "/test"); { WriteTransaction wt{db}; @@ -3741,11 +3605,13 @@ TEST(Sync_UploadProgress_EmptyCommits) } std::atomic entry = 0; - session.set_progress_handler( - [&](uint_fast64_t, uint_fast64_t, uint_fast64_t, uint_fast64_t, uint_fast64_t, double, double, int64_t) { - ++entry; - }); - session.bind(); + Session::Config config; + config.progress_handler = [&](uint_fast64_t, uint_fast64_t, uint_fast64_t, uint_fast64_t, uint_fast64_t, double, + double, int64_t) { + ++entry; + }; + + Session session = fixture.make_session(db, "/test", std::move(config)); // Each step calls wait_for_upload_complete twice because upload completion // is fired before progress handlers, so we need another hop through the @@ -3807,7 +3673,7 @@ TEST(Sync_MultipleSyncAgentsNotAllowed) TEST_DIR(server_dir); TEST_CLIENT_DB(db); - auto pf = util::make_promise_future(); + auto pf = util::make_promise_future(); struct Observer : BindingCallbackThreadObserver { unit_test::TestContext& test_context; util::Promise& got_error; @@ -3837,9 +3703,7 @@ TEST(Sync_MultipleSyncAgentsNotAllowed) { Session session = fixture.make_session(db, "/test"); - session.bind(); Session session2 = fixture.make_session(db, "/test"); - session2.bind(); pf.future.get(); // The exception caused the event loop to stop so we need to restart it @@ -3849,7 +3713,6 @@ TEST(Sync_MultipleSyncAgentsNotAllowed) // Verify that after the error occurs (and is ignored) things are still // in a functional state Session session = fixture.make_session(db, "/test"); - session.bind(); session.wait_for_upload_complete_or_client_stopped(); } @@ -3862,19 +3725,26 @@ TEST(Sync_CancelReconnectDelay) ClientServerFixture::Config fixture_config; fixture_config.one_connection_per_session = false; + auto expect_status = [&](BowlOfStonesSemaphore& bowl, ErrorCodes::Error code) { + Session::Config config; + config.connection_state_change_listener = [&, code](ConnectionState state, + std::optional error) { + if (state != ConnectionState::disconnected) + return; + CHECK(error); + if (CHECK_EQUAL(error->status, code)) + bowl.add_stone(); + }; + return config; + }; + // After connection-level error, and at session-level. { ClientServerFixture fixture{server_dir, test_context, std::move(fixture_config)}; fixture.start(); BowlOfStonesSemaphore bowl; - auto handler = [&](const SessionErrorInfo& info) { - if (CHECK_EQUAL(info.status, ErrorCodes::ConnectionClosed)) - bowl.add_stone(); - }; - Session session = fixture.make_session(db, "/test"); - session.set_error_handler(std::move(handler)); - session.bind(); + Session session = fixture.make_session(db, "/test", expect_status(bowl, ErrorCodes::ConnectionClosed)); session.wait_for_download_complete_or_client_stopped(); fixture.close_server_side_connections(); bowl.get_stone(); @@ -3890,13 +3760,7 @@ TEST(Sync_CancelReconnectDelay) fixture.start(); BowlOfStonesSemaphore bowl; - auto handler = [&](const SessionErrorInfo& info) { - if (CHECK_EQUAL(info.status, ErrorCodes::ConnectionClosed)) - bowl.add_stone(); - }; - Session session = fixture.make_session(db, "/test"); - session.set_error_handler(std::move(handler)); - session.bind(); + Session session = fixture.make_session(db, "/test", expect_status(bowl, ErrorCodes::ConnectionClosed)); session.wait_for_download_complete_or_client_stopped(); fixture.close_server_side_connections(); bowl.get_stone(); @@ -3913,13 +3777,7 @@ TEST(Sync_CancelReconnectDelay) { BowlOfStonesSemaphore bowl; - auto handler = [&](const SessionErrorInfo& info) { - if (CHECK_EQUAL(info.status, ErrorCodes::ConnectionClosed)) - bowl.add_stone(); - }; - Session session = fixture.make_session(db, "/test"); - session.set_error_handler(std::move(handler)); - session.bind(); + Session session = fixture.make_session(db, "/test", expect_status(bowl, ErrorCodes::ConnectionClosed)); session.wait_for_download_complete_or_client_stopped(); fixture.close_server_side_connections(); bowl.get_stone(); @@ -3949,13 +3807,7 @@ TEST(Sync_CancelReconnectDelay) session_x.wait_for_download_complete_or_client_stopped(); BowlOfStonesSemaphore bowl; - auto handler = [&](const SessionErrorInfo& info) { - if (CHECK_EQUAL(info.status, ErrorCodes::BadSyncPartitionValue)) - bowl.add_stone(); - }; - Session session = fixture.make_session(db, "/.."); - session.set_error_handler(std::move(handler)); - session.bind(); + Session session = fixture.make_session(db, "/..", expect_status(bowl, ErrorCodes::BadSyncPartitionValue)); bowl.get_stone(); session.cancel_reconnect_delay(); @@ -3972,13 +3824,7 @@ TEST(Sync_CancelReconnectDelay) session_x.wait_for_download_complete_or_client_stopped(); BowlOfStonesSemaphore bowl; - auto handler = [&](const SessionErrorInfo& info) { - if (CHECK_EQUAL(info.status, ErrorCodes::BadSyncPartitionValue)) - bowl.add_stone(); - }; - Session session = fixture.make_session(db, "/.."); - session.set_error_handler(std::move(handler)); - session.bind(); + Session session = fixture.make_session(db, "/..", expect_status(bowl, ErrorCodes::BadSyncPartitionValue)); bowl.get_stone(); fixture.cancel_reconnect_delay(); @@ -4090,24 +3936,24 @@ TEST_IF(Sync_MergeLargeBinary, !(REALM_ARCHITECTURE_X86_32)) fixture.start(); { - Session session_1 = fixture.make_session(0, 0, db_1, "/test"); - session_1.set_progress_handler(progress_handler_1); - session_1.bind(); + Session::Config config; + config.progress_handler = progress_handler_1; + Session session_1 = fixture.make_session(0, 0, db_1, "/test", std::move(config)); session_1.wait_for_upload_complete_or_client_stopped(); } { - Session session_2 = fixture.make_session(1, 0, db_2, "/test"); - session_2.set_progress_handler(progress_handler_2); - session_2.bind(); + Session::Config config; + config.progress_handler = progress_handler_2; + Session session_2 = fixture.make_session(1, 0, db_2, "/test", std::move(config)); session_2.wait_for_download_complete_or_client_stopped(); session_2.wait_for_upload_complete_or_client_stopped(); } { - Session session_1 = fixture.make_session(0, 0, db_1, "/test"); - session_1.set_progress_handler(progress_handler_1); - session_1.bind(); + Session::Config config; + config.progress_handler = progress_handler_1; + Session session_1 = fixture.make_session(0, 0, db_1, "/test", std::move(config)); session_1.wait_for_download_complete_or_client_stopped(); } } @@ -4244,24 +4090,24 @@ TEST(Sync_MergeLargeBinaryReducedMemory) fixture.start(); { - Session session_1 = fixture.make_session(0, 0, db_1, "/test"); - session_1.set_progress_handler(progress_handler_1); - session_1.bind(); + Session::Config config; + config.progress_handler = progress_handler_1; + Session session_1 = fixture.make_session(0, 0, db_1, "/test", std::move(config)); session_1.wait_for_upload_complete_or_client_stopped(); } { - Session session_2 = fixture.make_session(1, 0, db_2, "/test"); - session_2.set_progress_handler(progress_handler_2); - session_2.bind(); + Session::Config config; + config.progress_handler = progress_handler_2; + Session session_2 = fixture.make_session(1, 0, db_2, "/test", std::move(config)); session_2.wait_for_download_complete_or_client_stopped(); session_2.wait_for_upload_complete_or_client_stopped(); } { - Session session_1 = fixture.make_session(0, 0, db_1, "/test"); - session_1.set_progress_handler(progress_handler_1); - session_1.bind(); + Session::Config config; + config.progress_handler = progress_handler_1; + Session session_1 = fixture.make_session(0, 0, db_1, "/test", std::move(config)); session_1.wait_for_download_complete_or_client_stopped(); } } @@ -4357,9 +4203,7 @@ TEST(Sync_MergeLargeChangesets) MultiClientServerFixture fixture(2, 1, dir, test_context); Session session_1 = fixture.make_session(0, 0, db_1, "/test"); - session_1.bind(); Session session_2 = fixture.make_session(1, 0, db_2, "/test"); - session_2.bind(); fixture.start(); @@ -4433,9 +4277,7 @@ TEST(Sync_MergeMultipleChangesets) // Start server and upload changes of first client. Session session_1 = fixture.make_session(0, 0, db_1, "/test"); - session_1.bind(); Session session_2 = fixture.make_session(1, 0, db_2, "/test"); - session_2.bind(); fixture.start_server(0); fixture.start_client(0); @@ -4612,11 +4454,9 @@ TEST(Sync_Quadratic_Merge) fixture.start(); Session session_1 = fixture.make_session(0, 0, db_1, "/test"); - session_1.bind(); session_1.wait_for_upload_complete_or_client_stopped(); Session session_2 = fixture.make_session(1, 0, db_2, "/test"); - session_2.bind(); session_2.wait_for_upload_complete_or_client_stopped(); session_1.wait_for_download_complete_or_client_stopped(); @@ -4632,8 +4472,6 @@ TEST(Sync_BatchedUploadMessages) ClientServerFixture fixture(server_dir, test_context); fixture.start(); - Session session = fixture.make_session(db, "/test"); - { WriteTransaction wt{db}; TableRef tr = wt.get_group().add_table_with_primary_key("class_foo", type_Int, "id"); @@ -4650,9 +4488,11 @@ TEST(Sync_BatchedUploadMessages) wt.commit(); } - auto progress_handler = [&](uint_fast64_t downloaded_bytes, uint_fast64_t downloadable_bytes, - uint_fast64_t uploaded_bytes, uint_fast64_t uploadable_bytes, - uint_fast64_t snapshot_version, double, double, int64_t) { + Session::Config config; + auto pf = util::make_promise_future(); + config.progress_handler = [&](uint_fast64_t downloaded_bytes, uint_fast64_t downloadable_bytes, + uint_fast64_t uploaded_bytes, uint_fast64_t uploadable_bytes, uint_fast64_t, double, + double, int64_t) { CHECK_GREATER(uploadable_bytes, 1000); // This is the important check. If the changesets were not batched, @@ -4661,12 +4501,14 @@ TEST(Sync_BatchedUploadMessages) CHECK(uploaded_bytes == 0 || uploaded_bytes == uploadable_bytes); CHECK_EQUAL(0, downloaded_bytes); CHECK_EQUAL(0, downloadable_bytes); - static_cast(snapshot_version); + if (uploaded_bytes == uploadable_bytes) { + pf.promise.emplace_value(); + } }; - session.set_progress_handler(progress_handler); - session.bind(); + Session session = fixture.make_session(db, "/test", std::move(config)); session.wait_for_upload_complete_or_client_stopped(); + pf.future.get(); } @@ -4681,9 +4523,6 @@ TEST(Sync_UploadLogCompactionEnabled) ClientServerFixture fixture(server_dir, test_context, std::move(config)); fixture.start(); - Session session_1 = fixture.make_session(db_1, "/test"); - Session session_2 = fixture.make_session(db_2, "/test"); - // Create a changeset with lots of overwrites of the // same fields. { @@ -4699,12 +4538,13 @@ TEST(Sync_UploadLogCompactionEnabled) wt.commit(); } - session_1.bind(); + Session session_1 = fixture.make_session(db_1, "/test"); session_1.wait_for_upload_complete_or_client_stopped(); - auto progress_handler = [&](uint_fast64_t downloaded_bytes, uint_fast64_t downloadable_bytes, - uint_fast64_t uploaded_bytes, uint_fast64_t uploadable_bytes, - uint_fast64_t snapshot_version, double, double, int64_t) { + Session::Config session_config; + session_config.progress_handler = [&](uint_fast64_t downloaded_bytes, uint_fast64_t downloadable_bytes, + uint_fast64_t uploaded_bytes, uint_fast64_t uploadable_bytes, + uint_fast64_t snapshot_version, double, double, int64_t) { CHECK_EQUAL(downloaded_bytes, downloadable_bytes); CHECK_EQUAL(0, uploaded_bytes); CHECK_EQUAL(0, uploadable_bytes); @@ -4712,10 +4552,7 @@ TEST(Sync_UploadLogCompactionEnabled) CHECK_NOT_EQUAL(downloadable_bytes, 0); }; - session_2.set_progress_handler(progress_handler); - - session_2.bind(); - + Session session_2 = fixture.make_session(db_2, "/test", std::move(session_config)); session_2.wait_for_download_complete_or_client_stopped(); { @@ -4760,9 +4597,10 @@ TEST(Sync_UploadLogCompactionDisabled) Session session_1 = fixture.make_bound_session(db_1, "/test"); session_1.wait_for_upload_complete_or_client_stopped(); - auto progress_handler = [&](std::uint_fast64_t downloaded_bytes, std::uint_fast64_t downloadable_bytes, - std::uint_fast64_t uploaded_bytes, std::uint_fast64_t uploadable_bytes, - std::uint_fast64_t snapshot_version, double, double, int64_t) { + Session::Config session_config; + session_config.progress_handler = [&](uint_fast64_t downloaded_bytes, uint_fast64_t downloadable_bytes, + uint_fast64_t uploaded_bytes, uint_fast64_t uploadable_bytes, + uint_fast64_t snapshot_version, double, double, int64_t) { CHECK_EQUAL(downloaded_bytes, downloadable_bytes); CHECK_EQUAL(0, uploaded_bytes); CHECK_EQUAL(0, uploadable_bytes); @@ -4770,9 +4608,7 @@ TEST(Sync_UploadLogCompactionDisabled) CHECK_NOT_EQUAL(0, downloadable_bytes); }; - Session session_2 = fixture.make_session(db_2, "/test"); - session_2.set_progress_handler(progress_handler); - session_2.bind(); + Session session_2 = fixture.make_session(db_2, "/test", std::move(session_config)); session_2.wait_for_download_complete_or_client_stopped(); { @@ -4952,14 +4788,14 @@ TEST(Sync_ConnectionStateChange) bowl_2.add_stone(); }; - Session session_1 = fixture.make_session(db_1, "/test"); - session_1.set_connection_state_change_listener(listener_1); - session_1.bind(); + Session::Config config_1; + config_1.connection_state_change_listener = listener_1; + Session session_1 = fixture.make_session(db_1, "/test", std::move(config_1)); session_1.wait_for_download_complete_or_client_stopped(); - Session session_2 = fixture.make_session(db_2, "/test"); - session_2.set_connection_state_change_listener(listener_2); - session_2.bind(); + Session::Config config_2; + config_2.connection_state_change_listener = listener_2; + Session session_2 = fixture.make_session(db_2, "/test", std::move(config_2)); session_2.wait_for_download_complete_or_client_stopped(); fixture.close_server_side_connections(); @@ -4973,28 +4809,6 @@ TEST(Sync_ConnectionStateChange) } -TEST(Sync_ClientErrorHandler) -{ - TEST_DIR(dir); - TEST_CLIENT_DB(db); - ClientServerFixture fixture(dir, test_context); - fixture.start(); - - BowlOfStonesSemaphore bowl; - auto handler = [&](const SessionErrorInfo&) { - bowl.add_stone(); - }; - - Session session = fixture.make_session(db, "/test"); - session.set_error_handler(std::move(handler)); - session.bind(); - session.wait_for_download_complete_or_client_stopped(); - - fixture.close_server_side_connections(); - bowl.get_stone(); -} - - TEST(Sync_VerifyServerHistoryAfterLargeUpload) { TEST_DIR(server_dir); @@ -5019,7 +4833,6 @@ TEST(Sync_VerifyServerHistoryAfterLargeUpload) wt->commit(); Session session = fixture.make_session(db, "/test"); - session.bind(); session.wait_for_upload_complete_or_client_stopped(); } @@ -5148,9 +4961,8 @@ TEST_IF(Sync_SSL_Certificates, false) // Invalid token for the cloud. session_config.signed_user_token = g_signed_test_user_token; - Session session{client, db, nullptr, nullptr, std::move(session_config)}; - - auto listener = [&](ConnectionState state, const util::Optional& error_info) { + session_config.connection_state_change_listener = [&](ConnectionState state, + const util::Optional& error_info) { if (state == ConnectionState::disconnected) { CHECK(error_info); client_logger->debug("State change: disconnected, error_code = %1, is_fatal = %2", error_info->status, @@ -5161,9 +4973,7 @@ TEST_IF(Sync_SSL_Certificates, false) } }; - session.set_connection_state_change_listener(listener); - session.bind(); - + Session session{client, db, nullptr, nullptr, std::move(session_config)}; session.wait_for_download_complete_or_client_stopped(); } } @@ -5192,7 +5002,6 @@ TEST(Sync_AuthorizationHeaderName) custom_http_headers["Header-Name-2"] = "Header-Value-2"; session_config.custom_http_headers = std::move(custom_http_headers); Session session = fixture.make_session(db, "/test", std::move(session_config)); - session.bind(); session.wait_for_download_complete_or_client_stopped(); } @@ -5226,7 +5035,9 @@ TEST(Sync_BadChangeset) wt.commit(); } - auto listener = [&](ConnectionState state, const util::Optional& error_info) { + Session::Config session_config; + session_config.connection_state_change_listener = [&](ConnectionState state, + const util::Optional& error_info) { if (state != ConnectionState::disconnected) return; REALM_ASSERT(error_info); @@ -5235,11 +5046,7 @@ TEST(Sync_BadChangeset) did_fail = true; fixture.stop(); }; - - Session session = fixture.make_session(db, "/test"); - session.set_connection_state_change_listener(listener); - session.bind(); - + Session session = fixture.make_session(db, "/test", std::move(session_config)); session.wait_for_upload_complete_or_client_stopped(); session.wait_for_download_complete_or_client_stopped(); } @@ -5273,17 +5080,15 @@ TEST(Sync_GoodChangeset_AccentCharacterInFieldName) wt.commit(); } - auto listener = [&](ConnectionState state, const util::Optional) { + Session::Config session_config; + session_config.connection_state_change_listener = [&](ConnectionState state, + const util::Optional) { if (state != ConnectionState::disconnected) return; did_fail = true; fixture.stop(); }; - - Session session = fixture.make_session(db, "/test"); - session.set_connection_state_change_listener(listener); - session.bind(); - + Session session = fixture.make_session(db, "/test", std::move(session_config)); session.wait_for_upload_complete_or_client_stopped(); } CHECK_NOT(did_fail); @@ -5768,8 +5573,6 @@ NONCONCURRENT_TEST_TYPES(Sync_PrimaryKeyTypes, Int, String, ObjectId, UUID, util Session session_1 = fixture.make_session(db_1, "/test"); Session session_2 = fixture.make_session(db_2, "/test"); - session_1.bind(); - session_2.bind(); TEST_TYPE obj_1_id; TEST_TYPE obj_2_id; @@ -5846,8 +5649,6 @@ TEST(Sync_Mixed) Session session_1 = fixture.make_session(db_1, "/test"); Session session_2 = fixture.make_session(db_2, "/test"); - session_1.bind(); - session_2.bind(); { WriteTransaction tr{db_1}; @@ -5926,8 +5727,6 @@ TEST(Sync_TypedLinks) Session session_1 = fixture.make_session(db_1, "/test"); Session session_2 = fixture.make_session(db_2, "/test"); - session_1.bind(); - session_2.bind(); write_transaction(db_1, [](WriteTransaction& tr) { auto& g = tr.get_group(); @@ -5988,8 +5787,6 @@ TEST(Sync_Dictionary) Session session_1 = fixture.make_session(db_1, "/test"); Session session_2 = fixture.make_session(db_2, "/test"); - session_1.bind(); - session_2.bind(); Timestamp now{std::chrono::system_clock::now()}; @@ -6090,8 +5887,6 @@ TEST(Sync_CollectionInMixed) Session session_1 = fixture.make_session(db_1, "/test"); Session session_2 = fixture.make_session(db_2, "/test"); - session_1.bind(); - session_2.bind(); Timestamp now{std::chrono::system_clock::now()}; @@ -6248,8 +6043,6 @@ TEST(Sync_CollectionInCollection) Session session_1 = fixture.make_session(db_1, "/test"); Session session_2 = fixture.make_session(db_2, "/test"); - session_1.bind(); - session_2.bind(); Timestamp now{std::chrono::system_clock::now()}; @@ -6347,8 +6140,6 @@ TEST(Sync_DeleteCollectionInCollection) Session session_1 = fixture.make_session(db_1, "/test"); Session session_2 = fixture.make_session(db_2, "/test"); - session_1.bind(); - session_2.bind(); Timestamp now{std::chrono::system_clock::now()}; @@ -6413,8 +6204,6 @@ TEST(Sync_Dictionary_Links) Session session_1 = fixture.make_session(db_1, "/test"); Session session_2 = fixture.make_session(db_2, "/test"); - session_1.bind(); - session_2.bind(); // Test that we can transmit links. @@ -6517,8 +6306,6 @@ TEST(Sync_Set) Session session_1 = fixture.make_session(db_1, "/test"); Session session_2 = fixture.make_session(db_2, "/test"); - session_1.bind(); - session_2.bind(); ColKey col_ints, col_strings, col_mixeds; { @@ -6737,8 +6524,6 @@ TEST(Sync_UpgradeToClientHistory) Session session_1 = fixture.make_session(db_1, "/test"); Session session_2 = fixture.make_session(db_2, "/test"); - session_1.bind(); - session_2.bind(); write_transaction(db_1, [](WriteTransaction& tr) { auto foos = tr.get_group().get_table("class_Foo"); diff --git a/test/test_util_scope_exit.cpp b/test/test_util_scope_exit.cpp index 15182427c1d..f2656efe5c4 100644 --- a/test/test_util_scope_exit.cpp +++ b/test/test_util_scope_exit.cpp @@ -58,14 +58,59 @@ namespace { TEST(Util_ScopeExit_Basics) { bool called = false; - auto handler = [&]() noexcept { - called = true; - }; { - auto seg = util::make_scope_exit(handler); + util::ScopeExit se([&]() noexcept { + called = true; + }); CHECK_NOT(called); } CHECK(called); + + called = false; + try { + util::ScopeExit se([&]() noexcept { + called = true; + }); + CHECK_NOT(called); + throw 0; + } + catch (int) { + } + CHECK(called); + + called = false; + { + util::ScopeExit se([&]() noexcept { + called = true; + }); + CHECK_NOT(called); + se.cancel(); + } + CHECK_NOT(called); +} + +TEST(Util_ScopeExit_Fail) +{ + bool called = false; + { + util::ScopeExitFail se([&]() noexcept { + called = true; + }); + CHECK_NOT(called); + } + CHECK_NOT(called); + + called = false; + try { + util::ScopeExit se([&]() noexcept { + called = true; + }); + CHECK_NOT(called); + throw 0; + } + catch (int) { + } + CHECK(called); } } // unnamed namespace