Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add readiness probe to k8s job in KubernetesV2 backend #3721

Merged
merged 16 commits into from
Aug 22, 2024
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion e2e/test/cases/kafka/kafka-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ describe('kafka', () => {
terasliceHarness.waitForExStatus(sender, 'completed')
]);

await terasliceHarness.waitForIndexCount(specIndex, total);
await terasliceHarness.waitForIndexCount(specIndex, total, 45 * 1000);
await reader.stop();

await terasliceHarness.waitForExStatus(reader, 'stopped');
Expand Down
2 changes: 1 addition & 1 deletion packages/teraslice-messaging/package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "@terascope/teraslice-messaging",
"displayName": "Teraslice Messaging",
"version": "1.2.0",
"version": "1.3.0",
"description": "An internal teraslice messaging library using socket.io",
"homepage": "https://github.com/terascope/teraslice/tree/master/packages/teraslice-messaging#readme",
"bugs": {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { Logger } from '@terascope/utils';
import http from 'node:http';

export interface ClientOptions {
executionControllerUrl: string;
Expand All @@ -16,9 +17,13 @@ export interface ServerOptions {
workerDisconnectTimeout: number;
networkLatencyBuffer?: number;
actionTimeout: number;
requestListener?: RequestListener;
logger?: Logger;
}

export interface RequestListener {
(request: http.IncomingMessage, response: http.ServerResponse): void;
}
export interface Worker {
workerId: string;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,12 @@ const { Available, Unavailable } = core.ClientState;
export class Server extends core.Server {
private _activeWorkers: i.ActiveWorkers;
queue: Queue<EnqueuedWorker>;
executionReady: boolean;

constructor(opts: i.ServerOptions) {
const {
port, actionTimeout, networkLatencyBuffer, workerDisconnectTimeout, logger
port, actionTimeout, networkLatencyBuffer,
workerDisconnectTimeout, logger, requestListener
} = opts;

if (!isNumber(workerDisconnectTimeout)) {
Expand All @@ -21,14 +23,16 @@ export class Server extends core.Server {
super({
port,
actionTimeout,
requestListener,
networkLatencyBuffer,
clientDisconnectTimeout: workerDisconnectTimeout,
serverName: 'ExecutionController',
logger,
logger
});

this.queue = new Queue();
this._activeWorkers = {};
this.executionReady = false;
}

async start(): Promise<void> {
Expand Down
2 changes: 1 addition & 1 deletion packages/teraslice/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
"dependencies": {
"@terascope/elasticsearch-api": "^4.0.0",
"@terascope/job-components": "^1.2.0",
"@terascope/teraslice-messaging": "^1.2.0",
"@terascope/teraslice-messaging": "^1.3.0",
"@terascope/types": "^1.0.0",
"@terascope/utils": "^1.0.0",
"async-mutex": "^0.5.0",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ export class KubernetesClusterBackend {

const pod = await this.k8s.waitForSelectedPod(
`${controllerLabel}=${controllerUid}`,
'pod-status',
undefined,
this.context.sysconfig.teraslice.slicer_timeout
);
Expand Down Expand Up @@ -162,6 +163,14 @@ export class KubernetesClusterBackend {
// @ts-expect-error
execution.k8sUid = jobs.items[0].metadata.uid;

/// Wait for ex readiness probe to return 'Ready'
await this.k8s.waitForSelectedPod(
selector,
'readiness-probe',
undefined,
this.context.sysconfig.teraslice.slicer_timeout
);

const kr = new K8sResource(
'deployments',
'worker',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,17 @@
"image": "{{dockerImage}}",
"name": "{{name}}",
"ports": [{ "containerPort": 45680 }],
"readinessProbe": {
"httpGet": {
"path": "/health",
"port": 45680
},
"initialDelaySeconds": 5,
"periodSeconds": 10,
"timeoutSeconds": 5,
"successThreshold": 1,
"failureThreshold": 3
},
"volumeMounts": [{
"mountPath": "/app/config",
"name": "config"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,14 +87,15 @@ export class K8s {
* NOTE: If your selector will return multiple pods, this method probably
* won't work for you.
* @param {String} selector kubernetes selector, like 'controller-uid=XXX'
* @param {String} statusType type of status to check, either pod status or readiness probe
* @param {String} ns namespace to search, this will override the default
* @param {Number} timeout time, in ms, to wait for pod to start
* @return {Object} pod
*
* TODO: Should this use the cluster state that gets polled periodically,
* rather than making it's own k8s API calls
*/
async waitForSelectedPod(selector: string, ns?: string, timeout = 10000) {
async waitForSelectedPod(selector: string, statusType: string, ns?: string, timeout = 10000) {
const namespace = ns || this.defaultNamespace;
let now = Date.now();
const end = now + timeout;
Expand All @@ -112,7 +113,20 @@ export class K8s {
}

if (typeof pod !== 'undefined' && pod) {
if (get(pod, 'status.phase') === 'Running') return pod;
if (statusType === 'readiness-probe') {
if (pod.status?.conditions) {
for (const condition of pod.status.conditions) {
if (
condition.type === 'ContainersReady'
&& condition.status === 'True'
) {
return pod;
}
}
}
} else if (statusType === 'pod-status') {
if (get(pod, 'status.phase') === 'Running') return pod;
}
}
if (now > end) throw new Error(`Timeout waiting for pod matching: ${selector}`);
this.logger.debug(`waiting for pod matching: ${selector}`);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,10 +150,9 @@ export class KubernetesClusterBackendV2 {

const controllerUid = jobResult.spec?.selector?.matchLabels?.[controllerLabel];

// Right now this is waiting for the selected pod to come up in a "running"
// state. It may be better to check for a readiness probe instead
const pod = await this.k8s.waitForSelectedPod(
`${controllerLabel}=${controllerUid}`,
'pod-status',
undefined,
this.context.sysconfig.teraslice.slicer_timeout
);
Expand Down Expand Up @@ -191,6 +190,14 @@ export class KubernetesClusterBackendV2 {
// @ts-expect-error
execution.k8sUid = jobs.items[0].metadata.uid;

/// Wait for ex readiness probe to return 'Ready'
await this.k8s.waitForSelectedPod(
selector,
'readiness-probe',
undefined,
this.context.sysconfig.teraslice.slicer_timeout
);

const kr = new K8sResource(
'deployments',
'worker',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,17 @@
"image": "{{dockerImage}}",
"name": "{{name}}",
"ports": [{ "containerPort": 45680 }],
"readinessProbe": {
"httpGet": {
"path": "/health",
"port": 45680
},
"initialDelaySeconds": 5,
"periodSeconds": 10,
"timeoutSeconds": 5,
"successThreshold": 1,
"failureThreshold": 3
},
"volumeMounts": [{
"mountPath": "/app/config",
"name": "config"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ export class K8s {
* TODO: Should this use the cluster state that gets polled periodically,
* rather than making it's own k8s API calls
*/
async waitForSelectedPod(selector: string, ns?: string, timeout = 10000) {
async waitForSelectedPod(selector: string, statusType: string, ns?: string, timeout = 10000) {
const namespace = ns || this.defaultNamespace;
let now = Date.now();
const end = now + timeout;
Expand All @@ -104,7 +104,20 @@ export class K8s {
}

if (pod) {
if (get(pod, 'status.phase') === 'Running') return pod;
if (statusType === 'readiness-probe') {
if (pod.status?.conditions) {
for (const condition of pod.status.conditions) {
if (
condition.type === 'ContainersReady'
&& condition.status === 'True'
) {
return pod;
}
}
}
} else if (statusType === 'pod-status') {
if (get(pod, 'status.phase') === 'Running') return pod;
}
}
if (now > end) throw new Error(`Timeout waiting for pod matching: ${selector}`);
this.logger.debug(`waiting for pod matching: ${selector}`);
Expand Down
20 changes: 19 additions & 1 deletion packages/teraslice/src/lib/workers/execution-controller/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import ms from 'ms';
import _ from 'lodash';
import http from 'node:http';
import {
formatURL, ExecutionController as ExController, ClusterMaster
} from '@terascope/teraslice-messaging';
Expand Down Expand Up @@ -79,10 +80,10 @@ export class ExecutionController {
const workerDisconnectTimeout = get(config, 'worker_disconnect_timeout');
const nodeDisconnectTimeout = get(config, 'node_disconnect_timeout');
const shutdownTimeout = get(config, 'shutdown_timeout');

this.server = new ExController.Server({
port: slicerPort,
networkLatencyBuffer,
requestListener: this.requestListener.bind(this),
actionTimeout,
workerDisconnectTimeout,
logger
Expand Down Expand Up @@ -311,6 +312,8 @@ export class ExecutionController {
this.logger.info(`execution: ${this.exId} initialized execution_controller`);

this.isInitialized = true;
/// This will change the '/ready' endpoint to Ready
this.server.executionReady = true;
}

async run() {
Expand Down Expand Up @@ -1176,4 +1179,19 @@ export class ExecutionController {
);
}
}

requestListener(req: http.IncomingMessage, res: http.ServerResponse) {
if (req.url === '/health') {
if (this.server.executionReady) {
res.writeHead(200);
res.end('Ready');
} else {
res.writeHead(503);
res.end('Service Unavailable');
}
} else {
res.writeHead(501);
res.end('Not Implemented');
}
}
}
Loading