Skip to content

Commit

Permalink
Merge pull request #2074 from terascope/add-waitForNumPods
Browse files Browse the repository at this point in the history
Add wait for num pods
  • Loading branch information
godber authored Jul 31, 2020
2 parents 10d9911 + fa4fa66 commit 52f42df
Show file tree
Hide file tree
Showing 12 changed files with 184 additions and 58 deletions.
6 changes: 3 additions & 3 deletions examples/k8s/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -178,8 +178,8 @@ rebuild: destroy build setup ## destroys then re-runs things
make register

register: ## creates asset and registers job
earl assets deploy ${TERASLICE_ALIAS} terascope/elasticsearch-assets
earl assets deploy ${TERASLICE_ALIAS} --build --replace --src-dir asset/
earl assets deploy ${TERASLICE_ALIAS} --blocking terascope/elasticsearch-assets
earl assets deploy ${TERASLICE_ALIAS} --blocking --build --replace --src-dir asset/
earl tjm register ${TERASLICE_ALIAS} example-job.json
earl tjm register ${TERASLICE_ALIAS} example-job-labels.json
earl tjm register ${TERASLICE_ALIAS} example-job-resource.json
Expand All @@ -195,7 +195,7 @@ deregister: ## resets jobs

start: ## starts example job
# yes | tjm asset --replace -c $(TERASLICE_MASTER_URL) || echo '* it is okay'
earl assets deploy ${TERASLICE_ALIAS} --build --replace --src-dir asset/
earl assets deploy ${TERASLICE_ALIAS} --blocking --build --replace --src-dir asset/
earl tjm start example-job.json

