-
Notifications
You must be signed in to change notification settings - Fork 3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(buffer): closingNotifier should support ObservableInput #7073
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,8 +1,8 @@ | ||
import { Observable } from '../Observable'; | ||
import { OperatorFunction } from '../types'; | ||
import { OperatorFunction, ObservableInput } from '../types'; | ||
import { operate } from '../util/lift'; | ||
import { noop } from '../util/noop'; | ||
import { createOperatorSubscriber } from './OperatorSubscriber'; | ||
import { innerFrom } from '../observable/innerFrom'; | ||
|
||
/** | ||
* Buffers the source Observable values until `closingNotifier` emits. | ||
|
@@ -13,7 +13,8 @@ import { createOperatorSubscriber } from './OperatorSubscriber'; | |
* ![](buffer.png) | ||
* | ||
* Buffers the incoming Observable values until the given `closingNotifier` | ||
* Observable emits a value, at which point it emits the buffer on the output | ||
* `ObservableInput` (that internally gets converted to an Observable) | ||
* emits a value, at which point it emits the buffer on the output | ||
* Observable and starts a new buffer internally, awaiting the next time | ||
* `closingNotifier` emits. | ||
* | ||
|
@@ -36,12 +37,12 @@ import { createOperatorSubscriber } from './OperatorSubscriber'; | |
* @see {@link bufferWhen} | ||
* @see {@link window} | ||
* | ||
* @param {Observable<any>} closingNotifier An Observable that signals the | ||
* @param closingNotifier An `ObservableInput` that signals the | ||
* buffer to be emitted on the output Observable. | ||
* @return A function that returns an Observable of buffers, which are arrays | ||
* of values. | ||
*/ | ||
export function buffer<T>(closingNotifier: Observable<any>): OperatorFunction<T, T[]> { | ||
export function buffer<T>(closingNotifier: ObservableInput<any>): OperatorFunction<T, T[]> { | ||
return operate((source, subscriber) => { | ||
// The current buffered values. | ||
let currentBuffer: T[] = []; | ||
|
@@ -59,7 +60,7 @@ export function buffer<T>(closingNotifier: Observable<any>): OperatorFunction<T, | |
); | ||
|
||
// Subscribe to the closing notifier. | ||
closingNotifier.subscribe( | ||
innerFrom(closingNotifier).subscribe( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @benlesh, in the RxJS Core Semantics guide, it is stated that:
This is not the case with this operator (and it's unrelated to this PR as well, but I noticed this here) - the source gets subscribed to on line 51. Should we address this? If yes, should we do it in V7 or should we wait for V8? |
||
createOperatorSubscriber( | ||
subscriber, | ||
() => { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: (Can be fixed in a FLUP)... we don't need to wait 8 ms... we can just do
Promise.resolve()
.