Skip to content

Commit

Permalink
Merge pull request #271 from WilliamParker/issue249-2
Browse files Browse the repository at this point in the history
Issue 249: Introduce experimental support for insertions and retracti…
  • Loading branch information
WilliamParker authored Mar 10, 2017
2 parents e1cdd75 + 56233da commit ab9f6c0
Show file tree
Hide file tree
Showing 5 changed files with 448 additions and 80 deletions.
15 changes: 12 additions & 3 deletions src/main/clojure/clara/rules.cljc
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,18 @@
state and will not be re-fired unless facts affecting the rule are added or retracted.
This function does not modify the given session to mark rules as fired. Instead, it returns
a new session in which the rules are marked as fired."

[session] (eng/fire-rules session))
a new session in which the rules are marked as fired.
This take an additional map of options as a second argument. Current options:
:cancelling true (EXPERIMENTAL, subject to change/removal. Not supported in ClojureScript.):
Simultaneously propagate insertions and retractions through the rules network, at every step using the insertion and retractions of equals facts to cancel each
other out and avoid operations deeper in the rules network. The behavior of unconditional insertions and RHS (right-hand side) retractions
is undefined when this option is enabled and this option should not be used when calling fire-rules can result in these operations.
Note that this is purely a performance optimization and no guarantees are made at this time on whether a given rule's RHS will be called.
When this option is used rule RHS code that is executed shouldn't do anything that impacts state other than perform logical insertions."
([session] (eng/fire-rules session {}))
([session opts] (eng/fire-rules session opts)))

(defn query
"Runs the given query with the optional given parameters against the session.
Expand Down
204 changes: 129 additions & 75 deletions src/main/clojure/clara/rules/engine.cljc
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@
[clojure.string :as string]
[clara.rules.memory :as mem]
[clara.rules.listener :as l]
[clara.rules.platform :as platform]))
[clara.rules.platform :as platform]
[clara.rules.update-cache.core :as uc]
#?(:clj [clara.rules.update-cache.cancelling :as ca])))

;; The accumulator is a Rete extension to run an accumulation (such as sum, average, or similar operation)
;; over a collection of values passing through the Rete network. This object defines the behavior
Expand Down Expand Up @@ -72,14 +74,19 @@
;; Returns a new session with the additional facts inserted.
(defprotocol ISession

;; Inserts a fact.
(insert [session fact])
;; Inserts facts.
(insert [session facts])

;; Retracts a fact.
(retract [session fact])
;; Retracts facts.
(retract [session facts])

;; Fires pending rules and returns a new session where they are in a fired state.
(fire-rules [session])
;;
;; Note that clara.rules/fire-rules, the public API for these methods, will handle
;; calling the two-arg fire-rules with an empty map itself, but we add handle it in the fire-rules implementation
;; as well in case anyone is directly calling the fire-rules protocol function or interface method on the LocalSession.
;; The two-argument version of fire-rules was added for issue 249.
(fire-rules [session] [session opts])

;; Runs a query agains thte session.
(query [session query params])
Expand Down Expand Up @@ -163,7 +170,6 @@
(alpha-activate [node facts memory transport listener])
(alpha-retract [node facts memory transport listener]))


;; Protocol for getting the type (e.g. :production and :query) and name of a
;; terminal node.
(defprotocol ITerminalNode
Expand Down Expand Up @@ -192,10 +198,6 @@
(map get-conditions-and-rule-names)
(reduce (partial merge-with conj) {})))))


;; Record indicating pending insertion or removal of a sequence of facts.
(defrecord PendingUpdate [type facts])

;; Active session during rule execution.
(def ^:dynamic *current-session* nil)

Expand Down Expand Up @@ -234,15 +236,12 @@
[current-session]
(letfn [(flush-all [current-session flushed-items?]
(let [{:keys [rulebase transient-memory transport insertions get-alphas-fn listener]} current-session
pending-updates @(:pending-updates current-session)]

;; Remove the facts here so they are re-inserted if we flush recursively.
(reset! (:pending-updates current-session) [])
pending-updates (-> current-session :pending-updates uc/get-updates-and-reset!)]

(if (empty? pending-updates)
flushed-items?
(do
(doseq [partition (partition-by :type pending-updates)
(doseq [partition pending-updates
:let [facts (mapcat :facts partition)]
[alpha-roots fact-group] (get-alphas-fn facts)
root alpha-roots]
Expand All @@ -251,7 +250,7 @@
(alpha-activate root fact-group transient-memory transport listener)
(alpha-retract root fact-group transient-memory transport listener)))

;; There may be new :pending-updates due to the flush just
;; There may be new pending updates due to the flush just
;; made. So keep flushing until there are none left. Items
;; were flushed though, so flush-items? is now true.
(flush-all current-session true)))))]
Expand Down Expand Up @@ -307,12 +306,12 @@
(mem/add-insertions! transient-memory node token facts)
(l/insert-facts-logical! listener node token facts)))

