Skip to content

Commit

Permalink
[FAI-13687] Set up stream prefix and connection name (#104)
Browse files Browse the repository at this point in the history
  • Loading branch information
jeniii authored Jan 23, 2025
1 parent 6512a95 commit d07708e
Show file tree
Hide file tree
Showing 4 changed files with 84 additions and 7 deletions.
20 changes: 20 additions & 0 deletions airbyte-local-cli-nodejs/src/docker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,26 @@ export async function pullDockerImage(image: string): Promise<void> {
}
}

export async function inspectDockerImage(image: string): Promise<{digest: string; version: string}> {
logger.debug(`Inspecting docker image: ${image}`);

try {
const imageInfo = await _docker.getImage(image).inspect();
logger.debug(`Docker image inspected: ${image}`);

const digest = imageInfo.RepoDigests[0];
const version = imageInfo.Config.Labels['io.airbyte.version'];

if (!digest || !version) {
throw new Error('RepoDigests or airbyte version label is missing.');
}
return {digest, version};
} catch (error: any) {
logger.error(`Failed to inspect docker image: ${image}`);
throw error;
}
}

/**
* Spinning up a docker container to check the source connection.
* `docker run --rm -v "$tempdir:/configs" $src_docker_options "$src_docker_image"
Expand Down
16 changes: 14 additions & 2 deletions airbyte-local-cli-nodejs/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,17 @@
import {parseAndValidateInputs} from './command';
import {checkDockerInstalled, checkSrcConnection, pullDockerImage, runSrcSync} from './docker';
import {AirbyteCliContext} from './types';
import {cleanUp, createTmpDir, loadStateFile, logger, processSrcInputFile, writeConfig} from './utils';
import {
cleanUp,
createTmpDir,
generateDstStreamPrefix,
ImageType,
loadStateFile,
logger,
logImageVersion,
processSrcInputFile,
writeConfig,
} from './utils';

async function main(): Promise<void> {
const context: AirbyteCliContext = {};
Expand All @@ -12,7 +22,6 @@ async function main(): Promise<void> {

// Create temporary directory, load state file, write config to files
context.tmpDir = createTmpDir();
loadStateFile(context.tmpDir, cfg?.stateFile, cfg?.connectionName);
writeConfig(context.tmpDir, cfg);

// Pull source docker image
Expand All @@ -27,6 +36,9 @@ async function main(): Promise<void> {

// Run airbyte source connector
if (!cfg.srcInputFile) {
await logImageVersion(ImageType.SRC, cfg.src?.image);
generateDstStreamPrefix(cfg);
loadStateFile(context.tmpDir, cfg?.stateFile, cfg?.connectionName);
await runSrcSync(context.tmpDir, cfg);
} else {
await processSrcInputFile(context.tmpDir, cfg);
Expand Down
3 changes: 3 additions & 0 deletions airbyte-local-cli-nodejs/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ export interface FarosConfig {
keepContainers: boolean;
logLevel: string;
debug: boolean;

// internal use
dstStreamPrefix?: string;
}

/**
Expand Down
52 changes: 47 additions & 5 deletions airbyte-local-cli-nodejs/src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,18 @@ import {Writable} from 'node:stream';
import pino from 'pino';
import pretty from 'pino-pretty';

import {inspectDockerImage} from './docker';
import {AirbyteCliContext, AirbyteConfig, FarosConfig} from './types';

// constants
export enum OutputStream {
STDERR = 'STDERR',
STDOUT = 'STDOUT',
}
export enum ImageType {
SRC = 'source',
DST = 'destination',
}
export const FILENAME_PREFIX = 'faros_airbyte_cli';
export const SRC_CONFIG_FILENAME = `${FILENAME_PREFIX}_src_config.json`;
export const DST_CONFIG_FILENAME = `${FILENAME_PREFIX}_dst_config.json`;
Expand All @@ -40,6 +45,16 @@ export function updateLogLevel(debug: boolean | undefined): void {
logger.level = debug ? 'debug' : 'info';
}

// Log the docker image digest and version
export async function logImageVersion(type: ImageType, image: string | undefined): Promise<void> {
if (image === undefined) {
return;
}
const {digest, version} = await inspectDockerImage(image);
logger.info(`Using ${type} image digest ${digest}`);
logger.info(`Using ${type} image version ${version}`);
}

// Read the config file and covert to AirbyteConfig
export function parseConfigFile(configFilePath: string): {src: AirbyteConfig; dst: AirbyteConfig} {
try {
Expand Down Expand Up @@ -104,6 +119,7 @@ export function createTmpDir(absTmpDir?: string): string {
// Load the existing state file and write to the temporary folder
export function loadStateFile(tempDir: string, filePath?: string, connectionName?: string): void {
const path = filePath ?? (connectionName ? `${connectionName}__state.json` : DEFAULT_STATE_FILE);
logger.info(`Using state file: '${path}'`);

// Read the state file and write to temp folder
// Write an empty state file if the state file hasn't existed yet
Expand All @@ -121,7 +137,7 @@ export function loadStateFile(tempDir: string, filePath?: string, connectionName
);
}
writeFileSync(`${tempDir}/${DEFAULT_STATE_FILE}`, '{}');
logger.debug(`State file '${DEFAULT_STATE_FILE}' not found. An empty state file is created.`);
logger.debug(`State file '${path}' not found. An empty state file will be created.`);
}
}

Expand Down Expand Up @@ -221,8 +237,6 @@ export function writeFile(file: string, data: any): void {
*
* jq_src_msg="\"${GREEN}[SRC]: \" + ${JQ_TIMESTAMP} + \" - \" + ."
*
*
* Note: `dst_stream_prefix` command option is dropped
*/

