Skip to content

Commit

Permalink
Merge pull request #269 from WilliamParker/issue268
Browse files Browse the repository at this point in the history
Issue 268: Cache insertions and retractions until the rules are fired…
  • Loading branch information
WilliamParker authored Feb 24, 2017
2 parents bc20e39 + 2698b6f commit e1cdd75
Show file tree
Hide file tree
Showing 8 changed files with 345 additions and 128 deletions.
11 changes: 8 additions & 3 deletions src/main/clojure/clara/rules.cljc
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@
This function does not modify the given session to mark rules as fired. Instead, it returns
a new session in which the rules are marked as fired."
[session]
(eng/fire-rules session))

[session] (eng/fire-rules session))

(defn query
"Runs the given query with the optional given parameters against the session.
Expand Down Expand Up @@ -238,7 +238,12 @@

;; ClojureScript implementation doesn't support salience yet, so
;; no activation group functions are used.
(eng/LocalSession. rulebase (eng/local-memory rulebase transport activation-group-sort-fn activation-group-fn get-alphas-fn) transport listener get-alphas-fn))))
(eng/LocalSession. rulebase
(eng/local-memory rulebase transport activation-group-sort-fn activation-group-fn get-alphas-fn)
transport
listener
get-alphas-fn
[]))))

#?(:clj
(extend-type clojure.lang.Symbol
Expand Down
92 changes: 63 additions & 29 deletions src/main/clojure/clara/rules/engine.cljc
Original file line number Diff line number Diff line change
Expand Up @@ -1748,50 +1748,82 @@
(when (flush-updates *current-session*)
(recur (mem/next-activation-group transient-memory) next-group))))))

(deftype LocalSession [rulebase memory transport listener get-alphas-fn]
(deftype LocalSession [rulebase memory transport listener get-alphas-fn pending-operations]
ISession
(insert [session facts]
(let [transient-memory (mem/to-transient memory)
transient-listener (l/to-transient listener)]

(l/insert-facts! transient-listener facts)

(binding [*pending-external-retractions* (atom [])]
;; Bind the external retractions cache so that any logical retractions as a result
;; of these insertions can be cached and executed as a batch instead of eagerly realizing
;; them. An external insertion of a fact that matches
;; a negation or accumulator condition can cause logical retractions.
(doseq [[alpha-roots fact-group] (get-alphas-fn facts)
root alpha-roots]
(alpha-activate root fact-group transient-memory transport transient-listener))
(external-retract-loop get-alphas-fn transient-memory transport transient-listener))
(let [new-pending-operations (conj pending-operations (->PendingUpdate :insertion
;; Preserve the behavior prior to https://github.com/cerner/clara-rules/issues/268
;; , particularly for the Java API, where the caller could freely mutate a
;; collection of facts after passing it to Clara for the constituent
;; facts to be inserted or retracted. If the caller passes a persistent
;; Clojure collection don't do any additional work.
(if (coll? facts)
facts
(into [] facts))))]

(LocalSession. rulebase
(mem/to-persistent! transient-memory)
memory
transport
(l/to-persistent! transient-listener)
get-alphas-fn)))
listener
get-alphas-fn
new-pending-operations)))

(retract [session facts]

(let [transient-memory (mem/to-transient memory)
transient-listener (l/to-transient listener)]

(l/retract-facts! transient-listener facts)

(binding [*pending-external-retractions* (atom facts)]
(external-retract-loop get-alphas-fn transient-memory transport transient-listener))
(let [new-pending-operations (conj pending-operations (->PendingUpdate :retraction
;; As in insert above defend against facts being a mutable collection.
(if (coll? facts)
facts
(into [] facts))))]

(LocalSession. rulebase
(mem/to-persistent! transient-memory)
memory
transport
(l/to-persistent! transient-listener)
get-alphas-fn)))
listener
get-alphas-fn
new-pending-operations)))