(swap! (:pending-updates *current-session*) into [(->PendingUpdate :insert facts)])))
(-> *current-session* :pending-updates (uc/add-insertions! facts))))

(defn retract-facts!
"Perform the fact retraction."
[facts]
(swap! (:pending-updates *current-session*) into [(->PendingUpdate :retract facts)]))
(-> *current-session* :pending-updates (uc/add-retractions! facts)))

;; Record for the production node in the Rete network.
(defrecord ProductionNode [id production rhs]
Expand Down Expand Up @@ -1655,13 +1654,13 @@

(defn fire-rules*
"Fire rules for the given nodes."
[rulebase nodes transient-memory transport listener get-alphas-fn]
[rulebase nodes transient-memory transport listener get-alphas-fn update-cache]
(binding [*current-session* {:rulebase rulebase
:transient-memory transient-memory
:transport transport
:insertions (atom 0)
:get-alphas-fn get-alphas-fn
:pending-updates (atom [])
:pending-updates update-cache
:listener listener}]

(loop [next-group (mem/next-activation-group transient-memory)
Expand Down Expand Up @@ -1752,15 +1751,15 @@
ISession
(insert [session facts]

(let [new-pending-operations (conj pending-operations (->PendingUpdate :insertion
;; Preserve the behavior prior to https://github.com/cerner/clara-rules/issues/268
;; , particularly for the Java API, where the caller could freely mutate a
;; collection of facts after passing it to Clara for the constituent
;; facts to be inserted or retracted. If the caller passes a persistent
;; Clojure collection don't do any additional work.
(if (coll? facts)
facts
(into [] facts))))]
(let [new-pending-operations (conj pending-operations (uc/->PendingUpdate :insertion
;; Preserve the behavior prior to https://github.com/cerner/clara-rules/issues/268
;; , particularly for the Java API, where the caller could freely mutate a
;; collection of facts after passing it to Clara for the constituent
;; facts to be inserted or retracted. If the caller passes a persistent
;; Clojure collection don't do any additional work.
(if (coll? facts)
facts
(into [] facts))))]

(LocalSession. rulebase
memory
Expand All @@ -1771,11 +1770,11 @@

(retract [session facts]

(let [new-pending-operations (conj pending-operations (->PendingUpdate :retraction
;; As in insert above defend against facts being a mutable collection.
(if (coll? facts)
facts
(into [] facts))))]
(let [new-pending-operations (conj pending-operations (uc/->PendingUpdate :retraction
;; As in insert above defend against facts being a mutable collection.
(if (coll? facts)
facts
(into [] facts))))]

(LocalSession. rulebase
memory
Expand All @@ -1784,52 +1783,107 @@
get-alphas-fn
new-pending-operations)))

(fire-rules [session]
;; Prior to issue 249 we only had a one-argument fire-rules method. clara.rules/fire-rules will always call the two-argument method now
;; but we kept a one-argument version of the fire-rules in case anyone is calling the fire-rules protocol function or method on the session directly.
(fire-rules [session] (fire-rules session {}))
(fire-rules [session opts]

(let [transient-memory (mem/to-transient memory)
transient-listener (l/to-transient listener)]

;; We originally performed insertions and retractions immediately after the insert and retract calls,
;; but this had the downside of making a pattern like "Retract facts, insert other facts, and fire the rules"
;; perform at least three transitions between a persistent and transient memory. Delaying the actual execution
;; of the insertions and retractions until firing the rules allows us to cut this down to a single transition
;; between persistent and transient memory. There is some cost to the runtime dispatch on operation types here,
;; but this is presumably less significant than the cost of memory transitions.
;;
;; We perform the insertions and retractions in the same order as they were applied to the session since
;; if a fact is not in the session, retracted, and then subsequently inserted it should be in the session at
;; the end.
(doseq [{op-type :type facts :facts} pending-operations]

(case op-type

:insertion
(do
(l/insert-facts! transient-listener facts)

(binding [*pending-external-retractions* (atom [])]
;; Bind the external retractions cache so that any logical retractions as a result
;; of these insertions can be cached and executed as a batch instead of eagerly realizing
;; them. An external insertion of a fact that matches
;; a negation or accumulator condition can cause logical retractions.
(doseq [[alpha-roots fact-group] (get-alphas-fn facts)
root alpha-roots]
(alpha-activate root fact-group transient-memory transport transient-listener))
(external-retract-loop get-alphas-fn transient-memory transport transient-listener)))

:retraction
(do
(l/retract-facts! transient-listener facts)
(if-not (:cancelling opts)
;; We originally performed insertions and retractions immediately after the insert and retract calls,
;; but this had the downside of making a pattern like "Retract facts, insert other facts, and fire the rules"
;; perform at least three transitions between a persistent and transient memory. Delaying the actual execution
;; of the insertions and retractions until firing the rules allows us to cut this down to a single transition
;; between persistent and transient memory. There is some cost to the runtime dispatch on operation types here,
;; but this is presumably less significant than the cost of memory transitions.
;;
;; We perform the insertions and retractions in the same order as they were applied to the session since
;; if a fact is not in the session, retracted, and then subsequently inserted it should be in the session at
;; the end.
(do
(doseq [{op-type :type facts :facts} pending-operations]

(binding [*pending-external-retractions* (atom facts)]
(external-retract-loop get-alphas-fn transient-memory transport transient-listener)))))
(case op-type

(fire-rules* rulebase
(:production-nodes rulebase)
transient-memory
transport
transient-listener
get-alphas-fn)
:insertion
(do
(l/insert-facts! transient-listener facts)

(binding [*pending-external-retractions* (atom [])]
;; Bind the external retractions cache so that any logical retractions as a result
;; of these insertions can be cached and executed as a batch instead of eagerly realizing
;; them. An external insertion of a fact that matches
;; a negation or accumulator condition can cause logical retractions.
(doseq [[alpha-roots fact-group] (get-alphas-fn facts)
root alpha-roots]
(alpha-activate root fact-group transient-memory transport transient-listener))
(external-retract-loop get-alphas-fn transient-memory transport transient-listener)))

:retraction
(do
(l/retract-facts! transient-listener facts)

(binding [*pending-external-retractions* (atom facts)]
(external-retract-loop get-alphas-fn transient-memory transport transient-listener)))))

(fire-rules* rulebase
(:production-nodes rulebase)
transient-memory
transport
transient-listener
get-alphas-fn
(uc/get-ordered-update-cache)))

#?(:cljs (throw (ex-info "The :cancelling option is not supported in ClojureScript"
{:session session :opts opts}))

:clj (let [insertions (sequence
(comp (filter (fn [pending-op]
(= (:type pending-op)
:insertion)))
(mapcat :facts))
pending-operations)

retractions (sequence
(comp (filter (fn [pending-op]
(= (:type pending-op)
:retraction)))
(mapcat :facts))
pending-operations)

update-cache (ca/get-cancelling-update-cache)]

(binding [*current-session* {:rulebase rulebase
:transient-memory transient-memory
:transport transport
:insertions (atom 0)
:get-alphas-fn get-alphas-fn
:pending-updates update-cache
:listener transient-listener}]

;; Insertions should come before retractions so that if we insert and then retract the same
;; fact that is not already in the session the end result will be that the session won't have that fact.
;; If retractions came first then we'd first retract a fact that isn't in the session, which doesn't do anything,
;; and then later we would insert the fact.
(doseq [[alpha-roots fact-group] (get-alphas-fn insertions)
root alpha-roots]
(alpha-activate root fact-group transient-memory transport transient-listener))

(doseq [[alpha-roots fact-group] (get-alphas-fn retractions)
root alpha-roots]
(alpha-retract root fact-group transient-memory transport transient-listener))

(fire-rules* rulebase
(:production-nodes rulebase)
transient-memory
transport
transient-listener
get-alphas-fn
;; This continues to use the cancelling cache after the first batch of insertions and retractions.
;; If this is suboptimal for some workflows we can revisit this.
update-cache)))))

(LocalSession. rulebase
(mem/to-persistent! transient-memory)
Expand Down
Loading

0 comments on commit ab9f6c0

Please sign in to comment.