From 420a1286627e22a9d978f936001a7c6e10443566 Mon Sep 17 00:00:00 2001 From: Filipe Silva Date: Mon, 28 Mar 2022 22:47:26 +0100 Subject: [PATCH] feat: migrate to efficient event log filtering --- .github/workflows/build.yml | 3 + src/clj/athens/self_hosted/event_log.clj | 124 +++++++++--------- .../self_hosted/event_log_migrations.clj | 75 ++++++++++- src/clj/athens/self_hosted/fluree/utils.clj | 13 +- src/clj/athens/self_hosted/save_load.clj | 4 +- .../self_hosted/event_log_migrations_test.clj | 49 +++++++ test/athens/self_hosted/event_log_test.clj | 41 ++++++ .../self_hosted/fluree/test_helpers.clj | 55 ++++++++ 8 files changed, 287 insertions(+), 77 deletions(-) create mode 100644 test/athens/self_hosted/event_log_migrations_test.clj create mode 100644 test/athens/self_hosted/event_log_test.clj create mode 100644 test/athens/self_hosted/fluree/test_helpers.clj diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 22e06d38c5..04dad235b5 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -57,6 +57,9 @@ jobs: - uses: ./.github/custom-actions/clojure-env - uses: ./.github/custom-actions/node-env + - name: Start Fluree process + run: yarn server:fluree + - name: Run JVM tests run: yarn server:test diff --git a/src/clj/athens/self_hosted/event_log.clj b/src/clj/athens/self_hosted/event_log.clj index 66637cbf8c..325b84025b 100644 --- a/src/clj/athens/self_hosted/event_log.clj +++ b/src/clj/athens/self_hosted/event_log.clj @@ -17,7 +17,14 @@ UUID))) -(def ledger "events/log") +(def default-ledger "events/log") + + +(defn fluree-comp->ledger + [fluree] + (-> fluree + :ledger + (or default-ledger))) (def initial-events @@ -31,8 +38,9 @@ :event/id (str id) :event/data (pr-str data) ;; Compute the new order number as 1 higher than last. + ;; This will be 1 if there are no events yet. ;; NOTE: is max-pred-val efficient for very large collections? I don't know. - #_#_:event/order "#(inc (max-pred-val \"event/order\"))"}) + :event/order "#(inc (max-pred-val \"event/order\"))"}) (defn deserialize @@ -45,22 +53,24 @@ (defn- events-page "Returns {:next-page ... :items ...}, where items is a vector of events in page-number for all events in db split by page-size. For use with `iteration`." - ([db page-size page-number] + ([db since-order page-size page-number] {:next-page (inc page-number) - :items @(fdb/query db - {:select {"?event" ["*"]} - :where [["?event" "event/id", "?id"]] - ;; Subject ID (?event here) is a monotonically incrementing bigint, - ;; so ordering by that gives us event insertion order since events are immutable. - :opts {:orderBy ["ASC", "?event"] - :limit page-size - :offset (* page-size page-number)}})})) - - -(defn- event-id->subject-id + :items (fu/query db + {:select {"?event" ["*"]} + :where [["?event" "event/id", "?id"] + ["?event" "event/order" (str "#(> ?order " since-order ")")]] + ;; Subject ID (?event here) is a monotonically incrementing bigint, + ;; so ordering by that gives us event insertion order since events are immutable. + :opts {:orderBy ["ASC", "?order"] + :limit page-size + :offset (* page-size page-number)}})})) + + +(defn- event-id->order [db event-id] - (first @(fdb/query db {:select "?event" - :where [["?event" "event/id", (str event-id)]]}))) + (first (fu/query db {:select "?order" + :where [["?event" "event/id", (str event-id)] + ["?event" "event/order" "?order"]]}))) (defn last-event-id @@ -68,12 +78,12 @@ (-> fluree :conn-atom deref - (fdb/db ledger) - (fdb/query {:selectOne {"?event" ["*"]} - :where [["?event" "event/id", "?id"]] - :opts {:orderBy ["DESC" "?event"] - :limit 1}}) - deref + (fdb/db (fluree-comp->ledger fluree)) + (fu/query {:selectOne {"?event" ["*"]} + :where [["?event" "event/id" "?id"] + ["?event" "event/order" "?order"]] + :opts {:orderBy ["DESC" "?order"] + :limit 1}}) deserialize first)) @@ -82,44 +92,27 @@ "Returns a lazy-seq of all events in conn up to now, starting at optional event-id. Can potentially be very large, so don't hold on to the seq head while processing, and don't use fns that realize the whole coll (e.g. count)." - ([fluree] - (let [db (fdb/db (-> fluree :conn-atom deref) ledger) - step (partial events-page db 100)] - ;; New core fn added in Clojure 11. - ;; See https://www.juxt.pro/blog/new-clojure-iteration for usage example. - (->> (iteration step - :kf :next-page - :vf :items - :somef #(-> % :events seq) - :initk 0) - (sequence cat) - (map deserialize)))) - ([fluree event-id] - (let [db (fdb/db (-> fluree :conn-atom deref) ledger) - step (partial events-page db 100)] - (when-not (event-id->subject-id db event-id) - (throw (ex-info "Cannot find starting id" {:event-id event-id}))) - (->> (iteration step - :kf :next-page - :vf :items - :somef #(-> % :events seq) - :initk 0) - (sequence cat) - (map deserialize) - ;; The point here is to get all events since event-id. - ;; We're getting all the events, dropping every one until the first one we care about, - ;; then dropping that one too. - ;; This is a terrible way to do what we want. - ;; Instead we should filter out all the events we don't care about on the fluree query. - ;; But when I (filipe) tried to do that, it made each events-page query take 30s instead of 0.3s. - ;; See https://github.com/fluree/db/issues/160 - ;; This seems good enough for now. - ;; TODO: figure out a performant way to do this. - (drop-while (fn [[id]] - (if event-id - (not= event-id id) - false))) - (drop 1))))) + [fluree & {:keys [since-event-id since-order]}] + (let [db (fdb/db (-> fluree :conn-atom deref) + (fluree-comp->ledger fluree)) + since-order' (cond + since-order since-order + since-event-id (or (event-id->order db since-event-id) + (throw (ex-info "Cannot find starting id" + {:since-event-id since-event-id}))) + ;; First order number is 1, so if we start + ;; on 0 we will get all events. + :else 0) + step (partial events-page db since-order' 100)] + ;; New core fn added in Clojure 11. + ;; See https://www.juxt.pro/blog/new-clojure-iteration for usage example. + (->> (iteration step + :kf :next-page + :vf :items + :somef #(-> % :items seq) + :initk 0) + (sequence cat) + (map deserialize)))) (defn double-write? @@ -147,6 +140,7 @@ (throw (ex-info (str "add-event! timed-out 3 times on " id) {:id id})) (let [conn (-> fluree :conn-atom deref) + ledger (fluree-comp->ledger fluree) ch (fdb/transact-async conn ledger [(serialize id data)]) {:keys [status block] :as r} (async/ fluree :conn-atom deref)] + (let [conn (-> fluree :conn-atom deref) + ledger (fluree-comp->ledger fluree)] (log/info "Looking for event-log fluree ledger") (when (empty? @(fdb/ledger-info conn ledger)) (log/info "Fluree ledger for event-log not found, creating" ledger) @@ -192,7 +187,7 @@ (reset! block (add-event! fluree id data))) (log/info "✅ Populated fresh ledger.") (log/info "Bringing local ledger to to date with latest transactions...") - (events-page (fdb/db conn ledger {:syncTo @block}) 1 0) + (fu/wait-for-block conn ledger @block) (log/info "✅ Fluree local ledger up to date."))) (log/info "✅ Fluree ledger for event-log created.")) (migrate/migrate-ledger! conn ledger event-log-migrations/migrations)))) @@ -248,7 +243,7 @@ "Returns {:stop ... :next-page ... :items ...}, where items is a seq of recovered events in conn for block=idx+1 in conn. For use with `iteration`." [conn idx] - (let [res @(fdb/block-query conn ledger {:block (inc idx) :pretty-print true}) + (let [res @(fdb/block-query conn default-ledger {:block (inc idx) :pretty-print true}) ex-msg (ex-message res)] (println res) ;; If the query because the is higher than the total blocks, @@ -317,7 +312,6 @@ (count (recovered-events fluree-comp)) ;; Debug event recovery - (events-page (fdb/db (-> fluree-comp :conn-atom deref) ledger) 1 1) (recover-block-events (-> fluree-comp :conn-atom deref) 3) (take 3 (recovered-events fluree-comp)) (take 3 (events fluree-comp)) @@ -327,4 +321,4 @@ (take 3 (events fluree-comp))) ;; Delete ledger. - @(fdb/delete-ledger (-> fluree-comp :conn-atom deref) ledger)) + @(fdb/delete-ledger (-> fluree-comp :conn-atom deref) default-ledger)) diff --git a/src/clj/athens/self_hosted/event_log_migrations.clj b/src/clj/athens/self_hosted/event_log_migrations.clj index 037e033b55..bcd1f84def 100644 --- a/src/clj/athens/self_hosted/event_log_migrations.clj +++ b/src/clj/athens/self_hosted/event_log_migrations.clj @@ -95,11 +95,74 @@ ;; Adds a order number for fast partial event queries via a filter in a where-triple. ;; Existing events are updated to contain the right order number. +(def migration-3-schema + [{:_id :_predicate + :_predicate/name :event/order + ;; TODO: the "strictly increasing" condition could be validated via specs: + ;; - collection spec to ensure order is there + ;; - predicate spec to ensure the new number is bigger than the max + ;; This validation isn't happening here, we're just transacting "correct" data. + :_predicate/doc "Strictly increasing big int for event ordering." + :_predicate/unique true + :_predicate/type :bigint + :_predicate/spec [[:_fn/name :immutable]]}]) + + +(defn add-order! + [conn ledger sid] + (fu/transact! conn ledger [{:_id sid + ;; Would be nice to do multiple order numbers in the same tx, + ;; but max-pred-val seems to compute to the value before the tx + ;; and thus will be the same for all events in the tx. + :event/order "#(inc (max-pred-val \"event/order\"))"}])) + + +(defn sid+order-page + "Returns {:next-page ... :items ...}, where items is a vector of [subject-id order] in + page-number for all events with an event/id in db, ordered by subject-id, split by page-size. + Before event/order was added, events were ordered by subject-id as it's a strictly increasing + bigint that acts as insertion order. + Events without event/order will return nil as order. For use with `iteration`." + ([db page-size page-number] + {:next-page (inc page-number) + :items (fu/query db {:select ["?event" "?order"] + :where [["?event" "event/id", "?id"] + {:optional [["?event" "event/order" "?order"]]}] + :opts {:orderBy ["ASC", "?event"] + :limit page-size + :offset (* page-size page-number)}})})) + + +(defn sids-with-missing-order + [conn ledger] + (let [db (fdb/db conn ledger) + step (partial sid+order-page db 1)] + (->> (iteration step + :kf :next-page + :vf :items + :somef #(-> % :items seq) + :initk 0) + (sequence cat) + ;; Remove items that have a non-nil order in [sid order] + (remove second) + (map first)))) + + +(defn migrate-to-3 + [conn ledger] + (when-not ((migrate/predicates conn ledger) "event/order") + (->> (fu/transact! conn ledger migration-3-schema) + :block + ;; Force sync to ensure query recognizes the new schema predicate. + (fu/wait-for-block conn ledger))) + (run! (partial add-order! conn ledger) + (sids-with-missing-order conn ledger))) + (def migrations [[1 migrate-to-1] [2 migrate-to-2] - ;; [3 migrate-to-3] + [3 migrate-to-3] ;; [4 migrate-to-4] ]) @@ -141,4 +204,14 @@ (fu/transact! conn ledger [{:_id [:event/id (nth ids 1)] :event/id (str (random-uuid)) :event/data "10"}]) + + ;; Migration 3 + ;; The events inserted above don't have an order id + (all-events) + + ;; Running the migration should add it. + (migrate-to-3 conn ledger) + (all-events) + + ;; ) diff --git a/src/clj/athens/self_hosted/fluree/utils.clj b/src/clj/athens/self_hosted/fluree/utils.clj index 79da95512e..7487287e1a 100644 --- a/src/clj/athens/self_hosted/fluree/utils.clj +++ b/src/clj/athens/self_hosted/fluree/utils.clj @@ -1,5 +1,6 @@ (ns athens.self-hosted.fluree.utils (:require + [clojure.core.async :as async] [fluree.db.api :as fdb])) @@ -20,12 +21,6 @@ (query q-data)))) -(defn sync-to - [conn ledger block] - (query conn ledger - ;; Look up the first collection name. - ;; This can be any query, the cheaper the better, all that - ;; matters is the :syncTo option. - {:selectOne "?o" - :where [["?s" "_collection/name" "?o"]] - :opts {:syncTo block}})) +(defn wait-for-block + [conn ledger expected-block] + (async/> (event-log/add-event! comp id data) + fth/wait-for-block)) + (is (= events (event-log/events comp))))) + + +(deftest lists-events-since + (let [comp (make-comp) + events (map (fn [id] [id {:id id}]) + (repeatedly 4 random-uuid)) + since (-> events second first) + events' (drop 2 events)] + (event-log/init! comp []) + (doseq [[id data] events] + (->> (event-log/add-event! comp id data) + fth/wait-for-block)) + (is (= events' (event-log/events comp :since-event-id since))))) diff --git a/test/athens/self_hosted/fluree/test_helpers.clj b/test/athens/self_hosted/fluree/test_helpers.clj new file mode 100644 index 0000000000..83bbc35cd1 --- /dev/null +++ b/test/athens/self_hosted/fluree/test_helpers.clj @@ -0,0 +1,55 @@ +(ns athens.self-hosted.fluree.test-helpers + (:require + [athens.self-hosted.fluree.utils :as utils] + [athens.self-hosted.fluree.utils :as fu] + [clojure.string :as str] + [fluree.db.api :as fdb])) + + +(def conn-atom (atom nil)) +(def ledger-atom (atom nil)) +(def last-transacted-block-atom (atom nil)) + + +(defn conn+ledger + [] + [@conn-atom @ledger-atom]) + + +(defn with-ledger + [f] + (reset! conn-atom (fdb/connect "http://localhost:8090")) + ;; Due to https://github.com/fluree/ledger/issues/98, we can't just + ;; recreate ledgers with the same name, so we have to be creating and + ;; discarding the names. + (reset! ledger-atom (str "athens/" (str/replace (random-uuid) #"-" ""))) + (reset! last-transacted-block-atom nil) + @(fdb/new-ledger @conn-atom @ledger-atom) + (fdb/wait-for-ledger-ready @conn-atom @ledger-atom) + (f) + @(fdb/delete-ledger @conn-atom @ledger-atom) + (fdb/close @conn-atom) + (reset! conn-atom nil) + (reset! ledger-atom nil) + (reset! last-transacted-block-atom nil)) + + +(defn transact! + [tx-data] + (let [res (utils/transact! @conn-atom @ledger-atom tx-data)] + (reset! last-transacted-block-atom (:block res)) + res)) + + +(defn query + [q-data] + (utils/query @conn-atom @ledger-atom q-data)) + + +(defn wait-for-block + "Useful when querying right after transacting, as fluree can be behind on queries. + Only use the 0-arity when the last transaction was via test-helpers/transact!." + ([] + (wait-for-block @last-transacted-block-atom)) + ([expected-block] + (utils/wait-for-block @conn-atom @ledger-atom expected-block)))