From 4de6c2e586d701fd9b69dcb556b1f5d4aa5821bc Mon Sep 17 00:00:00 2001 From: Jennie Gao Date: Thu, 16 Jan 2025 15:15:20 -0800 Subject: [PATCH 1/9] process src output --- airbyte-local-cli-nodejs/src/command.ts | 6 +- airbyte-local-cli-nodejs/src/docker.ts | 1 + airbyte-local-cli-nodejs/src/index.ts | 7 +- airbyte-local-cli-nodejs/src/utils.ts | 134 +++++++++++++++++- .../test/__snapshots__/utils.it.test.ts.snap | 7 + .../test/resources/test_src_input | 9 ++ .../test/utils.it.test.ts | 134 ++++++++++++------ 7 files changed, 252 insertions(+), 46 deletions(-) create mode 100644 airbyte-local-cli-nodejs/test/resources/test_src_input diff --git a/airbyte-local-cli-nodejs/src/command.ts b/airbyte-local-cli-nodejs/src/command.ts index 02cf2a4..7492ead 100644 --- a/airbyte-local-cli-nodejs/src/command.ts +++ b/airbyte-local-cli-nodejs/src/command.ts @@ -1,7 +1,7 @@ import {Command, Option} from 'commander'; import {AirbyteConfig, AirbyteConfigInputType, CliOptions, FarosConfig} from './types'; -import {logger, parseConfigFile, updateLogLevel} from './utils'; +import {logger, OutputStream, parseConfigFile, updateLogLevel} from './utils'; import {CLI_VERSION} from './version'; // Command line program @@ -183,9 +183,9 @@ export function parseAndValidateInputs(argv: string[]): FarosConfig { // Convert the cli options to FarosConfig const farosConfig: FarosConfig = { - // The default source output file is stdout if `srcOnly` is true + // The default source output file is stdout(`-`) if `srcOnly` is true // Take the non-default value if provided with `srcOutputFile` option - srcOutputFile: cliOptions.srcOnly ? '/dev/null' : cliOptions.srcOutputFile, + srcOutputFile: cliOptions.srcOnly ? OutputStream.STDOUT : cliOptions.srcOutputFile, // Rename the `dstOnly` file path to `srcInputFile` srcInputFile: cliOptions.dstOnly, connectionName: cliOptions.connectionName, diff --git a/airbyte-local-cli-nodejs/src/docker.ts b/airbyte-local-cli-nodejs/src/docker.ts index bedea53..5cca968 100644 --- a/airbyte-local-cli-nodejs/src/docker.ts +++ b/airbyte-local-cli-nodejs/src/docker.ts @@ -181,6 +181,7 @@ export async function runSrcSync(tmpDir: string, config: FarosConfig): Promise { const context: AirbyteCliContext = {}; @@ -19,14 +19,19 @@ async function main(): Promise { if (cfg.srcPull && cfg.src?.image) { await pullDockerImage(cfg.src.image); } + // Check source connection if (cfg.srcCheckConnection && cfg.src?.image) { await checkSrcConnection(context.tmpDir, cfg.src.image); } + // Run airbyte source connector if (!cfg.srcInputFile) { await runSrcSync(context.tmpDir, cfg); } + + // Process source data + await processSrcData(cfg); } catch (error: any) { logger.error(error.message, 'Error'); cleanUp(context); diff --git a/airbyte-local-cli-nodejs/src/utils.ts b/airbyte-local-cli-nodejs/src/utils.ts index 8e0bb74..b136f25 100644 --- a/airbyte-local-cli-nodejs/src/utils.ts +++ b/airbyte-local-cli-nodejs/src/utils.ts @@ -1,20 +1,42 @@ import {spawnSync} from 'node:child_process'; -import {accessSync, constants, mkdtempSync, readFileSync, rmSync, writeFileSync} from 'node:fs'; +import {once} from 'node:events'; +import { + accessSync, + constants, + createReadStream, + createWriteStream, + mkdtempSync, + readFileSync, + rmSync, + writeFileSync, +} from 'node:fs'; import {tmpdir} from 'node:os'; import {sep} from 'node:path'; import pino from 'pino'; import pretty from 'pino-pretty'; +import readline from 'readline'; import {AirbyteCliContext, AirbyteConfig, FarosConfig} from './types'; // constants +export enum OutputStream { + STDERR = 'STDERR', + STDOUT = 'STDOUT', +} export const FILENAME_PREFIX = 'faros_airbyte_cli'; export const SRC_CONFIG_FILENAME = `${FILENAME_PREFIX}_src_config.json`; export const DST_CONFIG_FILENAME = `${FILENAME_PREFIX}_dst_config.json`; export const SRC_CATALOG_FILENAME = `${FILENAME_PREFIX}_src_catalog.json`; export const DST_CATALOG_FILENAME = `${FILENAME_PREFIX}_dst_catalog.json`; export const DEFAULT_STATE_FILE = 'state.json'; +export const SRC_INPUT_DATA_FILE = `${FILENAME_PREFIX}_src_data`; +export const SRC_OUTPUT_DATA_FILE = `${FILENAME_PREFIX}_src_output`; + +// Check if the value is an OutputStream +function isOutputStream(value: any): value is OutputStream { + return Object.values(OutputStream).includes(value); +} // Create a pino logger instance export const logger = pino(pretty({colorize: true})); @@ -165,3 +187,113 @@ export function writeConfig(tmpDir: string, config: FarosConfig): void { writeFileSync(dstCatalogFilePath, JSON.stringify(airbyteConfig.dst.catalog ?? {}, null, 2)); logger.debug(`Airbyte catalog files written to: ${srcCatalogFilePath}, ${dstCatalogFilePath}`); } + +// Read file content +export function readFile(file: string): any { + try { + const data = readFileSync(file, 'utf8'); + return data; + } catch (error: any) { + throw new Error(`Failed to read '${file}': ${error.message}`); + } +} + +// Write file content +export function writeFile(file: string, data: any): void { + try { + writeFileSync(file, data); + } catch (error: any) { + throw new Error(`Failed to write '${file}': ${error.message}`); + } +} + +/** + * Process the source output. + * + * Command line: + * tee >( + * jq -cR $jq_color_opt --unbuffered 'fromjson? | + * select(.type != "RECORD" and .type != "STATE")' | + * jq -rR --unbuffered "$jq_src_msg" >&2 + * ) | + * jq -cR --unbuffered "fromjson? | + * select(.type == \"RECORD\" or .type == \"STATE\") | + * .record.stream |= \"${dst_stream_prefix}\" + ." | + * tee "$output_filepath" | + * + * Note: `dst_stream_prefix` command option is dropped + */ +export async function processSrcData(cfg: FarosConfig): Promise { + // Processing the source line by line + function processLine(line: string): void { + // skip empty lines + if (line.trim() === '') { + return; + } + + try { + const data = JSON.parse(line); + + // non RECORD and STATE type messages: print to stderr + if (data.type !== 'RECORD' && data.type !== 'STATE') { + logger.info(line); // TODO: should we use logger instead? + } + // RECORD and STATE type messages: write to output file + else { + outputStream.write(`${line}\n`); + } + } catch (error: any) { + throw new Error(`Line of data: '${line}'; Error: ${error.message}\n`); + } + } + + // Close the output stream if it's a file + function closeOutputStream(): void { + if (!isOutputStream(cfg.srcOutputFile)) { + outputStream.end(); + } + logger.debug(`Closing the output stream file '${cfg.srcOutputFile}'.`); + } + + // get source data file and output file paths + const srcInputFilePath = cfg.srcInputFile ?? SRC_INPUT_DATA_FILE; + const srcOutputFilePath = cfg.srcOutputFile ?? SRC_OUTPUT_DATA_FILE; + + // create input and output streams: + // - input stream: read from the data file user provided or the one the script created in the temporary directory + // - output stream: write to a file or stdout/stderr. Overwrite the file if it exists, otherwise create a new one + const inputStream = createReadStream(srcInputFilePath); + const outputStream: any = + cfg.srcOutputFile === OutputStream.STDOUT + ? process.stdout + : cfg.srcOutputFile === OutputStream.STDERR + ? process.stderr + : createWriteStream(srcOutputFilePath, {flags: 'w'}); + + // Create readline interface + const rl = readline.createInterface({ + input: inputStream, + crlfDelay: Infinity, + }); + + rl.on('line', (line) => { + processLine(line); + }); + + rl.on('close', () => { + closeOutputStream(); + }); + + rl.on('error', (error) => { + closeOutputStream(); + throw new Error(`Failed to process the source output data: ${error.message ?? JSON.stringify(error)}`); + }); + + outputStream.on('error', (error: any) => { + throw new Error(`Failed to write to the output file: ${error.message ?? JSON.stringify(error)}`); + }); + + // wait for the processing to be done + await once(rl, 'close'); + logger.debug('Finished processing the source output data.'); +} diff --git a/airbyte-local-cli-nodejs/test/__snapshots__/utils.it.test.ts.snap b/airbyte-local-cli-nodejs/test/__snapshots__/utils.it.test.ts.snap index 8e66a18..9132de1 100644 --- a/airbyte-local-cli-nodejs/test/__snapshots__/utils.it.test.ts.snap +++ b/airbyte-local-cli-nodejs/test/__snapshots__/utils.it.test.ts.snap @@ -34,6 +34,13 @@ exports[`parseConfigFile should pass 1`] = ` } `; +exports[`processSrcData should succeed writing to stdout 1`] = ` +"{"state":{"data":{"format":"base64/gzip","data":"H4sIAAAAAAAAA6uuBQBDv6ajAgAAAA=="}},"type":"STATE","redactedConfig":{"user":"chris"},"sourceType":"example","sourceVersion":"0.12.3"} +{"state":{"data":{"format":"base64/gzip","data":"H4sIAAAAAAAAA6uuBQBDv6ajAgAAAA=="}},"type":"STATE","sourceStatus":{"status":"SUCCESS"}} +{"state":{"data":{"format":"base64/gzip","data":"H4sIAAAAAAAAA6uuBQBDv6ajAgAAAA=="}},"type":"STATE","logs":[{"timestamp":1736891682696,"message":{"level":30,"msg":"Source version: 0.12.3"}},{"timestamp":1736891682697,"message":{"level":30,"msg":"Config: {\\"user\\":\\"chris\\"}"}},{"timestamp":1736891682697,"message":{"level":30,"msg":"Catalog: {}"}},{"timestamp":1736891682697,"message":{"level":30,"msg":"State: {}"}},{"timestamp":1736891682700,"message":{"level":30,"msg":"Syncing ExampleSource"}},{"timestamp":1736891682704,"message":{"level":30,"msg":"Finished syncing ExampleSource"}}]} +" +`; + exports[`write files to temporary dir loadStateFile should pass with existing state file 1`] = ` "{"format":"base64/gzip","data":"dGVzdA=="} " diff --git a/airbyte-local-cli-nodejs/test/resources/test_src_input b/airbyte-local-cli-nodejs/test/resources/test_src_input new file mode 100644 index 0000000..5f63b3f --- /dev/null +++ b/airbyte-local-cli-nodejs/test/resources/test_src_input @@ -0,0 +1,9 @@ +{"log":{"level":"INFO","message":"Source version: 0.12.3"},"type":"LOG"} +{"log":{"level":"INFO","message":"Config: {\"user\":\"chris\"}"},"type":"LOG"} +{"log":{"level":"INFO","message":"Catalog: {}"},"type":"LOG"} +{"log":{"level":"INFO","message":"State: {}"},"type":"LOG"} +{"log":{"level":"INFO","message":"Syncing ExampleSource"},"type":"LOG"} +{"state":{"data":{"format":"base64/gzip","data":"H4sIAAAAAAAAA6uuBQBDv6ajAgAAAA=="}},"type":"STATE","redactedConfig":{"user":"chris"},"sourceType":"example","sourceVersion":"0.12.3"} +{"state":{"data":{"format":"base64/gzip","data":"H4sIAAAAAAAAA6uuBQBDv6ajAgAAAA=="}},"type":"STATE","sourceStatus":{"status":"SUCCESS"}} +{"log":{"level":"INFO","message":"Finished syncing ExampleSource"},"type":"LOG"} +{"state":{"data":{"format":"base64/gzip","data":"H4sIAAAAAAAAA6uuBQBDv6ajAgAAAA=="}},"type":"STATE","logs":[{"timestamp":1736891682696,"message":{"level":30,"msg":"Source version: 0.12.3"}},{"timestamp":1736891682697,"message":{"level":30,"msg":"Config: {\"user\":\"chris\"}"}},{"timestamp":1736891682697,"message":{"level":30,"msg":"Catalog: {}"}},{"timestamp":1736891682697,"message":{"level":30,"msg":"State: {}"}},{"timestamp":1736891682700,"message":{"level":30,"msg":"Syncing ExampleSource"}},{"timestamp":1736891682704,"message":{"level":30,"msg":"Finished syncing ExampleSource"}}]} diff --git a/airbyte-local-cli-nodejs/test/utils.it.test.ts b/airbyte-local-cli-nodejs/test/utils.it.test.ts index 5695462..795183e 100644 --- a/airbyte-local-cli-nodejs/test/utils.it.test.ts +++ b/airbyte-local-cli-nodejs/test/utils.it.test.ts @@ -1,5 +1,6 @@ import {existsSync, mkdtempSync, readFileSync, rmSync} from 'node:fs'; import {tmpdir} from 'node:os'; +import {Writable} from 'node:stream'; import {FarosConfig} from '../src/types'; import { @@ -8,10 +9,52 @@ import { createTmpDir, FILENAME_PREFIX, loadStateFile, + OutputStream, parseConfigFile, + processSrcData, writeConfig, } from '../src/utils'; +const testConfig: FarosConfig = { + src: { + image: 'farosai/airbyte-test-source', + config: { + username: 'test', + password: 'test', + url: 'test', + }, + catalog: { + tests: {disabled: true}, + projects: {disabled: true}, + }, + }, + dst: { + image: 'farosai/airbyte-test-destination', + config: { + edition_config: { + graph: 'default', + edition: 'cloud', + api_url: 'https://test.api.faros.ai', + }, + }, + }, + + // default values + srcCheckConnection: false, + dstUseHostNetwork: false, + srcPull: false, + dstPull: false, + fullRefresh: false, + rawMessages: false, + keepContainers: false, + logLevel: 'info', + debug: false, + stateFile: undefined, + connectionName: undefined, + srcOutputFile: undefined, + srcInputFile: undefined, +}; + describe('parseConfigFile', () => { it('should pass', () => { expect(parseConfigFile('test/resources/test_config_file.json')).toMatchSnapshot(); @@ -76,46 +119,6 @@ describe('write files to temporary dir', () => { }); describe('writeConfig', () => { - const testConfig: FarosConfig = { - src: { - image: 'farosai/airbyte-test-source', - config: { - username: 'test', - password: 'test', - url: 'test', - }, - catalog: { - tests: {disabled: true}, - projects: {disabled: true}, - }, - }, - dst: { - image: 'farosai/airbyte-test-destination', - config: { - edition_config: { - graph: 'default', - edition: 'cloud', - api_url: 'https://test.api.faros.ai', - }, - }, - }, - - // default values - srcCheckConnection: false, - dstUseHostNetwork: false, - srcPull: false, - dstPull: false, - fullRefresh: false, - rawMessages: false, - keepContainers: false, - logLevel: 'info', - debug: false, - stateFile: undefined, - connectionName: undefined, - srcOutputFile: undefined, - srcInputFile: undefined, - }; - afterEach(() => { rmSync(`${FILENAME_PREFIX}_config.json`, {force: true}); rmSync(`${tmpDirPath}/${FILENAME_PREFIX}_src_config.json`, {force: true}); @@ -151,7 +154,7 @@ describe('write files to temporary dir', () => { it('should alter config if debug is enabled', () => { const testConfigDebug = {...structuredClone(testConfig), debug: true}; - (testConfigDebug.src as any).image = 'farosai/airbyte-faros-feeds-source:v1'; + testConfigDebug.src!.image = 'farosai/airbyte-faros-feeds-source:v1'; expect(() => writeConfig(tmpDirPath, structuredClone(testConfigDebug))).not.toThrow(); expect(existsSync(`${FILENAME_PREFIX}_config.json`)).toBe(true); expect(existsSync(`${tmpDirPath}/${FILENAME_PREFIX}_src_config.json`)).toBe(true); @@ -169,3 +172,52 @@ describe('write files to temporary dir', () => { }); }); }); + +describe.only('processSrcData', () => { + const testSrcInputFile = `${process.cwd()}/test/resources/test_src_input`; + const testSrcOutputFile = `${process.cwd()}/test/resources/test_src_output`; + + afterEach(() => { + rmSync(testSrcOutputFile, {force: true}); + }); + + it('should succeed writing to an output file', async () => { + const cfg: FarosConfig = { + ...testConfig, + srcInputFile: testSrcInputFile, + srcOutputFile: testSrcOutputFile, + }; + + await expect(processSrcData(cfg)).resolves.not.toThrow(); + + const output = readFileSync(testSrcOutputFile, 'utf8'); + expect(output).toMatchSnapshot(); + }); + + it('should succeed writing to stdout', async () => { + const cfg: FarosConfig = { + ...testConfig, + srcInputFile: testSrcInputFile, + srcOutputFile: OutputStream.STDOUT, + }; + + // Capture process.stdout + let stdoutData = ''; + const originalStdoutWrite = process.stdout.write; + const stdoutStream = new Writable({ + write(chunk, _encoding, callback) { + stdoutData += chunk.toString(); + callback(); + }, + }); + process.stdout.write = stdoutStream.write.bind(stdoutStream) as any; + + try { + await expect(processSrcData(cfg)).resolves.not.toThrow(); + } finally { + process.stdout.write = originalStdoutWrite; + } + + expect(stdoutData).toMatchSnapshot(); + }); +}); From 81d46bfc9e3f8ece9276180983c30c3b693bdb58 Mon Sep 17 00:00:00 2001 From: Jennie Gao Date: Fri, 17 Jan 2025 11:16:59 -0800 Subject: [PATCH 2/9] add color/reformat msgs --- airbyte-local-cli-nodejs/package-lock.json | 439 +++++++++++++++++- airbyte-local-cli-nodejs/package.json | 3 +- airbyte-local-cli-nodejs/src/utils.ts | 28 +- .../test/__snapshots__/utils.it.test.ts.snap | 9 +- .../test/utils.it.test.ts | 6 +- 5 files changed, 455 insertions(+), 30 deletions(-) diff --git a/airbyte-local-cli-nodejs/package-lock.json b/airbyte-local-cli-nodejs/package-lock.json index 74d4d00..d09f2c4 100644 --- a/airbyte-local-cli-nodejs/package-lock.json +++ b/airbyte-local-cli-nodejs/package-lock.json @@ -12,7 +12,8 @@ "dockerode": "^4.0.2", "lodash": "^4.17.21", "pino": "^9.6.0", - "pino-pretty": "^13.0.0" + "pino-pretty": "^13.0.0", + "yoctocolors-cjs": "^2.1.2" }, "devDependencies": { "@tsconfig/node20": "^20.1.4", @@ -1467,6 +1468,24 @@ "node": "^14.15.0 || ^16.10.0 || >=18.0.0" } }, + "node_modules/@jest/console/node_modules/chalk": { + "version": "4.1.2", + "resolved": "https://registry.npmjs.org/chalk/-/chalk-4.1.2.tgz", + "integrity": "sha512-oKnbhFyRIXpUuez8iBMmyEa4nbj4IOQyuhc/wy9kY7/WVPcwIO9VA668Pu8RkO7+0G76SLROeyw9CpQ061i4mA==", + "dev": true, + "license": "MIT", + "peer": true, + "dependencies": { + "ansi-styles": "^4.1.0", + "supports-color": "^7.1.0" + }, + "engines": { + "node": ">=10" + }, + "funding": { + "url": "https://github.com/chalk/chalk?sponsor=1" + } + }, "node_modules/@jest/core": { "version": "29.7.0", "resolved": "https://registry.npmjs.org/@jest/core/-/core-29.7.0.tgz", @@ -1516,6 +1535,24 @@ } } }, + "node_modules/@jest/core/node_modules/chalk": { + "version": "4.1.2", + "resolved": "https://registry.npmjs.org/chalk/-/chalk-4.1.2.tgz", + "integrity": "sha512-oKnbhFyRIXpUuez8iBMmyEa4nbj4IOQyuhc/wy9kY7/WVPcwIO9VA668Pu8RkO7+0G76SLROeyw9CpQ061i4mA==", + "dev": true, + "license": "MIT", + "peer": true, + "dependencies": { + "ansi-styles": "^4.1.0", + "supports-color": "^7.1.0" + }, + "engines": { + "node": ">=10" + }, + "funding": { + "url": "https://github.com/chalk/chalk?sponsor=1" + } + }, "node_modules/@jest/environment": { "version": "29.7.0", "resolved": "https://registry.npmjs.org/@jest/environment/-/environment-29.7.0.tgz", @@ -1642,6 +1679,24 @@ } } }, + "node_modules/@jest/reporters/node_modules/chalk": { + "version": "4.1.2", + "resolved": "https://registry.npmjs.org/chalk/-/chalk-4.1.2.tgz", + "integrity": "sha512-oKnbhFyRIXpUuez8iBMmyEa4nbj4IOQyuhc/wy9kY7/WVPcwIO9VA668Pu8RkO7+0G76SLROeyw9CpQ061i4mA==", + "dev": true, + "license": "MIT", + "peer": true, + "dependencies": { + "ansi-styles": "^4.1.0", + "supports-color": "^7.1.0" + }, + "engines": { + "node": ">=10" + }, + "funding": { + "url": "https://github.com/chalk/chalk?sponsor=1" + } + }, "node_modules/@jest/schemas": { "version": "29.6.3", "resolved": "https://registry.npmjs.org/@jest/schemas/-/schemas-29.6.3.tgz", @@ -1733,6 +1788,24 @@ "node": "^14.15.0 || ^16.10.0 || >=18.0.0" } }, + "node_modules/@jest/transform/node_modules/chalk": { + "version": "4.1.2", + "resolved": "https://registry.npmjs.org/chalk/-/chalk-4.1.2.tgz", + "integrity": "sha512-oKnbhFyRIXpUuez8iBMmyEa4nbj4IOQyuhc/wy9kY7/WVPcwIO9VA668Pu8RkO7+0G76SLROeyw9CpQ061i4mA==", + "dev": true, + "license": "MIT", + "peer": true, + "dependencies": { + "ansi-styles": "^4.1.0", + "supports-color": "^7.1.0" + }, + "engines": { + "node": ">=10" + }, + "funding": { + "url": "https://github.com/chalk/chalk?sponsor=1" + } + }, "node_modules/@jest/types": { "version": "29.6.3", "resolved": "https://registry.npmjs.org/@jest/types/-/types-29.6.3.tgz", @@ -1751,6 +1824,23 @@ "node": "^14.15.0 || ^16.10.0 || >=18.0.0" } }, + "node_modules/@jest/types/node_modules/chalk": { + "version": "4.1.2", + "resolved": "https://registry.npmjs.org/chalk/-/chalk-4.1.2.tgz", + "integrity": "sha512-oKnbhFyRIXpUuez8iBMmyEa4nbj4IOQyuhc/wy9kY7/WVPcwIO9VA668Pu8RkO7+0G76SLROeyw9CpQ061i4mA==", + "dev": true, + "license": "MIT", + "dependencies": { + "ansi-styles": "^4.1.0", + "supports-color": "^7.1.0" + }, + "engines": { + "node": ">=10" + }, + "funding": { + "url": "https://github.com/chalk/chalk?sponsor=1" + } + }, "node_modules/@jridgewell/gen-mapping": { "version": "0.3.5", "resolved": "https://registry.npmjs.org/@jridgewell/gen-mapping/-/gen-mapping-0.3.5.tgz", @@ -2778,6 +2868,24 @@ "@babel/core": "^7.8.0" } }, + "node_modules/babel-jest/node_modules/chalk": { + "version": "4.1.2", + "resolved": "https://registry.npmjs.org/chalk/-/chalk-4.1.2.tgz", + "integrity": "sha512-oKnbhFyRIXpUuez8iBMmyEa4nbj4IOQyuhc/wy9kY7/WVPcwIO9VA668Pu8RkO7+0G76SLROeyw9CpQ061i4mA==", + "dev": true, + "license": "MIT", + "peer": true, + "dependencies": { + "ansi-styles": "^4.1.0", + "supports-color": "^7.1.0" + }, + "engines": { + "node": ">=10" + }, + "funding": { + "url": "https://github.com/chalk/chalk?sponsor=1" + } + }, "node_modules/babel-plugin-istanbul": { "version": "6.1.1", "resolved": "https://registry.npmjs.org/babel-plugin-istanbul/-/babel-plugin-istanbul-6.1.1.tgz", @@ -3107,23 +3215,6 @@ "license": "CC-BY-4.0", "peer": true }, - "node_modules/chalk": { - "version": "4.1.2", - "resolved": "https://registry.npmjs.org/chalk/-/chalk-4.1.2.tgz", - "integrity": "sha512-oKnbhFyRIXpUuez8iBMmyEa4nbj4IOQyuhc/wy9kY7/WVPcwIO9VA668Pu8RkO7+0G76SLROeyw9CpQ061i4mA==", - "dev": true, - "license": "MIT", - "dependencies": { - "ansi-styles": "^4.1.0", - "supports-color": "^7.1.0" - }, - "engines": { - "node": ">=10" - }, - "funding": { - "url": "https://github.com/chalk/chalk?sponsor=1" - } - }, "node_modules/char-regex": { "version": "1.0.2", "resolved": "https://registry.npmjs.org/char-regex/-/char-regex-1.0.2.tgz", @@ -3295,6 +3386,24 @@ "node": "^14.15.0 || ^16.10.0 || >=18.0.0" } }, + "node_modules/create-jest/node_modules/chalk": { + "version": "4.1.2", + "resolved": "https://registry.npmjs.org/chalk/-/chalk-4.1.2.tgz", + "integrity": "sha512-oKnbhFyRIXpUuez8iBMmyEa4nbj4IOQyuhc/wy9kY7/WVPcwIO9VA668Pu8RkO7+0G76SLROeyw9CpQ061i4mA==", + "dev": true, + "license": "MIT", + "peer": true, + "dependencies": { + "ansi-styles": "^4.1.0", + "supports-color": "^7.1.0" + }, + "engines": { + "node": ">=10" + }, + "funding": { + "url": "https://github.com/chalk/chalk?sponsor=1" + } + }, "node_modules/cross-spawn": { "version": "7.0.6", "resolved": "https://registry.npmjs.org/cross-spawn/-/cross-spawn-7.0.6.tgz", @@ -3803,6 +3912,23 @@ "concat-map": "0.0.1" } }, + "node_modules/eslint/node_modules/chalk": { + "version": "4.1.2", + "resolved": "https://registry.npmjs.org/chalk/-/chalk-4.1.2.tgz", + "integrity": "sha512-oKnbhFyRIXpUuez8iBMmyEa4nbj4IOQyuhc/wy9kY7/WVPcwIO9VA668Pu8RkO7+0G76SLROeyw9CpQ061i4mA==", + "dev": true, + "license": "MIT", + "dependencies": { + "ansi-styles": "^4.1.0", + "supports-color": "^7.1.0" + }, + "engines": { + "node": ">=10" + }, + "funding": { + "url": "https://github.com/chalk/chalk?sponsor=1" + } + }, "node_modules/eslint/node_modules/eslint-visitor-keys": { "version": "3.4.3", "resolved": "https://registry.npmjs.org/eslint-visitor-keys/-/eslint-visitor-keys-3.4.3.tgz", @@ -4887,6 +5013,23 @@ "concat-map": "0.0.1" } }, + "node_modules/jake/node_modules/chalk": { + "version": "4.1.2", + "resolved": "https://registry.npmjs.org/chalk/-/chalk-4.1.2.tgz", + "integrity": "sha512-oKnbhFyRIXpUuez8iBMmyEa4nbj4IOQyuhc/wy9kY7/WVPcwIO9VA668Pu8RkO7+0G76SLROeyw9CpQ061i4mA==", + "dev": true, + "license": "MIT", + "dependencies": { + "ansi-styles": "^4.1.0", + "supports-color": "^7.1.0" + }, + "engines": { + "node": ">=10" + }, + "funding": { + "url": "https://github.com/chalk/chalk?sponsor=1" + } + }, "node_modules/jake/node_modules/minimatch": { "version": "3.1.2", "resolved": "https://registry.npmjs.org/minimatch/-/minimatch-3.1.2.tgz", @@ -4977,6 +5120,24 @@ "node": "^14.15.0 || ^16.10.0 || >=18.0.0" } }, + "node_modules/jest-circus/node_modules/chalk": { + "version": "4.1.2", + "resolved": "https://registry.npmjs.org/chalk/-/chalk-4.1.2.tgz", + "integrity": "sha512-oKnbhFyRIXpUuez8iBMmyEa4nbj4IOQyuhc/wy9kY7/WVPcwIO9VA668Pu8RkO7+0G76SLROeyw9CpQ061i4mA==", + "dev": true, + "license": "MIT", + "peer": true, + "dependencies": { + "ansi-styles": "^4.1.0", + "supports-color": "^7.1.0" + }, + "engines": { + "node": ">=10" + }, + "funding": { + "url": "https://github.com/chalk/chalk?sponsor=1" + } + }, "node_modules/jest-cli": { "version": "29.7.0", "resolved": "https://registry.npmjs.org/jest-cli/-/jest-cli-29.7.0.tgz", @@ -5012,6 +5173,24 @@ } } }, + "node_modules/jest-cli/node_modules/chalk": { + "version": "4.1.2", + "resolved": "https://registry.npmjs.org/chalk/-/chalk-4.1.2.tgz", + "integrity": "sha512-oKnbhFyRIXpUuez8iBMmyEa4nbj4IOQyuhc/wy9kY7/WVPcwIO9VA668Pu8RkO7+0G76SLROeyw9CpQ061i4mA==", + "dev": true, + "license": "MIT", + "peer": true, + "dependencies": { + "ansi-styles": "^4.1.0", + "supports-color": "^7.1.0" + }, + "engines": { + "node": ">=10" + }, + "funding": { + "url": "https://github.com/chalk/chalk?sponsor=1" + } + }, "node_modules/jest-cli/node_modules/cliui": { "version": "8.0.1", "resolved": "https://registry.npmjs.org/cliui/-/cliui-8.0.1.tgz", @@ -5095,6 +5274,24 @@ } } }, + "node_modules/jest-config/node_modules/chalk": { + "version": "4.1.2", + "resolved": "https://registry.npmjs.org/chalk/-/chalk-4.1.2.tgz", + "integrity": "sha512-oKnbhFyRIXpUuez8iBMmyEa4nbj4IOQyuhc/wy9kY7/WVPcwIO9VA668Pu8RkO7+0G76SLROeyw9CpQ061i4mA==", + "dev": true, + "license": "MIT", + "peer": true, + "dependencies": { + "ansi-styles": "^4.1.0", + "supports-color": "^7.1.0" + }, + "engines": { + "node": ">=10" + }, + "funding": { + "url": "https://github.com/chalk/chalk?sponsor=1" + } + }, "node_modules/jest-diff": { "version": "29.7.0", "resolved": "https://registry.npmjs.org/jest-diff/-/jest-diff-29.7.0.tgz", @@ -5111,6 +5308,23 @@ "node": "^14.15.0 || ^16.10.0 || >=18.0.0" } }, + "node_modules/jest-diff/node_modules/chalk": { + "version": "4.1.2", + "resolved": "https://registry.npmjs.org/chalk/-/chalk-4.1.2.tgz", + "integrity": "sha512-oKnbhFyRIXpUuez8iBMmyEa4nbj4IOQyuhc/wy9kY7/WVPcwIO9VA668Pu8RkO7+0G76SLROeyw9CpQ061i4mA==", + "dev": true, + "license": "MIT", + "dependencies": { + "ansi-styles": "^4.1.0", + "supports-color": "^7.1.0" + }, + "engines": { + "node": ">=10" + }, + "funding": { + "url": "https://github.com/chalk/chalk?sponsor=1" + } + }, "node_modules/jest-docblock": { "version": "29.7.0", "resolved": "https://registry.npmjs.org/jest-docblock/-/jest-docblock-29.7.0.tgz", @@ -5143,6 +5357,24 @@ "node": "^14.15.0 || ^16.10.0 || >=18.0.0" } }, + "node_modules/jest-each/node_modules/chalk": { + "version": "4.1.2", + "resolved": "https://registry.npmjs.org/chalk/-/chalk-4.1.2.tgz", + "integrity": "sha512-oKnbhFyRIXpUuez8iBMmyEa4nbj4IOQyuhc/wy9kY7/WVPcwIO9VA668Pu8RkO7+0G76SLROeyw9CpQ061i4mA==", + "dev": true, + "license": "MIT", + "peer": true, + "dependencies": { + "ansi-styles": "^4.1.0", + "supports-color": "^7.1.0" + }, + "engines": { + "node": ">=10" + }, + "funding": { + "url": "https://github.com/chalk/chalk?sponsor=1" + } + }, "node_modules/jest-environment-node": { "version": "29.7.0", "resolved": "https://registry.npmjs.org/jest-environment-node/-/jest-environment-node-29.7.0.tgz", @@ -5230,6 +5462,23 @@ "node": "^14.15.0 || ^16.10.0 || >=18.0.0" } }, + "node_modules/jest-matcher-utils/node_modules/chalk": { + "version": "4.1.2", + "resolved": "https://registry.npmjs.org/chalk/-/chalk-4.1.2.tgz", + "integrity": "sha512-oKnbhFyRIXpUuez8iBMmyEa4nbj4IOQyuhc/wy9kY7/WVPcwIO9VA668Pu8RkO7+0G76SLROeyw9CpQ061i4mA==", + "dev": true, + "license": "MIT", + "dependencies": { + "ansi-styles": "^4.1.0", + "supports-color": "^7.1.0" + }, + "engines": { + "node": ">=10" + }, + "funding": { + "url": "https://github.com/chalk/chalk?sponsor=1" + } + }, "node_modules/jest-message-util": { "version": "29.7.0", "resolved": "https://registry.npmjs.org/jest-message-util/-/jest-message-util-29.7.0.tgz", @@ -5251,6 +5500,23 @@ "node": "^14.15.0 || ^16.10.0 || >=18.0.0" } }, + "node_modules/jest-message-util/node_modules/chalk": { + "version": "4.1.2", + "resolved": "https://registry.npmjs.org/chalk/-/chalk-4.1.2.tgz", + "integrity": "sha512-oKnbhFyRIXpUuez8iBMmyEa4nbj4IOQyuhc/wy9kY7/WVPcwIO9VA668Pu8RkO7+0G76SLROeyw9CpQ061i4mA==", + "dev": true, + "license": "MIT", + "dependencies": { + "ansi-styles": "^4.1.0", + "supports-color": "^7.1.0" + }, + "engines": { + "node": ">=10" + }, + "funding": { + "url": "https://github.com/chalk/chalk?sponsor=1" + } + }, "node_modules/jest-mock": { "version": "29.7.0", "resolved": "https://registry.npmjs.org/jest-mock/-/jest-mock-29.7.0.tgz", @@ -5334,6 +5600,24 @@ "node": "^14.15.0 || ^16.10.0 || >=18.0.0" } }, + "node_modules/jest-resolve/node_modules/chalk": { + "version": "4.1.2", + "resolved": "https://registry.npmjs.org/chalk/-/chalk-4.1.2.tgz", + "integrity": "sha512-oKnbhFyRIXpUuez8iBMmyEa4nbj4IOQyuhc/wy9kY7/WVPcwIO9VA668Pu8RkO7+0G76SLROeyw9CpQ061i4mA==", + "dev": true, + "license": "MIT", + "peer": true, + "dependencies": { + "ansi-styles": "^4.1.0", + "supports-color": "^7.1.0" + }, + "engines": { + "node": ">=10" + }, + "funding": { + "url": "https://github.com/chalk/chalk?sponsor=1" + } + }, "node_modules/jest-runner": { "version": "29.7.0", "resolved": "https://registry.npmjs.org/jest-runner/-/jest-runner-29.7.0.tgz", @@ -5368,6 +5652,24 @@ "node": "^14.15.0 || ^16.10.0 || >=18.0.0" } }, + "node_modules/jest-runner/node_modules/chalk": { + "version": "4.1.2", + "resolved": "https://registry.npmjs.org/chalk/-/chalk-4.1.2.tgz", + "integrity": "sha512-oKnbhFyRIXpUuez8iBMmyEa4nbj4IOQyuhc/wy9kY7/WVPcwIO9VA668Pu8RkO7+0G76SLROeyw9CpQ061i4mA==", + "dev": true, + "license": "MIT", + "peer": true, + "dependencies": { + "ansi-styles": "^4.1.0", + "supports-color": "^7.1.0" + }, + "engines": { + "node": ">=10" + }, + "funding": { + "url": "https://github.com/chalk/chalk?sponsor=1" + } + }, "node_modules/jest-runtime": { "version": "29.7.0", "resolved": "https://registry.npmjs.org/jest-runtime/-/jest-runtime-29.7.0.tgz", @@ -5403,6 +5705,24 @@ "node": "^14.15.0 || ^16.10.0 || >=18.0.0" } }, + "node_modules/jest-runtime/node_modules/chalk": { + "version": "4.1.2", + "resolved": "https://registry.npmjs.org/chalk/-/chalk-4.1.2.tgz", + "integrity": "sha512-oKnbhFyRIXpUuez8iBMmyEa4nbj4IOQyuhc/wy9kY7/WVPcwIO9VA668Pu8RkO7+0G76SLROeyw9CpQ061i4mA==", + "dev": true, + "license": "MIT", + "peer": true, + "dependencies": { + "ansi-styles": "^4.1.0", + "supports-color": "^7.1.0" + }, + "engines": { + "node": ">=10" + }, + "funding": { + "url": "https://github.com/chalk/chalk?sponsor=1" + } + }, "node_modules/jest-snapshot": { "version": "29.7.0", "resolved": "https://registry.npmjs.org/jest-snapshot/-/jest-snapshot-29.7.0.tgz", @@ -5436,6 +5756,24 @@ "node": "^14.15.0 || ^16.10.0 || >=18.0.0" } }, + "node_modules/jest-snapshot/node_modules/chalk": { + "version": "4.1.2", + "resolved": "https://registry.npmjs.org/chalk/-/chalk-4.1.2.tgz", + "integrity": "sha512-oKnbhFyRIXpUuez8iBMmyEa4nbj4IOQyuhc/wy9kY7/WVPcwIO9VA668Pu8RkO7+0G76SLROeyw9CpQ061i4mA==", + "dev": true, + "license": "MIT", + "peer": true, + "dependencies": { + "ansi-styles": "^4.1.0", + "supports-color": "^7.1.0" + }, + "engines": { + "node": ">=10" + }, + "funding": { + "url": "https://github.com/chalk/chalk?sponsor=1" + } + }, "node_modules/jest-util": { "version": "29.7.0", "resolved": "https://registry.npmjs.org/jest-util/-/jest-util-29.7.0.tgz", @@ -5454,6 +5792,23 @@ "node": "^14.15.0 || ^16.10.0 || >=18.0.0" } }, + "node_modules/jest-util/node_modules/chalk": { + "version": "4.1.2", + "resolved": "https://registry.npmjs.org/chalk/-/chalk-4.1.2.tgz", + "integrity": "sha512-oKnbhFyRIXpUuez8iBMmyEa4nbj4IOQyuhc/wy9kY7/WVPcwIO9VA668Pu8RkO7+0G76SLROeyw9CpQ061i4mA==", + "dev": true, + "license": "MIT", + "dependencies": { + "ansi-styles": "^4.1.0", + "supports-color": "^7.1.0" + }, + "engines": { + "node": ">=10" + }, + "funding": { + "url": "https://github.com/chalk/chalk?sponsor=1" + } + }, "node_modules/jest-util/node_modules/picomatch": { "version": "2.3.1", "resolved": "https://registry.npmjs.org/picomatch/-/picomatch-2.3.1.tgz", @@ -5500,6 +5855,24 @@ "url": "https://github.com/sponsors/sindresorhus" } }, + "node_modules/jest-validate/node_modules/chalk": { + "version": "4.1.2", + "resolved": "https://registry.npmjs.org/chalk/-/chalk-4.1.2.tgz", + "integrity": "sha512-oKnbhFyRIXpUuez8iBMmyEa4nbj4IOQyuhc/wy9kY7/WVPcwIO9VA668Pu8RkO7+0G76SLROeyw9CpQ061i4mA==", + "dev": true, + "license": "MIT", + "peer": true, + "dependencies": { + "ansi-styles": "^4.1.0", + "supports-color": "^7.1.0" + }, + "engines": { + "node": ">=10" + }, + "funding": { + "url": "https://github.com/chalk/chalk?sponsor=1" + } + }, "node_modules/jest-watcher": { "version": "29.7.0", "resolved": "https://registry.npmjs.org/jest-watcher/-/jest-watcher-29.7.0.tgz", @@ -5521,6 +5894,24 @@ "node": "^14.15.0 || ^16.10.0 || >=18.0.0" } }, + "node_modules/jest-watcher/node_modules/chalk": { + "version": "4.1.2", + "resolved": "https://registry.npmjs.org/chalk/-/chalk-4.1.2.tgz", + "integrity": "sha512-oKnbhFyRIXpUuez8iBMmyEa4nbj4IOQyuhc/wy9kY7/WVPcwIO9VA668Pu8RkO7+0G76SLROeyw9CpQ061i4mA==", + "dev": true, + "license": "MIT", + "peer": true, + "dependencies": { + "ansi-styles": "^4.1.0", + "supports-color": "^7.1.0" + }, + "engines": { + "node": ">=10" + }, + "funding": { + "url": "https://github.com/chalk/chalk?sponsor=1" + } + }, "node_modules/jest-worker": { "version": "29.7.0", "resolved": "https://registry.npmjs.org/jest-worker/-/jest-worker-29.7.0.tgz", @@ -7863,6 +8254,18 @@ "funding": { "url": "https://github.com/sponsors/sindresorhus" } + }, + "node_modules/yoctocolors-cjs": { + "version": "2.1.2", + "resolved": "https://registry.npmjs.org/yoctocolors-cjs/-/yoctocolors-cjs-2.1.2.tgz", + "integrity": "sha512-cYVsTjKl8b+FrnidjibDWskAv7UKOfcwaVZdp/it9n1s9fU3IkgDbhdIRKCW4JDsAlECJY0ytoVPT3sK6kideA==", + "license": "MIT", + "engines": { + "node": ">=18" + }, + "funding": { + "url": "https://github.com/sponsors/sindresorhus" + } } } } diff --git a/airbyte-local-cli-nodejs/package.json b/airbyte-local-cli-nodejs/package.json index f129699..f997d4f 100644 --- a/airbyte-local-cli-nodejs/package.json +++ b/airbyte-local-cli-nodejs/package.json @@ -54,7 +54,8 @@ "dockerode": "^4.0.2", "lodash": "^4.17.21", "pino": "^9.6.0", - "pino-pretty": "^13.0.0" + "pino-pretty": "^13.0.0", + "yoctocolors-cjs": "^2.1.2" }, "jest": { "silent": false, diff --git a/airbyte-local-cli-nodejs/src/utils.ts b/airbyte-local-cli-nodejs/src/utils.ts index b136f25..2d5b772 100644 --- a/airbyte-local-cli-nodejs/src/utils.ts +++ b/airbyte-local-cli-nodejs/src/utils.ts @@ -16,6 +16,7 @@ import {sep} from 'node:path'; import pino from 'pino'; import pretty from 'pino-pretty'; import readline from 'readline'; +import colors from 'yoctocolors-cjs'; import {AirbyteCliContext, AirbyteConfig, FarosConfig} from './types'; @@ -210,6 +211,8 @@ export function writeFile(file: string, data: any): void { /** * Process the source output. * + * jq_src_msg="\"${GREEN}[SRC]: \" + ${JQ_TIMESTAMP} + \" - \" + ." + * * Command line: * tee >( * jq -cR $jq_color_opt --unbuffered 'fromjson? | @@ -224,6 +227,12 @@ export function writeFile(file: string, data: any): void { * Note: `dst_stream_prefix` command option is dropped */ export async function processSrcData(cfg: FarosConfig): Promise { + // Colorize the JSON message + function formatSrcWithColor(json: any): string { + const TIMESTAMP = new Date().toISOString(); + return `${colors.green('[SRC]')}: ${TIMESTAMP} - ${JSON.stringify(json)}`; + } + // Processing the source line by line function processLine(line: string): void { // skip empty lines @@ -234,13 +243,18 @@ export async function processSrcData(cfg: FarosConfig): Promise { try { const data = JSON.parse(line); - // non RECORD and STATE type messages: print to stderr + // non RECORD and STATE type messages: print as debug log if (data.type !== 'RECORD' && data.type !== 'STATE') { - logger.info(line); // TODO: should we use logger instead? + const logMsg = cfg.rawMessages ? line : formatSrcWithColor(data); + logger.debug(logMsg); } // RECORD and STATE type messages: write to output file else { - outputStream.write(`${line}\n`); + if (cfg.srcOutputFile === OutputStream.STDOUT) { + outputStream.write(`${formatSrcWithColor(data)}\n`); + } else { + outputStream.write(`${line}\n`); + } } } catch (error: any) { throw new Error(`Line of data: '${line}'; Error: ${error.message}\n`); @@ -261,14 +275,10 @@ export async function processSrcData(cfg: FarosConfig): Promise { // create input and output streams: // - input stream: read from the data file user provided or the one the script created in the temporary directory - // - output stream: write to a file or stdout/stderr. Overwrite the file if it exists, otherwise create a new one + // - output stream: write to a file or stdout. Overwrite the file if it exists, otherwise create a new one const inputStream = createReadStream(srcInputFilePath); const outputStream: any = - cfg.srcOutputFile === OutputStream.STDOUT - ? process.stdout - : cfg.srcOutputFile === OutputStream.STDERR - ? process.stderr - : createWriteStream(srcOutputFilePath, {flags: 'w'}); + cfg.srcOutputFile === OutputStream.STDOUT ? process.stdout : createWriteStream(srcOutputFilePath, {flags: 'w'}); // Create readline interface const rl = readline.createInterface({ diff --git a/airbyte-local-cli-nodejs/test/__snapshots__/utils.it.test.ts.snap b/airbyte-local-cli-nodejs/test/__snapshots__/utils.it.test.ts.snap index 9132de1..1351b1e 100644 --- a/airbyte-local-cli-nodejs/test/__snapshots__/utils.it.test.ts.snap +++ b/airbyte-local-cli-nodejs/test/__snapshots__/utils.it.test.ts.snap @@ -34,13 +34,20 @@ exports[`parseConfigFile should pass 1`] = ` } `; -exports[`processSrcData should succeed writing to stdout 1`] = ` +exports[`processSrcData should succeed writing to an output file 1`] = ` "{"state":{"data":{"format":"base64/gzip","data":"H4sIAAAAAAAAA6uuBQBDv6ajAgAAAA=="}},"type":"STATE","redactedConfig":{"user":"chris"},"sourceType":"example","sourceVersion":"0.12.3"} {"state":{"data":{"format":"base64/gzip","data":"H4sIAAAAAAAAA6uuBQBDv6ajAgAAAA=="}},"type":"STATE","sourceStatus":{"status":"SUCCESS"}} {"state":{"data":{"format":"base64/gzip","data":"H4sIAAAAAAAAA6uuBQBDv6ajAgAAAA=="}},"type":"STATE","logs":[{"timestamp":1736891682696,"message":{"level":30,"msg":"Source version: 0.12.3"}},{"timestamp":1736891682697,"message":{"level":30,"msg":"Config: {\\"user\\":\\"chris\\"}"}},{"timestamp":1736891682697,"message":{"level":30,"msg":"Catalog: {}"}},{"timestamp":1736891682697,"message":{"level":30,"msg":"State: {}"}},{"timestamp":1736891682700,"message":{"level":30,"msg":"Syncing ExampleSource"}},{"timestamp":1736891682704,"message":{"level":30,"msg":"Finished syncing ExampleSource"}}]} " `; +exports[`processSrcData should succeed writing to stdout 1`] = ` +"[SRC] {"state":{"data":{"format":"base64/gzip","data":"H4sIAAAAAAAAA6uuBQBDv6ajAgAAAA=="}},"type":"STATE","redactedConfig":{"user":"chris"},"sourceType":"example","sourceVersion":"0.12.3"} +[SRC] {"state":{"data":{"format":"base64/gzip","data":"H4sIAAAAAAAAA6uuBQBDv6ajAgAAAA=="}},"type":"STATE","sourceStatus":{"status":"SUCCESS"}} +[SRC] {"state":{"data":{"format":"base64/gzip","data":"H4sIAAAAAAAAA6uuBQBDv6ajAgAAAA=="}},"type":"STATE","logs":[{"timestamp":1736891682696,"message":{"level":30,"msg":"Source version: 0.12.3"}},{"timestamp":1736891682697,"message":{"level":30,"msg":"Config: {\\"user\\":\\"chris\\"}"}},{"timestamp":1736891682697,"message":{"level":30,"msg":"Catalog: {}"}},{"timestamp":1736891682697,"message":{"level":30,"msg":"State: {}"}},{"timestamp":1736891682700,"message":{"level":30,"msg":"Syncing ExampleSource"}},{"timestamp":1736891682704,"message":{"level":30,"msg":"Finished syncing ExampleSource"}}]} +" +`; + exports[`write files to temporary dir loadStateFile should pass with existing state file 1`] = ` "{"format":"base64/gzip","data":"dGVzdA=="} " diff --git a/airbyte-local-cli-nodejs/test/utils.it.test.ts b/airbyte-local-cli-nodejs/test/utils.it.test.ts index 795183e..b42fb0a 100644 --- a/airbyte-local-cli-nodejs/test/utils.it.test.ts +++ b/airbyte-local-cli-nodejs/test/utils.it.test.ts @@ -218,6 +218,10 @@ describe.only('processSrcData', () => { process.stdout.write = originalStdoutWrite; } - expect(stdoutData).toMatchSnapshot(); + // remove timestamp from the output so that it can be compared + const stdoutDataWithoutTS = stdoutData.split('\n').map((line) => { + return line.replace(/\[SRC\].*\s-/g, '[SRC]'); + }); + expect(stdoutDataWithoutTS.join('\n')).toMatchSnapshot(); }); }); From 7e1b8714c4b69be4ec10ec717d0631ec8edec083 Mon Sep 17 00:00:00 2001 From: Jennie Gao Date: Fri, 17 Jan 2025 12:44:14 -0800 Subject: [PATCH 3/9] add error tests --- airbyte-local-cli-nodejs/src/utils.ts | 139 +++++++++--------- .../resources/test_src_input_invalid_json | 1 + .../test/utils.it.test.ts | 25 +++- 3 files changed, 93 insertions(+), 72 deletions(-) create mode 100644 airbyte-local-cli-nodejs/test/resources/test_src_input_invalid_json diff --git a/airbyte-local-cli-nodejs/src/utils.ts b/airbyte-local-cli-nodejs/src/utils.ts index 2d5b772..990f383 100644 --- a/airbyte-local-cli-nodejs/src/utils.ts +++ b/airbyte-local-cli-nodejs/src/utils.ts @@ -1,5 +1,4 @@ import {spawnSync} from 'node:child_process'; -import {once} from 'node:events'; import { accessSync, constants, @@ -226,84 +225,82 @@ export function writeFile(file: string, data: any): void { * * Note: `dst_stream_prefix` command option is dropped */ -export async function processSrcData(cfg: FarosConfig): Promise { - // Colorize the JSON message - function formatSrcWithColor(json: any): string { - const TIMESTAMP = new Date().toISOString(); - return `${colors.green('[SRC]')}: ${TIMESTAMP} - ${JSON.stringify(json)}`; - } - - // Processing the source line by line - function processLine(line: string): void { - // skip empty lines - if (line.trim() === '') { - return; +export function processSrcData(cfg: FarosConfig): Promise { + return new Promise((resolve, reject) => { + // Colorize the JSON message + function formatSrcWithColor(json: any): string { + return `${colors.green('[SRC]')} - ${JSON.stringify(json)}`; } - try { - const data = JSON.parse(line); - - // non RECORD and STATE type messages: print as debug log - if (data.type !== 'RECORD' && data.type !== 'STATE') { - const logMsg = cfg.rawMessages ? line : formatSrcWithColor(data); - logger.debug(logMsg); + // Processing the source line by line + function processLine(line: string): void { + // skip empty lines + if (line.trim() === '') { + return; } - // RECORD and STATE type messages: write to output file - else { - if (cfg.srcOutputFile === OutputStream.STDOUT) { - outputStream.write(`${formatSrcWithColor(data)}\n`); - } else { - outputStream.write(`${line}\n`); + + try { + const data = JSON.parse(line); + + // non RECORD and STATE type messages: print as stdout + if (data.type !== 'RECORD' && data.type !== 'STATE') { + const logMsg = cfg.rawMessages ? line : formatSrcWithColor(data); + logger.info(logMsg); + } + // RECORD and STATE type messages: write to output file + else { + if (cfg.srcOutputFile === OutputStream.STDOUT) { + outputStream.write(`${formatSrcWithColor(data)}\n`); + } else { + outputStream.write(`${line}\n`); + } } + } catch (error: any) { + rl.emit('error', new Error(`Line of data: '${line}'; Error: ${error.message}`)); } - } catch (error: any) { - throw new Error(`Line of data: '${line}'; Error: ${error.message}\n`); } - } - // Close the output stream if it's a file - function closeOutputStream(): void { - if (!isOutputStream(cfg.srcOutputFile)) { - outputStream.end(); + // Close the output stream if it's a file + function closeOutputStream(): void { + if (!isOutputStream(cfg.srcOutputFile)) { + outputStream.end(); + } + logger.debug(`Closing the output stream file '${cfg.srcOutputFile}'.`); } - logger.debug(`Closing the output stream file '${cfg.srcOutputFile}'.`); - } - - // get source data file and output file paths - const srcInputFilePath = cfg.srcInputFile ?? SRC_INPUT_DATA_FILE; - const srcOutputFilePath = cfg.srcOutputFile ?? SRC_OUTPUT_DATA_FILE; - - // create input and output streams: - // - input stream: read from the data file user provided or the one the script created in the temporary directory - // - output stream: write to a file or stdout. Overwrite the file if it exists, otherwise create a new one - const inputStream = createReadStream(srcInputFilePath); - const outputStream: any = - cfg.srcOutputFile === OutputStream.STDOUT ? process.stdout : createWriteStream(srcOutputFilePath, {flags: 'w'}); - - // Create readline interface - const rl = readline.createInterface({ - input: inputStream, - crlfDelay: Infinity, - }); - - rl.on('line', (line) => { - processLine(line); - }); - rl.on('close', () => { - closeOutputStream(); + // get source data file and output file paths + const srcInputFilePath = cfg.srcInputFile ?? SRC_INPUT_DATA_FILE; + const srcOutputFilePath = cfg.srcOutputFile ?? SRC_OUTPUT_DATA_FILE; + + // create input and output streams: + // - input stream: read from the data file user provided or the one the script created in the temporary directory + // - output stream: write to a file or stdout. Overwrite the file if it exists, otherwise create a new one + const inputStream = createReadStream(srcInputFilePath); + const outputStream: any = + cfg.srcOutputFile === OutputStream.STDOUT ? process.stdout : createWriteStream(srcOutputFilePath); + + // create readline interface + const rl = readline.createInterface({ + input: inputStream, + crlfDelay: Infinity, + }); + + rl.on('line', (line) => { + processLine(line); + }) + .on('close', () => { + logger.debug('Finished processing the source output data.'); + closeOutputStream(); + resolve(); + }) + .on('error', (error) => { + closeOutputStream(); + reject(new Error(`Failed to process the source output data: ${error.message ?? JSON.stringify(error)}`)); + }); + + outputStream.on('error', (error: any) => { + closeOutputStream(); + reject(new Error(`Failed to write to the output file: ${error.message ?? JSON.stringify(error)}`)); + }); }); - - rl.on('error', (error) => { - closeOutputStream(); - throw new Error(`Failed to process the source output data: ${error.message ?? JSON.stringify(error)}`); - }); - - outputStream.on('error', (error: any) => { - throw new Error(`Failed to write to the output file: ${error.message ?? JSON.stringify(error)}`); - }); - - // wait for the processing to be done - await once(rl, 'close'); - logger.debug('Finished processing the source output data.'); } diff --git a/airbyte-local-cli-nodejs/test/resources/test_src_input_invalid_json b/airbyte-local-cli-nodejs/test/resources/test_src_input_invalid_json new file mode 100644 index 0000000..39a1b34 --- /dev/null +++ b/airbyte-local-cli-nodejs/test/resources/test_src_input_invalid_json @@ -0,0 +1 @@ +invalid json diff --git a/airbyte-local-cli-nodejs/test/utils.it.test.ts b/airbyte-local-cli-nodejs/test/utils.it.test.ts index b42fb0a..2d3db1a 100644 --- a/airbyte-local-cli-nodejs/test/utils.it.test.ts +++ b/airbyte-local-cli-nodejs/test/utils.it.test.ts @@ -1,4 +1,4 @@ -import {existsSync, mkdtempSync, readFileSync, rmSync} from 'node:fs'; +import {chmodSync, existsSync, mkdtempSync, readFileSync, rmSync, writeFileSync} from 'node:fs'; import {tmpdir} from 'node:os'; import {Writable} from 'node:stream'; @@ -224,4 +224,27 @@ describe.only('processSrcData', () => { }); expect(stdoutDataWithoutTS.join('\n')).toMatchSnapshot(); }); + + it('should fail with processing error', async () => { + const cfg: FarosConfig = { + ...testConfig, + srcInputFile: `${process.cwd()}/test/resources/test_src_input_invalid_json`, + srcOutputFile: '/dev/null', + }; + await expect(processSrcData(cfg)).rejects.toThrow( + `Failed to process the source output data: Line of data: ` + + `'invalid json'; Error: Unexpected token 'i', "invalid json" is not valid JSON`, + ); + }); + + it('should fail with outstream error', async () => { + writeFileSync(testSrcOutputFile, 'test'); + chmodSync(testSrcOutputFile, 0o544); + const cfg: FarosConfig = { + ...testConfig, + srcInputFile: testSrcInputFile, + srcOutputFile: testSrcOutputFile, + }; + await expect(processSrcData(cfg)).rejects.toThrow('Failed to write to the output file: EACCES: permission denied'); + }); }); From 86a28ef79e12666be1c7cca56bc81472cd1b2946 Mon Sep 17 00:00:00 2001 From: Jennie Gao Date: Fri, 17 Jan 2025 12:56:08 -0800 Subject: [PATCH 4/9] remove color and write to logger --- airbyte-local-cli-nodejs/src/utils.ts | 14 ++++----- .../test/__snapshots__/utils.it.test.ts.snap | 7 ----- airbyte-local-cli-nodejs/test/command.test.ts | 2 +- .../test/utils.it.test.ts | 29 ++----------------- 4 files changed, 10 insertions(+), 42 deletions(-) diff --git a/airbyte-local-cli-nodejs/src/utils.ts b/airbyte-local-cli-nodejs/src/utils.ts index 990f383..7a9423e 100644 --- a/airbyte-local-cli-nodejs/src/utils.ts +++ b/airbyte-local-cli-nodejs/src/utils.ts @@ -15,7 +15,6 @@ import {sep} from 'node:path'; import pino from 'pino'; import pretty from 'pino-pretty'; import readline from 'readline'; -import colors from 'yoctocolors-cjs'; import {AirbyteCliContext, AirbyteConfig, FarosConfig} from './types'; @@ -227,9 +226,9 @@ export function writeFile(file: string, data: any): void { */ export function processSrcData(cfg: FarosConfig): Promise { return new Promise((resolve, reject) => { - // Colorize the JSON message - function formatSrcWithColor(json: any): string { - return `${colors.green('[SRC]')} - ${JSON.stringify(json)}`; + // Reformat the JSON message + function formatSrcMsg(json: any): string { + return `[SRC] - ${JSON.stringify(json)}`; } // Processing the source line by line @@ -244,13 +243,12 @@ export function processSrcData(cfg: FarosConfig): Promise { // non RECORD and STATE type messages: print as stdout if (data.type !== 'RECORD' && data.type !== 'STATE') { - const logMsg = cfg.rawMessages ? line : formatSrcWithColor(data); - logger.info(logMsg); + logger.info(formatSrcMsg(data)); } - // RECORD and STATE type messages: write to output file + // RECORD and STATE type messages: logger or write to output file else { if (cfg.srcOutputFile === OutputStream.STDOUT) { - outputStream.write(`${formatSrcWithColor(data)}\n`); + logger.info(formatSrcMsg(data)); } else { outputStream.write(`${line}\n`); } diff --git a/airbyte-local-cli-nodejs/test/__snapshots__/utils.it.test.ts.snap b/airbyte-local-cli-nodejs/test/__snapshots__/utils.it.test.ts.snap index 1351b1e..10802a0 100644 --- a/airbyte-local-cli-nodejs/test/__snapshots__/utils.it.test.ts.snap +++ b/airbyte-local-cli-nodejs/test/__snapshots__/utils.it.test.ts.snap @@ -41,13 +41,6 @@ exports[`processSrcData should succeed writing to an output file 1`] = ` " `; -exports[`processSrcData should succeed writing to stdout 1`] = ` -"[SRC] {"state":{"data":{"format":"base64/gzip","data":"H4sIAAAAAAAAA6uuBQBDv6ajAgAAAA=="}},"type":"STATE","redactedConfig":{"user":"chris"},"sourceType":"example","sourceVersion":"0.12.3"} -[SRC] {"state":{"data":{"format":"base64/gzip","data":"H4sIAAAAAAAAA6uuBQBDv6ajAgAAAA=="}},"type":"STATE","sourceStatus":{"status":"SUCCESS"}} -[SRC] {"state":{"data":{"format":"base64/gzip","data":"H4sIAAAAAAAAA6uuBQBDv6ajAgAAAA=="}},"type":"STATE","logs":[{"timestamp":1736891682696,"message":{"level":30,"msg":"Source version: 0.12.3"}},{"timestamp":1736891682697,"message":{"level":30,"msg":"Config: {\\"user\\":\\"chris\\"}"}},{"timestamp":1736891682697,"message":{"level":30,"msg":"Catalog: {}"}},{"timestamp":1736891682697,"message":{"level":30,"msg":"State: {}"}},{"timestamp":1736891682700,"message":{"level":30,"msg":"Syncing ExampleSource"}},{"timestamp":1736891682704,"message":{"level":30,"msg":"Finished syncing ExampleSource"}}]} -" -`; - exports[`write files to temporary dir loadStateFile should pass with existing state file 1`] = ` "{"format":"base64/gzip","data":"dGVzdA=="} " diff --git a/airbyte-local-cli-nodejs/test/command.test.ts b/airbyte-local-cli-nodejs/test/command.test.ts index a50ee39..847cd39 100644 --- a/airbyte-local-cli-nodejs/test/command.test.ts +++ b/airbyte-local-cli-nodejs/test/command.test.ts @@ -220,7 +220,7 @@ describe('Check other options', () => { ...defaultConfig, src: {image: 'source-image', config: {}}, dst: {image: undefined, config: {}}, - srcOutputFile: '/dev/null', + srcOutputFile: 'STDOUT', dstPull: false, }); }); diff --git a/airbyte-local-cli-nodejs/test/utils.it.test.ts b/airbyte-local-cli-nodejs/test/utils.it.test.ts index 2d3db1a..af68735 100644 --- a/airbyte-local-cli-nodejs/test/utils.it.test.ts +++ b/airbyte-local-cli-nodejs/test/utils.it.test.ts @@ -1,6 +1,5 @@ import {chmodSync, existsSync, mkdtempSync, readFileSync, rmSync, writeFileSync} from 'node:fs'; import {tmpdir} from 'node:os'; -import {Writable} from 'node:stream'; import {FarosConfig} from '../src/types'; import { @@ -173,7 +172,7 @@ describe('write files to temporary dir', () => { }); }); -describe.only('processSrcData', () => { +describe('processSrcData', () => { const testSrcInputFile = `${process.cwd()}/test/resources/test_src_input`; const testSrcOutputFile = `${process.cwd()}/test/resources/test_src_output`; @@ -194,35 +193,13 @@ describe.only('processSrcData', () => { expect(output).toMatchSnapshot(); }); - it('should succeed writing to stdout', async () => { + it('should succeed writing to logger', async () => { const cfg: FarosConfig = { ...testConfig, srcInputFile: testSrcInputFile, srcOutputFile: OutputStream.STDOUT, }; - - // Capture process.stdout - let stdoutData = ''; - const originalStdoutWrite = process.stdout.write; - const stdoutStream = new Writable({ - write(chunk, _encoding, callback) { - stdoutData += chunk.toString(); - callback(); - }, - }); - process.stdout.write = stdoutStream.write.bind(stdoutStream) as any; - - try { - await expect(processSrcData(cfg)).resolves.not.toThrow(); - } finally { - process.stdout.write = originalStdoutWrite; - } - - // remove timestamp from the output so that it can be compared - const stdoutDataWithoutTS = stdoutData.split('\n').map((line) => { - return line.replace(/\[SRC\].*\s-/g, '[SRC]'); - }); - expect(stdoutDataWithoutTS.join('\n')).toMatchSnapshot(); + await expect(processSrcData(cfg)).resolves.not.toThrow(); }); it('should fail with processing error', async () => { From c3d29e2aafc5ada76715b189bdf19e8f4b223684 Mon Sep 17 00:00:00 2001 From: Jennie Gao Date: Tue, 21 Jan 2025 10:00:44 -0800 Subject: [PATCH 5/9] update tests --- airbyte-local-cli-nodejs/package-lock.json | 15 +-- airbyte-local-cli-nodejs/package.json | 3 +- airbyte-local-cli-nodejs/src/docker.ts | 40 ++++--- airbyte-local-cli-nodejs/src/index.ts | 7 +- airbyte-local-cli-nodejs/src/utils.ts | 105 ++++++++---------- .../test/__snapshots__/docker.it.test.ts.snap | 8 ++ .../test/__snapshots__/utils.it.test.ts.snap | 2 +- .../test/docker.it.test.ts | 23 +++- .../test/utils.it.test.ts | 28 ++--- 9 files changed, 116 insertions(+), 115 deletions(-) create mode 100644 airbyte-local-cli-nodejs/test/__snapshots__/docker.it.test.ts.snap diff --git a/airbyte-local-cli-nodejs/package-lock.json b/airbyte-local-cli-nodejs/package-lock.json index d09f2c4..0495397 100644 --- a/airbyte-local-cli-nodejs/package-lock.json +++ b/airbyte-local-cli-nodejs/package-lock.json @@ -12,8 +12,7 @@ "dockerode": "^4.0.2", "lodash": "^4.17.21", "pino": "^9.6.0", - "pino-pretty": "^13.0.0", - "yoctocolors-cjs": "^2.1.2" + "pino-pretty": "^13.0.0" }, "devDependencies": { "@tsconfig/node20": "^20.1.4", @@ -8254,18 +8253,6 @@ "funding": { "url": "https://github.com/sponsors/sindresorhus" } - }, - "node_modules/yoctocolors-cjs": { - "version": "2.1.2", - "resolved": "https://registry.npmjs.org/yoctocolors-cjs/-/yoctocolors-cjs-2.1.2.tgz", - "integrity": "sha512-cYVsTjKl8b+FrnidjibDWskAv7UKOfcwaVZdp/it9n1s9fU3IkgDbhdIRKCW4JDsAlECJY0ytoVPT3sK6kideA==", - "license": "MIT", - "engines": { - "node": ">=18" - }, - "funding": { - "url": "https://github.com/sponsors/sindresorhus" - } } } } diff --git a/airbyte-local-cli-nodejs/package.json b/airbyte-local-cli-nodejs/package.json index f997d4f..f129699 100644 --- a/airbyte-local-cli-nodejs/package.json +++ b/airbyte-local-cli-nodejs/package.json @@ -54,8 +54,7 @@ "dockerode": "^4.0.2", "lodash": "^4.17.21", "pino": "^9.6.0", - "pino-pretty": "^13.0.0", - "yoctocolors-cjs": "^2.1.2" + "pino-pretty": "^13.0.0" }, "jest": { "silent": false, diff --git a/airbyte-local-cli-nodejs/src/docker.ts b/airbyte-local-cli-nodejs/src/docker.ts index 5cca968..7c3b5e0 100644 --- a/airbyte-local-cli-nodejs/src/docker.ts +++ b/airbyte-local-cli-nodejs/src/docker.ts @@ -1,10 +1,18 @@ -import {writeFileSync} from 'node:fs'; +import {createWriteStream, writeFileSync} from 'node:fs'; import {Writable} from 'node:stream'; import Docker from 'dockerode'; import {AirbyteConnectionStatus, AirbyteConnectionStatusMessage, AirbyteMessageType, FarosConfig} from './types'; -import {DEFAULT_STATE_FILE, logger, SRC_CATALOG_FILENAME, SRC_CONFIG_FILENAME} from './utils'; +import { + DEFAULT_STATE_FILE, + logger, + OutputStream, + processSrcDataByLine, + SRC_CATALOG_FILENAME, + SRC_CONFIG_FILENAME, + SRC_OUTPUT_DATA_FILE, +} from './utils'; // Constants const DEFAULT_MAX_LOG_SIZE = '10m'; @@ -119,10 +127,8 @@ export async function checkSrcConnection(tmpDir: string, image: string, srcConfi * --config "/configs/$src_config_filename" \ * --catalog "/configs/$src_catalog_filename" \ * --state "/configs/$src_state_filename" - * - * @argument command - for testing purposes only */ -export async function runSrcSync(tmpDir: string, config: FarosConfig): Promise { +export async function runSrcSync(tmpDir: string, config: FarosConfig): Promise { logger.info('Running source connector...'); if (!config.src?.image) { @@ -180,18 +186,28 @@ export async function runSrcSync(tmpDir: string, config: FarosConfig): Promise { + processSrcDataByLine(line, srcOutputStream, config); + }); callback(); }, }); + // Attach the stderr to termincal stderr, and stdout to the output stream const stream = await container.attach({stream: true, stdout: true, stderr: true}); - container.modem.demuxStream(stream, stdoutStream, process.stderr); + container.modem.demuxStream(stream, containerOutputStream, process.stderr); // Start the container await container.start(); @@ -199,16 +215,12 @@ export async function runSrcSync(tmpDir: string, config: FarosConfig): Promise { const context: AirbyteCliContext = {}; @@ -28,10 +28,9 @@ async function main(): Promise { // Run airbyte source connector if (!cfg.srcInputFile) { await runSrcSync(context.tmpDir, cfg); + } else { + await processSrcInputFile(context.tmpDir, cfg); } - - // Process source data - await processSrcData(cfg); } catch (error: any) { logger.error(error.message, 'Error'); cleanUp(context); diff --git a/airbyte-local-cli-nodejs/src/utils.ts b/airbyte-local-cli-nodejs/src/utils.ts index 7a9423e..6aa43e9 100644 --- a/airbyte-local-cli-nodejs/src/utils.ts +++ b/airbyte-local-cli-nodejs/src/utils.ts @@ -11,10 +11,11 @@ import { } from 'node:fs'; import {tmpdir} from 'node:os'; import {sep} from 'node:path'; +import readline from 'node:readline'; +import {Writable} from 'node:stream'; import pino from 'pino'; import pretty from 'pino-pretty'; -import readline from 'readline'; import {AirbyteCliContext, AirbyteConfig, FarosConfig} from './types'; @@ -32,11 +33,6 @@ export const DEFAULT_STATE_FILE = 'state.json'; export const SRC_INPUT_DATA_FILE = `${FILENAME_PREFIX}_src_data`; export const SRC_OUTPUT_DATA_FILE = `${FILENAME_PREFIX}_src_output`; -// Check if the value is an OutputStream -function isOutputStream(value: any): value is OutputStream { - return Object.values(OutputStream).includes(value); -} - // Create a pino logger instance export const logger = pino(pretty({colorize: true})); @@ -209,8 +205,6 @@ export function writeFile(file: string, data: any): void { /** * Process the source output. * - * jq_src_msg="\"${GREEN}[SRC]: \" + ${JQ_TIMESTAMP} + \" - \" + ." - * * Command line: * tee >( * jq -cR $jq_color_opt --unbuffered 'fromjson? | @@ -220,62 +214,53 @@ export function writeFile(file: string, data: any): void { * jq -cR --unbuffered "fromjson? | * select(.type == \"RECORD\" or .type == \"STATE\") | * .record.stream |= \"${dst_stream_prefix}\" + ." | - * tee "$output_filepath" | + * tee "$output_filepath" | ... + * + * jq_src_msg="\"${GREEN}[SRC]: \" + ${JQ_TIMESTAMP} + \" - \" + ." + * * * Note: `dst_stream_prefix` command option is dropped */ -export function processSrcData(cfg: FarosConfig): Promise { - return new Promise((resolve, reject) => { - // Reformat the JSON message - function formatSrcMsg(json: any): string { - return `[SRC] - ${JSON.stringify(json)}`; - } - // Processing the source line by line - function processLine(line: string): void { - // skip empty lines - if (line.trim() === '') { - return; - } - - try { - const data = JSON.parse(line); +// Processing the source line by line +export function processSrcDataByLine(line: string, outputStream: Writable, cfg: FarosConfig): void { + // Reformat the JSON message + function formatSrcMsg(json: any): string { + return `[SRC] - ${JSON.stringify(json)}`; + } + // skip empty lines + if (line.trim() === '') { + return; + } - // non RECORD and STATE type messages: print as stdout - if (data.type !== 'RECORD' && data.type !== 'STATE') { - logger.info(formatSrcMsg(data)); - } - // RECORD and STATE type messages: logger or write to output file - else { - if (cfg.srcOutputFile === OutputStream.STDOUT) { - logger.info(formatSrcMsg(data)); - } else { - outputStream.write(`${line}\n`); - } - } - } catch (error: any) { - rl.emit('error', new Error(`Line of data: '${line}'; Error: ${error.message}`)); + try { + const data = JSON.parse(line); + + // non RECORD and STATE type messages: print as stdout + // RECORD and STATE type messages: when the output is set to stdout + if ((data.type !== 'RECORD' && data.type !== 'STATE') || cfg.srcOutputFile === OutputStream.STDOUT) { + if (cfg.rawMessages) { + process.stdout.write(`${line}\n`); + } else { + logger.info(formatSrcMsg(data)); } } - - // Close the output stream if it's a file - function closeOutputStream(): void { - if (!isOutputStream(cfg.srcOutputFile)) { - outputStream.end(); - } - logger.debug(`Closing the output stream file '${cfg.srcOutputFile}'.`); + // RECORD and STATE type messages: write to output file + else { + outputStream.write(`${line}\n`); } + } catch (error: any) { + throw new Error(`Line of data: '${line}'; Error: ${error.message}`); + } +} - // get source data file and output file paths - const srcInputFilePath = cfg.srcInputFile ?? SRC_INPUT_DATA_FILE; - const srcOutputFilePath = cfg.srcOutputFile ?? SRC_OUTPUT_DATA_FILE; - +export function processSrcInputFile(tmpDir: string, cfg: FarosConfig): Promise { + return new Promise((resolve, reject) => { // create input and output streams: - // - input stream: read from the data file user provided or the one the script created in the temporary directory - // - output stream: write to a file or stdout. Overwrite the file if it exists, otherwise create a new one - const inputStream = createReadStream(srcInputFilePath); - const outputStream: any = - cfg.srcOutputFile === OutputStream.STDOUT ? process.stdout : createWriteStream(srcOutputFilePath); + // - input stream: read from the data file user provided + // - output stream: write to an intermediate file. Overwrite the file if it exists, otherwise create a new one + const inputStream = createReadStream(cfg.srcInputFile!); + const outputStream = createWriteStream(`${tmpDir}/${SRC_OUTPUT_DATA_FILE}`); // create readline interface const rl = readline.createInterface({ @@ -284,20 +269,24 @@ export function processSrcData(cfg: FarosConfig): Promise { }); rl.on('line', (line) => { - processLine(line); + try { + processSrcDataByLine(line, outputStream, cfg); + } catch (error: any) { + rl.emit('error', error); + } }) .on('close', () => { logger.debug('Finished processing the source output data.'); - closeOutputStream(); + outputStream.end(); resolve(); }) .on('error', (error) => { - closeOutputStream(); + outputStream.end(); reject(new Error(`Failed to process the source output data: ${error.message ?? JSON.stringify(error)}`)); }); outputStream.on('error', (error: any) => { - closeOutputStream(); + outputStream.end(); reject(new Error(`Failed to write to the output file: ${error.message ?? JSON.stringify(error)}`)); }); }); diff --git a/airbyte-local-cli-nodejs/test/__snapshots__/docker.it.test.ts.snap b/airbyte-local-cli-nodejs/test/__snapshots__/docker.it.test.ts.snap new file mode 100644 index 0000000..10e8271 --- /dev/null +++ b/airbyte-local-cli-nodejs/test/__snapshots__/docker.it.test.ts.snap @@ -0,0 +1,8 @@ +// Jest Snapshot v1, https://goo.gl/fbAQLP + +exports[`runSrcSync should success 1`] = ` +"{"state":{"data":{"format":"base64/gzip","data":"H4sIAAAAAAAAA6uuBQBDv6ajAgAAAA=="}},"type":"STATE","redactedConfig":{"user":"chris"},"sourceType":"example","sourceVersion":"0.12.4"} +{"state":{"data":{"format":"base64/gzip","data":"H4sIAAAAAAAAA6uuBQBDv6ajAgAAAA=="}},"type":"STATE","sourceStatus":{"status":"SUCCESS"}} +{"state":{"data":{"format":"base64/gzip","data":"H4sIAAAAAAAAA6uuBQBDv6ajAgAAAA=="}},"type":"STATE","logs":[{"timestamp":***,"message":{"level":30,"msg":"Source version: 0.12.4"}},{"timestamp":***,"message":{"level":30,"msg":"Config: {\\"user\\":\\"chris\\"}"}},{"timestamp":***,"message":{"level":30,"msg":"Catalog: {}"}},{"timestamp":***,"message":{"level":30,"msg":"State: {}"}},{"timestamp":***,"message":{"level":30,"msg":"Syncing ExampleSource"}},{"timestamp":***,"message":{"level":30,"msg":"Finished syncing ExampleSource"}}]} +" +`; diff --git a/airbyte-local-cli-nodejs/test/__snapshots__/utils.it.test.ts.snap b/airbyte-local-cli-nodejs/test/__snapshots__/utils.it.test.ts.snap index 10802a0..a043c1c 100644 --- a/airbyte-local-cli-nodejs/test/__snapshots__/utils.it.test.ts.snap +++ b/airbyte-local-cli-nodejs/test/__snapshots__/utils.it.test.ts.snap @@ -34,7 +34,7 @@ exports[`parseConfigFile should pass 1`] = ` } `; -exports[`processSrcData should succeed writing to an output file 1`] = ` +exports[`processSrcInputFile should succeed writing to an output file 1`] = ` "{"state":{"data":{"format":"base64/gzip","data":"H4sIAAAAAAAAA6uuBQBDv6ajAgAAAA=="}},"type":"STATE","redactedConfig":{"user":"chris"},"sourceType":"example","sourceVersion":"0.12.3"} {"state":{"data":{"format":"base64/gzip","data":"H4sIAAAAAAAAA6uuBQBDv6ajAgAAAA=="}},"type":"STATE","sourceStatus":{"status":"SUCCESS"}} {"state":{"data":{"format":"base64/gzip","data":"H4sIAAAAAAAAA6uuBQBDv6ajAgAAAA=="}},"type":"STATE","logs":[{"timestamp":1736891682696,"message":{"level":30,"msg":"Source version: 0.12.3"}},{"timestamp":1736891682697,"message":{"level":30,"msg":"Config: {\\"user\\":\\"chris\\"}"}},{"timestamp":1736891682697,"message":{"level":30,"msg":"Catalog: {}"}},{"timestamp":1736891682697,"message":{"level":30,"msg":"State: {}"}},{"timestamp":1736891682700,"message":{"level":30,"msg":"Syncing ExampleSource"}},{"timestamp":1736891682704,"message":{"level":30,"msg":"Finished syncing ExampleSource"}}]} diff --git a/airbyte-local-cli-nodejs/test/docker.it.test.ts b/airbyte-local-cli-nodejs/test/docker.it.test.ts index 7b04e87..4e51e93 100644 --- a/airbyte-local-cli-nodejs/test/docker.it.test.ts +++ b/airbyte-local-cli-nodejs/test/docker.it.test.ts @@ -1,9 +1,10 @@ -import {readdirSync, unlinkSync} from 'node:fs'; +import {readdirSync, readFileSync, rmSync, unlinkSync} from 'node:fs'; import path from 'node:path'; import {Writable} from 'node:stream'; import {checkSrcConnection, pullDockerImage, runSrcSync} from '../src/docker'; import {FarosConfig} from '../src/types'; +import {SRC_OUTPUT_DATA_FILE} from '../src/utils'; const defaultConfig: FarosConfig = { srcCheckConnection: false, @@ -48,7 +49,7 @@ describe('checkSrcConnection', () => { }); }); -describe('runSrcSync', () => { +describe.only('runSrcSync', () => { const testCfg: FarosConfig = { ...defaultConfig, src: { @@ -56,6 +57,13 @@ describe('runSrcSync', () => { }, }; + const testTmpDir = `${process.cwd()}/test/resources/dockerIt_runSrcSync`; + + // remove the intermediate output file + afterEach(() => { + rmSync(`${testTmpDir}/${SRC_OUTPUT_DATA_FILE}`, {force: true}); + }); + // Clean up files created by the test afterAll(() => { const pattern = /.*-src_cid$/; @@ -69,10 +77,15 @@ describe('runSrcSync', () => { }); it('should success', async () => { - await expect(runSrcSync(`${process.cwd()}/test/resources/dockerIt_runSrcSync`, testCfg)).resolves.not.toThrow(); + await expect(runSrcSync(testTmpDir, testCfg)).resolves.not.toThrow(); + + // Replace timestamp for comparison + const output = readFileSync(`${testTmpDir}/${SRC_OUTPUT_DATA_FILE}`, 'utf8'); + const outputWithoutTS = output.split('\n').map((line) => line.replace(/"timestamp":\d+/g, '"timestamp":***')); + expect(outputWithoutTS.join('\n')).toMatchSnapshot(); }); - // Check the error message is correctly redirect to process.stderr + // Check stderr message is correctly redirect to process.stderr it('should fail', async () => { // Capture process.stderr let stderrData = ''; @@ -87,7 +100,7 @@ describe('runSrcSync', () => { try { await expect( - runSrcSync(`${process.cwd()}/test/resources/dockerIt_runSrcSync`, { + runSrcSync(testTmpDir, { ...testCfg, src: {image: 'farosai/airbyte-faros-graphql-source'}, }), diff --git a/airbyte-local-cli-nodejs/test/utils.it.test.ts b/airbyte-local-cli-nodejs/test/utils.it.test.ts index af68735..c597787 100644 --- a/airbyte-local-cli-nodejs/test/utils.it.test.ts +++ b/airbyte-local-cli-nodejs/test/utils.it.test.ts @@ -8,9 +8,9 @@ import { createTmpDir, FILENAME_PREFIX, loadStateFile, - OutputStream, parseConfigFile, - processSrcData, + processSrcInputFile, + SRC_OUTPUT_DATA_FILE, writeConfig, } from '../src/utils'; @@ -172,9 +172,10 @@ describe('write files to temporary dir', () => { }); }); -describe('processSrcData', () => { - const testSrcInputFile = `${process.cwd()}/test/resources/test_src_input`; - const testSrcOutputFile = `${process.cwd()}/test/resources/test_src_output`; +describe('processSrcInputFile', () => { + const tmpDir = `${process.cwd()}/test/resources`; + const testSrcInputFile = `${tmpDir}/test_src_input`; + const testSrcOutputFile = `${tmpDir}/${SRC_OUTPUT_DATA_FILE}`; afterEach(() => { rmSync(testSrcOutputFile, {force: true}); @@ -187,28 +188,19 @@ describe('processSrcData', () => { srcOutputFile: testSrcOutputFile, }; - await expect(processSrcData(cfg)).resolves.not.toThrow(); + await expect(processSrcInputFile(tmpDir, cfg)).resolves.not.toThrow(); const output = readFileSync(testSrcOutputFile, 'utf8'); expect(output).toMatchSnapshot(); }); - it('should succeed writing to logger', async () => { - const cfg: FarosConfig = { - ...testConfig, - srcInputFile: testSrcInputFile, - srcOutputFile: OutputStream.STDOUT, - }; - await expect(processSrcData(cfg)).resolves.not.toThrow(); - }); - it('should fail with processing error', async () => { const cfg: FarosConfig = { ...testConfig, srcInputFile: `${process.cwd()}/test/resources/test_src_input_invalid_json`, srcOutputFile: '/dev/null', }; - await expect(processSrcData(cfg)).rejects.toThrow( + await expect(processSrcInputFile(tmpDir, cfg)).rejects.toThrow( `Failed to process the source output data: Line of data: ` + `'invalid json'; Error: Unexpected token 'i', "invalid json" is not valid JSON`, ); @@ -222,6 +214,8 @@ describe('processSrcData', () => { srcInputFile: testSrcInputFile, srcOutputFile: testSrcOutputFile, }; - await expect(processSrcData(cfg)).rejects.toThrow('Failed to write to the output file: EACCES: permission denied'); + await expect(processSrcInputFile(tmpDir, cfg)).rejects.toThrow( + 'Failed to write to the output file: EACCES: permission denied', + ); }); }); From c3578753aa9a8dbf3bc7984f895aa377488e79b5 Mon Sep 17 00:00:00 2001 From: Jennie Gao Date: Tue, 21 Jan 2025 10:04:10 -0800 Subject: [PATCH 6/9] fix snapshot --- .../test/__snapshots__/utils.it.test.ts.snap | 2 +- airbyte-local-cli-nodejs/test/utils.it.test.ts | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/airbyte-local-cli-nodejs/test/__snapshots__/utils.it.test.ts.snap b/airbyte-local-cli-nodejs/test/__snapshots__/utils.it.test.ts.snap index a043c1c..8677173 100644 --- a/airbyte-local-cli-nodejs/test/__snapshots__/utils.it.test.ts.snap +++ b/airbyte-local-cli-nodejs/test/__snapshots__/utils.it.test.ts.snap @@ -37,7 +37,7 @@ exports[`parseConfigFile should pass 1`] = ` exports[`processSrcInputFile should succeed writing to an output file 1`] = ` "{"state":{"data":{"format":"base64/gzip","data":"H4sIAAAAAAAAA6uuBQBDv6ajAgAAAA=="}},"type":"STATE","redactedConfig":{"user":"chris"},"sourceType":"example","sourceVersion":"0.12.3"} {"state":{"data":{"format":"base64/gzip","data":"H4sIAAAAAAAAA6uuBQBDv6ajAgAAAA=="}},"type":"STATE","sourceStatus":{"status":"SUCCESS"}} -{"state":{"data":{"format":"base64/gzip","data":"H4sIAAAAAAAAA6uuBQBDv6ajAgAAAA=="}},"type":"STATE","logs":[{"timestamp":1736891682696,"message":{"level":30,"msg":"Source version: 0.12.3"}},{"timestamp":1736891682697,"message":{"level":30,"msg":"Config: {\\"user\\":\\"chris\\"}"}},{"timestamp":1736891682697,"message":{"level":30,"msg":"Catalog: {}"}},{"timestamp":1736891682697,"message":{"level":30,"msg":"State: {}"}},{"timestamp":1736891682700,"message":{"level":30,"msg":"Syncing ExampleSource"}},{"timestamp":1736891682704,"message":{"level":30,"msg":"Finished syncing ExampleSource"}}]} +{"state":{"data":{"format":"base64/gzip","data":"H4sIAAAAAAAAA6uuBQBDv6ajAgAAAA=="}},"type":"STATE","logs":[{"timestamp":***,"message":{"level":30,"msg":"Source version: 0.12.3"}},{"timestamp":***,"message":{"level":30,"msg":"Config: {\\"user\\":\\"chris\\"}"}},{"timestamp":***,"message":{"level":30,"msg":"Catalog: {}"}},{"timestamp":***,"message":{"level":30,"msg":"State: {}"}},{"timestamp":***,"message":{"level":30,"msg":"Syncing ExampleSource"}},{"timestamp":***,"message":{"level":30,"msg":"Finished syncing ExampleSource"}}]} " `; diff --git a/airbyte-local-cli-nodejs/test/utils.it.test.ts b/airbyte-local-cli-nodejs/test/utils.it.test.ts index c597787..8b37b5f 100644 --- a/airbyte-local-cli-nodejs/test/utils.it.test.ts +++ b/airbyte-local-cli-nodejs/test/utils.it.test.ts @@ -191,7 +191,8 @@ describe('processSrcInputFile', () => { await expect(processSrcInputFile(tmpDir, cfg)).resolves.not.toThrow(); const output = readFileSync(testSrcOutputFile, 'utf8'); - expect(output).toMatchSnapshot(); + const outputWithoutTS = output.split('\n').map((line) => line.replace(/"timestamp":\d+/g, '"timestamp":***')); + expect(outputWithoutTS.join('\n')).toMatchSnapshot(); }); it('should fail with processing error', async () => { From b8e28e113f9429572a1ec8eb62feaa9ab0826210 Mon Sep 17 00:00:00 2001 From: Jennie Gao Date: Tue, 21 Jan 2025 10:05:33 -0800 Subject: [PATCH 7/9] remove test only --- airbyte-local-cli-nodejs/test/docker.it.test.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte-local-cli-nodejs/test/docker.it.test.ts b/airbyte-local-cli-nodejs/test/docker.it.test.ts index 4e51e93..eecdcc3 100644 --- a/airbyte-local-cli-nodejs/test/docker.it.test.ts +++ b/airbyte-local-cli-nodejs/test/docker.it.test.ts @@ -49,7 +49,7 @@ describe('checkSrcConnection', () => { }); }); -describe.only('runSrcSync', () => { +describe('runSrcSync', () => { const testCfg: FarosConfig = { ...defaultConfig, src: { From 3c227f02fe1f9dfa1e0b74d14bfc9c588d5e2f7d Mon Sep 17 00:00:00 2001 From: Jennie Gao Date: Tue, 21 Jan 2025 12:02:46 -0800 Subject: [PATCH 8/9] add e2e srcOnly test --- airbyte-local-cli-nodejs/.gitignore | 1 + airbyte-local-cli-nodejs/src/utils.ts | 3 +++ airbyte-local-cli-nodejs/test/exec/spec/index_spec.sh | 10 ++++++++++ .../test/resources/test_config_file_src_only.json | 8 ++++++++ 4 files changed, 22 insertions(+) create mode 100644 airbyte-local-cli-nodejs/test/resources/test_config_file_src_only.json diff --git a/airbyte-local-cli-nodejs/.gitignore b/airbyte-local-cli-nodejs/.gitignore index 947a0d0..2a705fd 100644 --- a/airbyte-local-cli-nodejs/.gitignore +++ b/airbyte-local-cli-nodejs/.gitignore @@ -14,3 +14,4 @@ sample_command.sh *airbyte-local test/exec/resources tmp* +faros_airbyte_cli_config.json diff --git a/airbyte-local-cli-nodejs/src/utils.ts b/airbyte-local-cli-nodejs/src/utils.ts index 6aa43e9..4a6bad7 100644 --- a/airbyte-local-cli-nodejs/src/utils.ts +++ b/airbyte-local-cli-nodejs/src/utils.ts @@ -51,6 +51,9 @@ export function parseConfigFile(configFilePath: string): {src: AirbyteConfig; ds }; const validateConfig = (cfg: AirbyteConfig) => { + if (!cfg) { + return true; + } const allowedKeys = ['image', 'config', 'catalog', 'dockerOptions']; return Object.keys(cfg).every((key) => allowedKeys.includes(key)); }; diff --git a/airbyte-local-cli-nodejs/test/exec/spec/index_spec.sh b/airbyte-local-cli-nodejs/test/exec/spec/index_spec.sh index 52a5407..010fc9f 100644 --- a/airbyte-local-cli-nodejs/test/exec/spec/index_spec.sh +++ b/airbyte-local-cli-nodejs/test/exec/spec/index_spec.sh @@ -122,6 +122,16 @@ Describe 'Run source sync' The output should include "Failed to run source connector: Failed to run source connector." The status should equal 1 End + It 'should succeed with srcOnly' + airbyte_local_test() { + ./airbyte-local \ + --config-file './resources/test_config_file_src_only.json' \ + --src-only + } + When call airbyte_local_test + The status should equal 0 + The output should include "Source connector ran successfully." + End End # Clean up temeporary test files diff --git a/airbyte-local-cli-nodejs/test/resources/test_config_file_src_only.json b/airbyte-local-cli-nodejs/test/resources/test_config_file_src_only.json new file mode 100644 index 0000000..d704b15 --- /dev/null +++ b/airbyte-local-cli-nodejs/test/resources/test_config_file_src_only.json @@ -0,0 +1,8 @@ +{ + "src": { + "image": "farosai/airbyte-example-source", + "config": { + "user": "chris" + } + } +} \ No newline at end of file From 6c2b52c4a45f83bc09effd9980f8ddd99f77e0d9 Mon Sep 17 00:00:00 2001 From: Jennie Gao Date: Tue, 21 Jan 2025 13:59:20 -0800 Subject: [PATCH 9/9] update comment --- airbyte-local-cli-nodejs/src/docker.ts | 1 + airbyte-local-cli-nodejs/src/utils.ts | 8 +++++--- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/airbyte-local-cli-nodejs/src/docker.ts b/airbyte-local-cli-nodejs/src/docker.ts index 7c3b5e0..f27b82b 100644 --- a/airbyte-local-cli-nodejs/src/docker.ts +++ b/airbyte-local-cli-nodejs/src/docker.ts @@ -187,6 +187,7 @@ export async function runSrcSync(tmpDir: string, config: FarosConfig): Promise { return new Promise((resolve, reject) => { - // create input and output streams: - // - input stream: read from the data file user provided - // - output stream: write to an intermediate file. Overwrite the file if it exists, otherwise create a new one + // create input and output streams const inputStream = createReadStream(cfg.srcInputFile!); const outputStream = createWriteStream(`${tmpDir}/${SRC_OUTPUT_DATA_FILE}`);