(fire-rules [session]

(let [transient-memory (mem/to-transient memory)
transient-listener (l/to-transient listener)]

;; We originally performed insertions and retractions immediately after the insert and retract calls,
;; but this had the downside of making a pattern like "Retract facts, insert other facts, and fire the rules"
;; perform at least three transitions between a persistent and transient memory. Delaying the actual execution
;; of the insertions and retractions until firing the rules allows us to cut this down to a single transition
;; between persistent and transient memory. There is some cost to the runtime dispatch on operation types here,
;; but this is presumably less significant than the cost of memory transitions.
;;
;; We perform the insertions and retractions in the same order as they were applied to the session since
;; if a fact is not in the session, retracted, and then subsequently inserted it should be in the session at
;; the end.
(doseq [{op-type :type facts :facts} pending-operations]

(case op-type

:insertion
(do
(l/insert-facts! transient-listener facts)

(binding [*pending-external-retractions* (atom [])]
;; Bind the external retractions cache so that any logical retractions as a result
;; of these insertions can be cached and executed as a batch instead of eagerly realizing
;; them. An external insertion of a fact that matches
;; a negation or accumulator condition can cause logical retractions.
(doseq [[alpha-roots fact-group] (get-alphas-fn facts)
root alpha-roots]
(alpha-activate root fact-group transient-memory transport transient-listener))
(external-retract-loop get-alphas-fn transient-memory transport transient-listener)))

:retraction
(do
(l/retract-facts! transient-listener facts)

(binding [*pending-external-retractions* (atom facts)]
(external-retract-loop get-alphas-fn transient-memory transport transient-listener)))))

(fire-rules* rulebase
(:production-nodes rulebase)
transient-memory
Expand All @@ -1803,7 +1835,8 @@
(mem/to-persistent! transient-memory)
transport
(l/to-persistent! transient-listener)
get-alphas-fn)))
get-alphas-fn
[])))

;; TODO: queries shouldn't require the use of transient memory.
(query [session query params]
Expand Down Expand Up @@ -1849,7 +1882,8 @@
(if (> (count listeners) 0)
(l/delegating-listener listeners)
l/default-listener)
get-alphas-fn))
get-alphas-fn
[]))

(defn local-memory
"Returns a local, in-process working memory."
Expand Down
64 changes: 45 additions & 19 deletions src/test/clojure/clara/test_accumulators.clj
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,15 @@
session (-> empty-session
(insert (->Temperature 30 "MCI"))
(insert (->Temperature 10 "MCI"))
(insert (->Temperature 80 "MCI")))
(insert (->Temperature 80 "MCI"))
fire-rules)

min-retracted (retract session (->Temperature 10 "MCI"))
max-retracted (retract session (->Temperature 80 "MCI"))]
min-retracted (-> session
(retract (->Temperature 10 "MCI"))
fire-rules)
max-retracted (-> session
(retract (->Temperature 80 "MCI"))
fire-rules)]

(is (empty (query empty-session coldest)))
(is (= {:?t 10} (first (query session coldest))))
Expand All @@ -86,9 +91,12 @@
session (-> (mk-session [sum])
(insert (->Temperature 30 "MCI"))
(insert (->Temperature 10 "MCI"))
(insert (->Temperature 80 "MCI")))
(insert (->Temperature 80 "MCI"))
fire-rules)

retracted (retract session (->Temperature 30 "MCI"))]
retracted (-> session
(retract (->Temperature 30 "MCI"))
fire-rules)]

(is (= {:?t 120} (first (query session sum))))
(is (= {:?t 90} (first (query retracted sum))))))
Expand All @@ -99,9 +107,12 @@
session (-> (mk-session [count])
(insert (->Temperature 30 "MCI"))
(insert (->Temperature 10 "MCI"))
(insert (->Temperature 80 "MCI")))
(insert (->Temperature 80 "MCI"))
fire-rules)

retracted (retract session (->Temperature 30 "MCI"))]
retracted (-> session
(retract (->Temperature 30 "MCI"))
fire-rules)]