// Processing the source line by line
Expand All @@ -241,7 +255,7 @@ export function processSrcDataByLine(line: string, outputStream: Writable, cfg:

// non RECORD and STATE type messages: print as stdout
// RECORD and STATE type messages: when the output is set to stdout
if ((data.type !== 'RECORD' && data.type !== 'STATE') || cfg.srcOutputFile === OutputStream.STDOUT) {
if ((data?.type !== 'RECORD' && data?.type !== 'STATE') || cfg.srcOutputFile === OutputStream.STDOUT) {
if (cfg.rawMessages) {
process.stdout.write(`${line}\n`);
} else {
Expand All @@ -250,7 +264,10 @@ export function processSrcDataByLine(line: string, outputStream: Writable, cfg:
}
// RECORD and STATE type messages: write to output file
else {
outputStream.write(`${line}\n`);
if (data?.record?.stream && cfg.dstStreamPrefix) {
data.record.stream = `${cfg.dstStreamPrefix ?? ''}${data.record.stream}`;
}
outputStream.write(`${JSON.stringify(data)}\n`);
}
} catch (error: any) {
throw new Error(`Line of data: '${line}'; Error: ${error.message}`);
Expand Down Expand Up @@ -296,3 +313,28 @@ export function processSrcInputFile(tmpDir: string, cfg: FarosConfig): Promise<v
});
});
}

export function generateDstStreamPrefix(cfg: FarosConfig): void {
const srcImage = cfg.src?.image;
const dstImage = cfg.dst?.image;
if (dstImage?.startsWith('farosai/airbyte-faros-destination')) {
// if source image is a faros feed image
if (
!cfg.connectionName &&
srcImage?.startsWith('farosai/airbyte-faros-feeds-source') &&
(cfg.src?.config as any)?.feed_cfg?.feed_name
) {
cfg.connectionName = `${(cfg.src?.config as any)?.feed_cfg?.feed_name}-feed`;
logger.debug(`Using connection name: ${cfg.connectionName}`);
}
// if image is an airbyte image
if (srcImage?.startsWith('farosai/airbyte')) {
const [imageName] = srcImage.split(':');
const imageParts = imageName?.split('-').slice(1, -1);
cfg.connectionName = `my${imageParts?.join('') ?? ''}src`;
cfg.dstStreamPrefix = `${cfg.connectionName}_${imageParts?.join('_') ?? ''}__`;
logger.debug(`Using connection name: ${cfg.connectionName}`);
logger.debug(`Using destination stream prefix: ${cfg.dstStreamPrefix}`);
}
}
}

0 comments on commit d07708e

Please sign in to comment.