Skip to content

Commit

Permalink
update tests
Browse files Browse the repository at this point in the history
  • Loading branch information
jeniii committed Jan 21, 2025
1 parent 86a28ef commit c3d29e2
Show file tree
Hide file tree
Showing 9 changed files with 116 additions and 115 deletions.
15 changes: 1 addition & 14 deletions airbyte-local-cli-nodejs/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 1 addition & 2 deletions airbyte-local-cli-nodejs/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,7 @@
"dockerode": "^4.0.2",
"lodash": "^4.17.21",
"pino": "^9.6.0",
"pino-pretty": "^13.0.0",
"yoctocolors-cjs": "^2.1.2"
"pino-pretty": "^13.0.0"
},
"jest": {
"silent": false,
Expand Down
40 changes: 26 additions & 14 deletions airbyte-local-cli-nodejs/src/docker.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,18 @@
import {writeFileSync} from 'node:fs';
import {createWriteStream, writeFileSync} from 'node:fs';
import {Writable} from 'node:stream';

import Docker from 'dockerode';

import {AirbyteConnectionStatus, AirbyteConnectionStatusMessage, AirbyteMessageType, FarosConfig} from './types';
import {DEFAULT_STATE_FILE, logger, SRC_CATALOG_FILENAME, SRC_CONFIG_FILENAME} from './utils';
import {
DEFAULT_STATE_FILE,
logger,
OutputStream,
processSrcDataByLine,
SRC_CATALOG_FILENAME,
SRC_CONFIG_FILENAME,
SRC_OUTPUT_DATA_FILE,
} from './utils';

// Constants
const DEFAULT_MAX_LOG_SIZE = '10m';
Expand Down Expand Up @@ -119,10 +127,8 @@ export async function checkSrcConnection(tmpDir: string, image: string, srcConfi
* --config "/configs/$src_config_filename" \
* --catalog "/configs/$src_catalog_filename" \
* --state "/configs/$src_state_filename"
*
* @argument command - for testing purposes only
*/
export async function runSrcSync(tmpDir: string, config: FarosConfig): Promise<string> {
export async function runSrcSync(tmpDir: string, config: FarosConfig): Promise<void> {
logger.info('Running source connector...');

if (!config.src?.image) {
Expand Down Expand Up @@ -180,35 +186,41 @@ export async function runSrcSync(tmpDir: string, config: FarosConfig): Promise<s
const cidfilePath = `tmp-${timestamp}-src_cid`;
writeFileSync(cidfilePath, container.id);

// Create a writable stream for the processed output data
const srcOutputFilePath = config.srcOutputFile ?? `${tmpDir}/${SRC_OUTPUT_DATA_FILE}`;
const srcOutputStream =
config.srcOutputFile === OutputStream.STDOUT ? process.stdout : createWriteStream(srcOutputFilePath);

// create a writable stream to capture the stdout
// TODO: write to a file instead of memory
let data = '';
const stdoutStream = new Writable({
let buffer = '';
const containerOutputStream = new Writable({
write(chunk, _encoding, callback) {
data += chunk.toString();
buffer += chunk.toString();
const lines = buffer.split('\n');
buffer = lines.pop() ?? '';
lines.forEach((line: string) => {
processSrcDataByLine(line, srcOutputStream, config);
});
callback();
},
});

// Attach the stderr to termincal stderr, and stdout to the output stream
const stream = await container.attach({stream: true, stdout: true, stderr: true});
container.modem.demuxStream(stream, stdoutStream, process.stderr);
container.modem.demuxStream(stream, containerOutputStream, process.stderr);

// Start the container
await container.start();

// Wait for the container to finish
const res = await container.wait();
logger.debug(res);
logger.debug(data);

if (res.StatusCode === 0) {
logger.info('Source connector ran successfully.');
} else {
throw new Error('Failed to run source connector.');
}

// return the stdout data
return data;
} catch (error: any) {
throw new Error(`Failed to run source connector: ${error.message ?? JSON.stringify(error)}`);
}
Expand Down
7 changes: 3 additions & 4 deletions airbyte-local-cli-nodejs/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import {parseAndValidateInputs} from './command';
import {checkDockerInstalled, checkSrcConnection, pullDockerImage, runSrcSync} from './docker';
import {AirbyteCliContext} from './types';
import {cleanUp, createTmpDir, loadStateFile, logger, processSrcData, writeConfig} from './utils';
import {cleanUp, createTmpDir, loadStateFile, logger, processSrcInputFile, writeConfig} from './utils';

async function main(): Promise<void> {
const context: AirbyteCliContext = {};
Expand All @@ -28,10 +28,9 @@ async function main(): Promise<void> {
// Run airbyte source connector
if (!cfg.srcInputFile) {
await runSrcSync(context.tmpDir, cfg);
} else {
await processSrcInputFile(context.tmpDir, cfg);
}

// Process source data
await processSrcData(cfg);
} catch (error: any) {
logger.error(error.message, 'Error');
cleanUp(context);
Expand Down
105 changes: 47 additions & 58 deletions airbyte-local-cli-nodejs/src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,11 @@ import {
} from 'node:fs';
import {tmpdir} from 'node:os';
import {sep} from 'node:path';
import readline from 'node:readline';
import {Writable} from 'node:stream';

import pino from 'pino';
import pretty from 'pino-pretty';
import readline from 'readline';

import {AirbyteCliContext, AirbyteConfig, FarosConfig} from './types';

Expand All @@ -32,11 +33,6 @@ export const DEFAULT_STATE_FILE = 'state.json';
export const SRC_INPUT_DATA_FILE = `${FILENAME_PREFIX}_src_data`;
export const SRC_OUTPUT_DATA_FILE = `${FILENAME_PREFIX}_src_output`;

// Check if the value is an OutputStream
function isOutputStream(value: any): value is OutputStream {
return Object.values(OutputStream).includes(value);
}

// Create a pino logger instance
export const logger = pino(pretty({colorize: true}));

Expand Down Expand Up @@ -209,8 +205,6 @@ export function writeFile(file: string, data: any): void {
/**
* Process the source output.
*
* jq_src_msg="\"${GREEN}[SRC]: \" + ${JQ_TIMESTAMP} + \" - \" + ."
*
* Command line:
* tee >(
* jq -cR $jq_color_opt --unbuffered 'fromjson? |
Expand All @@ -220,62 +214,53 @@ export function writeFile(file: string, data: any): void {
* jq -cR --unbuffered "fromjson? |
* select(.type == \"RECORD\" or .type == \"STATE\") |
* .record.stream |= \"${dst_stream_prefix}\" + ." |
* tee "$output_filepath" |
* tee "$output_filepath" | ...
*
* jq_src_msg="\"${GREEN}[SRC]: \" + ${JQ_TIMESTAMP} + \" - \" + ."
*
*
* Note: `dst_stream_prefix` command option is dropped
*/
export function processSrcData(cfg: FarosConfig): Promise<void> {
return new Promise((resolve, reject) => {
// Reformat the JSON message
function formatSrcMsg(json: any): string {
return `[SRC] - ${JSON.stringify(json)}`;
}

// Processing the source line by line
function processLine(line: string): void {
// skip empty lines
if (line.trim() === '') {
return;
}

try {
const data = JSON.parse(line);
// Processing the source line by line
export function processSrcDataByLine(line: string, outputStream: Writable, cfg: FarosConfig): void {
// Reformat the JSON message
function formatSrcMsg(json: any): string {
return `[SRC] - ${JSON.stringify(json)}`;
}
// skip empty lines
if (line.trim() === '') {
return;
}

// non RECORD and STATE type messages: print as stdout
if (data.type !== 'RECORD' && data.type !== 'STATE') {
logger.info(formatSrcMsg(data));
}
// RECORD and STATE type messages: logger or write to output file
else {
if (cfg.srcOutputFile === OutputStream.STDOUT) {
logger.info(formatSrcMsg(data));
} else {
outputStream.write(`${line}\n`);
}
}
} catch (error: any) {
rl.emit('error', new Error(`Line of data: '${line}'; Error: ${error.message}`));
try {
const data = JSON.parse(line);

// non RECORD and STATE type messages: print as stdout
// RECORD and STATE type messages: when the output is set to stdout
if ((data.type !== 'RECORD' && data.type !== 'STATE') || cfg.srcOutputFile === OutputStream.STDOUT) {
if (cfg.rawMessages) {
process.stdout.write(`${line}\n`);
} else {
logger.info(formatSrcMsg(data));
}
}

// Close the output stream if it's a file
function closeOutputStream(): void {
if (!isOutputStream(cfg.srcOutputFile)) {
outputStream.end();
}
logger.debug(`Closing the output stream file '${cfg.srcOutputFile}'.`);
// RECORD and STATE type messages: write to output file
else {
outputStream.write(`${line}\n`);
}
} catch (error: any) {
throw new Error(`Line of data: '${line}'; Error: ${error.message}`);
}
}

// get source data file and output file paths
const srcInputFilePath = cfg.srcInputFile ?? SRC_INPUT_DATA_FILE;
const srcOutputFilePath = cfg.srcOutputFile ?? SRC_OUTPUT_DATA_FILE;

export function processSrcInputFile(tmpDir: string, cfg: FarosConfig): Promise<void> {
return new Promise((resolve, reject) => {
// create input and output streams:
// - input stream: read from the data file user provided or the one the script created in the temporary directory
// - output stream: write to a file or stdout. Overwrite the file if it exists, otherwise create a new one
const inputStream = createReadStream(srcInputFilePath);
const outputStream: any =
cfg.srcOutputFile === OutputStream.STDOUT ? process.stdout : createWriteStream(srcOutputFilePath);
// - input stream: read from the data file user provided
// - output stream: write to an intermediate file. Overwrite the file if it exists, otherwise create a new one
const inputStream = createReadStream(cfg.srcInputFile!);
const outputStream = createWriteStream(`${tmpDir}/${SRC_OUTPUT_DATA_FILE}`);

// create readline interface
const rl = readline.createInterface({
Expand All @@ -284,20 +269,24 @@ export function processSrcData(cfg: FarosConfig): Promise<void> {
});

rl.on('line', (line) => {
processLine(line);
try {
processSrcDataByLine(line, outputStream, cfg);
} catch (error: any) {
rl.emit('error', error);
}
})
.on('close', () => {
logger.debug('Finished processing the source output data.');
closeOutputStream();
outputStream.end();
resolve();
})
.on('error', (error) => {
closeOutputStream();
outputStream.end();
reject(new Error(`Failed to process the source output data: ${error.message ?? JSON.stringify(error)}`));
});

outputStream.on('error', (error: any) => {
closeOutputStream();
outputStream.end();
reject(new Error(`Failed to write to the output file: ${error.message ?? JSON.stringify(error)}`));
});
});
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
// Jest Snapshot v1, https://goo.gl/fbAQLP

exports[`runSrcSync should success 1`] = `
"{"state":{"data":{"format":"base64/gzip","data":"H4sIAAAAAAAAA6uuBQBDv6ajAgAAAA=="}},"type":"STATE","redactedConfig":{"user":"chris"},"sourceType":"example","sourceVersion":"0.12.4"}
{"state":{"data":{"format":"base64/gzip","data":"H4sIAAAAAAAAA6uuBQBDv6ajAgAAAA=="}},"type":"STATE","sourceStatus":{"status":"SUCCESS"}}
{"state":{"data":{"format":"base64/gzip","data":"H4sIAAAAAAAAA6uuBQBDv6ajAgAAAA=="}},"type":"STATE","logs":[{"timestamp":***,"message":{"level":30,"msg":"Source version: 0.12.4"}},{"timestamp":***,"message":{"level":30,"msg":"Config: {\\"user\\":\\"chris\\"}"}},{"timestamp":***,"message":{"level":30,"msg":"Catalog: {}"}},{"timestamp":***,"message":{"level":30,"msg":"State: {}"}},{"timestamp":***,"message":{"level":30,"msg":"Syncing ExampleSource"}},{"timestamp":***,"message":{"level":30,"msg":"Finished syncing ExampleSource"}}]}
"
`;
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ exports[`parseConfigFile should pass 1`] = `
}
`;

exports[`processSrcData should succeed writing to an output file 1`] = `
exports[`processSrcInputFile should succeed writing to an output file 1`] = `
"{"state":{"data":{"format":"base64/gzip","data":"H4sIAAAAAAAAA6uuBQBDv6ajAgAAAA=="}},"type":"STATE","redactedConfig":{"user":"chris"},"sourceType":"example","sourceVersion":"0.12.3"}
{"state":{"data":{"format":"base64/gzip","data":"H4sIAAAAAAAAA6uuBQBDv6ajAgAAAA=="}},"type":"STATE","sourceStatus":{"status":"SUCCESS"}}
{"state":{"data":{"format":"base64/gzip","data":"H4sIAAAAAAAAA6uuBQBDv6ajAgAAAA=="}},"type":"STATE","logs":[{"timestamp":1736891682696,"message":{"level":30,"msg":"Source version: 0.12.3"}},{"timestamp":1736891682697,"message":{"level":30,"msg":"Config: {\\"user\\":\\"chris\\"}"}},{"timestamp":1736891682697,"message":{"level":30,"msg":"Catalog: {}"}},{"timestamp":1736891682697,"message":{"level":30,"msg":"State: {}"}},{"timestamp":1736891682700,"message":{"level":30,"msg":"Syncing ExampleSource"}},{"timestamp":1736891682704,"message":{"level":30,"msg":"Finished syncing ExampleSource"}}]}
Expand Down
23 changes: 18 additions & 5 deletions airbyte-local-cli-nodejs/test/docker.it.test.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import {readdirSync, unlinkSync} from 'node:fs';
import {readdirSync, readFileSync, rmSync, unlinkSync} from 'node:fs';
import path from 'node:path';
import {Writable} from 'node:stream';

import {checkSrcConnection, pullDockerImage, runSrcSync} from '../src/docker';
import {FarosConfig} from '../src/types';
import {SRC_OUTPUT_DATA_FILE} from '../src/utils';

const defaultConfig: FarosConfig = {
srcCheckConnection: false,
Expand Down Expand Up @@ -48,14 +49,21 @@ describe('checkSrcConnection', () => {
});
});

describe('runSrcSync', () => {
describe.only('runSrcSync', () => {
const testCfg: FarosConfig = {
...defaultConfig,
src: {
image: 'farosai/airbyte-example-source',
},
};

const testTmpDir = `${process.cwd()}/test/resources/dockerIt_runSrcSync`;

// remove the intermediate output file
afterEach(() => {
rmSync(`${testTmpDir}/${SRC_OUTPUT_DATA_FILE}`, {force: true});
});

// Clean up files created by the test
afterAll(() => {
const pattern = /.*-src_cid$/;
Expand All @@ -69,10 +77,15 @@ describe('runSrcSync', () => {
});

it('should success', async () => {
await expect(runSrcSync(`${process.cwd()}/test/resources/dockerIt_runSrcSync`, testCfg)).resolves.not.toThrow();
await expect(runSrcSync(testTmpDir, testCfg)).resolves.not.toThrow();

// Replace timestamp for comparison
const output = readFileSync(`${testTmpDir}/${SRC_OUTPUT_DATA_FILE}`, 'utf8');
const outputWithoutTS = output.split('\n').map((line) => line.replace(/"timestamp":\d+/g, '"timestamp":***'));
expect(outputWithoutTS.join('\n')).toMatchSnapshot();
});

// Check the error message is correctly redirect to process.stderr
// Check stderr message is correctly redirect to process.stderr
it('should fail', async () => {
// Capture process.stderr
let stderrData = '';
Expand All @@ -87,7 +100,7 @@ describe('runSrcSync', () => {

try {
await expect(
runSrcSync(`${process.cwd()}/test/resources/dockerIt_runSrcSync`, {
runSrcSync(testTmpDir, {
...testCfg,
src: {image: 'farosai/airbyte-faros-graphql-source'},
}),
Expand Down
Loading

0 comments on commit c3d29e2

Please sign in to comment.