Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

v0.3.22: fix readable stream lock #169

Merged
merged 1 commit into from
Nov 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
"module": "./dist/index.mjs",
"name": "chameleon-ultra.js",
"type": "commonjs",
"version": "0.3.21",
"version": "0.3.22",
"bugs": {
"url": "https://github.com/taichunmin/chameleon-ultra.js/issues"
},
Expand All @@ -30,31 +30,31 @@
"webbluetooth": "^3.2.1"
},
"devDependencies": {
"@tsconfig/node-lts": "^20.1.3",
"@tsconfig/node-lts": "^22.0.0",
"@types/debug": "^4.1.12",
"@types/finalhandler": "^1.2.3",
"@types/html-minifier": "^4.0.5",
"@types/jest": "^29.5.14",
"@types/livereload": "^0.9.5",
"@types/lodash": "^4.17.13",
"@types/node": "^22.8.4",
"@types/node": "^22.8.7",
"@types/pug": "^2.0.10",
"@types/serve-static": "^1.15.7",
"@types/uglify-js": "^3.17.5",
"@types/web-bluetooth": "^0.0.20",
"@typescript-eslint/eslint-plugin": "^7.18.0",
"@typescript-eslint/parser": "^7.18.0",
"chokidar": "^4.0.1",
"concurrently": "^9.0.1",
"concurrently": "^9.1.0",
"dayjs": "^1.11.13",
"dotenv": "^16.4.5",
"esbuild-plugins-node-modules-polyfill": "^1.6.6",
"esbuild-plugins-node-modules-polyfill": "^1.6.7",
"eslint": "^8.57.0",
"eslint-config-love": "^43",
"eslint-config-standard": "^17.1.0",
"eslint-plugin-import": "^2.31.0",
"eslint-plugin-local-rules": "^3.0.2",
"eslint-plugin-n": "^17.11.1",
"eslint-plugin-n": "^17.12.0",
"eslint-plugin-promise": "^7.1.0",
"eslint-plugin-pug": "^1.2.5",
"eslint-plugin-tsdoc": "^0.3.0",
Expand All @@ -74,8 +74,8 @@
"ts-node": "^10.9.2",
"tsup": "^8.3.5",
"tsx": "^4.19.2",
"typedoc": "^0.26.10",
"typedoc-plugin-mdn-links": "^3.3.5",
"typedoc": "^0.26.11",
"typedoc-plugin-mdn-links": "^3.3.6",
"typedoc-plugin-missing-exports": "^3.0.0",
"typedoc-plugin-rename-defaults": "^0.7.1",
"typescript": "^5.6.3",
Expand Down
61 changes: 22 additions & 39 deletions src/ChameleonUltra.ts
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ function toUpperHex (buf: Buffer): string {
export class ChameleonUltra {
#deviceMode: DeviceMode | null = null
#isDisconnecting: boolean = false
#readAsyncGenerator: ReadableToAsyncGenerator<Uint8Array> | null = null
#rxReader: ReadableStreamDefaultReader<Uint8Array> | null = null
#supportedCmds: Set<Cmd> = new Set<Cmd>()
readonly #emitErr: (err: Error) => void
readonly #hooks = new Map<string, ReturnType<typeof middlewareCompose>>()
Expand Down Expand Up @@ -226,9 +226,9 @@ export class ChameleonUltra {
// serial.readable pipeTo this.rxSink
const promiseConnected = new Promise<Date>(resolve => this.emitter.once('connected', resolve))
if (_.isNil(this.port.readable)) throw new Error('this.port.readable is nil')
this.#readAsyncGenerator = new ReadableToAsyncGenerator(this.port.readable)
if (this.isDfu()) void this.#startDfuReadAsyncGenerator()
else void this.#startUltraReadAsyncGenerator()
this.#rxReader = this.port.readable.getReader()
if (this.isDfu()) void this.#dfuStartReading()
else void this.#ultraStartReading()

const connectedAt = await promiseConnected
this.#debug('core', `connected at ${connectedAt.toISOString()}`)
Expand All @@ -241,14 +241,16 @@ export class ChameleonUltra {
}
}

async #startUltraReadAsyncGenerator (): Promise<void> {
const generator = this.#readAsyncGenerator
if (_.isNil(generator)) throw new Error('this.#readAsyncGenerator is nil')
async #ultraStartReading (): Promise<void> {
const reader = this.#rxReader
if (_.isNil(reader)) throw new Error('this.#rxReader is nil')

try {
const bufs: Buffer[] = []
this.emitter.emit('connected', new Date())
for await (const chunk of generator) {
while (true) {
const { done, value: chunk } = await reader.read().catch(err => { throw _.set(new Error(err.message), 'originalError', err) })
if (_.isNil(chunk)) break
bufs.push(Buffer.isBuffer(chunk) ? chunk : Buffer.fromView(chunk))
let concated = Buffer.concat(bufs.splice(0, bufs.length))
try {
Expand All @@ -274,6 +276,7 @@ export class ChameleonUltra {
} finally {
if (concated.length > 0) bufs.push(concated)
}
if (done) break
}
this.emitter.emit('disconnected', new Date())
} catch (err) {
Expand All @@ -282,14 +285,17 @@ export class ChameleonUltra {
}
}

async #startDfuReadAsyncGenerator (): Promise<void> {
const generator = this.#readAsyncGenerator
if (_.isNil(generator)) throw new Error('this.#readAsyncGenerator is nil')
async #dfuStartReading (): Promise<void> {
const reader = this.#rxReader
if (_.isNil(reader)) throw new Error('this.#rxReader is nil')

try {
this.emitter.emit('connected', new Date())
for await (const chunk of generator) {
while (true) {
const { done, value: chunk } = await reader.read().catch(err => { throw _.set(new Error(err.message), 'originalError', err) })
if (_.isNil(chunk)) break
this.emitter.emit('resp', new DfuFrame(Buffer.isBuffer(chunk) ? chunk : Buffer.fromView(chunk)))
if (done) break
}
this.emitter.emit('disconnected', new Date())
} catch (err) {
Expand Down Expand Up @@ -317,13 +323,14 @@ export class ChameleonUltra {
const promiseDisconnected: Promise<[Date, string | undefined]> = this.isConnected() ? new Promise(resolve => {
this.emitter.once('disconnected', (disconnected: Date, reason?: string) => { resolve([disconnected, reason]) })
}) : Promise.resolve([new Date(), err.message])
await this.#readAsyncGenerator?.reader?.cancel(err).catch(this.#emitErr)
await this.#rxReader?.cancel(err).catch(this.#emitErr)
if (this.port?.readable?.locked) this.#rxReader?.releaseLock() // if cancel() not implemented
await this.port?.writable?.close().catch(this.#emitErr)
this.#debug('core', `locked: readable = ${this.port?.readable?.locked ?? '?'}, writable = ${this.port?.writable?.locked ?? '?'}`)
this.port = null

const [disconnectedAt, reason] = await promiseDisconnected
this.#debug('core', `disconnected at ${disconnectedAt.toISOString()}, reason = ${reason ?? '?'}`)
this.#debug('core', `disconnected at ${disconnectedAt.toISOString()}, reason = ${reason ?? err.message}`)
} catch (err) {
throw _.merge(new Error(err.message ?? 'Failed to disconnect'), { originalError: err })
}
Expand Down Expand Up @@ -400,7 +407,7 @@ export class ChameleonUltra {
}): Promise<() => Promise<T>> {
try {
if (!this.isConnected()) await this.connect()
if (_.isNil(this.#readAsyncGenerator)) throw new Error('#readAsyncGenerator is undefined')
if (_.isNil(this.#rxReader)) throw new Error('#rxReader is undefined')
if (_.isNil(args.timeout)) args.timeout = this.readDefaultTimeout
const respGenerator = new EventAsyncGenerator<T>()
this.emitter.on('resp', respGenerator.onData)
Expand Down Expand Up @@ -4264,28 +4271,4 @@ function mfuCheckRespNakCrc16a (resp: Buffer): Buffer {
return data
}

class ReadableToAsyncGenerator<T> implements AsyncGenerator<T> {
readonly reader: ReadableStreamDefaultReader<T>

constructor (readable: ReadableStream<T>) {
this.reader = readable.getReader()
}

async next (): Promise<IteratorResult<T>> {
return await this.reader.read() as IteratorResult<T>
}

async return (): Promise<IteratorResult<T>> {
await this.reader.cancel()
return { done: true, value: undefined }
}

async throw (err: any): Promise<IteratorResult<T>> {
await this.reader.cancel(err)
return { done: true, value: undefined }
}

[Symbol.asyncIterator] (): AsyncGenerator<T> { return this }
}

export { Decoder as ResponseDecoder }
Loading