Skip to content

Commit

Permalink
fix: EZSP: Fix socket error handling & serial/socket reset/close logic (
Browse files Browse the repository at this point in the history
#886)

* Fix socket error handling & serial/socket reset logic.

* Reworked reset/close logic.

* Remove open error listener when socket ready.
  • Loading branch information
Nerivec authored Jan 24, 2024
1 parent a9af9e7 commit 43b02ee
Show file tree
Hide file tree
Showing 4 changed files with 95 additions and 76 deletions.
18 changes: 13 additions & 5 deletions src/adapter/ezsp/adapter/ezspAdapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ class EZSPAdapter extends Adapter {
private interpanLock: boolean;
private backupMan: EZSPAdapterBackup;
private queue: Queue;
private closing: boolean;


public constructor(networkOptions: NetworkOptions,
Expand All @@ -50,12 +51,14 @@ class EZSPAdapter extends Adapter {
this.waitressValidator, this.waitressTimeoutFormatter
);
this.interpanLock = false;
this.closing = false;

const concurrent = adapterOptions && adapterOptions.concurrent ? adapterOptions.concurrent : 8;
debug(`Adapter concurrent: ${concurrent}`);
this.queue = new Queue(concurrent);

this.driver = new Driver(this.serialPortOptions, this.networkOptions, this.greenPowerGroup);
this.driver.on('close', this.onDriverClose.bind(this));
this.driver.on('deviceJoined', this.handleDeviceJoin.bind(this));
this.driver.on('deviceLeft', this.handleDeviceLeft.bind(this));
this.driver.on('incomingMessage', this.processMessage.bind(this));
Expand Down Expand Up @@ -175,17 +178,22 @@ class EZSPAdapter extends Adapter {
* Adapter methods
*/
public async start(): Promise<StartResult> {
try {
return await this.driver.startup();
} catch {
return await this.driver.reset();
}
return this.driver.startup();
}

public async stop(): Promise<void> {
this.closing = true;
await this.driver.stop();
}

public async onDriverClose(): Promise<void> {
debug(`onDriverClose()`);

if (!this.closing) {
this.emit(Events.Events.disconnected);
}
}

public static async isValidPath(path: string): Promise<boolean> {
// For TCP paths we cannot get device information, therefore we cannot validate it.
if (SocketPortUtils.isTcpPath(path)) {
Expand Down
70 changes: 36 additions & 34 deletions src/adapter/ezsp/driver/driver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -103,42 +103,55 @@ export class Driver extends EventEmitter {
this.waitressValidator, this.waitressTimeoutFormatter);
}

public async reset(): Promise<TsType.StartResult> {
let attempts = 0;
const pauses = [10, 30, 60];
let pause = 0;

// infinite retries XXX: might want to hard fail after a while..?
while (true) {
debug.log(`Reset connection. Try ${attempts}`);
try {
await this.stop();
await Wait(1000);
return await this.startup();
} catch (e) {
debug.error(`Reset error ${e.stack}`);
attempts += 1;
/**
* Requested by the EZSP watchdog after too many failures, or by UART layer after port closed unexpectedly.
* Tries to stop the layers below and startup again.
* @returns
*/
public async reset(): Promise<void> {
debug.log(`Reset connection.`);

if (pauses.length) {
pause = pauses.shift();
}
try {
// don't emit 'close' on stop since we don't want this to bubble back up as 'disconnected' to the controller.
await this.stop(false);
await Wait(1000);
await this.startup();
} catch (err) {
debug.error(`Reset error ${err.stack}`);

debug.log(`Pause ${pause}sec before try ${attempts}`);
await Wait(pause*1000);
try {
// here we let emit
await this.stop();
} catch (stopErr) {
debug.error(`Failed to stop after failed reset ${stopErr.stack}`);
}
}
}

private async onReset(): Promise<void> {
private async onEzspReset(): Promise<void> {
debug.log('onEzspReset()');
await this.reset();
}

private onEzspClose(): void {
debug.log('onEzspClose()');
this.emit('close');
}

public async stop(emitClose: boolean = true): Promise<void> {
debug.log('Stopping driver');

if (this.ezsp) {
return this.ezsp.close(emitClose);
}
}

public async startup(): Promise<TsType.StartResult> {
let result: TsType.StartResult = 'resumed';
this.transactionID = 1;
this.ezsp = undefined;
this.ezsp = new Ezsp();
this.ezsp.on('close', this.onClose.bind(this));
this.ezsp.on('close', this.onEzspClose.bind(this));

try {
await this.ezsp.connect(this.serialOpt);
Expand All @@ -148,7 +161,7 @@ export class Driver extends EventEmitter {
throw error;
}

this.ezsp.on('reset', this.onReset.bind(this));
this.ezsp.on('reset', this.onEzspReset.bind(this));

await this.ezsp.version();
await this.ezsp.updateConfig();
Expand Down Expand Up @@ -664,17 +677,6 @@ export class Driver extends EventEmitter {
}
}

private onClose(): void {
debug.log('Close driver');
}

public async stop(): Promise<void> {
if (this.ezsp) {
debug.log('Stop driver');
return this.ezsp.close();
}
}

public async networkIdToEUI64(nwk: number): Promise<EmberEUI64> {
for (const [eUI64, value] of this.eui64ToNodeId) {
if (value === nwk) return new EmberEUI64(eUI64);
Expand Down
33 changes: 16 additions & 17 deletions src/adapter/ezsp/driver/ezsp.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ const debug = {


const MAX_SERIAL_CONNECT_ATTEMPTS = 4;
/** In ms. This is multiplied by tries count (above), e.g. 4 tries = 5000, 10000, 15000 */
const SERIAL_CONNECT_NEW_ATTEMPT_MIN_DELAY = 5000;
const MTOR_MIN_INTERVAL = 10;
const MTOR_MAX_INTERVAL = 90;
const MTOR_ROUTE_ERROR_THRESHOLD = 4;
Expand Down Expand Up @@ -336,7 +338,7 @@ export class Ezsp extends EventEmitter {

this.serialDriver = new SerialDriver();
this.serialDriver.on('received', this.onFrameReceived.bind(this));
this.serialDriver.on('close', this.onClose.bind(this));
this.serialDriver.on('close', this.onSerialClose.bind(this));
}

public async connect(options: SerialPortOptions): Promise<void> {
Expand All @@ -350,7 +352,7 @@ export class Ezsp extends EventEmitter {
debug.error(`Connection attempt ${i} error: ${error.stack}`);

if (i < MAX_SERIAL_CONNECT_ATTEMPTS) {
await Wait(5000);
await Wait(SERIAL_CONNECT_NEW_ATTEMPT_MIN_DELAY * i);
debug.log(`Next attempt ${i+1}`);
}

Expand All @@ -362,7 +364,7 @@ export class Ezsp extends EventEmitter {
throw new Error("Failure to connect", {cause: lastError});
}

this.serialDriver.on('reset', this.onReset.bind(this));
this.serialDriver.on('reset', this.onSerialReset.bind(this));

if (WATCHDOG_WAKE_PERIOD) {
this.watchdogTimer = setInterval(
Expand All @@ -372,18 +374,22 @@ export class Ezsp extends EventEmitter {
}
}

private onClose(): void {
debug.log('Close ezsp');
private onSerialReset(): void {
debug.log('onSerialReset()');
this.emit('reset');
}

private onSerialClose(): void {
debug.log('onSerialClose()');
this.emit('close');
}

public async close(): Promise<void> {
debug.log('Stop ezsp');
public async close(emitClose: boolean): Promise<void> {
debug.log('Closing Ezsp');

clearTimeout(this.watchdogTimer);
this.queue.clear();
await this.serialDriver.close();
await this.serialDriver.close(emitClose);
}

/**
Expand Down Expand Up @@ -749,16 +755,9 @@ export class Ezsp extends EventEmitter {

if (this.failures > MAX_WATCHDOG_FAILURES) {
this.failures = 0;
this.reset();

this.emit('reset');
}
}
}

private reset(): void {
this.emit('reset');
}

private onReset(): void {
this.reset();
}
}
50 changes: 30 additions & 20 deletions src/adapter/ezsp/driver/uart.ts
Original file line number Diff line number Diff line change
Expand Up @@ -126,29 +126,29 @@ export class SerialDriver extends EventEmitter {
this.parser.on('parsed', this.onParsed.bind(this));

return new Promise((resolve, reject): void => {
const openError = (err: Error): void => {
this.initialized = false;

reject(err);
};

this.socketPort.on('connect', () => {
debug('Socket connected');
});

this.socketPort.on('ready', async (): Promise<void> => {
debug('Socket ready');
this.socketPort.removeListener('error', openError);
this.socketPort.once('close', this.onPortClose.bind(this));
this.socketPort.on('error', this.onPortError.bind(this));

// reset
await this.reset();

this.initialized = true;

resolve();
});

this.socketPort.once('close', this.onPortClose.bind(this));

this.socketPort.on('error', () => {
debug('Socket error');

this.initialized = false;

reject(new Error(`Error while opening socket`));
});
this.socketPort.once('error', openError);

this.socketPort.connect(info.port, info.host);
});
Expand Down Expand Up @@ -318,8 +318,8 @@ export class SerialDriver extends EventEmitter {
});
}

public async close(): Promise<void> {
debug('closing');
public async close(emitClose: boolean): Promise<void> {
debug('Closing UART');
this.queue.clear();

if (this.initialized) {
Expand All @@ -329,7 +329,9 @@ export class SerialDriver extends EventEmitter {
try {
await this.serialPort.asyncFlushAndClose();
} catch (error) {
this.emit('close');
if (emitClose) {
this.emit('close');
}

throw error;
}
Expand All @@ -338,19 +340,27 @@ export class SerialDriver extends EventEmitter {
}
}

this.emit('close');
if (emitClose) {
this.emit('close');
}
}

private onPortError(error: Error): void {
debug(`Port error: ${error}`);
}

private onPortClose(): void {
debug('Port closed');

this.initialized = false;
private onPortClose(err: boolean | Error): void {
debug(`Port closed. Error? ${err}`);

this.emit('close');
// on error: serialport passes an Error object (in case of disconnect)
// net.Socket passes a boolean (in case of a transmission error)
// try to reset instead of failing immediately
if (err != null && err !== false) {
this.emit('reset');
} else {
this.initialized = false;
this.emit('close');
}
}

public isInitialized(): boolean {
Expand Down

0 comments on commit 43b02ee

Please sign in to comment.