Skip to content

Commit

Permalink
Merge pull request #24 from jsonjoy-com/cleanup
Browse files Browse the repository at this point in the history
Cleanup
  • Loading branch information
streamich authored Nov 1, 2024
2 parents 3af3de2 + cfbbbed commit 6dee6bb
Show file tree
Hide file tree
Showing 17 changed files with 74 additions and 126 deletions.
7 changes: 4 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -79,15 +79,15 @@
"@jsonjoy.com/json-pack": "^1.1.0",
"@jsonjoy.com/util": "^1.4.0",
"abstract-level": "^2.0.0",
"classic-level": "^1.4.1",
"fs-zoo": "^1.1.0",
"memory-level": "^1.0.0",
"rpc-error": "^1.1.0",
"rx-use": "^1.8.1",
"sonic-forest": "^1.0.3",
"thingies": "^2.4.0",
"classic-level": "^1.4.1"
"thingies": "^2.4.0"
},
"devDependencies": {
"json-joy": "^17.0.0",
"@biomejs/biome": "^1.9.3",
"@types/benchmark": "^2.1.5",
"@types/jest": "^29.5.12",
Expand All @@ -96,6 +96,7 @@
"@types/ws": "^8.5.10",
"benchmark": "^2.1.4",
"jest": "^29.7.0",
"json-joy": "^17.0.0",
"rimraf": "^5.0.5",
"rxjs": "^7.8.1",
"ts-jest": "^29.1.2",
Expand Down
28 changes: 27 additions & 1 deletion src/README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,30 @@
# Reactive-RPC

Implements [Reactive-RPC](https://onp4.com/@vadim/p/qgzwgi42cz) protocol.
Implements [Reactive-RPC](https://onp4.com/@vadim/p/qgzwgi42cz) protocol. A fast
and type-safe HTTP and WebSocket server and client.


## Features

- Very fast.
- Type-safe. Write types using JSON Type, TypeScript types are automatically
inferred on the server and client.
- Supports Reactive-RPC protocol.
- Supports JSON-RPC 2.0 protocol.
- Supports binary data.
- Supports HTTP/1.1 and WebSocket.
- Ships with a client and server
- Specify RPC methods using JSON Type.
- Supports TLS with automatic certificate reloading.


## Installation

```bash
npm install reactive-rpc rxjs
```


## Usage

See `/src/__demos__` for examples.
Original file line number Diff line number Diff line change
Expand Up @@ -111,14 +111,14 @@ export class BlocksServices {
public async get(id: string) {
const {store} = this;
const result = await store.get(id);
if (!result) throw RpcError.fromCode(RpcErrorCodes.NOT_FOUND);
if (!result) throw RpcError.notFound();
return result;
}

public async view(id: string) {
const {store} = this;
const result = await store.get(id);
if (!result) throw RpcError.fromCode(RpcErrorCodes.NOT_FOUND);
if (!result) throw RpcError.notFound();
const model = Model.load(result.block.snapshot.blob);
return model.view();
}
Expand All @@ -141,7 +141,7 @@ export class BlocksServices {
) {
const {store} = this;
if (typeof offset !== 'number') offset = await store.seq(id);
if (typeof offset !== 'number') throw RpcError.fromCode(RpcErrorCodes.NOT_FOUND);
if (typeof offset !== 'number') throw RpcError.notFound();
let min = 0,
max = 0;
if (!limit || Math.round(limit) !== limit) throw RpcError.badRequest('INVALID_LIMIT');
Expand Down
2 changes: 1 addition & 1 deletion src/__tests__/json-crdt-server/block.ts
Original file line number Diff line number Diff line change
Expand Up @@ -690,7 +690,7 @@ export const runBlockTests = (_setup: ApiTestSetup, params: {staticOnly?: true}
const {call, stop} = await setup();
const id = getId();
const get1 = await of(call('block.get', {id}));
expect(get1[1]).toMatchObject({message: 'NOT_FOUND'});
expect(get1[1]).toMatchObject({code: 'NOT_FOUND'});
const result = await call('block.pull', {
id,
seq: -1,
Expand Down
2 changes: 1 addition & 1 deletion src/common/codec/compact/CompactRpcMessageCodec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import type * as types from './types';
import type {TlvBinaryJsonEncoder} from '@jsonjoy.com/json-pack';

const fromJson = (arr: unknown | unknown[] | types.CompactMessage): msg.ReactiveRpcMessage => {
if (!(arr instanceof Array)) throw RpcError.fromCode(RpcErrorCodes.BAD_REQUEST);
if (!(arr instanceof Array)) throw RpcError.badRequest();
const type = arr[0];
switch (type) {
case CompactMessageType.RequestComplete: {
Expand Down
2 changes: 1 addition & 1 deletion src/common/rpc/RpcMessageBatchProcessor.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import * as msg from '../messages';
import type {RpcErrorValue} from './caller';
import {validateId, validateMethod} from './validation';
import {TypedRpcError} from './caller/error/typed';
import type {RpcCaller} from './caller/RpcCaller';
import type {RpcErrorValue} from './caller/error/types';

export type IncomingBatchMessage =
| msg.RequestDataMessage
Expand Down
20 changes: 10 additions & 10 deletions src/common/rpc/RpcMessageStreamProcessor.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import * as msg from '../messages';
import {TimedQueue} from '../util/TimedQueue';
import {RpcErrorCodes, RpcError} from './caller/error/RpcError';
import type {RpcValue} from '../messages/Value';
import {subscribeCompleteObserver} from '../util/subscribeCompleteObserver';
import {TypedRpcError} from './caller/error/typed';
import type {RpcValue} from '../messages/Value';
import type {RpcCaller} from './caller/RpcCaller';
import type {Call, RpcApiMap} from './caller/types';
import {TypedRpcError} from './caller/error/typed';

type Send = (messages: (msg.ReactiveRpcServerMessage | msg.NotificationMessage)[]) => void;

Expand Down Expand Up @@ -182,12 +182,12 @@ export class RpcMessageStreamProcessor<Ctx = unknown> {
let call = this.activeStreamCalls.get(id);
if (!call) {
if (!method) {
this.sendError(id, RpcErrorCodes.NO_METHOD_SPECIFIED);
this.sendError(id, RpcErrorCodes.METHOD_INV);
return;
}
const info = this.caller.info(method);
if (!info) {
this.sendError(id, RpcErrorCodes.METHOD_NOT_FOUND);
this.sendError(id, RpcErrorCodes.METHOD_UNK);
return;
}
if (info.isStreaming) {
Expand Down Expand Up @@ -216,12 +216,12 @@ export class RpcMessageStreamProcessor<Ctx = unknown> {
return;
}
if (!method) {
this.sendError(id, RpcErrorCodes.NO_METHOD_SPECIFIED);
this.sendError(id, RpcErrorCodes.METHOD_INV);
return;
}
const caller = this.caller;
if (!caller.exists(method)) {
this.sendError(id, RpcErrorCodes.METHOD_NOT_FOUND);
this.sendError(id, RpcErrorCodes.METHOD_UNK);
return;
}
const {isStreaming} = caller.info(method);
Expand All @@ -245,16 +245,16 @@ export class RpcMessageStreamProcessor<Ctx = unknown> {
return;
}
if (!method) {
this.sendError(id, RpcErrorCodes.NO_METHOD_SPECIFIED);
this.sendError(id, RpcErrorCodes.METHOD_INV);
return;
}
if (!this.caller.exists(method)) {
this.sendError(id, RpcErrorCodes.METHOD_NOT_FOUND);
this.sendError(id, RpcErrorCodes.METHOD_UNK);
return;
}
const {isStreaming} = this.caller.info(method);
if (!isStreaming) {
void this.sendError(id, RpcErrorCodes.INVALID_METHOD);
void this.sendError(id, RpcErrorCodes.METHOD_UNK);
return;
}
const streamCall = this.createStreamCall(id, method, ctx);
Expand All @@ -272,7 +272,7 @@ export class RpcMessageStreamProcessor<Ctx = unknown> {

public onNotificationMessage(message: msg.NotificationMessage, ctx: Ctx): void {
const {method, value} = message;
if (!method || method.length > 128) throw RpcError.fromCode(RpcErrorCodes.INVALID_METHOD);
if (!method || method.length > 128) throw RpcError.fromErrno(RpcErrorCodes.METHOD_INV);
const request = value && typeof value === 'object' ? value?.data : undefined;
this.caller.notification(method, request, ctx).catch(() => {});
}
Expand Down
10 changes: 5 additions & 5 deletions src/common/rpc/__tests__/RpcMessageStreamProcessor.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -730,7 +730,7 @@ describe('pre-call checks', () => {
expect(send).toHaveBeenCalledTimes(1);
const errorValue = send.mock.calls[0][0][0];
expect(errorValue).toBeInstanceOf(ResponseErrorMessage);
expect(errorValue.value.data.message).toBe('BUFFER_OVERFLOW');
expect(errorValue.value.data.message).toBe('OVERFLOW');
});

test('buffer size can be set to 5 for the whole server', async () => {
Expand Down Expand Up @@ -766,7 +766,7 @@ describe('pre-call checks', () => {
await new Promise((r) => setTimeout(r, 1));
expect(send).toHaveBeenCalledTimes(1);
expect(send.mock.calls[0][0][0]).toBeInstanceOf(ResponseErrorMessage);
expect(send.mock.calls[0][0][0].value.data.message).toBe('BUFFER_OVERFLOW');
expect(send.mock.calls[0][0][0].value.data.message).toBe('OVERFLOW');
});

test('buffer size can be set to 5 per method', async () => {
Expand Down Expand Up @@ -802,7 +802,7 @@ describe('pre-call checks', () => {
await new Promise((r) => setTimeout(r, 1));
expect(send).toHaveBeenCalledTimes(1);
expect(send.mock.calls[0][0][0]).toBeInstanceOf(ResponseErrorMessage);
expect(send.mock.calls[0][0][0].value.data.message).toBe('BUFFER_OVERFLOW');
expect(send.mock.calls[0][0][0].value.data.message).toBe('OVERFLOW');
});

test('when pre-call checks finish just before buffer is full, can receive more request data', async () => {
Expand Down Expand Up @@ -929,9 +929,9 @@ describe('buffering', () => {
await new Promise((r) => setTimeout(r, 10));
expect(send).toHaveBeenCalledTimes(1);
expect(send.mock.calls[0][0][0]).toBeInstanceOf(ResponseErrorMessage);
expect(send.mock.calls[0][0][0].value.data.message).toBe('METHOD_NOT_FOUND');
expect(send.mock.calls[0][0][0].value.data.message).toBe('METHOD_UNK');
expect(send.mock.calls[0][0][1]).toBeInstanceOf(ResponseErrorMessage);
expect(send.mock.calls[0][0][1].value.data.message).toBe('METHOD_NOT_FOUND');
expect(send.mock.calls[0][0][1].value.data.message).toBe('METHOD_UNK');
expect(send.mock.calls[0][0][0]).toBeInstanceOf(ResponseErrorMessage);
expect(send.mock.calls[0][0][1]).toBeInstanceOf(ResponseErrorMessage);
});
Expand Down
5 changes: 3 additions & 2 deletions src/common/rpc/caller/RpcCaller.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
import {firstValueFrom, from, type Observable, Subject} from 'rxjs';
import {catchError, finalize, first, map, mergeWith, share, switchMap, take, takeUntil, tap} from 'rxjs/operators';
import {RpcError, RpcErrorCodes, type RpcErrorValue} from './error/RpcError';
import {RpcError, RpcErrorCodes} from 'rpc-error';
import {TypedRpcError} from './error/typed';
import {RpcValue} from '../../messages/Value';
import {StaticRpcMethod} from '../methods/StaticRpcMethod';
import {BufferSubject} from '../../../util/rx/BufferSubject';
import type {Call} from './types';
import type {RpcMethod} from '../types';
import type {StreamingRpcMethod} from '../methods/StreamingRpcMethod';
import type {RpcErrorValue} from './error/types';

export interface RpcApiCallerOptions<Ctx = unknown> {
getMethod: (name: string) => undefined | StaticRpcMethod<Ctx> | StreamingRpcMethod<Ctx>;
Expand Down Expand Up @@ -51,7 +52,7 @@ export class RpcCaller<Ctx = unknown> {

public getMethodStrict(name: string): StaticRpcMethod<Ctx> | StreamingRpcMethod<Ctx> {
const method = this.getMethod(name);
if (!method) throw TypedRpcError.valueFromCode(RpcErrorCodes.METHOD_NOT_FOUND);
if (!method) throw TypedRpcError.valueFromCode(RpcErrorCodes.METHOD_UNK);
return method;
}

Expand Down
4 changes: 2 additions & 2 deletions src/common/rpc/caller/TypedApiCaller.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import {RpcErrorCodes} from './error/RpcError';
import {RpcErrorCodes} from 'rpc-error';
import {TypedRpcError} from './error/typed';
import {RpcCaller, type RpcApiCallerOptions} from './RpcCaller';
import {FunctionStreamingType, FunctionType} from '@jsonjoy.com/json-type/lib/type/classes';
Expand Down Expand Up @@ -85,7 +85,7 @@ export class TypedApiCaller<Types extends TypeMap, Ctx = unknown> extends RpcCal

public get<K extends keyof Types>(id: K): MethodDefinition<Ctx, Types[K]> {
const method = this.methods.get(id as string) as any;
if (!method) throw TypedRpcError.valueFromCode(RpcErrorCodes.METHOD_NOT_FOUND);
if (!method) throw TypedRpcError.valueFromCode(RpcErrorCodes.METHOD_UNK);
return method;
}
}
94 changes: 2 additions & 92 deletions src/common/rpc/caller/error/RpcError.ts
Original file line number Diff line number Diff line change
@@ -1,93 +1,3 @@
import type {RpcValue} from '../../../messages/Value';
import type {IRpcError} from './RpcErrorType';
import {RpcError, RpcErrorCodes, IRpcError} from 'rpc-error';

export enum RpcErrorCodes {
/** Any unknown sever error is wrapped into INTERNAL_ERROR, error 500. */
INTERNAL_ERROR = 0,

/** When request is not valid, e.g. when request validation fails, error 400. */
BAD_REQUEST = 1,

/**
* Error thrown when there was no activity on a
* stream for a long time, and timeout was reached.
*/
TIMEOUT = 2,

/** Resource not found, error 404. */
NOT_FOUND = 3,

/** When operation cannot be performed due to a conflict, error 409. */
CONFLICT = 4,

ID_TAKEN = 5,
INVALID_METHOD = 6,
INVALID_METHOD_NAME = 7,
NO_METHOD_SPECIFIED = 8,
METHOD_NOT_FOUND = 9,

STOP = 10,
DISCONNECT = 11,
BUFFER_OVERFLOW = 12,
}

export type RpcErrorValue = RpcValue<RpcError>;

export class RpcError extends Error implements IRpcError {
public static from(error: unknown): RpcError {
if (error instanceof RpcError) return error;
return RpcError.internal(error);
}

public static fromCode(
errno: RpcErrorCodes,
message = '',
meta: unknown = undefined,
originalError: unknown = undefined,
): RpcError {
const code = RpcErrorCodes[errno];
return new RpcError(message || code, code, errno, undefined, meta || undefined, originalError);
}

public static internal(originalError: unknown, message = 'Internal Server Error'): RpcError {
return RpcError.fromCode(RpcErrorCodes.INTERNAL_ERROR, message, undefined, originalError);
}

public static badRequest(message = 'Bad Request'): RpcError {
return RpcError.fromCode(RpcErrorCodes.BAD_REQUEST, message);
}

public static notFound(message = 'Not Found'): RpcError {
return RpcError.fromCode(RpcErrorCodes.NOT_FOUND, message);
}

public static validation(message: string, meta?: unknown): RpcError {
return RpcError.fromCode(RpcErrorCodes.BAD_REQUEST, message, meta);
}

public static isRpcError(error: unknown): error is RpcError {
return error instanceof RpcError;
}

constructor(
public readonly message: string,
public readonly code: string | undefined,
public readonly errno: number,
public readonly errorId: string | undefined,
public readonly meta: unknown | undefined,
public readonly originalError: unknown | undefined,
) {
super(message);
if (message === code) this.code = undefined;
Object.setPrototypeOf(this, RpcError.prototype);
}

public toJson(): IRpcError {
const err: IRpcError = {message: this.message};
if (this.code) err.code = this.code;
if (this.errno) err.errno = this.errno;
if (this.errorId) err.errorId = this.errorId;
if (this.meta) err.meta = this.meta;
return err;
}
}
export {RpcError, RpcErrorCodes, IRpcError};
5 changes: 3 additions & 2 deletions src/common/rpc/caller/error/typed.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import {RpcError, type RpcErrorCodes, type RpcErrorValue} from './RpcError';
import {RpcError, type RpcErrorCodes} from 'rpc-error';
import {RpcValue} from '../../../messages/Value';
import {RpcErrorType} from './RpcErrorType';
import type {RpcErrorValue} from './types';

/**
* @protected
Expand All @@ -21,7 +22,7 @@ export class TypedRpcError {
}

public static valueFromCode(errno: RpcErrorCodes, message?: string): RpcErrorValue {
return TypedRpcError.value(RpcError.fromCode(errno, message));
return TypedRpcError.value(RpcError.fromErrno(errno, message));
}

public static internalErrorValue(originalError: unknown): RpcErrorValue {
Expand Down
4 changes: 4 additions & 0 deletions src/common/rpc/caller/error/types.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
import type {RpcError} from 'rpc-error';
import type {RpcValue} from '../../../messages/Value';

export type RpcErrorValue = RpcValue<RpcError>;
2 changes: 1 addition & 1 deletion src/server/uws/RpcApp.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import type {RpcCaller} from '../../common/rpc/caller/RpcCaller';

const HDR_BAD_REQUEST = Buffer.from('400 Bad Request', 'utf8');
const HDR_NOT_FOUND = Buffer.from('404 Not Found', 'utf8');
const ERR_NOT_FOUND = RpcError.fromCode(RpcErrorCodes.NOT_FOUND, 'Not Found');
const ERR_NOT_FOUND = RpcError.fromCode('NOT_FOUND', 'Not Found');

const noop = () => {};

Expand Down
Loading

0 comments on commit 6dee6bb

Please sign in to comment.