diff --git a/aioreactive/transform.py b/aioreactive/transform.py index 8a5eeab..66cf1ac 100644 --- a/aioreactive/transform.py +++ b/aioreactive/transform.py @@ -511,27 +511,3 @@ def scan_async( """ return _scan(accumulator, initial) - -def reduce( - accumulator: Callable[[_TResult, _TSource], _TResult], - initial: _TResult, -) -> Callable[[AsyncObservable[_TSource]], AsyncObservable[_TResult]]: - async def _reduce(current: _TResult, value: _TSource) -> _TResult: - return accumulator(current, value) - - def _operator(Observable: AsyncObservable[_TSource]) -> AsyncObservable[_TResult]: - return pipe(Observable, reduce_async(_reduce, initial)) - - return _operator - - -def reduce_async( - accumulator: Callable[[_TResult, _TSource], Awaitable[_TResult]], - initial: _TResult, -) -> Callable[[AsyncObservable[_TSource]], AsyncObservable[_TResult]]: - def _operator(source: AsyncObservable[_TSource]) -> AsyncObservable[_TResult]: - from .filtering import take_last - - return pipe(source, scan_async(accumulator, initial), take_last(1)) - - return _operator