Skip to content

Commit

Permalink
add single transaction support for long range transduce
Browse files Browse the repository at this point in the history
  • Loading branch information
cnautze committed Jan 9, 2025
1 parent 6e9d1fc commit fbd857b
Showing 1 changed file with 19 additions and 6 deletions.
25 changes: 19 additions & 6 deletions src/exoscale/vinyl/store.clj
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,15 @@
AutoCloseable
(close [_] (.close runner)))))

(defn wrapped-record-store [^FDBRecordStore store]
(reify
DatabaseContext
(get-metadata [_] store)
(new-runner [_] (throw (UnsupportedOperationException. "new-runner not supported for wrapped FDBRecordStore")))
(run-async [_ _] (throw (UnsupportedOperationException. "run-async not supported for wrapped FDBRecordStore")))
(run-in-context [_ f]
(f store))))

(def start component/start)
(def stop component/stop)

Expand Down Expand Up @@ -583,16 +592,20 @@
;; When interrupted, the function will be retried, which pops the last seen
;; continuation.
;;
[db xform f val continuing-fn transduce-fn]
([db xform f val continuing-fn transduce-fn]
(continuation-traversing-transduce db xform f val nil continuing-fn transduce-fn))
([db xform f val transaction-fn continuing-fn transduce-fn]
(let [cont (atom nil)
result (atom val)
runner (wrapped-runner db runner-params)]
(-> (run-async
runner
(fn [^FDBRecordStore store]
(when (some? transaction-fn)
(transaction-fn (wrapped-record-store store)))
(-> (continuing-fn store @cont)
(transduce-fn xform f result #(reset! cont %)))))
(fn/close-on-complete runner))))
(fn/close-on-complete runner)))))

(defn- get-range-fn [^TupleRange tuple-range {::keys [limit]}]
(fn [^FDBRecordStore store ^bytes cont]
Expand All @@ -609,20 +622,20 @@
(.getRange transaction (to-range store tuple-range) ^int limit)
(.getRange transaction (to-range store tuple-range))))))

(defn- scan-records-transduce [db xform f val record-type items {::keys [continuation] :as opts}]
(defn- scan-records-transduce [db xform f val record-type items {::keys [continuation transaction-fn] :as opts}]
(let [range (continuation-range db record-type items continuation)
props (scan-properties opts)]
(continuation-traversing-transduce
db xform f val
db xform f val transaction-fn
(fn [^FDBRecordStore store ^bytes cont]
(.scanRecords store range cont props))
cursor/apply-transduce)))

(defn- get-range-transduce [db xform f val record-type items {::keys [continuation] :as opts}]
(defn- get-range-transduce [db xform f val record-type items {::keys [continuation transaction-fn] :as opts}]
(let [range (continuation-range db record-type items continuation)
continuing-fn (get-range-fn range opts)]
(continuation-traversing-transduce
db xform f val continuing-fn
db xform f val transaction-fn continuing-fn
cursor/apply-iterable-transduce)))

(defn long-range-transduce
Expand Down

0 comments on commit fbd857b

Please sign in to comment.