Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: rollback-tx-snapshot atomically #1743

Merged
merged 1 commit into from
Oct 18, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 2 additions & 7 deletions src/clj/athens/self_hosted/web/datascript.clj
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,7 @@
[athens.common-events.resolver.atomic :as atomic-resolver]
[athens.common.logging :as log]
[athens.self-hosted.clients :as clients]
[clojure.pprint :as pprint]
[datascript.core :as d])
[clojure.pprint :as pprint])
(:import
(clojure.lang
ExceptionInfo)))
Expand Down Expand Up @@ -73,11 +72,7 @@
txs)]
(log/debug "transact! event-id:" event-id ", normalized-txs:" (with-out-str
(pprint/pprint txs)))
(let [processed-tx (->> txs
(common-db/block-uid-nil-eater @conn)
(common-db/linkmaker @conn)
(common-db/orderkeeper @conn))
{:keys [tempids]} (d/transact! conn processed-tx)
(let [{:keys [tempids]} (common-db/transact-with-middleware! conn txs)
{:db/keys [current-tx]} tempids]
(log/debug "transact! event-id:" event-id ", transacted in tx-id:" current-tx)
(common-events/build-event-accepted event-id current-tx)))
Expand Down
16 changes: 16 additions & 0 deletions src/cljc/athens/common_db.cljc
Original file line number Diff line number Diff line change
Expand Up @@ -768,3 +768,19 @@
(catch #?(:cljs :default
:clj Exception) e
(block-uid-nil-eater-error e input-tx)))))


(defn tx-with-middleware
[db tx-data]
(->> tx-data
(block-uid-nil-eater db)
(linkmaker db)
(orderkeeper db)))


(defn transact-with-middleware!
"Transact tx-data enriched with middleware txs into conn."
[conn tx-data]
;; 🎶 Sia "Cheap Thrills"
(d/transact! conn (tx-with-middleware @conn tx-data)))

7 changes: 2 additions & 5 deletions src/cljs/athens/effects.cljs
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,11 @@

;; Effects


(rf/reg-fx
:transact!
(fn [tx-data]
;; 🎶 Sia "Cheap Thrills"
(d/transact! db/dsdb (->> tx-data
(common-db/block-uid-nil-eater @db/dsdb)
(common-db/linkmaker @db/dsdb)
(common-db/orderkeeper @db/dsdb)))))
(common-db/transact-with-middleware! db/dsdb tx-data)))


