-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathlink.ts
83 lines (73 loc) · 1.97 KB
/
link.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
import { TRPCLink } from "@trpc/client";
import { AnyRouter } from "@trpc/server";
import { observable } from "@trpc/server/observable";
export const httpSseLink = <TRouter extends AnyRouter>(opts: {
baseUrl: string;
EventSource?: typeof EventSource;
}): TRPCLink<TRouter> => {
const open = ({
url,
handleEvent,
handleError,
handleCloseRequest,
handleOpen,
}: any) => {
const es = opts.EventSource
? new opts.EventSource(url)
: new EventSource(url);
es.onopen = () => {
// usually when the first packet is sent
handleOpen();
};
es.onerror = (error) => {
console.log("error", error);
handleError(error);
};
es.addEventListener("data", ({ data }) => {
handleEvent(JSON.parse(data.trim()));
});
es.addEventListener("end", () => {
handleCloseRequest();
});
return {
close: () => {
console.log("close");
es.close();
},
};
};
return (runtime) => {
return ({ op }) => {
if (op.type !== "subscription") {
throw new Error("httpSseLink must use subscription type");
}
const getUrl = () => {
let url = `${opts.baseUrl}/${op.path}`;
if (op.input !== undefined) {
url += `?input=${encodeURIComponent(JSON.stringify(op.input))}`;
}
return url;
};
return observable((observer) => {
const handleEvent = (data: /*json*/ any) => {
observer.next({ result: { type: "data", data }, context: {} });
};
const source = open({
url: getUrl(),
handleEvent,
handleCloseRequest: () => {
observer.next({ result: { type: "stopped" } });
observer.complete();
},
handleOpen: () => {
observer.next({ result: { type: "started" } });
},
});
return () => {
console.log("something something");
source.close();
};
});
};
};
};