stop: ## stops example job
Expand Down
5 changes: 4 additions & 1 deletion examples/k8s/asset/asset/asset.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
{
"name": "example",
"version": "1.0.0"
"version": "1.0.0",
"node_version": false,
"platform": false,
"arch": false
}
4 changes: 4 additions & 0 deletions examples/k8s/example-job.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@
{
"_op": "example-op"
},
{
"_op": "delay",
"ms": 30000
},
{
"_op": "elasticsearch_index_selector",
"index": "terak8s-example-data",
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "teraslice-workspace",
"displayName": "Teraslice",
"version": "0.69.0",
"version": "0.69.1",
"private": true,
"homepage": "https://github.com/terascope/teraslice",
"bugs": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,17 @@
"app.kubernetes.io/instance": "{{clusterNameLabel}}"
},
"name": "{{name}}",
"namespace": "{{namespace}}"
"namespace": "{{namespace}}",
"ownerReferences": [
{
"apiVersion": "batch/v1",
"controller": false,
"blockOwnerDeletion": false,
"kind": "Job",
"name": "{{exName}}",
"uid": "{{exUid}}"
}
]
},
"spec": {
"replicas": {{replicas}},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,13 @@ module.exports = function kubernetesClusterBackend(context, clusterMasterServer)
const clusterState = {};
let clusterStateInterval = null;

const k8s = new K8s(logger, null, kubernetesNamespace,
context.sysconfig.teraslice.kubernetes_api_poll_delay);
const k8s = new K8s(
logger,
null,
kubernetesNamespace,
context.sysconfig.teraslice.kubernetes_api_poll_delay,
context.sysconfig.teraslice.shutdown_timeout
);

clusterMasterServer.onClientOnline((exId) => {
logger.info(`execution ${exId} is connected`);
Expand Down Expand Up @@ -116,7 +121,16 @@ module.exports = function kubernetesClusterBackend(context, clusterMasterServer)
* @param {Object} execution Object that contains information of Execution
* @return {Promise} [description]
*/
function allocateWorkers(execution) {
async function allocateWorkers(execution) {
// NOTE: I tried to set these on the execution inside allocateSlicer
// but these properties were gone by the time this was called, perhaps
// because they are not on the schema. So I do this k8s API call
// instead.
const selector = `app.kubernetes.io/component=execution_controller,teraslice.terascope.io/jobId=${execution.job_id}`;
const jobs = await k8s.list(selector, 'jobs');
execution.k8sName = jobs.items[0].metadata.name;
execution.k8sUid = jobs.items[0].metadata.uid;

const kr = new K8sResource(
'deployments', 'worker', context.sysconfig.teraslice, execution
);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,19 @@
'use strict';

const {
TSError, get, isEmpty, pDelay
TSError, get, isEmpty, pDelay, pRetry
} = require('@terascope/utils');
const { Client, KubeConfig } = require('kubernetes-client');
const Request = require('kubernetes-client/backends/request');
const { getRetryConfig } = require('./utils');

class K8s {
constructor(logger, clientConfig, defaultNamespace = 'default', apiPollDelay) {
constructor(logger, clientConfig, defaultNamespace = 'default',
apiPollDelay, shutdownTimeout) {
this.apiPollDelay = apiPollDelay;
this.logger = logger;
this.defaultNamespace = defaultNamespace;
this.logger = logger;
this.shutdownTimeout = shutdownTimeout; // this is in milliseconds

if (clientConfig) {
this.client = new Client({
Expand Down Expand Up @@ -50,7 +53,8 @@ class K8s {
async getNamespaces() {
let namespaces;
try {
namespaces = await this.client.api.v1.namespaces.get();
namespaces = await pRetry(() => this.client
.api.v1.namespaces.get(), getRetryConfig());
} catch (err) {
const error = new TSError(err, {
reason: 'Failure getting in namespaces'
Expand All @@ -64,11 +68,14 @@ class K8s {
* Rerturns the first pod matching the provided selector after it has
* entered the `Running` state.
*
* TODO: Make more generic to search for different statuses
*
* NOTE: If your selector will return multiple pods, this method probably
* won't work for you.
* @param {String} selector kubernetes selector, like 'controller-uid=XXX'
* @param {String} ns namespace to search, this will override the default
* @param {Number} timeout time, in ms, to wait for pod to start
* @return {Object} pod
*/
async waitForSelectedPod(selector, ns, timeout = 10000) {
const namespace = ns || this.defaultNamespace;
Expand All @@ -77,8 +84,9 @@ class K8s {

// eslint-disable-next-line no-constant-condition
while (true) {
const result = await this.client.api.v1.namespaces(namespace)
.pods().get({ qs: { labelSelector: selector } });
const result = await pRetry(() => this.client
.api.v1.namespaces(namespace).pods()
.get({ qs: { labelSelector: selector } }), getRetryConfig());

let pod;
if (typeof result !== 'undefined' && result) {
Expand All @@ -97,6 +105,42 @@ class K8s {
}
}

/**
* Waits for the number of pods to equal number.
* @param {Number} number Number of pods to wait for, e.g.: 0, 10
* @param {String} selector kubernetes selector, like 'controller-uid=XXX'
* @param {String} ns namespace to search, this will override the default
* @param {Number} timeout time, in ms, to wait for pod to start
* @return {Array} Array of pod objects
*/
async waitForNumPods(number, selector, ns, timeout = 10000) {
const namespace = ns || this.defaultNamespace;
let now = Date.now();
const end = now + timeout;

// eslint-disable-next-line no-constant-condition
while (true) {
const result = await pRetry(() => this.client
.api.v1.namespaces(namespace).pods()
.get({ qs: { labelSelector: selector } }), getRetryConfig());

let podList;
if (typeof result !== 'undefined' && result) {
podList = get(result, 'body.items');
}

if (typeof podList !== 'undefined' && podList) {
if (podList.length === number) return podList;
}
const msg = `Waiting: pods matching ${selector} is ${podList.length}/${number}`;
if (now > end) throw new Error(`Timeout ${msg}`);
this.logger.debug(msg);

await pDelay(this.apiPollDelay);
now = Date.now();
}
}

/**
* returns list of k8s objects matching provided selector
* @param {String} selector kubernetes selector, like 'app=teraslice'
Expand All @@ -111,17 +155,21 @@ class K8s {

try {
if (objType === 'pods') {
response = await this.client.api.v1.namespaces(namespace)
.pods().get({ qs: { labelSelector: selector } });
response = await pRetry(() => this.client
.api.v1.namespaces(namespace).pods()
.get({ qs: { labelSelector: selector } }), getRetryConfig());
} else if (objType === 'deployments') {
response = await this.client.apis.apps.v1.namespaces(namespace)
.deployments().get({ qs: { labelSelector: selector } });
response = await pRetry(() => this.client
.apis.apps.v1.namespaces(namespace).deployments()
.get({ qs: { labelSelector: selector } }), getRetryConfig());
} else if (objType === 'services') {
response = await this.client.api.v1.namespaces(namespace)
.services().get({ qs: { labelSelector: selector } });
response = await pRetry(() => this.client
.api.v1.namespaces(namespace).services()
.get({ qs: { labelSelector: selector } }), getRetryConfig());
} else if (objType === 'jobs') {
response = await this.client.apis.batch.v1.namespaces(namespace)
.jobs().get({ qs: { labelSelector: selector } });
response = await pRetry(() => this.client
.apis.batch.v1.namespaces(namespace).jobs()
.get({ qs: { labelSelector: selector } }), getRetryConfig());
} else {
const error = new Error(`Wrong objType provided to get: ${objType}`);
this.logger.error(error);
Expand Down Expand Up @@ -194,8 +242,9 @@ class K8s {
let response;

try {
response = await this.client.apis.apps.v1.namespaces(this.defaultNamespace)
.deployments(name).patch({ body: record });
response = await pRetry(() => this.client
.apis.apps.v1.namespaces(this.defaultNamespace).deployments(name)
.patch({ body: record }), getRetryConfig());
} catch (e) {
const err = new Error(`Request k8s.patch with ${name} failed with: ${e}`);
this.logger.error(err);
Expand Down Expand Up @@ -224,22 +273,25 @@ class K8s {

try {
if (objType === 'services') {
response = await this.client.api.v1.namespaces(this.defaultNamespace)
.services(name).delete();
response = await pRetry(() => this.client
.api.v1.namespaces(this.defaultNamespace).services(name)
.delete(), getRetryConfig(), getRetryConfig());
} else if (objType === 'deployments') {
response = await this.client.apis.apps.v1.namespaces(this.defaultNamespace)
.deployments(name).delete();
response = await pRetry(() => this.client
.apis.apps.v1.namespaces(this.defaultNamespace).deployments(name)
.delete(), getRetryConfig());
} else if (objType === 'jobs') {
// To get a Job to remove the associated pods you have to
// include a body like the one below with the delete request
response = await this.client.apis.batch.v1.namespaces(this.defaultNamespace)
.jobs(name).delete({
response = await pRetry(() => this.client
.apis.batch.v1.namespaces(this.defaultNamespace).jobs(name)
.delete({
body: {
apiVersion: 'v1',
kind: 'DeleteOptions',
propagationPolicy: 'Background'
}
});
}), getRetryConfig());
} else {
throw new Error(`Invalid objType: ${objType}`);
}
Expand All @@ -261,18 +313,49 @@ class K8s {

/**
* Delete all of the deployments and services related to the specified exId
*
* The process here waits for the worker pods to completely exit before
* terminating the execution controller pod. The intent is to avoid having
* a worker timeout when it tries to tell the execution controller it is
* exiting.
*
* @param {String} exId ID of the execution
* @return {Promise}
*/
async deleteExecution(exId) {
const r = [];
if (!exId) {
throw new Error('deleteExecution requires an executionId');
}

return Promise.all([
this._deleteObjByExId(exId, 'worker', 'deployments'),
this._deleteObjByExId(exId, 'execution_controller', 'jobs'),
]);
try {
this.logger.info(`Deleting worker deployment for ex_id: ${exId}`);
r.push(await this._deleteObjByExId(exId, 'worker', 'deployments'));

await this.waitForNumPods(
0,
`app.kubernetes.io/component=worker,teraslice.terascope.io/exId=${exId}`,
null,
this.shutdownTimeout + 15000 // shutdown_timeout + 15s
);
} catch (e) {
// deliberately ignore errors, k8s will clean up workers when
// execution controller gets deleted.
const err = new Error(`Error encountered deleting pod deployment, continuing execution controller shutdown: ${e}`);
this.logger.error(err);
}

try {
this.logger.info(`Deleting execution controller job for ex_id: ${exId}`);
r.push(await this._deleteObjByExId(exId, 'execution_controller', 'jobs'));
} catch (e) {
const err = new Error(`Error deleting execution controller: ${e}`);
this.logger.error(err);
return Promise.reject(err);
}

this.logger.debug(`Deleted Resources:\n\n${r.map((x) => JSON.stringify(x, null, 2))}`);
return r;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@ class K8sResource {
dockerImage,
execution: safeEncode(this.execution),
exId: this.execution.ex_id,
exName: this.execution.k8sName,
exUid: this.execution.k8sUid,
jobId: this.execution.job_id,
jobNameLabel,
name,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ const fs = require('fs');
const path = require('path');
const barbe = require('barbe');

const { isTest } = require('@terascope/utils');

function makeTemplate(folder, fileName) {
const filePath = path.join(__dirname, folder, `${fileName}.hbs`);
const templateData = fs.readFileSync(filePath, 'utf-8');
Expand Down Expand Up @@ -38,4 +40,19 @@ function setMaxOldSpaceViaEnv(envArr, jobEnv, memory) {
});
}

module.exports = { setMaxOldSpaceViaEnv, makeTemplate, getMaxOldSpace };
const MAX_RETRIES = isTest ? 2 : 3;
const RETRY_DELAY = isTest ? 50 : 1000; // time in ms

function getRetryConfig() {
return {
retries: MAX_RETRIES,
delay: RETRY_DELAY
};
}

module.exports = {
getMaxOldSpace,
getRetryConfig,
makeTemplate,
setMaxOldSpaceViaEnv
};
2 changes: 1 addition & 1 deletion packages/teraslice/package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "teraslice",
"displayName": "Teraslice",
"version": "0.69.0",
"version": "0.69.1",
"description": "Distributed computing platform for processing JSON data",
"homepage": "https://github.com/terascope/teraslice#readme",
"bugs": {
Expand Down
Loading

0 comments on commit 52f42df

Please sign in to comment.