Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Large asset Ex OOM fix when in s3 asset mode #3598

Merged
merged 7 commits into from
Apr 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 109 additions & 6 deletions e2e/test/cases/assets/simple-spec.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,17 @@
import 'jest-extended';
import { createReadStream } from 'node:fs';
import fs from 'node:fs';
import os from 'os';
import path from 'path';
import decompress from 'decompress';
import archiver from 'archiver';
import {
createS3Client,
getS3Object,
S3Client,
} from '@terascope/file-asset-apis';
import { Teraslice } from '@terascope/types';
import { pWhile } from '@terascope/utils';
import crypto from 'crypto';
import { TerasliceHarness, JobFixtureNames } from '../../teraslice-harness.js';
import {
ASSET_STORAGE_CONNECTION_TYPE, MINIO_ACCESS_KEY, MINIO_HOST, MINIO_SECRET_KEY, TEST_PLATFORM
Expand All @@ -32,7 +38,7 @@ describe('assets', () => {
* @param {string} assetPath the relative path to the asset file
*/
async function submitAndValidateAssetJob(jobSpecName: JobFixtureNames, assetPath: string) {
const fileStream = createReadStream(assetPath);
const fileStream = fs.createReadStream(assetPath);
const jobSpec = terasliceHarness.newJob(jobSpecName);
// Set resource constraints on workers within CI
if (TEST_PLATFORM === 'kubernetes') {
Expand All @@ -57,7 +63,7 @@ describe('assets', () => {
}

it('after uploading an asset, it can be deleted', async () => {
const testStream = createReadStream('test/fixtures/assets/example_asset_1.zip');
const testStream = fs.createReadStream('test/fixtures/assets/example_asset_1.zip');

const result = await terasliceHarness.teraslice.assets.upload(
testStream,
Expand All @@ -79,7 +85,7 @@ describe('assets', () => {
// {"error":"asset.json was not found in root directory of asset bundle
// nor any immediate sub directory"}
it('uploading a bad asset returns an error', async () => {
const testStream = createReadStream('test/fixtures/assets/example_bad_asset_1.zip');
const testStream = fs.createReadStream('test/fixtures/assets/example_bad_asset_1.zip');

try {
await terasliceHarness.teraslice.assets.upload(testStream, { blocking: true });
Expand Down Expand Up @@ -113,7 +119,7 @@ describe('assets', () => {
it('can update an asset bundle and use the new asset', async () => {
const assetPath = 'test/fixtures/assets/example_asset_1updated.zip';

const fileStream = createReadStream(assetPath);
const fileStream = fs.createReadStream(assetPath);
// the asset on this job already points to 'ex1' so it should use the latest available asset
const jobSpec = terasliceHarness.newJob('generator-asset');
// Set resource constraints on workers within CI
Expand Down Expand Up @@ -173,6 +179,8 @@ describe('assets', () => {
describe('s3 asset storage', () => {
// If the connection type is S3 run tests to ensure assets are stored in S3
if (ASSET_STORAGE_CONNECTION_TYPE === 's3') {
/// keep 'largeAssetPath' in outer scope so afterAll can cleanup even on failure
const largeAssetPath = fs.mkdtempSync(path.join(os.tmpdir(), 'example_large_asset_top'));
let terasliceInfo: Teraslice.ApiRootResponse;
let terasliceHarness: TerasliceHarness;
let s3client: S3Client;
Expand All @@ -197,9 +205,14 @@ describe('s3 asset storage', () => {
bucketName = `ts-assets-${terasliceInfo.name}`.replaceAll('_', '-');
});

afterAll(async () => {
/// cleanup
fs.rmSync(largeAssetPath, { recursive: true, force: true });
});

it('stores assets in s3', async () => {
const assetPath = 'test/fixtures/assets/example_asset_1updated.zip';
const fileStream = createReadStream(assetPath);
const fileStream = fs.createReadStream(assetPath);
const assetResponse = await terasliceHarness.teraslice.assets.upload(fileStream, {
blocking: true
});
Expand All @@ -221,5 +234,95 @@ describe('s3 asset storage', () => {
expect(record._source?.blob).toBeUndefined();
}
});

it('can upload and use large asset', async () => {
/// Create a large asset within the test so we don't have to a upload
/// large binary file to the repo
const assetPath = 'test/fixtures/assets/example_asset_1updated.zip';
if (!fs.existsSync(largeAssetPath)) {
fs.mkdirSync(largeAssetPath, { recursive: true });
}
const largeAssetPathSub = path.join(largeAssetPath, 'example_large_asset_sub');
if (!fs.existsSync(largeAssetPathSub)) {
fs.mkdirSync(largeAssetPathSub, { recursive: true });
}
const assetBuffer = fs.readFileSync(assetPath);
await decompress(assetBuffer, largeAssetPathSub);
fs.mkdirSync(path.join(largeAssetPathSub, '__static_assets'), { recursive: true });
const largeDocumentPath = path.join(largeAssetPathSub, '__static_assets', 'data.txt');
fs.writeFileSync(largeDocumentPath, '');
const writer = fs.createWriteStream(largeDocumentPath);
let generateComplete = false;

/// TODO: This functionality could be moved to utils at some point.
/// Writes a chunk of random string data to data.txt
/// It needs to be random to maintain size during compression
function writeData() {
/// chunk size in bytes
/// 5mb per chunk
const chunkSize = 5242880;
const stringChunk = crypto.randomBytes(chunkSize);
writer.write(stringChunk, writerCB);
}

/// Once the previous chunk is proccesed,
/// write another chunk until the bytes written is >= 60mb
/// This is so we don't hold all 60mb in memory
function writerCB(error: Error | null | void) {
if (error) {
throw new Error(error.message);
}
const totalBytes = writer.bytesWritten;
if (totalBytes >= 62914560) {
writer.end();
generateComplete = true;
} else {
writeData();
}
}
/// Once the write stream is ready start writing data to the file
writer.on('ready', () => {
writeData();
});

writer.on('error', (err) => {
throw new Error(err.message);
});
/// Wait for all data to be written to file
await pWhile(async () => generateComplete);

/// Change name in asset.json
const assetJSON = JSON.parse(fs.readFileSync(path.join(largeAssetPathSub, 'asset.json'), 'utf8'));
assetJSON.name = 'large-example-asset';
fs.writeFileSync(path.join(largeAssetPathSub, 'asset.json'), JSON.stringify(assetJSON, null, 2));

/// Zip the large asset
const zippedFile = fs.createWriteStream(path.join(largeAssetPath, 'example_large_asset.zip'));
const zipper = archiver('zip');
zipper.pipe(zippedFile);
zipper.on('error', (err) => {
throw new Error(err.message);
});
zipper.directory(largeAssetPathSub, false);
await zipper.finalize();

const fileStream = fs.createReadStream(path.join(largeAssetPath, 'example_large_asset.zip'));

/// Will throw error if unable to upload
await terasliceHarness.teraslice.assets.upload(fileStream, {
blocking: true
});

const jobSpec = terasliceHarness.newJob('generator-large-asset');
// // Set resource constraints on workers within CI
if (TEST_PLATFORM === 'kubernetes') {
jobSpec.resources_requests_cpu = 0.1;
}

const ex = await terasliceHarness.submitAndStart(jobSpec);
const status = await ex.waitForStatus('completed');

expect(status).toBe('completed');
});
}
});
19 changes: 19 additions & 0 deletions e2e/test/fixtures/jobs/generator-large-asset.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
{
"name": "generator",
"slicers": 1,
"lifecycle": "once",
"workers": 3,
"analytics": false,
"assets": ["standard", "large-example-asset"],
"max_retries": 0,
"operations": [
{
"_op": "data_generator",
"size": 1000,
"stress_test": false
},
{
"_op": "noop"
}
]
}
2 changes: 2 additions & 0 deletions e2e/test/teraslice-harness.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import { scaleWorkers, getElapsed } from './docker-helpers.js';
import signale from './signale.js';
import generatorToESJob from './fixtures/jobs/generate-to-es.json' assert { type: 'json' };
import generatorAssetJob from './fixtures/jobs/generator-asset.json' assert { type: 'json' };
import generatorLargeAssetJob from './fixtures/jobs/generator-large-asset.json' assert { type: 'json' };
import generatorJob from './fixtures/jobs/generator.json' assert { type: 'json' };
import idJob from './fixtures/jobs/id.json' assert { type: 'json' };
import kafkaReaderJob from './fixtures/jobs/kafka-reader.json' assert { type: 'json' };
Expand All @@ -29,6 +30,7 @@ import { defaultAssetBundles } from './download-assets.js';
const JobDict = Object.freeze({
'generate-to-es': generatorToESJob,
'generator-asset': generatorAssetJob,
'generator-large-asset': generatorLargeAssetJob,
generator: generatorJob,
id: idJob,
'kafka-reader': kafkaReaderJob,
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "teraslice-workspace",
"displayName": "Teraslice",
"version": "1.3.0",
"version": "1.3.1",
"private": true,
"homepage": "https://github.com/terascope/teraslice",
"bugs": {
Expand Down
2 changes: 1 addition & 1 deletion packages/teraslice/package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "teraslice",
"displayName": "Teraslice",
"version": "1.3.0",
"version": "1.3.1",
"description": "Distributed computing platform for processing JSON data",
"homepage": "https://github.com/terascope/teraslice#readme",
"bugs": {
Expand Down
27 changes: 21 additions & 6 deletions packages/teraslice/src/lib/storage/backends/s3_store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -89,24 +89,39 @@ export class S3Store {

// TODO: if we want to use the S3 store more generically we can't
// assume the key will have a '.zip' extension
async get(recordId: string) {
async get(recordId: string): Promise<Buffer> {
const command = {
Bucket: this.bucket,
Key: `${recordId}.zip`
};
try {
this.logger.debug(`getting record with id: ${recordId} from s3 ${this.connection} connection, ${this.bucket} bucket.`);
const client = this.api;
const bufferArray: Buffer[] = [];
let triggerReturn = false;
const response = await s3RequestWithRetry({
client,
func: getS3Object,
params: command
});
const s3File = await response.Body?.transformToString('base64');
if (typeof s3File !== 'string') {
throw new TSError(`Unable to get recordId ${recordId} from s3 ${this.connection} connection, ${this.bucket} bucket.`);
}
return s3File;
/// Convert the response body to a Node read stream
const s3Stream = response.Body as NodeJS.ReadableStream;

/// Store the data coming into s3 into a buffer array
s3Stream.on('data', (chunk: Buffer) => {
bufferArray.push(chunk);
});
s3Stream.on('end', () => {
triggerReturn = true;
});
s3Stream.on('error', (err) => {
throw new TSError(`Unable to get recordId ${recordId} from s3 ${this.connection} connection, ${this.bucket} bucket.
Reason: ${err.message}`);
});

await pWhile(async () => triggerReturn);

return Buffer.concat(bufferArray);
} catch (err) {
if (err instanceof S3ClientResponse.NoSuchKey) {
throw new TSError(`recordId ${recordId} does not exist in s3 ${this.connection} connection, ${this.bucket} bucket.`, {
Expand Down
8 changes: 7 additions & 1 deletion packages/teraslice/src/lib/workers/assets/loader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,14 @@ export class AssetLoader {

const assetRecord = await this.assetsStorage.get(assetIdentifier);
this.logger.info(`loading assets: ${assetIdentifier}`);
let buff: Buffer;

if (this.context.sysconfig.terafoundation.asset_storage_connection_type === 's3') {
buff = assetRecord.blob as Buffer;
} else {
buff = Buffer.from(assetRecord.blob as string, 'base64');
}

const buff = Buffer.from(assetRecord.blob as string, 'base64');
const saveResult = await saveAsset(
this.logger,
this.assetsDirectory,
Expand Down
3 changes: 2 additions & 1 deletion packages/teraslice/src/lib/workers/assets/spawn.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ export async function spawnAssetLoader(

if (!isSuccess) {
const errMsg = get(message, 'error', `exit code ${code}`);
const error = new Error(`Failure to get assets, caused by ${errMsg}`);
const errOOM = 'If running out of memory, try consider increasing the memory allocation for the process by adding/modifying the "memory_execution_controller" or "resources_limits_memory" (for workers) field in the job file.';
const error = new Error(`Failure to get assets, caused by ${errMsg}\n${errOOM}`);
reject(error);
} else {
resolve(get(message, 'assetIds', []));
Expand Down
5 changes: 4 additions & 1 deletion packages/teraslice/test/storage/assets_storage-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,11 @@ describe('AssetsStorage using S3 backend', () => {
});

it('can get an asset from S3', async () => {
/// create a buffer copy of example_asset_1.zip to test if it equals what s3 sends back
const filePath = 'e2e/test/fixtures/assets/example_asset_1.zip';
const buffer = fs.readFileSync(filePath);
const assetRecord = await storage.get('2909ec5fd38466cf6276cc14ede25096f1f34ee9');
expect(assetRecord.blob).toStartWith('UEsDBAoAAAAAANxV');
expect(buffer.equals(assetRecord.blob as Buffer)).toBe(true);
expect(assetRecord.name).toBe('ex1');
});

Expand Down
5 changes: 3 additions & 2 deletions packages/teraslice/test/storage/s3_store-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -110,11 +110,12 @@ describe('S3 backend test', () => {

it('should be able to download asset', async () => {
const filePath = 'e2e/test/fixtures/assets/example_asset_2.zip';
await s3Backend.save('ex2', fse.readFileSync(filePath), 30000);
const fileBuffer = fse.readFileSync(filePath);
await s3Backend.save('ex2', fileBuffer, 30000);

const result = await s3Backend.get('ex2');

expect(result).toStartWith('UEsDBAo');
expect(result.equals(fileBuffer)).toBe(true);
await s3Backend.remove('ex2');
});
});
Expand Down
Loading