forked from c4spar/deno-fast-forward
-
Notifications
You must be signed in to change notification settings - Fork 0
/
encoding_event_stream.ts
113 lines (101 loc) · 3.12 KB
/
encoding_event_stream.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
import type { Encoding } from "./encoding.ts";
import type { EncodingProcess } from "./encoding_process.ts";
import {
EncodingEventStreamAlreadyDisposed,
EncodingProcessNotStarted,
} from "./errors.ts";
import type { EncodingEvent } from "./events.ts";
type Resolver<T> = (value: T | PromiseLike<T>) => void;
type Reject = (error: Error) => void;
interface PullQueueItem {
resolve: Resolver<EncodingEvent | null>;
reject: Reject;
}
export class EncodingEventStream
implements AsyncIterableIterator<EncodingEvent> {
#disposed = false;
#pullQueue: PullQueueItem[] = [];
#pushQueue: (EncodingEvent | null)[] = [];
#encodingProcess: EncodingProcess;
#encoding: Encoding;
#done = false;
constructor(process: EncodingProcess) {
this.#encodingProcess = process;
this.#encoding = process.encoding;
this.#encoding.addEventListener("start", this.#pushEvent);
this.#encoding.addEventListener("info", this.#pushEvent);
this.#encoding.addEventListener("progress", this.#pushEvent);
this.#encoding.addEventListener("end", this.#pushEvent);
this.#encoding.addEventListener("error", this.#pushEvent);
}
[Symbol.asyncIterator](): AsyncIterableIterator<EncodingEvent> {
return this;
}
dispose = () => {
if (this.#disposed) {
throw new EncodingEventStreamAlreadyDisposed({
encoding: this.#encoding,
});
}
this.#encoding.removeEventListener("start", this.#pushEvent);
this.#encoding.removeEventListener("info", this.#pushEvent);
this.#encoding.removeEventListener("progress", this.#pushEvent);
this.#encoding.removeEventListener("end", this.#pushEvent);
this.#encoding.removeEventListener("error", this.#pushEvent);
this.#disposed = true;
if (this.#pullQueue.length > 0) {
const { resolve } = this.#pullQueue[0];
this.#pullQueue.shift();
resolve(null);
}
};
async next(): Promise<IteratorResult<EncodingEvent>> {
if (!this.#encodingProcess.pid) {
throw new EncodingProcessNotStarted({
encoding: this.#encoding,
});
}
if (this.#done) {
this.dispose();
return {
done: true,
value: undefined,
};
}
const event: EncodingEvent | null = await this.#pullEvent();
if (!event) {
return {
done: true,
value: undefined,
};
}
if (event.type === "end" || event.type === "error") {
this.#done = true;
}
return {
done: false,
value: event,
};
}
#pushEvent = (event: EncodingEvent): void => {
if (this.#pullQueue.length > 0) {
const { resolve } = this.#pullQueue.shift() as PullQueueItem;
resolve(event);
} else {
this.#pushQueue.push(event);
}
};
#pullEvent = (): Promise<EncodingEvent | null> => {
return new Promise<EncodingEvent | null>(
(resolve: Resolver<EncodingEvent | null>, reject: Reject) => {
if (this.#pushQueue.length > 0) {
const event: EncodingEvent | null = this.#pushQueue[0];
this.#pushQueue.shift();
resolve(event);
} else {
this.#pullQueue.push({ resolve, reject });
}
},
);
};
}