Skip to content

Commit

Permalink
update README.md
Browse files Browse the repository at this point in the history
  • Loading branch information
bguerout committed Jan 5, 2025
1 parent ebd7882 commit 554efff
Showing 1 changed file with 95 additions and 98 deletions.
193 changes: 95 additions & 98 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ oleoduc (french synonym of pipeline) provides tools to easily stream data.

```sh
npm install oleoduc
# or
yarn add oleoduc
```

It can be used with both CommonJS and ESM
Expand Down Expand Up @@ -51,12 +49,12 @@ Stream JSON to client through an express server
```js
const express = require("express");
const { oleoduc, transformIntoJSON } = require("oleoduc");
const { pipeStreams, transformIntoJSON } = require("oleoduc");
// Consume for example a MongoDB cursor and send documents as it flows
const app = express();
app.get("/documents", async (req, res) => {
oleoduc(
pipeStreams(
db.collection("documents").find().stream(),
transformIntoJSON(),// Stream the documents as a json array
res
Expand Down Expand Up @@ -84,13 +82,13 @@ for await (const data of csvStream) {
# API
* [accumulateData](#accumulatedatacallback-options)
* [pipeStreams](#pipestreamstreams-options)
* [concatStreams](#concatstreamsstreams-options)
* [filterData](#filterdatacallback-options)
* [flattenArray](#flattenarrayoptions)
* [groupData](#groupdataoptions)
* [mergeStreams](#mergestreamsstreams-options)
* [oleoduc](#oleoducstreams-options)
* [pipeStreams](#pipestreamsstreams-options)
* [readLineByLine](#readlinebyline)
* [transformData](#transformdatacallback-options)
* [transformIntoCSV](#transformintocsvoptions)
Expand Down Expand Up @@ -121,7 +119,7 @@ const { Readable } = require("stream");
const source = Readable.from(["j", "o", "h", "n"]);
oleoduc(
await oleoduc(
source,
accumulateData((acc, value) => {
return { ...acc, value };
Expand All @@ -141,7 +139,7 @@ const { Readable } = require("stream");
const source = Readable.from(["John", "Doe", "Robert", "Hue"]);
oleoduc(
await oleoduc(
source,
accumulateData((acc, data, flush) => {
//Group firstname and lastname
Expand All @@ -167,68 +165,6 @@ oleoduc(
]
```
## pipeStreams(...streams, [options])
Same as oleoduc but without promise stuff and stream composition capability
#### Parameters
- `streams`: A list of streams to pipe together
- `options`:
- `*`: The rest of the options is passed
to [stream.Transform](https://nodejs.org/api/stream.html#stream_class_stream_transform)
Pipe streams
```js
const { pipeStreams, transformData, writeData } = require("oleoduc");
async function getCursor() {
const cursor = await getDataFromDB();
return pipeStreams(
cursor,
transformData((data) => data.value * 10),
)
};
const cursor = await getCursor();
await oleoduc(
cursor,
writeData((data) => console.log(data))
);
```
Iterate over a chained readable stream
```js
const { pipeStreams, transformData } = require("oleoduc");
const stream = pipeStreams(
source,
transformData((data) => data.trim()),
);
for await (const data of stream) {
console.log(data)
}
```
Handle errors in single event listener
```js
const { oleoduc, writeData } = require("oleoduc");
const stream = pipeStreams(
source,
writeData((obj) => throw new Error())
);
stream.on("error", (e) => {
//Handle error
});
```
## concatStreams(...streams, [options])
Allows multiple streams to be processed one after the other.
Expand Down Expand Up @@ -307,7 +243,7 @@ const { Readable } = require("stream");
const source = Readable.from([1, 2]);
oleoduc(
await oleoduc(
source,
filterData((data) => data === 1),
writeData((obj) => console.log(obj))
Expand All @@ -316,7 +252,6 @@ oleoduc(
// --> Output:
1
```
## flattenArray([options])
Allows chunks of an array to be streamed as if each was part of the source
Expand All @@ -337,7 +272,7 @@ const { Readable } = require("stream");
const source = Readable.from([["John Doe"], ["Robert Hue"]]);
oleoduc(
await oleoduc(
source,
flattenArray(),
writeData((fullname) => console.log(fullname))
Expand Down Expand Up @@ -365,7 +300,7 @@ const { Readable } = require("stream");
const source = Readable.from(["John", "Doe", "Robert", "Hue"]);
oleoduc(
await oleoduc(
source,
groupData({ size: 2 }),
writeData((array) => console.log(array))
Expand Down Expand Up @@ -406,36 +341,14 @@ await oleoduc(
;
```
## readLineByLine
Allows data to be read line by line
#### Examples
Read a [ndsjon](http://ndjson.org/) file line by line
```js
const { oleoduc, readLineByLine, transformData, writeData } = require("oleoduc");
const { createReadStream } = require("stream");
await oleoduc(
createReadStream("/path/to/file.ndjson"),
readLineByLine(),
transformData(line => JSON.parse(line)),
writeData((json) => console.log(json))
);
```
## oleoduc(...streams, [options])
Pipe streams together, forwards errors and returns a promisified stream.
Pipe streams together and returns a promisified stream.
It is same as nodejs
core [pipeline](https://nodejs.org/api/stream.html#stream_stream_pipeline_source_transforms_destination_callback)
but with better error handling.
If the last stream is readable, the returned stream will be iterable
#### Parameters
- `streams`: A list of streams to pipe together
Expand Down Expand Up @@ -471,6 +384,90 @@ try {
}
```
## pipeStreams(...streams, [options])
Pipe streams together and forwards errors
If the last stream is readable, the returned stream will be iterable
#### Parameters
- `streams`: A list of streams to pipe together
- `options`:
- `*`: The rest of the options is passed
to [stream.Transform](https://nodejs.org/api/stream.html#stream_class_stream_transform)
Pipe streams
```js
const { pipeStreams, transformData, writeData } = require("oleoduc");
async function getCursor() {
const cursor = await getDataFromDB();
return pipeStreams(
cursor,
transformData((data) => data.value * 10),
)
};
const cursor = await getCursor();
await oleoduc(
cursor,
writeData((data) => console.log(data))
);
```
Iterate over a chained readable stream
```js
const { pipeStreams, transformData } = require("oleoduc");
const stream = pipeStreams(
source,
transformData((data) => data.trim()),
);
for await (const data of stream) {
console.log(data)
}
```
Handle errors in single event listener
```js
const { oleoduc, writeData } = require("oleoduc");
const stream = pipeStreams(
source,
writeData((obj) => throw new Error())
);
stream.on("error", (e) => {
//Handle error
});
```
## readLineByLine
Allows data to be read line by line
#### Examples
Read a [ndsjon](http://ndjson.org/) file line by line
```js
const { oleoduc, readLineByLine, transformData, writeData } = require("oleoduc");
const { createReadStream } = require("stream");
await oleoduc(
createReadStream("/path/to/file.ndjson"),
readLineByLine(),
transformData(line => JSON.parse(line)),
writeData((json) => console.log(json))
);
```
## transformData(callback, [options])
Allows data to be manipulated and transformed during a stream processing.
Expand Down Expand Up @@ -500,7 +497,7 @@ const { Readable } = require("stream");
const source = Readable.from([1, 2]);
oleoduc(
await oleoduc(
source,
transformData((data) => {
return ({ value: data });
Expand Down Expand Up @@ -692,7 +689,7 @@ Writing data to stdout
```js
const { oleoduc, writeData } = require("oleoduc");
oleoduc(
await oleoduc(
source,
writeData((data) => console.log("New chunk", data))
);
Expand Down

0 comments on commit 554efff

Please sign in to comment.