Skip to content

Commit

Permalink
Merge pull request #2098 from filipesilva/event-log-filter
Browse files Browse the repository at this point in the history
event log filter
filipesilva authored Apr 5, 2022

Verified

This commit was signed with the committer’s verified signature.
TarikGul Tarik Gul
2 parents 5a9507f + 88a98c5 commit 7b738d2
Showing 17 changed files with 647 additions and 184 deletions.
1 change: 1 addition & 0 deletions .carve/ignore
Original file line number Diff line number Diff line change
@@ -39,3 +39,4 @@ athens.common-events.graph.atomic/make-shortcut-move-op
athens.common-events.graph.schema/valid-atomic-op?
athens.common-events.graph.schema/explain-atomic-op
athens.style/unzoom
athens.self-hosted.fluree.test-helpers/query
11 changes: 10 additions & 1 deletion .github/workflows/build.yml
Original file line number Diff line number Diff line change
@@ -24,7 +24,7 @@ jobs:
# Keep version and script up to date!
- uses: DeLaGuardo/setup-clj-kondo@master
with:
version: '2021.08.06'
version: '2022.03.09'

- name: Lint
run: clj-kondo --lint src
@@ -63,6 +63,15 @@ jobs:
- name: Run Karma tests
run: yarn client:test

# TODO: these tests cause the test runner to never exist, so they
# can't be ran on CI. Please run them manually for now.
# See https://github.com/fluree/db/issues/163.
# - name: Start Fluree process
# run: yarn server:fluree

# - name: Run Fluree tests
# run: yarn server:test:fluree


e2e:
runs-on: ubuntu-latest
6 changes: 3 additions & 3 deletions deps.edn
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{:paths ["src/clj" "src/cljs" "src/cljc" "src/js" "src/gen" "test"]

:deps
{org.clojure/clojure #:mvn{:version "1.10.3"}
{org.clojure/clojure #:mvn{:version "1.11.0"}
org.clojure/clojurescript #:mvn{:version "1.10.879"}
org.clojure/tools.cli #:mvn{:version "1.0.206"}
thheller/shadow-cljs #:mvn{:version "2.15.3"}
@@ -67,8 +67,8 @@
:clj-kondo
{:extra-deps {borkdude/clj-kondo
#:git{:url "https://github.com/borkdude/clj-kondo"
:sha "9bb69c3188f7efe6e17d392def89e89ec9d85ae5"
:tag "v2021.08.06"}}
:sha "8937af7f4372c0d2264735ebc1439d0b61030872"
:tag "v2022.03.09"}}
:main-opts ["-m" "clj-kondo.main"]}

:outdated
3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -51,7 +51,8 @@
"server:fluree:wipe": "rm -rf athens-data/fluree",
"server:compile": "clojure -M -e \"(compile 'athens.self-hosted.core)\"",
"server:uberjar": "clojure -M:uberdeps --aliases compiled-classes --main-class athens.self-hosted.core --target target/athens-lan-party-standalone.jar",
"server:test": "clojure -X:test",
"server:test": "clojure -X:test :excludes [:fluree]",
"server:test:fluree": "clojure -X:test :includes [:fluree]",
"server:test:only": "clojure -M:test --var",
"server:repl": "clojure -A:repl",
"server:wipe": "rimraf athens-data/fluree athens-data/datascript",
2 changes: 1 addition & 1 deletion src/clj/athens/self_hosted/components/datascript.clj
Original file line number Diff line number Diff line change
@@ -38,7 +38,7 @@
;; In-memory setups don't use stored events at all
in-memory? event-log/initial-events
;; If we have the last id for persisted db, we can skip all events up to that one.
id (event-log/events fluree id)
id (event-log/events fluree :since-event-id id)
;; Otherwise just load all events.
:else (event-log/events fluree))]
(log/info "Processing" (pr-str id) "with" (common-events/find-event-or-atomic-op-type data))
2 changes: 1 addition & 1 deletion src/clj/athens/self_hosted/components/fluree.clj
Original file line number Diff line number Diff line change
@@ -37,7 +37,7 @@
(log/info "Starting Fluree connection, servers" servers)
(let [comp (create-fluree-comp servers)]
;; Initialize event log.
(event-log/ensure-ledger! comp)
(event-log/init! comp)
(merge component comp))))))


