Skip to content

Commit

Permalink
Add readiness probe to k8s job in KubernetesV2 backend (#3721)
Browse files Browse the repository at this point in the history
This PR makes the following changes:

- Adds readiness probe to `execution controller` job (_**Only applies to
k8s V2 backend**_)
- Worker deployments will now wait for the ex readiness probe to come
back as `ready` before creation.
- Adds `/ready` endpoint to execution controller server 
  - Will return with status code `200` when initializing is complete 
- Will return with status code `503` when initializing either fails or
hasn't yet completed
- Bumps **@terascope/teraslice-messaging** from `v1.2.0` to `v1.3.0`

Ref to issue #3720

---------

Co-authored-by: Austin Godber <godber@terascope.io>
  • Loading branch information
sotojn and godber authored Aug 22, 2024
1 parent e3af988 commit b9e3fa8
Show file tree
Hide file tree
Showing 9 changed files with 68 additions and 10 deletions.
2 changes: 1 addition & 1 deletion e2e/test/cases/kafka/kafka-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ describe('kafka', () => {
terasliceHarness.waitForExStatus(sender, 'completed')
]);

await terasliceHarness.waitForIndexCount(specIndex, total);
await terasliceHarness.waitForIndexCount(specIndex, total, 45 * 1000);
await reader.stop();

await terasliceHarness.waitForExStatus(reader, 'stopped');
Expand Down
2 changes: 1 addition & 1 deletion packages/teraslice-messaging/package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "@terascope/teraslice-messaging",
"displayName": "Teraslice Messaging",
"version": "1.2.0",
"version": "1.3.0",
"description": "An internal teraslice messaging library using socket.io",
"homepage": "https://github.com/terascope/teraslice/tree/master/packages/teraslice-messaging#readme",
"bugs": {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { Logger } from '@terascope/utils';
import http from 'node:http';

export interface ClientOptions {
executionControllerUrl: string;
Expand All @@ -16,9 +17,13 @@ export interface ServerOptions {
workerDisconnectTimeout: number;
networkLatencyBuffer?: number;
actionTimeout: number;
requestListener?: RequestListener;
logger?: Logger;
}

export interface RequestListener {
(request: http.IncomingMessage, response: http.ServerResponse): void;
}
export interface Worker {
workerId: string;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,12 @@ const { Available, Unavailable } = core.ClientState;
export class Server extends core.Server {
private _activeWorkers: i.ActiveWorkers;
queue: Queue<EnqueuedWorker>;
executionReady: boolean;

constructor(opts: i.ServerOptions) {
const {
port, actionTimeout, networkLatencyBuffer, workerDisconnectTimeout, logger
port, actionTimeout, networkLatencyBuffer,
workerDisconnectTimeout, logger, requestListener
} = opts;

if (!isNumber(workerDisconnectTimeout)) {
Expand All @@ -21,14 +23,16 @@ export class Server extends core.Server {
super({
port,
actionTimeout,
requestListener,
networkLatencyBuffer,
clientDisconnectTimeout: workerDisconnectTimeout,
serverName: 'ExecutionController',
logger,
logger
});

this.queue = new Queue();
this._activeWorkers = {};
this.executionReady = false;
}

async start(): Promise<void> {
Expand Down
2 changes: 1 addition & 1 deletion packages/teraslice/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
"dependencies": {
"@terascope/elasticsearch-api": "^4.0.0",
"@terascope/job-components": "^1.2.0",
"@terascope/teraslice-messaging": "^1.2.0",
"@terascope/teraslice-messaging": "^1.3.0",
"@terascope/types": "^1.0.0",
"@terascope/utils": "^1.0.0",
"async-mutex": "^0.5.0",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,10 +150,9 @@ export class KubernetesClusterBackendV2 {

const controllerUid = jobResult.spec?.selector?.matchLabels?.[controllerLabel];

// Right now this is waiting for the selected pod to come up in a "running"
// state. It may be better to check for a readiness probe instead
const pod = await this.k8s.waitForSelectedPod(
`${controllerLabel}=${controllerUid}`,
'pod-status',
undefined,
this.context.sysconfig.teraslice.slicer_timeout
);
Expand Down Expand Up @@ -191,6 +190,14 @@ export class KubernetesClusterBackendV2 {
// @ts-expect-error
execution.k8sUid = jobs.items[0].metadata.uid;

/// Wait for ex readiness probe to return 'Ready'
await this.k8s.waitForSelectedPod(
selector,
'readiness-probe',
undefined,
this.context.sysconfig.teraslice.slicer_timeout
);

const kr = new K8sResource(
'deployments',
'worker',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,17 @@
"image": "{{dockerImage}}",
"name": "{{name}}",
"ports": [{ "containerPort": 45680 }],
"readinessProbe": {
"httpGet": {
"path": "/health",
"port": 45680
},
"initialDelaySeconds": 3,
"periodSeconds": 10,
"timeoutSeconds": 1,
"successThreshold": 1,
"failureThreshold": 3
},
"volumeMounts": [{
"mountPath": "/app/config",
"name": "config"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ export class K8s {
* TODO: Should this use the cluster state that gets polled periodically,
* rather than making it's own k8s API calls
*/
async waitForSelectedPod(selector: string, ns?: string, timeout = 10000) {
async waitForSelectedPod(selector: string, statusType: string, ns?: string, timeout = 10000) {
const namespace = ns || this.defaultNamespace;
let now = Date.now();
const end = now + timeout;
Expand All @@ -104,7 +104,20 @@ export class K8s {
}

if (pod) {
if (get(pod, 'status.phase') === 'Running') return pod;
if (statusType === 'readiness-probe') {
if (pod.status?.conditions) {
for (const condition of pod.status.conditions) {
if (
condition.type === 'ContainersReady'
&& condition.status === 'True'
) {
return pod;
}
}
}
} else if (statusType === 'pod-status') {
if (get(pod, 'status.phase') === 'Running') return pod;
}
}
if (now > end) throw new Error(`Timeout waiting for pod matching: ${selector}`);
this.logger.debug(`waiting for pod matching: ${selector}`);
Expand Down
20 changes: 19 additions & 1 deletion packages/teraslice/src/lib/workers/execution-controller/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import ms from 'ms';
import _ from 'lodash';
import http from 'node:http';
import {
formatURL, ExecutionController as ExController, ClusterMaster
} from '@terascope/teraslice-messaging';
Expand Down Expand Up @@ -79,10 +80,10 @@ export class ExecutionController {
const workerDisconnectTimeout = get(config, 'worker_disconnect_timeout');
const nodeDisconnectTimeout = get(config, 'node_disconnect_timeout');
const shutdownTimeout = get(config, 'shutdown_timeout');

this.server = new ExController.Server({
port: slicerPort,
networkLatencyBuffer,
requestListener: this.requestListener.bind(this),
actionTimeout,
workerDisconnectTimeout,
logger
Expand Down Expand Up @@ -311,6 +312,8 @@ export class ExecutionController {
this.logger.info(`execution: ${this.exId} initialized execution_controller`);

this.isInitialized = true;
/// This will change the '/ready' endpoint to Ready
this.server.executionReady = true;
}

async run() {
Expand Down Expand Up @@ -1176,4 +1179,19 @@ export class ExecutionController {
);
}
}

requestListener(req: http.IncomingMessage, res: http.ServerResponse) {
if (req.url === '/health') {
if (this.server.executionReady) {
res.writeHead(200);
res.end('Ready');
} else {
res.writeHead(503);
res.end('Service Unavailable');
}
} else {
res.writeHead(501);
res.end('Not Implemented');
}
}
}

0 comments on commit b9e3fa8

Please sign in to comment.