(is (= {:?c 3} (first (query session count))))
(is (= {:?c 2} (first (query retracted count))))))
Expand All @@ -113,9 +124,12 @@
session (-> (mk-session [distinct distinct-field])
(insert (->Temperature 80 "MCI"))
(insert (->Temperature 80 "MCI"))
(insert (->Temperature 90 "MCI")))
(insert (->Temperature 90 "MCI"))
fire-rules)

retracted (retract session (->Temperature 90 "MCI"))]
retracted (-> session
(retract (->Temperature 90 "MCI"))
fire-rules)]

(is (= #{{:?t #{ (->Temperature 80 "MCI")
(->Temperature 90 "MCI")}}}
Expand All @@ -139,7 +153,8 @@
session (-> (mk-session [max-min-avg])
(insert (->Temperature 30 "MCI")
(->Temperature 10 "MCI")
(->Temperature 80 "MCI")))]
(->Temperature 80 "MCI"))
fire-rules)]

(is (= {:?max 80 :?min 10 :?avg 40} (first (query session max-min-avg))))))

Expand All @@ -156,7 +171,8 @@
[?c <- (acc/count) from [Temperature (= ?loc location)]]])

session (-> (mk-session [count])
(insert (->WindSpeed 20 "MCI")))]
(insert (->WindSpeed 20 "MCI"))
fire-rules)]

(is (= {:?c 0 :?loc "MCI"} (first (query session count))))))

Expand All @@ -169,7 +185,8 @@
[WindSpeed (> windspeed 10) (= ?loc location)]])

session (-> (mk-session [count])
(insert (->WindSpeed 20 "MCI")))]
(insert (->WindSpeed 20 "MCI"))
fire-rules)]

(is (= {:?c 0 :?loc "MCI"} (first (query session count))))))

Expand All @@ -182,7 +199,8 @@
(insert (->WindSpeed 20 "MCI")
(->WindSpeed 20 "SFO")
(->Temperature 40 "SFO")
(->Temperature 50 "SFO")))]
(->Temperature 50 "SFO"))
fire-rules)]

;; Zero count at MCI, since no temperatures were inserted.
(is (= {:?c 0 :?loc "MCI"} (first (query session count :?loc "MCI"))))
Expand Down Expand Up @@ -288,9 +306,12 @@
session (-> (mk-session [max-query])
(insert (->Temperature 30 "MCI"))
(insert (->Temperature 10 "MCI"))
(insert (->Temperature 80 "MCI")))
(insert (->Temperature 80 "MCI"))
fire-rules)

retracted (retract session (->Temperature 80 "MCI"))]
retracted (-> session
(retract (->Temperature 80 "MCI"))
fire-rules)]

(is (= {:?t (->Temperature 80 "MCI")} (first (query session max-query))))
(is (= {:?t (->Temperature 30 "MCI")} (first (query retracted max-query))))))
Expand All @@ -308,9 +329,12 @@
session (-> (mk-session [sum])
(insert (->Temperature 30 "MCI"))
(insert (->Temperature 10 "MCI"))
(insert (->Temperature 80 "MCI")))
(insert (->Temperature 80 "MCI"))
fire-rules)

retracted (retract session (->Temperature 30 "MCI"))]
retracted (-> session
(retract (->Temperature 30 "MCI"))
fire-rules)]

(is (= {:?t 120} (first (query session sum))))
(is (= {:?t 90} (first (query retracted sum))))))
Expand All @@ -334,11 +358,13 @@
(insert-all [(->Temperature 30 "MCI")
(->Temperature 10 "MCI")
(->Temperature 80 "MCI")
(->Temperature 80 "MCI")]))
(->Temperature 80 "MCI")])
fire-rules)

retracted-session (-> session
(retract (->Temperature 80 "MCI")
(->Temperature 80 "MCI")))]
(->Temperature 80 "MCI"))
fire-rules)]

(testing "grouping-accum"
(is (= {:?t {30 [(->Temperature 30 "MCI")]
Expand Down
Loading

0 comments on commit e1cdd75

Please sign in to comment.