Skip to content

Commit

Permalink
feat: 🎸 make use of the rpc-error package
Browse files Browse the repository at this point in the history
  • Loading branch information
streamich committed Nov 1, 2024
1 parent 3af3de2 commit 4749546
Show file tree
Hide file tree
Showing 13 changed files with 40 additions and 118 deletions.
7 changes: 4 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -79,15 +79,15 @@
"@jsonjoy.com/json-pack": "^1.1.0",
"@jsonjoy.com/util": "^1.4.0",
"abstract-level": "^2.0.0",
"classic-level": "^1.4.1",
"fs-zoo": "^1.1.0",
"memory-level": "^1.0.0",
"rpc-error": "^1.1.0",
"rx-use": "^1.8.1",
"sonic-forest": "^1.0.3",
"thingies": "^2.4.0",
"classic-level": "^1.4.1"
"thingies": "^2.4.0"
},
"devDependencies": {
"json-joy": "^17.0.0",
"@biomejs/biome": "^1.9.3",
"@types/benchmark": "^2.1.5",
"@types/jest": "^29.5.12",
Expand All @@ -96,6 +96,7 @@
"@types/ws": "^8.5.10",
"benchmark": "^2.1.4",
"jest": "^29.7.0",
"json-joy": "^17.0.0",
"rimraf": "^5.0.5",
"rxjs": "^7.8.1",
"ts-jest": "^29.1.2",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,14 +111,14 @@ export class BlocksServices {
public async get(id: string) {
const {store} = this;
const result = await store.get(id);
if (!result) throw RpcError.fromCode(RpcErrorCodes.NOT_FOUND);
if (!result) throw RpcError.notFound();
return result;
}

public async view(id: string) {
const {store} = this;
const result = await store.get(id);
if (!result) throw RpcError.fromCode(RpcErrorCodes.NOT_FOUND);
if (!result) throw RpcError.notFound();
const model = Model.load(result.block.snapshot.blob);
return model.view();
}
Expand All @@ -141,7 +141,7 @@ export class BlocksServices {
) {
const {store} = this;
if (typeof offset !== 'number') offset = await store.seq(id);
if (typeof offset !== 'number') throw RpcError.fromCode(RpcErrorCodes.NOT_FOUND);
if (typeof offset !== 'number') throw RpcError.notFound();
let min = 0,
max = 0;
if (!limit || Math.round(limit) !== limit) throw RpcError.badRequest('INVALID_LIMIT');
Expand Down
2 changes: 1 addition & 1 deletion src/common/codec/compact/CompactRpcMessageCodec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import type * as types from './types';
import type {TlvBinaryJsonEncoder} from '@jsonjoy.com/json-pack';

const fromJson = (arr: unknown | unknown[] | types.CompactMessage): msg.ReactiveRpcMessage => {
if (!(arr instanceof Array)) throw RpcError.fromCode(RpcErrorCodes.BAD_REQUEST);
if (!(arr instanceof Array)) throw RpcError.badRequest();
const type = arr[0];
switch (type) {
case CompactMessageType.RequestComplete: {
Expand Down
2 changes: 1 addition & 1 deletion src/common/rpc/RpcMessageBatchProcessor.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import * as msg from '../messages';
import type {RpcErrorValue} from './caller';
import {validateId, validateMethod} from './validation';
import {TypedRpcError} from './caller/error/typed';
import type {RpcCaller} from './caller/RpcCaller';
import type {RpcErrorValue} from './caller/error/types';

export type IncomingBatchMessage =
| msg.RequestDataMessage
Expand Down
20 changes: 10 additions & 10 deletions src/common/rpc/RpcMessageStreamProcessor.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import * as msg from '../messages';
import {TimedQueue} from '../util/TimedQueue';
import {RpcErrorCodes, RpcError} from './caller/error/RpcError';
import type {RpcValue} from '../messages/Value';
import {subscribeCompleteObserver} from '../util/subscribeCompleteObserver';
import {TypedRpcError} from './caller/error/typed';
import type {RpcValue} from '../messages/Value';
import type {RpcCaller} from './caller/RpcCaller';
import type {Call, RpcApiMap} from './caller/types';
import {TypedRpcError} from './caller/error/typed';

type Send = (messages: (msg.ReactiveRpcServerMessage | msg.NotificationMessage)[]) => void;

Expand Down Expand Up @@ -182,12 +182,12 @@ export class RpcMessageStreamProcessor<Ctx = unknown> {
let call = this.activeStreamCalls.get(id);
if (!call) {
if (!method) {
this.sendError(id, RpcErrorCodes.NO_METHOD_SPECIFIED);
this.sendError(id, RpcErrorCodes.METHOD_INV);
return;
}
const info = this.caller.info(method);
if (!info) {
this.sendError(id, RpcErrorCodes.METHOD_NOT_FOUND);
this.sendError(id, RpcErrorCodes.METHOD_UNK);
return;
}
if (info.isStreaming) {
Expand Down Expand Up @@ -216,12 +216,12 @@ export class RpcMessageStreamProcessor<Ctx = unknown> {
return;
}
if (!method) {
this.sendError(id, RpcErrorCodes.NO_METHOD_SPECIFIED);
this.sendError(id, RpcErrorCodes.METHOD_INV);
return;
}
const caller = this.caller;
if (!caller.exists(method)) {
this.sendError(id, RpcErrorCodes.METHOD_NOT_FOUND);
this.sendError(id, RpcErrorCodes.METHOD_UNK);
return;
}
const {isStreaming} = caller.info(method);
Expand All @@ -245,16 +245,16 @@ export class RpcMessageStreamProcessor<Ctx = unknown> {
return;
}
if (!method) {
this.sendError(id, RpcErrorCodes.NO_METHOD_SPECIFIED);
this.sendError(id, RpcErrorCodes.METHOD_INV);
return;
}
if (!this.caller.exists(method)) {
this.sendError(id, RpcErrorCodes.METHOD_NOT_FOUND);
this.sendError(id, RpcErrorCodes.METHOD_UNK);
return;
}
const {isStreaming} = this.caller.info(method);
if (!isStreaming) {
void this.sendError(id, RpcErrorCodes.INVALID_METHOD);
void this.sendError(id, RpcErrorCodes.METHOD_UNK);
return;
}
const streamCall = this.createStreamCall(id, method, ctx);
Expand All @@ -272,7 +272,7 @@ export class RpcMessageStreamProcessor<Ctx = unknown> {

public onNotificationMessage(message: msg.NotificationMessage, ctx: Ctx): void {
const {method, value} = message;
if (!method || method.length > 128) throw RpcError.fromCode(RpcErrorCodes.INVALID_METHOD);
if (!method || method.length > 128) throw RpcError.fromErrno(RpcErrorCodes.METHOD_INV);
const request = value && typeof value === 'object' ? value?.data : undefined;
this.caller.notification(method, request, ctx).catch(() => {});
}
Expand Down
5 changes: 3 additions & 2 deletions src/common/rpc/caller/RpcCaller.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
import {firstValueFrom, from, type Observable, Subject} from 'rxjs';
import {catchError, finalize, first, map, mergeWith, share, switchMap, take, takeUntil, tap} from 'rxjs/operators';
import {RpcError, RpcErrorCodes, type RpcErrorValue} from './error/RpcError';
import {RpcError, RpcErrorCodes} from 'rpc-error';
import {TypedRpcError} from './error/typed';
import {RpcValue} from '../../messages/Value';
import {StaticRpcMethod} from '../methods/StaticRpcMethod';
import {BufferSubject} from '../../../util/rx/BufferSubject';
import type {Call} from './types';
import type {RpcMethod} from '../types';
import type {StreamingRpcMethod} from '../methods/StreamingRpcMethod';
import type {RpcErrorValue} from './error/types';

export interface RpcApiCallerOptions<Ctx = unknown> {
getMethod: (name: string) => undefined | StaticRpcMethod<Ctx> | StreamingRpcMethod<Ctx>;
Expand Down Expand Up @@ -51,7 +52,7 @@ export class RpcCaller<Ctx = unknown> {

public getMethodStrict(name: string): StaticRpcMethod<Ctx> | StreamingRpcMethod<Ctx> {
const method = this.getMethod(name);
if (!method) throw TypedRpcError.valueFromCode(RpcErrorCodes.METHOD_NOT_FOUND);
if (!method) throw TypedRpcError.valueFromCode(RpcErrorCodes.METHOD_UNK);
return method;
}

Expand Down
4 changes: 2 additions & 2 deletions src/common/rpc/caller/TypedApiCaller.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import {RpcErrorCodes} from './error/RpcError';
import {RpcErrorCodes} from 'rpc-error';
import {TypedRpcError} from './error/typed';
import {RpcCaller, type RpcApiCallerOptions} from './RpcCaller';
import {FunctionStreamingType, FunctionType} from '@jsonjoy.com/json-type/lib/type/classes';
Expand Down Expand Up @@ -85,7 +85,7 @@ export class TypedApiCaller<Types extends TypeMap, Ctx = unknown> extends RpcCal

public get<K extends keyof Types>(id: K): MethodDefinition<Ctx, Types[K]> {
const method = this.methods.get(id as string) as any;
if (!method) throw TypedRpcError.valueFromCode(RpcErrorCodes.METHOD_NOT_FOUND);
if (!method) throw TypedRpcError.valueFromCode(RpcErrorCodes.METHOD_UNK);
return method;
}
}
94 changes: 2 additions & 92 deletions src/common/rpc/caller/error/RpcError.ts
Original file line number Diff line number Diff line change
@@ -1,93 +1,3 @@
import type {RpcValue} from '../../../messages/Value';
import type {IRpcError} from './RpcErrorType';
import {RpcError, RpcErrorCodes, IRpcError} from 'rpc-error';

export enum RpcErrorCodes {
/** Any unknown sever error is wrapped into INTERNAL_ERROR, error 500. */
INTERNAL_ERROR = 0,

/** When request is not valid, e.g. when request validation fails, error 400. */
BAD_REQUEST = 1,

/**
* Error thrown when there was no activity on a
* stream for a long time, and timeout was reached.
*/
TIMEOUT = 2,

/** Resource not found, error 404. */
NOT_FOUND = 3,

/** When operation cannot be performed due to a conflict, error 409. */
CONFLICT = 4,

ID_TAKEN = 5,
INVALID_METHOD = 6,
INVALID_METHOD_NAME = 7,
NO_METHOD_SPECIFIED = 8,
METHOD_NOT_FOUND = 9,

STOP = 10,
DISCONNECT = 11,
BUFFER_OVERFLOW = 12,
}

export type RpcErrorValue = RpcValue<RpcError>;

export class RpcError extends Error implements IRpcError {
public static from(error: unknown): RpcError {
if (error instanceof RpcError) return error;
return RpcError.internal(error);
}

public static fromCode(
errno: RpcErrorCodes,
message = '',
meta: unknown = undefined,
originalError: unknown = undefined,
): RpcError {
const code = RpcErrorCodes[errno];
return new RpcError(message || code, code, errno, undefined, meta || undefined, originalError);
}

public static internal(originalError: unknown, message = 'Internal Server Error'): RpcError {
return RpcError.fromCode(RpcErrorCodes.INTERNAL_ERROR, message, undefined, originalError);
}

public static badRequest(message = 'Bad Request'): RpcError {
return RpcError.fromCode(RpcErrorCodes.BAD_REQUEST, message);
}

public static notFound(message = 'Not Found'): RpcError {
return RpcError.fromCode(RpcErrorCodes.NOT_FOUND, message);
}

public static validation(message: string, meta?: unknown): RpcError {
return RpcError.fromCode(RpcErrorCodes.BAD_REQUEST, message, meta);
}

public static isRpcError(error: unknown): error is RpcError {
return error instanceof RpcError;
}

constructor(
public readonly message: string,
public readonly code: string | undefined,
public readonly errno: number,
public readonly errorId: string | undefined,
public readonly meta: unknown | undefined,
public readonly originalError: unknown | undefined,
) {
super(message);
if (message === code) this.code = undefined;
Object.setPrototypeOf(this, RpcError.prototype);
}

public toJson(): IRpcError {
const err: IRpcError = {message: this.message};
if (this.code) err.code = this.code;
if (this.errno) err.errno = this.errno;
if (this.errorId) err.errorId = this.errorId;
if (this.meta) err.meta = this.meta;
return err;
}
}
export {RpcError, RpcErrorCodes, IRpcError};
5 changes: 3 additions & 2 deletions src/common/rpc/caller/error/typed.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import {RpcError, type RpcErrorCodes, type RpcErrorValue} from './RpcError';
import {RpcError, type RpcErrorCodes} from 'rpc-error';
import {RpcValue} from '../../../messages/Value';
import {RpcErrorType} from './RpcErrorType';
import type {RpcErrorValue} from './types';

/**
* @protected
Expand All @@ -21,7 +22,7 @@ export class TypedRpcError {
}

public static valueFromCode(errno: RpcErrorCodes, message?: string): RpcErrorValue {
return TypedRpcError.value(RpcError.fromCode(errno, message));
return TypedRpcError.value(RpcError.fromErrno(errno, message));
}

public static internalErrorValue(originalError: unknown): RpcErrorValue {
Expand Down
4 changes: 4 additions & 0 deletions src/common/rpc/caller/error/types.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
import type {RpcError} from 'rpc-error';
import type {RpcValue} from '../../../messages/Value';

export type RpcErrorValue = RpcValue<RpcError>;
2 changes: 1 addition & 1 deletion src/server/uws/RpcApp.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import type {RpcCaller} from '../../common/rpc/caller/RpcCaller';

const HDR_BAD_REQUEST = Buffer.from('400 Bad Request', 'utf8');
const HDR_NOT_FOUND = Buffer.from('404 Not Found', 'utf8');
const ERR_NOT_FOUND = RpcError.fromCode(RpcErrorCodes.NOT_FOUND, 'Not Found');
const ERR_NOT_FOUND = RpcError.fromCode('NOT_FOUND', 'Not Found');

const noop = () => {};

Expand Down
2 changes: 1 addition & 1 deletion src/util/rx/BufferSubject.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ export class BufferSubject<T> extends Subject<T> {
public next(value: T): void {
if (this.isBuffering) {
if (this.buffer.length >= this.bufferSize) {
this.error(RpcError.fromCode(RpcErrorCodes.BUFFER_OVERFLOW));
this.error(RpcError.fromErrno(RpcErrorCodes.OVERFLOW));
return;
}
this.buffer.push(value);
Expand Down
5 changes: 5 additions & 0 deletions yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -2829,6 +2829,11 @@ rimraf@^5.0.5:
dependencies:
glob "^10.3.7"

rpc-error@^1.1.0:
version "1.1.0"
resolved "https://registry.yarnpkg.com/rpc-error/-/rpc-error-1.1.0.tgz#ce38ef99f416b562da7008cb04d88d34dbacaeff"
integrity sha512-RNrbrCi0EOw9kNRqrCVBu7oEvekKt35tzW333mEzZvr4i+WZ/lp4c34l2FtIwmfrqdP+lkubZTdI9aVqN5tTLQ==

rx-use@^1.8.1:
version "1.8.1"
resolved "https://registry.yarnpkg.com/rx-use/-/rx-use-1.8.1.tgz#d6dc272eed2197fe785b4b678a7220d856cff1d3"
Expand Down

0 comments on commit 4749546

Please sign in to comment.