-
Notifications
You must be signed in to change notification settings - Fork 3k
/
Copy pathtimeout.ts
101 lines (89 loc) · 2.91 KB
/
timeout.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
import { async } from '../scheduler/async';
import { isDate } from '../util/isDate';
import { Operator } from '../Operator';
import { Subscriber } from '../Subscriber';
import { Scheduler } from '../Scheduler';
import { Observable } from '../Observable';
import { TeardownLogic } from '../Subscription';
import { TimeoutError } from '../util/TimeoutError';
/**
* @param due
* @param errorToSend
* @param scheduler
* @return {Observable<R>|WebSocketSubject<T>|Observable<T>}
* @method timeout
* @owner Observable
*/
export function timeout<T>(this: Observable<T>, due: number | Date,
errorToSend: any = null,
scheduler: Scheduler = async): Observable<T> {
let absoluteTimeout = isDate(due);
let waitFor = absoluteTimeout ? (+due - scheduler.now()) : Math.abs(<number>due);
return this.lift(new TimeoutOperator(waitFor, absoluteTimeout, errorToSend, scheduler));
}
class TimeoutOperator<T> implements Operator<T, T> {
constructor(private waitFor: number,
private absoluteTimeout: boolean,
private errorToSend: any,
private scheduler: Scheduler) {
}
call(subscriber: Subscriber<T>, source: any): TeardownLogic {
return source._subscribe(new TimeoutSubscriber<T>(
subscriber, this.absoluteTimeout, this.waitFor, this.errorToSend, this.scheduler
));
}
}
/**
* We need this JSDoc comment for affecting ESDoc.
* @ignore
* @extends {Ignored}
*/
class TimeoutSubscriber<T> extends Subscriber<T> {
private index: number = 0;
private _previousIndex: number = 0;
get previousIndex(): number {
return this._previousIndex;
}
private _hasCompleted: boolean = false;
get hasCompleted(): boolean {
return this._hasCompleted;
}
constructor(destination: Subscriber<T>,
private absoluteTimeout: boolean,
private waitFor: number,
private errorToSend: any,
private scheduler: Scheduler) {
super(destination);
this.scheduleTimeout();
}
private static dispatchTimeout(state: any): void {
const source = state.subscriber;
const currentIndex = state.index;
if (!source.hasCompleted && source.previousIndex === currentIndex) {
source.notifyTimeout();
}
}
private scheduleTimeout(): void {
let currentIndex = this.index;
this.scheduler.schedule(TimeoutSubscriber.dispatchTimeout, this.waitFor, { subscriber: this, index: currentIndex });
this.index++;
this._previousIndex = currentIndex;
}
protected _next(value: T): void {
this.destination.next(value);
if (!this.absoluteTimeout) {
this.scheduleTimeout();
}
}
protected _error(err: any): void {
this.destination.error(err);
this._hasCompleted = true;
}
protected _complete(): void {
this.destination.complete();
this._hasCompleted = true;
}
notifyTimeout(): void {
this.error(this.errorToSend || new TimeoutError());
}
}