(rf/reg-fx
Expand Down
66 changes: 38 additions & 28 deletions src/cljs/athens/events/remote.cljs
Original file line number Diff line number Diff line change
Expand Up @@ -100,17 +100,32 @@
(rf/reg-fx
:remote/snapshot-dsdb!
(fn []
(log/debug ":remote/snapshot-dsdb! event at time" (:max-tx @db/dsdb))
(log/debug ":remote/snapshot-dsdb! at time" (:max-tx @db/dsdb))
(reset! db/dsdb-snapshot @db/dsdb)))


(rf/reg-event-fx
:remote/rollback-dsdb
(fn [_ _]
(log/debug ":remote/rollback-dsdb event to time" (:max-tx @db/dsdb-snapshot))
(log/debug ":remote/rollback-dsdb to time" (:max-tx @db/dsdb-snapshot))
{:reset-conn! @db/dsdb-snapshot}))


;; NB: this operation needs to perform all these stateful operations one after another
;; and can't be split, so we can't reuse some of the other events we have.
(rf/reg-event-fx
:remote/rollback-resolve-transact-snapshot
(fn [_ [_ event]]
(log/debug ":remote/rollback-resolve-transact-snapshot rollback db to time" (:max-tx @db/dsdb-snapshot))
(d/reset-conn! db/dsdb @db/dsdb-snapshot)
(let [txs (atomic-resolver/resolve-to-tx @db/dsdb-snapshot event)]
(log/debug ":remote/rollback-resolve-transact-snapshot resolved txs:" (pr-str txs))
(common-db/transact-with-middleware! db/dsdb txs))
(log/debug ":remote/rollback-resolve-transact-snapshot snapshot at time" (:max-tx @db/dsdb))
(reset! db/dsdb-snapshot @db/dsdb)
{}))


(rf/reg-event-db
:remote/start-event-sync
(fn [db _]
Expand All @@ -129,28 +144,24 @@
{:db (update db :event-sync #(event-sync/remove % :server (:event/id event) event))}))


(defn- changed-order?
(defn- new-event?
[[type _ _ _ noop?]]
;; TODO: if we support rejections via event removal, this also needs to check
;; if the :remove changed order, while still ignoring removal from the tail.
(and (= type :add) (not noop?)))


(rf/reg-event-fx
:remote/snapshot-transact
(fn [_ [_ tx-data]]
(fn [_ [_ event]]
(log/debug ":remote/snapshot-transact update to time" (inc (:max-tx @db/dsdb-snapshot)))
{:remote/snapshot-transact! tx-data}))
{:remote/snapshot-transact! (atomic-resolver/resolve-to-tx @db/dsdb-snapshot event)}))


(rf/reg-fx
:remote/snapshot-transact!
(fn [tx-data]
(swap! db/dsdb-snapshot
(fn [db]
(d/db-with db (->> tx-data
(common-db/linkmaker db)
(common-db/linkmaker db)))))))
(d/db-with db (common-db/tx-with-middleware db tx-data))))))


(rf/reg-event-fx
Expand All @@ -162,36 +173,35 @@
{:db db'
;; Rollback and reapply all events in the memory stage.
;; If there's no events to reapply, just mark as synced.
:fx [[:dispatch-n (into [[:remote/rollback-dsdb]] (if (empty? memory-log)
[[:db/sync]]
(map (fn [e] [:resolve-transact (second e)])
(-> db' :event-sync :stages :memory))))]]})))
:fx [[:dispatch-n (into [[:remote/rollback-dsdb]]
(if (empty? memory-log)
[[:db/sync]]
(map (fn [e] [:resolve-transact (second e)])
(-> db' :event-sync :stages :memory))))]]})))


(rf/reg-event-fx
:remote/apply-forwarded-event
(fn [{db :db} [_ {:event/keys [id] :as event}]]
(log/debug ":remote/apply-forwarded-event event:" (pr-str event))
(let [db' (update db :event-sync #(event-sync/add % :server id event))
changed-order? (changed-order? (-> db' :event-sync :last-op))
memory-log (event-sync/stage-log (:event-sync db') :memory)
txs (atomic-resolver/resolve-to-tx @db/dsdb-snapshot event)]
(log/debug ":remote/apply-forwarded-event event changed order?:" changed-order?)
(log/debug ":remote/apply-forwarded-event resolved txs:" (pr-str txs))
(let [db' (update db :event-sync #(event-sync/add % :server id event))
new-event? (new-event? (-> db' :event-sync :last-op))
memory-log (event-sync/stage-log (:event-sync db') :memory)]
(log/debug ":remote/apply-forwarded-event new event?:" new-event?)
{:db db'
:fx [[:dispatch-n (cond-> []
;; Mark as synced if there's no events left in memory.
(empty? memory-log) (into [[:db/sync]])
;; If order does not change, just update the snapshot with tx.
(not changed-order?) (into [[:remote/snapshot-transact txs]])
;; If order changes, apply the tx over the last dsdb snapshot from,
;; If no new event was added, just update the snapshot with event.
(not new-event?) (into [[:remote/snapshot-transact event]])
;; If there's a new event, apply it over the last dsdb snapshot from
;; the server, then use that as the new snapshot, then reapply
;; all events in the memory stage.
changed-order? (into [[:remote/rollback-dsdb]
[:transact txs]
[:remote/snapshot-dsdb]])
changed-order? (into (map (fn [e] [:resolve-transact (second e)])
(-> db' :event-sync :stages :memory)))
;; NB: would be more performant to just transact over dsdb and dsdb-snapshot
;; if there's no txs in-memory to reapply, but this is ok for now.
new-event? (into [[:remote/rollback-resolve-transact-snapshot event]])
new-event? (into (map (fn [e] [:resolve-transact (second e)])
(-> db' :event-sync :stages :memory)))
;; Remove the server event after everything is done.
true (into [[:remote/clear-server-event event]]))]]})))

Expand Down