Skip to content

Commit

Permalink
test: added more tests
Browse files Browse the repository at this point in the history
  • Loading branch information
nichita-pasecinic committed Feb 11, 2024
1 parent fc9d4a6 commit 4feb51f
Show file tree
Hide file tree
Showing 3 changed files with 169 additions and 15 deletions.
26 changes: 13 additions & 13 deletions src/core/web-socket-connector.ts
Original file line number Diff line number Diff line change
Expand Up @@ -183,28 +183,29 @@ export class WebSocketConnector {
const requests$ = new BehaviorSubject<
undefined | SendRequestParams<TEvent, TRes, TReqIn, TReqOut>
>(undefined);

const uninitializedValue: StreamResponse<TRes, TReqOut, TErr> = {
status: STREAM_STATUS.uninitialized,
response: defaultResponse,
};
const $ = new BehaviorSubject<StreamResponse<TRes, TReqOut, TErr>>(uninitializedValue);
const userRequests$ = requests$.pipe(

const transformedRequests$ = requests$.pipe(
filterNullAndUndefined(),
transformRequests,
shareReplay(1),
);

userRequests$
transformedRequests$
.pipe(
concatMap((currentProcessingRequest) => {
concatMap((params) => {
const defaultTransformResponse = (() => identity) as TransformResponse<
TEvent,
TRes,
TReqOut
>;

const { request, transformResponse = defaultTransformResponse } =
currentProcessingRequest;
const { request, transformResponse = defaultTransformResponse } = params;

const ready$ = this.messages<TEvent>().pipe(
transformResponse(request),
Expand Down Expand Up @@ -233,8 +234,10 @@ export class WebSocketConnector {
);

const newRequest$ = defer(() => {
const nextRequest$ = userRequests$.pipe(
filter((x) => x !== currentProcessingRequest),
const nextRequest$ = transformedRequests$.pipe(
// ignore requests that are currently in processing
// transformedRequests$ is multicasted - so it will always emit the last client request
filter((x) => x !== params),
take(1),
);
return nextRequest$.pipe(
Expand Down Expand Up @@ -272,8 +275,8 @@ export class WebSocketConnector {
);

const concat$ = concat(loading$, ready$.pipe(takeUntil(takeUntil$))).pipe(
tap((value) => {
$.next({ ...$.value, ...value });
tap((streamResponse) => {
$.next({ ...$.value, ...streamResponse });
}),
);

Expand All @@ -291,9 +294,6 @@ export class WebSocketConnector {
requests$.next({ ...params });
};

return {
send,
$,
};
return { send, $ };
};
}
126 changes: 124 additions & 2 deletions src/test/get-stream-handler.spec.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import { beforeEach, describe, expect, it } from 'vitest';
import { beforeEach, describe, expect, it, vi } from 'vitest';
import { TestScheduler } from 'rxjs/testing';
import { getMockWebsocketConnector } from './get-mock-websocket.connector';
import { STREAM_STATUS } from '../core/constants';
import { delay, filter, scan, tap } from 'rxjs';
import { delay, filter, Observable, scan, tap } from 'rxjs';
import { SendRequestParams } from '../core/types';

interface TestEvent {
from: string;
Expand Down Expand Up @@ -490,4 +491,125 @@ describe('[getStreamHandler] rxjs marbles tests', () => {
expect(socket.send).toHaveBeenNthCalledWith(4, JSON.stringify({ from: 'b2' }));
/* eslint-enable @typescript-eslint/unbound-method */
});

it('does not reset response on next request `resetResponseOnNextRequest: false`', () => {
const { wsConnector, socket } = getMockWebsocketConnector();

const handler = wsConnector.getStreamHandler<TestEvent, TestEvent, TestEvent, unknown>({
resetResponseOnNextRequest: false,
});
const expectedMarbles = 'a(bc)de';
const expectedValues = {
a: {
status: STREAM_STATUS.uninitialized,
response: undefined,
},
b: {
status: STREAM_STATUS.loading,
request: { from: 'b' },
response: undefined,
error: undefined,
},
c: {
status: STREAM_STATUS.ready,
request: { from: 'b' },
response: { from: 'b' },
error: undefined,
},
d: {
status: STREAM_STATUS.loading,
request: { from: 'c' },
response: { from: 'b' },
error: undefined,
},
e: {
status: STREAM_STATUS.ready,
request: { from: 'c' },
response: { from: 'd' },
error: undefined,
},
};

const triggerMarbles = 'ab 3ms cd';
const triggerValues = {
a: () => {
wsConnector.connect();
socket.onopen!({} as Event);
socket.send(JSON.stringify({ from: 'a' }));
},
b: () => {
handler.send({ request: { from: 'b' } });
socket.send(JSON.stringify({ from: 'b' }));
},
c: () => {
handler.send({ request: { from: 'c' } });
},
d: () => {
socket.send(JSON.stringify({ from: 'd' }));
},
};

testScheduler.run(({ expectObservable, cold }) => {
expectObservable(handler.$).toBe(expectedMarbles, expectedValues);
expectObservable(cold(triggerMarbles, triggerValues).pipe(tap((fn) => fn())));
});
});

it('applies `transformRequest` operator', () => {
const { wsConnector, socket } = getMockWebsocketConnector();

const tapFn = vi.fn();
const transformRequests = vi.fn(
(source$: Observable<SendRequestParams<TestEvent, TestEvent, TestEvent>>) =>
source$.pipe(delay(10), tap(tapFn)),
);

const handler = wsConnector.getStreamHandler<TestEvent, TestEvent, TestEvent>({
transformRequests,
});
const expectedMarbles = 'a 9ms bc';
const expectedValues = {
a: {
status: STREAM_STATUS.uninitialized,
response: undefined,
},
b: {
status: STREAM_STATUS.loading,
request: { from: 'a' },
response: undefined,
error: undefined,
},
c: {
status: STREAM_STATUS.ready,
request: { from: 'a' },
response: { from: 'b' },
error: undefined,
},
};

const request: SendRequestParams<TestEvent, TestEvent, TestEvent> = { request: { from: 'a' } };

const triggerMarbles = 'a 10ms b';
const triggerValues = {
a: () => {
wsConnector.connect();
socket.onopen!({} as Event);
handler.send(request);
socket.send(JSON.stringify({ from: 'a' })); // <- ignored
},
b: () => {
socket.send(JSON.stringify({ from: 'b' }));
},
};

testScheduler.run(({ expectObservable, cold }) => {
expectObservable(handler.$).toBe(expectedMarbles, expectedValues);
expectObservable(cold(triggerMarbles, triggerValues).pipe(tap((fn) => fn())));
});

/* eslint-disable @typescript-eslint/unbound-method */
expect(transformRequests).toHaveBeenCalledOnce();
expect(tapFn).toHaveBeenNthCalledWith(1, request);
/* eslint-enable @typescript-eslint/unbound-method */
});
});
32 changes: 32 additions & 0 deletions src/test/types.test-d.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import { describe, it, expectTypeOf } from 'vitest';
import { getMockWebsocketConnector } from './get-mock-websocket.connector';
import { SendRequestParams, StreamResponse } from '../core/types';
import { BehaviorSubject } from 'rxjs';

describe('[types]', () => {
it('[getStreamHandler]', () => {
const { wsConnector } = getMockWebsocketConnector();

interface WsEvent {
id: number;
data: string;
}

interface Request {
from: string;
timestamp: number;
}

const handler = wsConnector.getStreamHandler<WsEvent, WsEvent, Request>();

/* eslint-disable @typescript-eslint/unbound-method */
expectTypeOf(handler.send)
.parameter(0)
.toMatchTypeOf<SendRequestParams<WsEvent, WsEvent, Request>>();

expectTypeOf(handler.$).toMatchTypeOf<
BehaviorSubject<StreamResponse<WsEvent, Request, unknown>>
>();
/* eslint-enable @typescript-eslint/unbound-method */
});
});

0 comments on commit 4feb51f

Please sign in to comment.