198 changes: 100 additions & 98 deletions src/clj/athens/self_hosted/event_log.clj
Original file line number Diff line number Diff line change
@@ -2,7 +2,9 @@
(:require
[athens.async :as athens.async]
[athens.athens-datoms :as datoms]
[athens.common.utils :as utils]
[athens.self-hosted.event-log-migrations :as event-log-migrations]
[athens.self-hosted.fluree.utils :as fu]
[athens.self-hosted.migrate :as migrate]
[clojure.core.async :as async]
[clojure.data :as data]
[clojure.data.json :as json]
@@ -15,22 +17,14 @@
UUID)))


(def ledger "events/log")
(def default-ledger "events/log")


(def schema
[{:_id :_collection
:_collection/name :event
:_collection/doc "Athens semantic events."}
{:_id :_predicate
:_predicate/name :event/id
:_predicate/doc "A globally unique event id."
:_predicate/unique true
:_predicate/type :string}
{:_id :_predicate
:_predicate/name :event/data
:_predicate/doc "Event data serialized as an EDN string."
:_predicate/type :string}])
(defn fluree-comp->ledger
[fluree]
(-> fluree
:ledger
(or default-ledger)))


(def initial-events
@@ -39,52 +33,57 @@

(defn serialize
[id data]
(assert (uuid? id))
{:_id :event
:event/id (str id)
:event/data (pr-str data)})
:event/data (pr-str data)
;; Compute the new order number as 1 higher than last.
;; This will be 1 if there are no events yet.
;; NOTE: is max-pred-val efficient for very large collections? I don't know.
:event/order "#(inc (max-pred-val \"event/order\"))"})


(defn deserialize
[{id "event/id"
data "event/data"}]
;; In some running ledgers we have "add Welcome page shortcut" that does not have a UUID
;; So we want the backup to be compatible for these previous version of ledgers.
[(if (str/blank? id)
(UUID/randomUUID)
(UUID/fromString id)) (edn/read-string data)])
[(UUID/fromString id)
(edn/read-string data)])


(defn- events-page
"Returns a seq of events in page-number for all events in db split by page-size.
If starting-subject-id is non-nil, only events after that one are returned."
([db page-size page-number]
@(fdb/query db
{:select {"?event" ["*"]}
:where [["?event" "event/id", "?id"]]
;; Subject ID (?event here) is a monotonically incrementing bigint,
;; so ordering by that gives us event insertion order since events are immutable.
:opts {:orderBy ["ASC", "?event"]
:limit page-size
:offset (* page-size page-number)}})))


(defn- event-id->subject-id
"Returns {:next-page ... :items ...}, where items is a vector of events in
page-number for all events in db split by page-size. For use with `iteration`."
([db since-order page-size page-number]
{:next-page (inc page-number)
:items (fu/query db
{:select {"?event" ["*"]}
:where [["?event" "event/id", "?id"]
["?event" "event/order" (str "#(> ?order " since-order ")")]]
;; Subject ID (?event here) is a monotonically incrementing bigint,
;; so ordering by that gives us event insertion order since events are immutable.
:opts {:orderBy ["ASC", "?order"]
:limit page-size
:offset (* page-size page-number)}})}))


(defn- event-id->order
[db event-id]
(first @(fdb/query db {:select "?event"
:where [["?event" "event/id", (str event-id)]]})))
(first (fu/query db {:select "?order"
:where [["?event" "event/id", (str event-id)]
["?event" "event/order" "?order"]]})))


