From 40d13169f969462dd65e812f94d9df3e304eb336 Mon Sep 17 00:00:00 2001 From: Christian Nautze Date: Mon, 26 Aug 2024 21:08:28 +0200 Subject: [PATCH] Cursor: fix apply with stateful transducer --- src/exoscale/vinyl/cursor.clj | 25 ++++++++++------- test/exoscale/vinyl/cursor_test.clj | 43 +++++++++++++++++++++++++++-- 2 files changed, 55 insertions(+), 13 deletions(-) diff --git a/src/exoscale/vinyl/cursor.clj b/src/exoscale/vinyl/cursor.clj index cc96821..37d502d 100644 --- a/src/exoscale/vinyl/cursor.clj +++ b/src/exoscale/vinyl/cursor.clj @@ -32,16 +32,21 @@ (AsyncUtil/whileTrue (reify Supplier (get [_] - (-> cursor - .onNext - (.thenApply - (fn/make-fun - (fn [^RecordCursorResult result] - (when (ifn? cont-fn) - (-> result .getContinuation .toBytes cont-fn)) - (let [next? (.hasNext result) - new-acc (when next? (swap! acc reducer (.get result)))] - (and (not (reduced? new-acc)) next?)))))))) + (try + (-> cursor + .onNext + (.thenApply + (fn/make-fun + (fn [^RecordCursorResult result] + (when (ifn? cont-fn) + (-> result .getContinuation .toBytes cont-fn)) + (let [next? (.hasNext result) + new-acc (when next? (swap! acc reducer (.get result)))] + (and (not (reduced? new-acc)) next?)))))) + (catch Exception e + (when (some? xform) + (reducer @acc)) + (throw e))))) (.getExecutor cursor)) (fn/make-fun (fn [_] (unreduced diff --git a/test/exoscale/vinyl/cursor_test.clj b/test/exoscale/vinyl/cursor_test.clj index 8bf5740..80183cc 100644 --- a/test/exoscale/vinyl/cursor_test.clj +++ b/test/exoscale/vinyl/cursor_test.clj @@ -1,14 +1,39 @@ (ns exoscale.vinyl.cursor-test - (:require [clojure.test :refer [deftest are]] + (:require [clojure.test :refer [deftest are is]] [exoscale.vinyl.cursor :refer [apply-transforms]] - [exoscale.vinyl.store :as store]) - (:import com.apple.foundationdb.record.RecordCursor)) + [exoscale.vinyl.demostore :as ds :refer [*db*]] + [exoscale.vinyl.store :as store]) + (:import [com.apple.foundationdb.record RecordCursor] + [com.apple.foundationdb FDBException] + [java.util Iterator])) (defn from-list "Transform a list of items to a `RecordCursor` instance" [items] (RecordCursor/fromList (seq items))) +(defn from-iterator + [iterator] + (RecordCursor/fromIterator iterator)) + +(defrecord FaultyIterator [items pos error-index] + Iterator + (forEachRemaining [_ _action]) + (hasNext [_] + (< @pos (count items))) + (next [_] + (if (= @pos @error-index) + (do + (vreset! error-index -1) + (throw (FDBException. "Retryable error" 1007))) + (let [result (nth items @pos)] + (vswap! pos inc) + result))) + (remove [_])) + +(defn make-faulty-iterator [items error-index] + (->FaultyIterator items (volatile! 0) (volatile! error-index))) + (defn reduce-plus [x y] (let [acc (+ x y)] @@ -30,3 +55,15 @@ ::store/reduce-init init})) [0 1 2 3 4 5 6] + 0 28 [0 1 2 3 4 5 6] (completing reduce-plus) 0 15)) + +(deftest stateful-transducer-test + (let [processed (atom [])] + (is (= [[1 2 3] [4 5] [6 7 8] [9]] + (ds/with-build-fdb + (fn [] (let [cursor (from-iterator (make-faulty-iterator [1 2 3 4 5 6 7 8 9] 5))] + @(store/run-async *db* (fn [_store] (apply-transforms + cursor + {::store/reducer (completing (fn [_acc items] (swap! processed conj items))) + ::store/reduce-init [] + ::store/transducer (partition-all 3)}))) + @processed)))))))