Skip to content

Commit

Permalink
Merge pull request #3275 from terascope/es-client-facade
Browse files Browse the repository at this point in the history
wrapper for es / opensearch client
  • Loading branch information
ciorg authored Nov 29, 2022
2 parents 76c370a + 8d97f48 commit 860afc0
Show file tree
Hide file tree
Showing 176 changed files with 12,466 additions and 1,842 deletions.
10 changes: 8 additions & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,17 @@ jobs:
if: branch = master AND type IN (pull_request, cron)
script: yarn --silent test --suite unit-a . -- -- --shard=4/4

- name: 'Unit Test Suite (node 14)'
- name: 'Unit Test Suite (node 14) shard 1'
node_js: '14.17'
# run only on pull-requests or cron
if: branch = master AND type IN (pull_request, cron)
script: yarn --silent test --suite unit-b
script: yarn --silent test --suite unit-b . -- -- --shard=1/2

- name: 'Unit Test Suite (node 14) shard 2'
node_js: '14.17'
# run only on pull-requests or cron
if: branch = master AND type IN (pull_request, cron)
script: yarn --silent test --suite unit-b . -- -- --shard=2/2

- name: 'ES Test Suite (elasticsearch 6) (node 16)'
node_js: '16.17.1'
Expand Down
4 changes: 2 additions & 2 deletions e2e/package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "e2e",
"displayName": "E2E Tests",
"version": "0.1.1",
"version": "0.2.0",
"private": true,
"description": "Teraslice integration test suite",
"keywords": [
Expand Down Expand Up @@ -33,7 +33,7 @@
},
"devDependencies": {
"bunyan": "^1.8.15",
"elasticsearch": "^15.4.1",
"elasticsearch-store": "^0.64.0",
"fs-extra": "^10.1.0",
"ms": "^2.1.3",
"nanoid": "^3.3.4",
Expand Down
48 changes: 28 additions & 20 deletions e2e/test/cases/assets/simple-spec.js
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
'use strict';

const fs = require('fs');
const misc = require('../../misc');
const wait = require('../../wait');
const { resetState, submitAndStart } = require('../../helpers');
const TerasliceHarness = require('../../teraslice-harness');

describe('assets', () => {
beforeAll(() => resetState());
let terasliceHarness;

const teraslice = misc.teraslice();
beforeAll(async () => {
terasliceHarness = new TerasliceHarness();
await terasliceHarness.init();
await terasliceHarness.resetState();
});

/**
* Uploads the specified asset file and then submits the specified job config
Expand All @@ -23,17 +25,20 @@ describe('assets', () => {
*/
async function submitAndValidateAssetJob(jobSpecName, assetPath) {
const fileStream = fs.createReadStream(assetPath);
const jobSpec = misc.newJob(jobSpecName);
const jobSpec = terasliceHarness.newJob(jobSpecName);
const { workers } = jobSpec; // save for comparison

const result = await teraslice.assets.upload(fileStream, { blocking: true });
const result = await terasliceHarness.teraslice.assets.upload(
fileStream,
{ blocking: true }
);
// NOTE: In this case, the asset is referenced by the ID
// assigned by teraslice and not it's name.
jobSpec.assets = [result._id, 'standard', 'elasticsearch'];

const ex = await submitAndStart(jobSpec);
const ex = await terasliceHarness.submitAndStart(jobSpec);

const r = await wait.forWorkersJoined(ex.id(), workers, 25);
const r = await terasliceHarness.forWorkersJoined(ex.id(), workers, 25);
expect(r).toEqual(workers);

await ex.stop({ blocking: true });
Expand All @@ -42,11 +47,14 @@ describe('assets', () => {
it('after uploading an asset, it can be deleted', async () => {
const testStream = fs.createReadStream('test/fixtures/assets/example_asset_1.zip');

const result = await teraslice.assets.upload(testStream, { blocking: true });
const result = await terasliceHarness.teraslice.assets.upload(
testStream,
{ blocking: true }
);

// save the asset ID that was submitted to terslice
const assetId = result._id;
const response = await teraslice.assets.remove(assetId);
const response = await terasliceHarness.teraslice.assets.remove(assetId);

// ensure the deleted asset's ID matches that of
// the saved asset
Expand All @@ -62,7 +70,7 @@ describe('assets', () => {
const testStream = fs.createReadStream('test/fixtures/assets/example_bad_asset_1.zip');

try {
await teraslice.assets.upload(testStream, { blocking: true });
await terasliceHarness.teraslice.assets.upload(testStream, { blocking: true });
} catch (err) {
expect(err.message).toInclude('asset.json was not found');
expect(err.code).toEqual(422);
Expand Down Expand Up @@ -95,17 +103,17 @@ describe('assets', () => {

const fileStream = fs.createReadStream(assetPath);
// the asset on this job already points to 'ex1' so it should use the latest available asset
const jobSpec = misc.newJob('generator-asset');
const jobSpec = terasliceHarness.newJob('generator-asset');
const { workers } = jobSpec;

const assetResponse = await teraslice.assets.upload(fileStream, {
const assetResponse = await terasliceHarness.teraslice.assets.upload(fileStream, {
blocking: true
});
const assetId = assetResponse._id;

const ex = await submitAndStart(jobSpec);
const ex = await terasliceHarness.submitAndStart(jobSpec);

const waitResponse = await wait.forWorkersJoined(ex.id(), workers, 25);
const waitResponse = await terasliceHarness.forWorkersJoined(ex.id(), workers, 25);
expect(waitResponse).toEqual(workers);

const execution = await ex.config();
Expand All @@ -115,16 +123,16 @@ describe('assets', () => {
});

it('can directly ask for the new asset to be used', async () => {
const jobSpec = misc.newJob('generator-asset');
const jobSpec = terasliceHarness.newJob('generator-asset');
jobSpec.assets = ['ex1:0.1.1', 'standard', 'elasticsearch'];
const { workers } = jobSpec;

const assetResponse = await teraslice.assets.getAsset('ex1', '0.1.1');
const assetResponse = await terasliceHarness.teraslice.assets.getAsset('ex1', '0.1.1');
const assetId = assetResponse[0].id;

const ex = await submitAndStart(jobSpec);
const ex = await terasliceHarness.submitAndStart(jobSpec);

const waitResponse = await wait.forWorkersJoined(ex.id(), workers, 25);
const waitResponse = await terasliceHarness.forWorkersJoined(ex.id(), workers, 25);
expect(waitResponse).toEqual(workers);

const execution = await ex.config();
Expand Down
61 changes: 31 additions & 30 deletions e2e/test/cases/cluster/api-spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,39 +2,40 @@

const fs = require('fs');
const { cloneDeep } = require('@terascope/utils');
const misc = require('../../misc');
const { resetState } = require('../../helpers');

const { waitForExStatus } = require('../../wait');
const TerasliceHarness = require('../../teraslice-harness');

describe('cluster api', () => {
beforeAll(() => resetState());
let terasliceHarness;

const teraslice = misc.teraslice();
beforeAll(async () => {
terasliceHarness = new TerasliceHarness();
await terasliceHarness.init();
await terasliceHarness.resetState();
});

it('submitted jobs are not saved in validated form', async () => {
const assetPath = 'test/fixtures/assets/example_asset_1.zip';
const testStream = fs.createReadStream(assetPath);
const jobSpec = misc.newJob('generator-asset');
const jobSpec = terasliceHarness.newJob('generator-asset');

await teraslice.assets.upload(testStream, {
await terasliceHarness.teraslice.assets.upload(testStream, {
blocking: true
});
const job = await teraslice.jobs.submit(jobSpec, true);
const job = await terasliceHarness.teraslice.jobs.submit(jobSpec, true);
const jobConfig = await job.config();

expect(jobConfig).toMatchObject(jobSpec);
});

it('should update job config', async () => {
// NOTE that this relies on the asset loaded in the test above
const jobSpec = misc.newJob('generator-asset');
const jobSpec = terasliceHarness.newJob('generator-asset');
const { workers, slicers } = jobSpec;
const alteredJob = cloneDeep(jobSpec);
alteredJob.workers = 3;
delete alteredJob.slicers;

const job = await teraslice.jobs.submit(jobSpec, true);
const job = await terasliceHarness.teraslice.jobs.submit(jobSpec, true);

const jobId = job.id();

Expand All @@ -43,7 +44,7 @@ describe('cluster api', () => {
expect(jobConfig.slicers).toEqual(slicers);
expect(jobConfig.workers).toEqual(workers);

await teraslice.cluster.put(`/jobs/${jobId}`, alteredJob);
await terasliceHarness.teraslice.cluster.put(`/jobs/${jobId}`, alteredJob);

jobConfig = await job.config();

Expand All @@ -53,10 +54,10 @@ describe('cluster api', () => {
});

it('will not send lifecycle changes to executions that are not active', async () => {
const jobSpec = misc.newJob('reindex');
const specIndex = misc.newSpecIndex('api');
const jobSpec = terasliceHarness.newJob('reindex');
const specIndex = terasliceHarness.newSpecIndex('api');
jobSpec.name = 'basic reindex for lifecycle';
jobSpec.operations[0].index = misc.getExampleIndex(100);
jobSpec.operations[0].index = terasliceHarness.getExampleIndex(100);
jobSpec.operations[1].index = specIndex;

async function didError(p) {
Expand All @@ -68,28 +69,28 @@ describe('cluster api', () => {
}
}

const job = await teraslice.jobs.submit(jobSpec);
const job = await terasliceHarness.teraslice.jobs.submit(jobSpec);
const jobId = job.id();

const { ex_id: exId } = await job.execution();
const ex = teraslice.executions.wrap(exId);
const ex = terasliceHarness.teraslice.executions.wrap(exId);

await waitForExStatus(ex, 'completed', 100, 1000);
await terasliceHarness.waitForExStatus(ex, 'completed', 100, 1000);

const result = await Promise.all([
didError(teraslice.cluster.post(`/jobs/${jobId}/_stop`)),
didError(teraslice.cluster.post(`/jobs/${jobId}/_resume`)),
didError(teraslice.cluster.post(`/jobs/${jobId}/_pause`)),
didError(teraslice.cluster.post(`/ex/${exId}/_stop`)),
didError(teraslice.cluster.post(`/ex/${exId}/_resume`)),
didError(teraslice.cluster.post(`/ex/${exId}/_pause`))
didError(terasliceHarness.teraslice.cluster.post(`/jobs/${jobId}/_stop`)),
didError(terasliceHarness.teraslice.cluster.post(`/jobs/${jobId}/_resume`)),
didError(terasliceHarness.teraslice.cluster.post(`/jobs/${jobId}/_pause`)),
didError(terasliceHarness.teraslice.cluster.post(`/ex/${exId}/_stop`)),
didError(terasliceHarness.teraslice.cluster.post(`/ex/${exId}/_resume`)),
didError(terasliceHarness.teraslice.cluster.post(`/ex/${exId}/_pause`))
]);

expect(result).toEqual([true, true, true, true, true, true]);
});

it('api end point /assets should return an array of json objects of asset metadata', async () => {
const response = await teraslice.cluster.get('/assets');
const response = await terasliceHarness.teraslice.cluster.get('/assets');

expect(response).toBeArray();
expect(response[0]).toHaveProperty('_created');
Expand All @@ -99,7 +100,7 @@ describe('cluster api', () => {
});

it('api end point /assets/assetName should return an array of json objects of asset metadata', async () => {
const response = await teraslice.cluster.get('/assets/ex1');
const response = await terasliceHarness.teraslice.cluster.get('/assets/ex1');

expect(response).toBeArray();
expect(response[0]).toHaveProperty('_created');
Expand All @@ -109,7 +110,7 @@ describe('cluster api', () => {
});

it('api end point /assets/assetName/version should return an array of json objects of asset metadata', async () => {
const response = await teraslice.cluster.get('/assets/ex1/0.0.1');
const response = await terasliceHarness.teraslice.cluster.get('/assets/ex1/0.0.1');

expect(response).toBeArray();
expect(response[0]).toHaveProperty('_created');
Expand All @@ -119,17 +120,17 @@ describe('cluster api', () => {
});

it('api end point /txt/assets should return a text table', async () => {
const response = await teraslice.cluster.txt('assets');
const response = await terasliceHarness.teraslice.cluster.txt('assets');
expect(response).toBeString();
});

it('api end point /txt/assets/assetName should return a text table', async () => {
const response = await teraslice.cluster.txt('assets/ex1');
const response = await terasliceHarness.teraslice.cluster.txt('assets/ex1');
expect(response).toBeString();
});

it('api end point /txt/assets/assetName/version should return a text table', async () => {
const response = await teraslice.cluster.txt('assets/ex1/0.0.1');
const response = await terasliceHarness.teraslice.cluster.txt('assets/ex1/0.0.1');
expect(response).toBeString();
});
});
26 changes: 14 additions & 12 deletions e2e/test/cases/cluster/job-state-spec.js
Original file line number Diff line number Diff line change
@@ -1,29 +1,31 @@
'use strict';

const { waitForExStatus } = require('../../wait');
const { resetState } = require('../../helpers');
const misc = require('../../misc');
const TerasliceHarness = require('../../teraslice-harness');

describe('job state', () => {
beforeAll(() => resetState());
let terasliceHarness;

const teraslice = misc.teraslice();
beforeAll(async () => {
terasliceHarness = new TerasliceHarness();
await terasliceHarness.init();
await terasliceHarness.resetState();
});

it('should cycle through after state changes with other jobs running', async () => {
const jobSpec1 = misc.newJob('generator');
const jobSpec2 = misc.newJob('generator');
const jobSpec1 = terasliceHarness.newJob('generator');
const jobSpec2 = terasliceHarness.newJob('generator');
jobSpec2.operations[1].name = 'second_generator';

const [ex1, ex2] = await Promise.all([
teraslice.executions.submit(jobSpec1),
teraslice.executions.submit(jobSpec2)
terasliceHarness.teraslice.executions.submit(jobSpec1),
terasliceHarness.teraslice.executions.submit(jobSpec2)
]);

await waitForExStatus(ex1, 'running');
await terasliceHarness.waitForExStatus(ex1, 'running');
await ex1.pause();
await waitForExStatus(ex1, 'paused');
await terasliceHarness.waitForExStatus(ex1, 'paused');
await ex1.resume();
await waitForExStatus(ex1, 'running');
await terasliceHarness.waitForExStatus(ex1, 'running');

await Promise.all([
ex1.stop({ blocking: true }),
Expand Down
Loading

0 comments on commit 860afc0

Please sign in to comment.