(defn last-event-id
[fluree]
(-> fluree
:conn-atom
deref
(fdb/db ledger)
(fdb/query {:selectOne {"?event" ["*"]}
:where [["?event" "event/id", "?id"]]
:opts {:orderBy ["DESC" "?event"]
:limit 1}})
deref
(fdb/db (fluree-comp->ledger fluree))
(fu/query {:selectOne {"?event" ["*"]}
:where [["?event" "event/id" "?id"]
["?event" "event/order" "?order"]]
:opts {:orderBy ["DESC" "?order"]
:limit 1}})
deserialize
first))

@@ -93,34 +92,27 @@
"Returns a lazy-seq of all events in conn up to now, starting at optional event-id.
Can potentially be very large, so don't hold on to the seq head while
processing, and don't use fns that realize the whole coll (e.g. count)."
([fluree]
(let [db (fdb/db (-> fluree :conn-atom deref) ledger)
f (partial events-page db 100)]
;; TODO: use `iteration` once clojure 1.11.0 is out, much simpler and standard
;; https://github.com/clojure/clojure/blob/master/changes.md#34-iteration
(->> (utils/range-mapcat-while f empty?)
(map deserialize))))
([fluree event-id]
(let [db (fdb/db (-> fluree :conn-atom deref) ledger)
f (partial events-page db 100)]
(when-not (event-id->subject-id db event-id)
(throw (ex-info "Cannot find starting id" {:event-id event-id})))
(->> (utils/range-mapcat-while f empty?)
(map deserialize)
;; The point here is to get all events since event-id.
;; We're getting all the events, dropping every one until the first one we care about,
;; then dropping that one too.
;; This is a terrible way to do what we want.
;; Instead we should filter out all the events we don't care about on the fluree query.
;; But when I (filipe) tried to do that, it made each events-page query take 30s instead of 0.3s.
;; See https://github.com/fluree/db/issues/160
;; This seems good enough for now.
;; TODO: figure out a performant way to do this.
(drop-while (fn [[id]]
(if event-id
(not= event-id id)
false)))
(drop 1)))))
[fluree & {:keys [since-event-id since-order]}]
(let [db (fdb/db (-> fluree :conn-atom deref)
(fluree-comp->ledger fluree))
since-order' (cond
since-order since-order
since-event-id (or (event-id->order db since-event-id)
(throw (ex-info "Cannot find starting id"
{:since-event-id since-event-id})))
;; First order number is 1, so if we start
;; on 0 we will get all events.
:else 0)
step (partial events-page db since-order' 100)]
;; New core fn added in Clojure 11.
;; See https://www.juxt.pro/blog/new-clojure-iteration for usage example.
(->> (iteration step
:kf :next-page
:vf :items
:somef #(-> % :items seq)
:initk 0)
(sequence cat)
(map deserialize))))


(defn double-write?
@@ -148,6 +140,7 @@
(throw (ex-info (str "add-event! timed-out 3 times on " id)
{:id id}))
(let [conn (-> fluree :conn-atom deref)
ledger (fluree-comp->ledger fluree)
ch (fdb/transact-async conn ledger [(serialize id data)])
{:keys [status block]
:as r} (async/<!! (athens.async/with-timeout ch timeout :timed-out))]
@@ -175,26 +168,29 @@
{:id id :response r}))))))))


