Skip to content

Commit

Permalink
feat: migrate to efficient event log filtering
Browse files Browse the repository at this point in the history
  • Loading branch information
filipesilva committed Mar 28, 2022
1 parent a94fc64 commit 420a128
Show file tree
Hide file tree
Showing 8 changed files with 287 additions and 77 deletions.
3 changes: 3 additions & 0 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ jobs:
- uses: ./.github/custom-actions/clojure-env
- uses: ./.github/custom-actions/node-env

- name: Start Fluree process
run: yarn server:fluree

- name: Run JVM tests
run: yarn server:test

Expand Down
124 changes: 59 additions & 65 deletions src/clj/athens/self_hosted/event_log.clj
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,14 @@
UUID)))


(def ledger "events/log")
(def default-ledger "events/log")


(defn fluree-comp->ledger
[fluree]
(-> fluree
:ledger
(or default-ledger)))


(def initial-events
Expand All @@ -31,8 +38,9 @@
:event/id (str id)
:event/data (pr-str data)
;; Compute the new order number as 1 higher than last.
;; This will be 1 if there are no events yet.
;; NOTE: is max-pred-val efficient for very large collections? I don't know.
#_#_:event/order "#(inc (max-pred-val \"event/order\"))"})
:event/order "#(inc (max-pred-val \"event/order\"))"})


(defn deserialize
Expand All @@ -45,35 +53,37 @@
(defn- events-page
"Returns {:next-page ... :items ...}, where items is a vector of events in
page-number for all events in db split by page-size. For use with `iteration`."
([db page-size page-number]
([db since-order page-size page-number]
{:next-page (inc page-number)
:items @(fdb/query db
{:select {"?event" ["*"]}
:where [["?event" "event/id", "?id"]]
;; Subject ID (?event here) is a monotonically incrementing bigint,
;; so ordering by that gives us event insertion order since events are immutable.
:opts {:orderBy ["ASC", "?event"]
:limit page-size
:offset (* page-size page-number)}})}))


(defn- event-id->subject-id
:items (fu/query db
{:select {"?event" ["*"]}
:where [["?event" "event/id", "?id"]
["?event" "event/order" (str "#(> ?order " since-order ")")]]
;; Subject ID (?event here) is a monotonically incrementing bigint,
;; so ordering by that gives us event insertion order since events are immutable.
:opts {:orderBy ["ASC", "?order"]
:limit page-size
:offset (* page-size page-number)}})}))


(defn- event-id->order
[db event-id]
(first @(fdb/query db {:select "?event"
:where [["?event" "event/id", (str event-id)]]})))
(first (fu/query db {:select "?order"
:where [["?event" "event/id", (str event-id)]
["?event" "event/order" "?order"]]})))


(defn last-event-id
[fluree]
(-> fluree
:conn-atom
deref
(fdb/db ledger)
(fdb/query {:selectOne {"?event" ["*"]}
:where [["?event" "event/id", "?id"]]
:opts {:orderBy ["DESC" "?event"]
:limit 1}})
deref
(fdb/db (fluree-comp->ledger fluree))
(fu/query {:selectOne {"?event" ["*"]}
:where [["?event" "event/id" "?id"]
["?event" "event/order" "?order"]]
:opts {:orderBy ["DESC" "?order"]
:limit 1}})
deserialize
first))

Expand All @@ -82,44 +92,27 @@
"Returns a lazy-seq of all events in conn up to now, starting at optional event-id.
Can potentially be very large, so don't hold on to the seq head while
processing, and don't use fns that realize the whole coll (e.g. count)."
([fluree]
(let [db (fdb/db (-> fluree :conn-atom deref) ledger)
step (partial events-page db 100)]
;; New core fn added in Clojure 11.
;; See https://www.juxt.pro/blog/new-clojure-iteration for usage example.
(->> (iteration step
:kf :next-page
:vf :items
:somef #(-> % :events seq)
:initk 0)
(sequence cat)
(map deserialize))))
([fluree event-id]
(let [db (fdb/db (-> fluree :conn-atom deref) ledger)
step (partial events-page db 100)]
(when-not (event-id->subject-id db event-id)
(throw (ex-info "Cannot find starting id" {:event-id event-id})))
(->> (iteration step
:kf :next-page
:vf :items
:somef #(-> % :events seq)
:initk 0)
(sequence cat)
(map deserialize)
;; The point here is to get all events since event-id.
;; We're getting all the events, dropping every one until the first one we care about,
;; then dropping that one too.
;; This is a terrible way to do what we want.
;; Instead we should filter out all the events we don't care about on the fluree query.
;; But when I (filipe) tried to do that, it made each events-page query take 30s instead of 0.3s.
;; See https://github.com/fluree/db/issues/160
;; This seems good enough for now.
;; TODO: figure out a performant way to do this.
(drop-while (fn [[id]]
(if event-id
(not= event-id id)
false)))
(drop 1)))))
[fluree & {:keys [since-event-id since-order]}]
(let [db (fdb/db (-> fluree :conn-atom deref)
(fluree-comp->ledger fluree))
since-order' (cond
since-order since-order
since-event-id (or (event-id->order db since-event-id)
(throw (ex-info "Cannot find starting id"
{:since-event-id since-event-id})))
;; First order number is 1, so if we start
;; on 0 we will get all events.
:else 0)
step (partial events-page db since-order' 100)]
;; New core fn added in Clojure 11.
;; See https://www.juxt.pro/blog/new-clojure-iteration for usage example.
(->> (iteration step
:kf :next-page
:vf :items
:somef #(-> % :items seq)
:initk 0)
(sequence cat)
(map deserialize))))


