Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(bindings/nodejs): impl Readable Stream for rows #531

Merged
merged 45 commits into from
Dec 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions bindings/nodejs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,28 @@ while (row) {
console.log(row.data());
row = await rows.next();
}

// iter rows
const rows = await conn.queryIter("SELECT * FROM test");
for await (const row of rows) {
console.log(row.values());
}

// pipe rows
import { Transform } from "node:stream";
import { finished, pipeline } from "node:stream/promises";

const rows = await conn.queryIter("SELECT * FROM test");
const stream = rows.stream();
const transformer = new Transform({
readableObjectMode: true,
writableObjectMode: true,
transform(row, _, callback) {
console.log(row.data());
},
});
await pipeline(stream, transformer);
await finished(stream);
```

## Type Mapping
Expand Down
1 change: 0 additions & 1 deletion bindings/nodejs/cucumber.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
{
"default": {
"publishQuiet": true,
"paths": ["tests/*.feature"],
"require": ["tests/*.js"]
}
Expand Down
64 changes: 61 additions & 3 deletions bindings/nodejs/generated.js
Original file line number Diff line number Diff line change
Expand Up @@ -240,14 +240,72 @@ switch (platform) {
}
break
case 'arm':
if (isMusl()) {
localFileExisted = existsSync(
join(__dirname, 'databend-driver.linux-arm-musleabihf.node')
)
try {
if (localFileExisted) {
nativeBinding = require('./databend-driver.linux-arm-musleabihf.node')
} else {
nativeBinding = require('@databend-driver/lib-linux-arm-musleabihf')
}
} catch (e) {
loadError = e
}
} else {
localFileExisted = existsSync(
join(__dirname, 'databend-driver.linux-arm-gnueabihf.node')
)
try {
if (localFileExisted) {
nativeBinding = require('./databend-driver.linux-arm-gnueabihf.node')
} else {
nativeBinding = require('@databend-driver/lib-linux-arm-gnueabihf')
}
} catch (e) {
loadError = e
}
}
break
case 'riscv64':
if (isMusl()) {
localFileExisted = existsSync(
join(__dirname, 'databend-driver.linux-riscv64-musl.node')
)
try {
if (localFileExisted) {
nativeBinding = require('./databend-driver.linux-riscv64-musl.node')
} else {
nativeBinding = require('@databend-driver/lib-linux-riscv64-musl')
}
} catch (e) {
loadError = e
}
} else {
localFileExisted = existsSync(
join(__dirname, 'databend-driver.linux-riscv64-gnu.node')
)
try {
if (localFileExisted) {
nativeBinding = require('./databend-driver.linux-riscv64-gnu.node')
} else {
nativeBinding = require('@databend-driver/lib-linux-riscv64-gnu')
}
} catch (e) {
loadError = e
}
}
break
case 's390x':
localFileExisted = existsSync(
join(__dirname, 'databend-driver.linux-arm-gnueabihf.node')
join(__dirname, 'databend-driver.linux-s390x-gnu.node')
)
try {
if (localFileExisted) {
nativeBinding = require('./databend-driver.linux-arm-gnueabihf.node')
nativeBinding = require('./databend-driver.linux-s390x-gnu.node')
} else {
nativeBinding = require('@databend-driver/lib-linux-arm-gnueabihf')
nativeBinding = require('@databend-driver/lib-linux-s390x-gnu')
}
} catch (e) {
loadError = e
Expand Down
28 changes: 14 additions & 14 deletions bindings/nodejs/index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,16 @@

/* auto-generated by NAPI-RS */

export class ValueOptions {
export declare class ValueOptions {
variantAsObject: boolean
}
export class Client {
export declare class Client {
/** Create a new databend client with a given DSN. */
constructor(dsn: string, opts?: ValueOptions | undefined | null)
/** Get a connection from the client. */
getConn(): Promise<Connection>
}
export class Connection {
export declare class Connection {
/** Get the connection information. */
info(): Promise<ConnectionInfo>
/** Get the databend version. */
Expand All @@ -49,49 +49,49 @@ export class Connection {
*/
streamLoad(sql: string, data: Array<Array<string>>): Promise<ServerStats>
}
export class ConnectionInfo {
export declare class ConnectionInfo {
get handler(): string
get host(): string
get port(): number
get user(): string
get database(): string | null
get warehouse(): string | null
}
export class Schema {
export declare class Schema {
fields(): Array<Field>
}
export class Field {
export declare class Field {
get name(): string
get dataType(): string
}
export class RowIterator {
export declare class RowIterator {
/** Get Schema for rows. */
schema(): Schema
/**
* Fetch next row.
* Returns `None` if there are no more rows.
*/
next(): Promise<Error | Row | null>
/** Get Schema for rows. */
schema(): Schema
}
export class RowIteratorExt {
export declare class RowIteratorExt {
schema(): Schema
/**
* Fetch next row or stats.
* Returns `None` if there are no more rows.
*/
next(): Promise<Error | RowOrStats | null>
schema(): Schema
}
/** Must contain either row or stats. */
export class RowOrStats {
export declare class RowOrStats {
get row(): Row | null
get stats(): ServerStats | null
}
export class Row {
export declare class Row {
setOpts(opts: ValueOptions): void
values(): Array<any>
data(): Record<string, any>
}
export class ServerStats {
export declare class ServerStats {
get totalRows(): bigint
get totalBytes(): bigint
get readRows(): bigint
Expand Down
36 changes: 35 additions & 1 deletion bindings/nodejs/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,40 @@

/// <reference types="node" />

const { Client } = require("./generated.js");
const { Readable } = require("node:stream");

const { Client, RowIterator } = require("./generated.js");

class RowsStream extends Readable {
constructor(reader, options) {
super({ objectMode: true, ...options });
this.reader = reader;
}

_read() {
this.reader
.next()
.then((item) => {
this.push(item);
})
.catch((e) => {
this.emit("error", e);
});
}
}

RowIterator.prototype[Symbol.asyncIterator] = async function* () {
while (true) {
const item = await this.next();
if (item === null) {
break;
}
yield item;
}
};

RowIterator.prototype.stream = function () {
return new RowsStream(this);
};

module.exports.Client = Client;
16 changes: 8 additions & 8 deletions bindings/nodejs/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,15 @@
"LICENSE"
],
"devDependencies": {
"@cucumber/cucumber": "^9.6.0",
"@napi-rs/cli": "^2.16.3",
"@types/node": "^18.14.5",
"prettier": "^3.0.3",
"typedoc": "^0.25.1",
"typescript": "^5.2.2"
"@cucumber/cucumber": "^11.1.0",
"@napi-rs/cli": "^2.18.4",
"@types/node": "^22.10.1",
"prettier": "^3.4.2",
"typedoc": "^0.27.4",
"typescript": "^5.7.2"
},
"engines": {
"node": ">= 10"
"node": ">= 16"
},
"scripts": {
"build": "napi build --platform --target=$NAPI_TARGET --release --js generated.js && node ./scripts/header.js",
Expand All @@ -72,5 +72,5 @@
"registry": "https://registry.npmjs.org/",
"access": "public"
},
"packageManager": "pnpm@9.14.2+sha512.6e2baf77d06b9362294152c851c4f278ede37ab1eba3a55fda317a4a17b209f4dbb973fb250a77abc463a341fcb1f17f17cfa24091c4eb319cda0d9b84278387"
"packageManager": "pnpm@9.15.0+sha512.76e2379760a4328ec4415815bcd6628dee727af3779aaa4c914e3944156c4299921a89f976381ee107d41f12cfa4b66681ca9c718f0668fa0831ed4c6d8ba56c"
}
Loading
Loading