(defn ensure-ledger!
(defn init!
([fluree]
(ensure-ledger! fluree initial-events))
(init! fluree initial-events))
([fluree seed-events]
(let [conn (-> fluree :conn-atom deref)]
(let [conn (-> fluree :conn-atom deref)
ledger (fluree-comp->ledger fluree)]
(log/info "Looking for event-log fluree ledger")
(when (empty? @(fdb/ledger-info conn ledger))
(let [block (atom nil)]
(log/info "Fluree ledger for event-log not found, creating" ledger)
@(fdb/new-ledger conn ledger)
(fdb/wait-for-ledger-ready conn ledger)
(reset! block (:block @(fdb/transact conn ledger schema)))
(log/info "Populating fresh ledger with initial events...")
(doseq [[id data] seed-events]
(reset! block (add-event! fluree id data)))
(log/info "✅ Populated fresh ledger.")
(log/info "Bringing local ledger to to date with latest transactions...")
(events-page (fdb/db conn ledger {:syncTo @block}) 1 0)
(log/info "✅ Fluree local ledger up to date.")
(log/info "✅ Fluree ledger for event-log created."))))))
(log/info "Fluree ledger for event-log not found, creating" ledger)
@(fdb/new-ledger conn ledger)
(fdb/wait-for-ledger-ready conn ledger)
(migrate/migrate-ledger! conn ledger event-log-migrations/migrations)
(when seed-events
(let [block (atom nil)]
(log/info "Populating fresh ledger with initial events...")
(doseq [[id data] seed-events]
(reset! block (add-event! fluree id data)))
(log/info "✅ Populated fresh ledger.")
(log/info "Bringing local ledger to to date with latest transactions...")
(fu/wait-for-block conn ledger @block)
(log/info "✅ Fluree local ledger up to date.")))
(log/info "✅ Fluree ledger for event-log created."))
(migrate/migrate-ledger! conn ledger event-log-migrations/migrations))))


#_(defn events-since
@@ -244,16 +240,17 @@


(defn- recover-block-events
"Returns a seq of recovered events in conn for block=idx+1 in conn."
"Returns {:stop ... :next-page ... :items ...}, where items is a seq of recovered
events in conn for block=idx+1 in conn. For use with `iteration`."
[conn idx]
(let [res @(fdb/block-query conn ledger {:block (inc idx) :pretty-print true})
(let [res @(fdb/block-query conn default-ledger {:block (inc idx) :pretty-print true})
ex-msg (ex-message res)]
;; If the query because the is higher than the total blocks,
;; result will be an error map instead of seq.
;; Used with range-mapcat-while so we return ::stop to stop the iteration instead.
(if (and ex-msg (str/starts-with? ex-msg "Start block is out of range for this ledger."))
::stop
(get-block-txs res))))
{:stop true}
{:next-page (inc idx)
:items (get-block-txs res)})))


(defn recovered-events
@@ -262,8 +259,14 @@
Can potentially be very large, so don't hold on to the seq head while
processing, and don't use fns that realize the whole coll (e.g. count)."
[fluree]
(let [f (partial recover-block-events (-> fluree :conn-atom deref))]
(map deserialize (utils/range-mapcat-while f #(= % ::stop)))))
(let [step (partial recover-block-events (-> fluree :conn-atom deref))]
(->> (iteration step
:kf :next-page
:vf :items
:somef #(-> % :stop not)
:initk 0)
(sequence cat)
(map deserialize))))


(comment
@@ -280,7 +283,7 @@
((:reconnect-fn fluree-comp))

;; Create ledger if not present.
(ensure-ledger! fluree-comp)
(init! fluree-comp)

;; What's the first event in the ledger?
(take 1 (events fluree-comp))
@@ -289,7 +292,7 @@
(take 1 (events fluree-comp (UUID/fromString "e6dad544-ef29-43b5-911e-9c4bfdca3fda")))

;; Add a few events.
(def my-events [["uuid-2" [1 2 3]]
(def my-events [["uuid-1" [1 2 3]]
["uuid-2" [4 5 6]]
["uuid-3" [7 8 9]]])

@@ -308,7 +311,6 @@
(count (recovered-events fluree-comp))

;; Debug event recovery
(events-page (fdb/db (-> fluree-comp :conn-atom deref) ledger) 1 1)
(recover-block-events (-> fluree-comp :conn-atom deref) 3)
(take 3 (recovered-events fluree-comp))
(take 3 (events fluree-comp))
@@ -318,4 +320,4 @@
(take 3 (events fluree-comp)))

;; Delete ledger.
@(fdb/delete-ledger (-> fluree-comp :conn-atom deref) ledger))
@(fdb/delete-ledger (-> fluree-comp :conn-atom deref) default-ledger))
Loading

0 comments on commit 7b738d2

Please sign in to comment.