(defn double-write?
Expand Down Expand Up @@ -147,6 +140,7 @@
(throw (ex-info (str "add-event! timed-out 3 times on " id)
{:id id}))
(let [conn (-> fluree :conn-atom deref)
ledger (fluree-comp->ledger fluree)
ch (fdb/transact-async conn ledger [(serialize id data)])
{:keys [status block]
:as r} (async/<!! (athens.async/with-timeout ch timeout :timed-out))]
Expand Down Expand Up @@ -178,7 +172,8 @@
([fluree]
(init! fluree initial-events))
([fluree seed-events]
(let [conn (-> fluree :conn-atom deref)]
(let [conn (-> fluree :conn-atom deref)
ledger (fluree-comp->ledger fluree)]
(log/info "Looking for event-log fluree ledger")
(when (empty? @(fdb/ledger-info conn ledger))
(log/info "Fluree ledger for event-log not found, creating" ledger)
Expand All @@ -192,7 +187,7 @@
(reset! block (add-event! fluree id data)))
(log/info "✅ Populated fresh ledger.")
(log/info "Bringing local ledger to to date with latest transactions...")
(events-page (fdb/db conn ledger {:syncTo @block}) 1 0)
(fu/wait-for-block conn ledger @block)
(log/info "✅ Fluree local ledger up to date.")))
(log/info "✅ Fluree ledger for event-log created."))
(migrate/migrate-ledger! conn ledger event-log-migrations/migrations))))
Expand Down Expand Up @@ -248,7 +243,7 @@
"Returns {:stop ... :next-page ... :items ...}, where items is a seq of recovered
events in conn for block=idx+1 in conn. For use with `iteration`."
[conn idx]
(let [res @(fdb/block-query conn ledger {:block (inc idx) :pretty-print true})
(let [res @(fdb/block-query conn default-ledger {:block (inc idx) :pretty-print true})
ex-msg (ex-message res)]
(println res)
;; If the query because the is higher than the total blocks,
Expand Down Expand Up @@ -317,7 +312,6 @@
(count (recovered-events fluree-comp))

;; Debug event recovery
(events-page (fdb/db (-> fluree-comp :conn-atom deref) ledger) 1 1)
(recover-block-events (-> fluree-comp :conn-atom deref) 3)
(take 3 (recovered-events fluree-comp))
(take 3 (events fluree-comp))
Expand All @@ -327,4 +321,4 @@
(take 3 (events fluree-comp)))

;; Delete ledger.
@(fdb/delete-ledger (-> fluree-comp :conn-atom deref) ledger))
@(fdb/delete-ledger (-> fluree-comp :conn-atom deref) default-ledger))
75 changes: 74 additions & 1 deletion src/clj/athens/self_hosted/event_log_migrations.clj
Original file line number Diff line number Diff line change
Expand Up @@ -95,11 +95,74 @@
;; Adds a order number for fast partial event queries via a filter in a where-triple.
;; Existing events are updated to contain the right order number.

(def migration-3-schema
[{:_id :_predicate
:_predicate/name :event/order
;; TODO: the "strictly increasing" condition could be validated via specs:
;; - collection spec to ensure order is there
;; - predicate spec to ensure the new number is bigger than the max
;; This validation isn't happening here, we're just transacting "correct" data.
:_predicate/doc "Strictly increasing big int for event ordering."
:_predicate/unique true
:_predicate/type :bigint
:_predicate/spec [[:_fn/name :immutable]]}])


(defn add-order!
[conn ledger sid]
(fu/transact! conn ledger [{:_id sid
;; Would be nice to do multiple order numbers in the same tx,
;; but max-pred-val seems to compute to the value before the tx
;; and thus will be the same for all events in the tx.
:event/order "#(inc (max-pred-val \"event/order\"))"}]))


(defn sid+order-page
"Returns {:next-page ... :items ...}, where items is a vector of [subject-id order] in
page-number for all events with an event/id in db, ordered by subject-id, split by page-size.
Before event/order was added, events were ordered by subject-id as it's a strictly increasing
bigint that acts as insertion order.
Events without event/order will return nil as order. For use with `iteration`."
([db page-size page-number]
{:next-page (inc page-number)
:items (fu/query db {:select ["?event" "?order"]
:where [["?event" "event/id", "?id"]
{:optional [["?event" "event/order" "?order"]]}]
:opts {:orderBy ["ASC", "?event"]
:limit page-size
:offset (* page-size page-number)}})}))


