Skip to content

Commit

Permalink
Fix reduce operators to emit initial value for empty observables
Browse files Browse the repository at this point in the history
  • Loading branch information
Latta committed Feb 27, 2025
1 parent 25772aa commit 9a64c84
Showing 1 changed file with 0 additions and 24 deletions.
24 changes: 0 additions & 24 deletions aioreactive/transform.py
Original file line number Diff line number Diff line change
Expand Up @@ -511,27 +511,3 @@ def scan_async(
"""
return _scan(accumulator, initial)


def reduce(
accumulator: Callable[[_TResult, _TSource], _TResult],
initial: _TResult,
) -> Callable[[AsyncObservable[_TSource]], AsyncObservable[_TResult]]:
async def _reduce(current: _TResult, value: _TSource) -> _TResult:
return accumulator(current, value)

def _operator(Observable: AsyncObservable[_TSource]) -> AsyncObservable[_TResult]:
return pipe(Observable, reduce_async(_reduce, initial))

return _operator


def reduce_async(
accumulator: Callable[[_TResult, _TSource], Awaitable[_TResult]],
initial: _TResult,
) -> Callable[[AsyncObservable[_TSource]], AsyncObservable[_TResult]]:
def _operator(source: AsyncObservable[_TSource]) -> AsyncObservable[_TResult]:
from .filtering import take_last

return pipe(source, scan_async(accumulator, initial), take_last(1))

return _operator

0 comments on commit 9a64c84

Please sign in to comment.