diff --git a/src/core/web-socket-connector.ts b/src/core/web-socket-connector.ts index f9772b0..1b79027 100644 --- a/src/core/web-socket-connector.ts +++ b/src/core/web-socket-connector.ts @@ -183,28 +183,29 @@ export class WebSocketConnector { const requests$ = new BehaviorSubject< undefined | SendRequestParams >(undefined); + const uninitializedValue: StreamResponse = { status: STREAM_STATUS.uninitialized, response: defaultResponse, }; const $ = new BehaviorSubject>(uninitializedValue); - const userRequests$ = requests$.pipe( + + const transformedRequests$ = requests$.pipe( filterNullAndUndefined(), transformRequests, shareReplay(1), ); - userRequests$ + transformedRequests$ .pipe( - concatMap((currentProcessingRequest) => { + concatMap((params) => { const defaultTransformResponse = (() => identity) as TransformResponse< TEvent, TRes, TReqOut >; - const { request, transformResponse = defaultTransformResponse } = - currentProcessingRequest; + const { request, transformResponse = defaultTransformResponse } = params; const ready$ = this.messages().pipe( transformResponse(request), @@ -233,8 +234,10 @@ export class WebSocketConnector { ); const newRequest$ = defer(() => { - const nextRequest$ = userRequests$.pipe( - filter((x) => x !== currentProcessingRequest), + const nextRequest$ = transformedRequests$.pipe( + // ignore requests that are currently in processing + // transformedRequests$ is multicasted - so it will always emit the last client request + filter((x) => x !== params), take(1), ); return nextRequest$.pipe( @@ -272,8 +275,8 @@ export class WebSocketConnector { ); const concat$ = concat(loading$, ready$.pipe(takeUntil(takeUntil$))).pipe( - tap((value) => { - $.next({ ...$.value, ...value }); + tap((streamResponse) => { + $.next({ ...$.value, ...streamResponse }); }), ); @@ -291,9 +294,6 @@ export class WebSocketConnector { requests$.next({ ...params }); }; - return { - send, - $, - }; + return { send, $ }; }; } diff --git a/src/test/get-stream-handler.spec.ts b/src/test/get-stream-handler.spec.ts index 748970b..e4a6d9d 100644 --- a/src/test/get-stream-handler.spec.ts +++ b/src/test/get-stream-handler.spec.ts @@ -1,8 +1,9 @@ -import { beforeEach, describe, expect, it } from 'vitest'; +import { beforeEach, describe, expect, it, vi } from 'vitest'; import { TestScheduler } from 'rxjs/testing'; import { getMockWebsocketConnector } from './get-mock-websocket.connector'; import { STREAM_STATUS } from '../core/constants'; -import { delay, filter, scan, tap } from 'rxjs'; +import { delay, filter, Observable, scan, tap } from 'rxjs'; +import { SendRequestParams } from '../core/types'; interface TestEvent { from: string; @@ -490,4 +491,125 @@ describe('[getStreamHandler] rxjs marbles tests', () => { expect(socket.send).toHaveBeenNthCalledWith(4, JSON.stringify({ from: 'b2' })); /* eslint-enable @typescript-eslint/unbound-method */ }); + + it('does not reset response on next request `resetResponseOnNextRequest: false`', () => { + const { wsConnector, socket } = getMockWebsocketConnector(); + + const handler = wsConnector.getStreamHandler({ + resetResponseOnNextRequest: false, + }); + const expectedMarbles = 'a(bc)de'; + const expectedValues = { + a: { + status: STREAM_STATUS.uninitialized, + response: undefined, + }, + b: { + status: STREAM_STATUS.loading, + request: { from: 'b' }, + response: undefined, + error: undefined, + }, + c: { + status: STREAM_STATUS.ready, + request: { from: 'b' }, + response: { from: 'b' }, + error: undefined, + }, + d: { + status: STREAM_STATUS.loading, + request: { from: 'c' }, + response: { from: 'b' }, + error: undefined, + }, + e: { + status: STREAM_STATUS.ready, + request: { from: 'c' }, + response: { from: 'd' }, + error: undefined, + }, + }; + + const triggerMarbles = 'ab 3ms cd'; + const triggerValues = { + a: () => { + wsConnector.connect(); + socket.onopen!({} as Event); + socket.send(JSON.stringify({ from: 'a' })); + }, + b: () => { + handler.send({ request: { from: 'b' } }); + socket.send(JSON.stringify({ from: 'b' })); + }, + c: () => { + handler.send({ request: { from: 'c' } }); + }, + d: () => { + socket.send(JSON.stringify({ from: 'd' })); + }, + }; + + testScheduler.run(({ expectObservable, cold }) => { + expectObservable(handler.$).toBe(expectedMarbles, expectedValues); + expectObservable(cold(triggerMarbles, triggerValues).pipe(tap((fn) => fn()))); + }); + }); + + it('applies `transformRequest` operator', () => { + const { wsConnector, socket } = getMockWebsocketConnector(); + + const tapFn = vi.fn(); + const transformRequests = vi.fn( + (source$: Observable>) => + source$.pipe(delay(10), tap(tapFn)), + ); + + const handler = wsConnector.getStreamHandler({ + transformRequests, + }); + const expectedMarbles = 'a 9ms bc'; + const expectedValues = { + a: { + status: STREAM_STATUS.uninitialized, + response: undefined, + }, + b: { + status: STREAM_STATUS.loading, + request: { from: 'a' }, + response: undefined, + error: undefined, + }, + c: { + status: STREAM_STATUS.ready, + request: { from: 'a' }, + response: { from: 'b' }, + error: undefined, + }, + }; + + const request: SendRequestParams = { request: { from: 'a' } }; + + const triggerMarbles = 'a 10ms b'; + const triggerValues = { + a: () => { + wsConnector.connect(); + socket.onopen!({} as Event); + handler.send(request); + socket.send(JSON.stringify({ from: 'a' })); // <- ignored + }, + b: () => { + socket.send(JSON.stringify({ from: 'b' })); + }, + }; + + testScheduler.run(({ expectObservable, cold }) => { + expectObservable(handler.$).toBe(expectedMarbles, expectedValues); + expectObservable(cold(triggerMarbles, triggerValues).pipe(tap((fn) => fn()))); + }); + + /* eslint-disable @typescript-eslint/unbound-method */ + expect(transformRequests).toHaveBeenCalledOnce(); + expect(tapFn).toHaveBeenNthCalledWith(1, request); + /* eslint-enable @typescript-eslint/unbound-method */ + }); }); diff --git a/src/test/types.test-d.ts b/src/test/types.test-d.ts new file mode 100644 index 0000000..9dbf420 --- /dev/null +++ b/src/test/types.test-d.ts @@ -0,0 +1,32 @@ +import { describe, it, expectTypeOf } from 'vitest'; +import { getMockWebsocketConnector } from './get-mock-websocket.connector'; +import { SendRequestParams, StreamResponse } from '../core/types'; +import { BehaviorSubject } from 'rxjs'; + +describe('[types]', () => { + it('[getStreamHandler]', () => { + const { wsConnector } = getMockWebsocketConnector(); + + interface WsEvent { + id: number; + data: string; + } + + interface Request { + from: string; + timestamp: number; + } + + const handler = wsConnector.getStreamHandler(); + + /* eslint-disable @typescript-eslint/unbound-method */ + expectTypeOf(handler.send) + .parameter(0) + .toMatchTypeOf>(); + + expectTypeOf(handler.$).toMatchTypeOf< + BehaviorSubject> + >(); + /* eslint-enable @typescript-eslint/unbound-method */ + }); +});