(defn sids-with-missing-order
[conn ledger]
(let [db (fdb/db conn ledger)
step (partial sid+order-page db 1)]
(->> (iteration step
:kf :next-page
:vf :items
:somef #(-> % :items seq)
:initk 0)
(sequence cat)
;; Remove items that have a non-nil order in [sid order]
(remove second)
(map first))))


(defn migrate-to-3
[conn ledger]
(when-not ((migrate/predicates conn ledger) "event/order")
(->> (fu/transact! conn ledger migration-3-schema)
:block
;; Force sync to ensure query recognizes the new schema predicate.
(fu/wait-for-block conn ledger)))
(run! (partial add-order! conn ledger)
(sids-with-missing-order conn ledger)))


(def migrations
[[1 migrate-to-1]
[2 migrate-to-2]
;; [3 migrate-to-3]
[3 migrate-to-3]
;; [4 migrate-to-4]
])

Expand Down Expand Up @@ -141,4 +204,14 @@
(fu/transact! conn ledger [{:_id [:event/id (nth ids 1)]
:event/id (str (random-uuid))
:event/data "10"}])

;; Migration 3
;; The events inserted above don't have an order id
(all-events)

;; Running the migration should add it.
(migrate-to-3 conn ledger)
(all-events)

;;
)
13 changes: 4 additions & 9 deletions src/clj/athens/self_hosted/fluree/utils.clj
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
(ns athens.self-hosted.fluree.utils
(:require
[clojure.core.async :as async]
[fluree.db.api :as fdb]))


Expand All @@ -20,12 +21,6 @@
(query q-data))))


(defn sync-to
[conn ledger block]
(query conn ledger
;; Look up the first collection name.
;; This can be any query, the cheaper the better, all that
;; matters is the :syncTo option.
{:selectOne "?o"
:where [["?s" "_collection/name" "?o"]]
:opts {:syncTo block}}))
(defn wait-for-block
[conn ledger expected-block]
(async/<!! (fdb/db conn ledger {:syncTo expected-block})))
4 changes: 2 additions & 2 deletions src/clj/athens/self_hosted/save_load.clj
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@
deref)
previous-events (edn/read-string (slurp filename))
total (count previous-events)
ledger-exists? (seq @(fdb/ledger-info conn event-log/ledger))
ledger-exists? (seq @(fdb/ledger-info conn event-log/default-ledger))
progress (atom 0)
last-added-event-id (when resume
(event-log/last-event-id comp))
Expand All @@ -68,7 +68,7 @@
(do
(log/info "Deleting the current ledger before loading data....")
@(fdb/delete-ledger conn
event-log/ledger)
event-log/default-ledger)
(log/warn "Please restart the fluree docker."))

(and (not ledger-exists?) resume)
Expand Down
49 changes: 49 additions & 0 deletions test/athens/self_hosted/event_log_migrations_test.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
(ns athens.self-hosted.event-log-migrations-test
(:require
[athens.self-hosted.event-log-migrations :as event-log-migrations]
[athens.self-hosted.fluree.test-helpers :as fth]
[athens.self-hosted.fluree.utils :as fu]
[athens.self-hosted.migrate :as migrate]
[clojure.test :as t :refer [deftest testing is]])
(:import
(clojure.lang
ExceptionInfo)))


(t/use-fixtures :each fth/with-ledger)


(defn all-events-subjects
[]
(fu/query @fth/conn-atom @fth/ledger-atom {:select ["*"]
:from "event"}))


(deftest migration-to-1
(let [[conn ledger] (fth/conn+ledger)]
(migrate/migrate-ledger! conn ledger event-log-migrations/migrations :up-to 1)
(run! #(fth/transact! [{:_id :event :event/id %}]) (range 4))
(fth/wait-for-block)
(is (= 4 (count (all-events-subjects))) "Should have 4 events")))


(deftest migration-to-2
(let [[conn ledger] (fth/conn+ledger)]
(migrate/migrate-ledger! conn ledger event-log-migrations/migrations :up-to 2)
(fth/transact! [{:_id :event :event/id "1" :event/data "1"}])
(is (thrown-with-msg? ExceptionInfo #"Predicate spec failed"
(fth/transact! [{:_id [:event/id "1"]
:event/id "2"}]))
"Should not allow changing :event/id")
(is (thrown-with-msg? ExceptionInfo #"Predicate spec failed"
(fth/transact! [{:_id [:event/id "1"]
:event/data "2"}]))
"Should not allow changing :event/data")))


(deftest migration-to-3
(let [[conn ledger] (fth/conn+ledger)]
(migrate/migrate-ledger! conn ledger event-log-migrations/migrations :up-to 1)
(run! #(fth/transact! [{:_id :event :event/id %}]) (range 4))
(migrate/migrate-ledger! conn ledger event-log-migrations/migrations :up-to 3)
(is (= '(4 3 2 1) (map #(get % "event/order") (all-events-subjects))))))
Loading

0 comments on commit 420a128

Please sign in to comment.