Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SSE with serve documentation missing #2663

Open
simylein opened this issue Apr 15, 2023 · 5 comments
Open

SSE with serve documentation missing #2663

simylein opened this issue Apr 15, 2023 · 5 comments
Labels
docs Improvements or additions to documentation

Comments

@simylein
Copy link
Contributor

What is the type of issue?

Documentation is missing

What is the issue?

I would love to use serve for SSE but cannot find any documentation about it apart from this issue. If serve is ready for handling SSE with the help of ReadableStream there should be documentation about it.

Where did you find it?

No response

@simylein simylein added the docs Improvements or additions to documentation label Apr 15, 2023
@gtrabanco
Copy link

gtrabanco commented May 2, 2023

No, its not working by the moment. It push data every 5 secs at once and should be almost instantly when you write.

See that node:http2 is not implemented and necessary to have this working well:

@cirospaciari
Copy link
Member

You can use HTTP / 1.1 at this moment until we add support for HTTP / 2

Flush was fixed by #3073

This should work on the next version:

function sendSSECustom(controller: ReadableStreamDirectController, eventName: string, data: string) {
  return controller.write(`event: ${eventName}\ndata:${JSON.stringify(data)}\n\n`);
}
function sendSSEMessage(controller: ReadableStreamDirectController, data: string) {
  return controller.write(`data:${JSON.stringify(data)}\n\n`);
}
function sse(req: Request): Response {
  const signal = req.signal;
  return new Response(
    new ReadableStream({
      type: "direct",
      async pull(controller: ReadableStreamDirectController) {
        while (!signal.aborted) {
          await sendSSECustom(controller, "bun", "Hello, World!");
          await sendSSEMessage(controller, "Hello, World!");
          await controller.flush();
          await Bun.sleep(1000);
        }
        controller.close();
      },
    }),
    { status: 200, headers: { "Content-Type": "text/event-stream" } },
  );
}
Bun.serve({
  port: 3000,
  fetch(req) {
    if (new URL(req.url).pathname === "/stream") {
      return sse(req);
    }
    return new Response("Hello, World!");
  },
});

You may will wanna use something like lastEventId to resume reconnections (by sending id:), and send retry: to control retry intervals.

const lastEventId = req.headers.get("last-event-id")

@simylein
Copy link
Contributor Author

simylein commented May 26, 2023

Thanks a lot 😀. This really helps building real time stuff 🙂.

For those interested, here is my implementation:

import EventEmitter from 'events';
import { debug, info } from '../logger/logger';

export const emitter = new EventEmitter();

export const subscribe = (req: Request, channel: string): Response => {
	info(`subscribing to channel '${channel}'`);
	return new Response(
		new ReadableStream({
			type: 'direct',
			pull(controller: ReadableStreamDirectController) {
				let id = +(req.headers.get('last-event-id') ?? 1);
				const handler = async (data: unknown): Promise<void> => {
					await controller.write(`id:${id}\ndata:${data !== undefined ? JSON.stringify(data) : ''}\n\n`);
					await controller.flush();
					id++;
				};
				emitter.on(channel, handler);
				if (req.signal.aborted) {
					info(`unsubscribing from channel '${channel}'`);
					emitter.off(channel, handler);
					controller.close();
				}
				return new Promise(() => void 0);
			},
		}),
		{
			status: 200,
			headers: { 'content-type': 'text/event-stream' },
		},
	);
};

export const emit = (channel: string, data?: unknown): void => {
	debug(`emitting to channel '${channel}'`);
	emitter.emit(channel, data);
};

@simylein simylein changed the title SSE with serve documentation missing SSE with serve documentation missing Jun 8, 2023
@gtrabanco
Copy link

gtrabanco commented Jul 27, 2023

Thanks a lot 😀. This really helps building real time stuff 🙂.

For those interested, here is my implementation:

(...)

I modifies your solution a little bit to have "onClose" event, useful for intervals or do whatever when connection is closed. I just wanted to share.

import EventEmitter from 'node:events';
export const emitter = new EventEmitter();

export type SSEEvent = {
  event?: string;
  data?: unknown;
}

export type SSEOptions = {
  retry: number;
  onClose: () => void;
}

function info(...args: unknown[]): void {
  console.info(...args);
}

function debug(...args: unknown[]): void {
  console.debug(...args);
}

function channelSubscribe(channel: string[], handler: (payload: SSEEvent) => void): void {
  channel.forEach((channel) => {
    emitter.on(channel, handler);
  });
}

function channelUnsubscribe(channel: string[], handler: (payload: SSEEvent) => void): void {
  channel.forEach((channel) => {
    emitter.off(channel, handler);
  });
}


export const sseSubscribe = (req: Request, channel: string | Array<string>, options: Partial<SSEOptions> = {
  retry: 1000,
  onClose: () => { }
}): Response => {
  info(`subscribing to channel '${channel}'`);
  const stream = new ReadableStream({
    type: 'direct',
    async pull(controller: ReadableStreamDirectController) {
      let id = +(req.headers.get('last-event-id') ?? 1);

      if (options.retry !== undefined) {
        await controller.write(`retry:${options.retry}\n`);
      }

      const handler = async (payload: SSEEvent): Promise<void> => {
        const { event = undefined, data = undefined } = payload as Record<string, unknown>;
        if (event !== undefined) {
          await controller.write(`event:${event}\n`);
        }
        await controller.write(`id:${id}\n`)
        await controller.write(`data:${data !== undefined ? JSON.stringify(data) : ''}\n\n`);
        await controller.flush();
        id++;
      };

      function closeConnection(reason: string | undefined = 'reason unknown') {
        return () => {
          info(`unsubscribing from channel '${channel}': ${reason}`);
          channelUnsubscribe(Array.isArray(channel) ? channel : [channel], handler);
          options.onClose?.();
          controller.close();
        }
      }

      channelSubscribe(Array.isArray(channel) ? channel : [channel], handler);

      req.signal.addEventListener('abort', closeConnection('Connection aborted'));

      req.signal.addEventListener('close', closeConnection('Connection closed'));

      if (req.signal.aborted) {
        closeConnection('Connection aborted originally')();
      }
      return new Promise(() => void 0);
    },
  });

  return new Response(stream, {
    status: 200,
    headers: {
      'Content-Type': 'text/event-stream',
      'Cache-Control': 'no-cache',
      Connection: 'keep-alive'
    }
  });
};

export const sseEmit = (channel: string, payload?: SSEEvent): void => {
  debug(`emitting to channel '${channel}'`);
  emitter.emit(channel, payload);
};

@7heMech
Copy link

7heMech commented Aug 6, 2024

@cirospaciari What does type: 'direct' do?
I mean my IDE tells me I can only put bytes in there.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
docs Improvements or additions to documentation
Projects
None yet
Development

No branches or pull requests

4 participants