Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature Request] Server Sent Events (SSE) Plugin #24

Closed
go4cas opened this issue Mar 17, 2023 · 10 comments
Closed

[Feature Request] Server Sent Events (SSE) Plugin #24

go4cas opened this issue Mar 17, 2023 · 10 comments

Comments

@go4cas
Copy link

go4cas commented Mar 17, 2023

The Elysia ecosystem includes a great collection of plugins (including websockets). It would be great if the team could add an official SSE plugin.

@JulianCataldo
Copy link

Giving a try:

  .get("/sse", ({ set }) => {
    const stream = new ReadableStream({
      start(controller) {
        setInterval(() => {
          controller.enqueue(`data: hey\n\n`);
        }, 1000);
      },
      cancel() {},
    });

/*
Argument of type 'ReadableStream<any>' is not assignable to parameter of type 'BlobPart | BlobPart[]'.
  Type 'ReadableStream<any>' is missing the following properties from type 'BlobPart[]': length, pop, push, concat, and 29 more.ts(2345)
const stream: ReadableStream<any>
*/

    return new Response(stream, {
      headers: {
        "Content-Type": "text/event-stream",
        Connection: "keep-alive",
        "Cache-Control": "no-cache",
      },
    });
  })
    <script>
      const source = new EventSource("/sse");

      console.log({ source });

      source.addEventListener("message", (e) => {
        console.log(e);
      });
    </script>

Not really working, but it's a start.
Events come not one by one every 1 sec like it should,
but by chunks of ~20-ish, probably because of the timeout.

Cheers

@SaltyAom
Copy link
Member

SaltyAom commented May 3, 2023

Blocking on: oven-sh/bun#2443

@cirospaciari
Copy link

You can use HTTP / 1.1 at this moment until we add support for HTTP / 2

Flush was fixed by oven-sh/bun#3073

This should work on the next version:

function sendSSECustom(controller: ReadableStreamDirectController, eventName: string, data: string) {
  return controller.write(`event: ${eventName}\ndata:${JSON.stringify(data)}\n\n`);
}
function sendSSEMessage(controller: ReadableStreamDirectController, data: string) {
  return controller.write(`data:${JSON.stringify(data)}\n\n`);
}
function sse(req: Request): Response {
  const signal = req.signal;
  return new Response(
    new ReadableStream({
      type: "direct",
      async pull(controller: ReadableStreamDirectController) {
        while (!signal.aborted) {
          await sendSSECustom(controller, "bun", "Hello, World!");
          await sendSSEMessage(controller, "Hello, World!");
          await controller.flush();
          await Bun.sleep(1000);
        }
        controller.close();
      },
    }),
    { status: 200, headers: { "Content-Type": "text/event-stream" } },
  );
}
Bun.serve({
  port: 3000,
  fetch(req) {
    if (new URL(req.url).pathname === "/stream") {
      return sse(req);
    }
    return new Response("Hello, World!");
  },
});

You may will wanna use something like lastEventId to resume reconnections (by sending id:), and send retry: to control retry intervals.

const lastEventId = req.headers.get("last-event-id")

For consuming on the server side we will add support to the client soon.

@paperclover
Copy link
Contributor

bun will have a builtin helper for doing SSE: Bun.EventStream (oven-sh/bun#3390). It would be cool to see Elysia and other frameworks integrate this, and it should be pretty straightforward too.

today you can use new ReadableStream({ type: "direct", pull }) which is a bun-specific feature that passes a direct writer to pull, making it extremely low overhead. cancel on this is broken at the moment but will be fixed in the pr.

the regular readable streams don't work just yet but i'm looking into how much work it would be to make it happen.

@gtrabanco
Copy link

gtrabanco commented Jul 27, 2023

With latest version of Bun this is working great.

@go4cas
Copy link
Author

go4cas commented Jul 29, 2023

@SaltyAom ... there seems to be movement on this. You think you'll be able to add an SSE plugin for Elysia?

@gtrabanco
Copy link

gtrabanco commented Aug 1, 2023

I was thinking in doing the middleware. Not sure if I will finally do it, but here I will expose some sample code that actually works, and a proposal of How plugin should work (after the sample at the end of the message). I am posting because, if I do a plugin when I have time, I am not sure how I would do it. Whatever, please, post your opinions.

Check this code: oven-sh/bun#2663 (comment)

I made a sample app:

import staticPlugin from "@elysiajs/static";
import { Context, Elysia } from "elysia";
import { sseEmit, sseSubscribe } from "./sse"; // sse.ts => https://github.com/oven-sh/bun/issues/2663#issuecomment-1653585011



const app = new Elysia()
  .use(staticPlugin({
    prefix: '', // I have index.html in /public folder to check all is working great... Normal html file using EventSource as expected =)

  }))
  .get("/stream", (ctx: Context) => {

    console.log('headers', ctx.request.headers);
    const req = ctx.request;

    const int1 = setInterval(() => {
      sseEmit("timestamp", {
        event: "timestamp",
        data: Date.now()
      });
    }, 1000);


    const int2 = setInterval(() => {
      sseEmit("timestamp", {
        event: "title",
        data: 'A new title ' + Date.now().toString().substring(7)
      });
    }, 5000);

    const response = sseSubscribe(req, "timestamp", {
      onClose: () => {
        clearInterval(int1);
        clearInterval(int2);
      }
    });
    return response;
  })
  .listen(3000);

console.log(
  `🦊 Elysia is running at http://${app.server?.hostname}:${app.server?.port}`
);

How plugin should work?

But not sure how you expect to use sse as plugin. Maybe a new method instead of using app.get? Should it auto suscribe to channel with same path?

import { sse, emit } from 'sse-plugin'; // emit is a singleton of EventEmmiter
const sseapp = new Elysia()
  .use(sse({ retry: 1000 })) // It will send retry as 1000ms to client that request sse
  .sse('/sse-path-channel', beforeHanleHandler /* optional */, optionalSchema /* optional */); // It will listen to all events
    // sent to channel with the path as channel, and send it to client that request this path.
    // You can send different events by using event property in the payload
  .listen(3000);


// This can be in any cron job
emit('/sse-path-channel', {
  event: 'event-name',
  data: 'data'
})

emit('*', payload); // Should this send payload to every channel?

@gtrabanco
Copy link

Maybe is better to use new BroadcastChannel than EventEmitter??

@gtrabanco
Copy link

If anyone is interested in a sample of how to use SSE with Elysia, I made a playground:

@SaltyAom
Copy link
Member

Closing with the introduction of Stream plugin

demo: https://x.com/saltyAom/status/1716759147109330972?s=20

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

6 participants