Skip to content

Commit

Permalink
Cursor: fix apply with stateful transducer
Browse files Browse the repository at this point in the history
  • Loading branch information
cnautze committed Aug 26, 2024
1 parent 6e9d1fc commit 40d1316
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 13 deletions.
25 changes: 15 additions & 10 deletions src/exoscale/vinyl/cursor.clj
Original file line number Diff line number Diff line change
Expand Up @@ -32,16 +32,21 @@
(AsyncUtil/whileTrue
(reify Supplier
(get [_]
(-> cursor
.onNext
(.thenApply
(fn/make-fun
(fn [^RecordCursorResult result]
(when (ifn? cont-fn)
(-> result .getContinuation .toBytes cont-fn))
(let [next? (.hasNext result)
new-acc (when next? (swap! acc reducer (.get result)))]
(and (not (reduced? new-acc)) next?))))))))
(try
(-> cursor
.onNext
(.thenApply
(fn/make-fun
(fn [^RecordCursorResult result]
(when (ifn? cont-fn)
(-> result .getContinuation .toBytes cont-fn))
(let [next? (.hasNext result)
new-acc (when next? (swap! acc reducer (.get result)))]
(and (not (reduced? new-acc)) next?))))))
(catch Exception e
(when (some? xform)
(reducer @acc))
(throw e)))))
(.getExecutor cursor))
(fn/make-fun (fn [_]
(unreduced
Expand Down
43 changes: 40 additions & 3 deletions test/exoscale/vinyl/cursor_test.clj
Original file line number Diff line number Diff line change
@@ -1,14 +1,39 @@
(ns exoscale.vinyl.cursor-test
(:require [clojure.test :refer [deftest are]]
(:require [clojure.test :refer [deftest are is]]
[exoscale.vinyl.cursor :refer [apply-transforms]]
[exoscale.vinyl.store :as store])
(:import com.apple.foundationdb.record.RecordCursor))
[exoscale.vinyl.demostore :as ds :refer [*db*]]
[exoscale.vinyl.store :as store])
(:import [com.apple.foundationdb.record RecordCursor]
[com.apple.foundationdb FDBException]
[java.util Iterator]))

(defn from-list
"Transform a list of items to a `RecordCursor` instance"
[items]
(RecordCursor/fromList (seq items)))

(defn from-iterator
[iterator]
(RecordCursor/fromIterator iterator))

(defrecord FaultyIterator [items pos error-index]
Iterator
(forEachRemaining [_ _action])
(hasNext [_]
(< @pos (count items)))
(next [_]
(if (= @pos @error-index)
(do
(vreset! error-index -1)
(throw (FDBException. "Retryable error" 1007)))
(let [result (nth items @pos)]
(vswap! pos inc)
result)))
(remove [_]))

(defn make-faulty-iterator [items error-index]
(->FaultyIterator items (volatile! 0) (volatile! error-index)))

(defn reduce-plus
[x y]
(let [acc (+ x y)]
Expand All @@ -30,3 +55,15 @@
::store/reduce-init init}))
[0 1 2 3 4 5 6] + 0 28
[0 1 2 3 4 5 6] (completing reduce-plus) 0 15))

(deftest stateful-transducer-test
(let [processed (atom [])]
(is (= [[1 2 3] [4 5] [6 7 8] [9]]
(ds/with-build-fdb
(fn [] (let [cursor (from-iterator (make-faulty-iterator [1 2 3 4 5 6 7 8 9] 5))]
@(store/run-async *db* (fn [_store] (apply-transforms
cursor
{::store/reducer (completing (fn [_acc items] (swap! processed conj items)))
::store/reduce-init []
::store/transducer (partition-all 3)})))
@processed)))))))

0 comments on commit 40d1316

Please sign in to comment.