Skip to content

Commit

Permalink
[Task Manager] Monitors the Task Manager Poller and automatically rec…
Browse files Browse the repository at this point in the history
…overs from failure (#75420)

Introduces a monitor around the Task Manager poller which pips through all values emitted by the poller and recovers from poller failures or stalls.
This monitor does the following:
1. Catches the poller thrown errors and recovers by proxying the error to a handler and continues listening to the poller.
2. Reacts to the poller `error` (caused by uncaught errors) and `completion` events, by starting a new poller and piping its event through to any previous subscribers (in our case, Task Manager itself).
3. Tracks the rate at which the poller emits events (this can be both work events, and `No Task` events, so polling and finding no work, still counts as an emitted event) and times out when this rate gets too long (suggesting the poller  has hung) and replaces the Poller with a new one.

We're not aware of any clear cases where Task Manager should actually get restarted by the monitor - this is definitely an error case and we have addressed all known cases.
The goal of introducing this monitor is as an insurance policy in case an unexpected error case breaks the poller in a long running production environment.
  • Loading branch information
gmmorris authored Aug 20, 2020
1 parent bcca933 commit 5308cc7
Show file tree
Hide file tree
Showing 8 changed files with 302 additions and 18 deletions.
9 changes: 9 additions & 0 deletions x-pack/plugins/task_manager/server/polling/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

export { createObservableMonitor } from './observable_monitor';
export { createTaskPoller, PollingError, PollingErrorType } from './task_poller';
export { timeoutPromiseAfter } from './timeout_promise_after';
170 changes: 170 additions & 0 deletions x-pack/plugins/task_manager/server/polling/observable_monitor.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

import { interval, from, Subject } from 'rxjs';
import { map, concatMap, takeWhile, take } from 'rxjs/operators';

import { createObservableMonitor } from './observable_monitor';
import { times } from 'lodash';

describe('Poll Monitor', () => {
test('returns a cold observable so that the monitored Observable is only created on demand', async () => {
const instantiator = jest.fn(() => new Subject());

createObservableMonitor(instantiator);

expect(instantiator).not.toHaveBeenCalled();
});

test('subscribing to the observable instantiates a new observable and pipes its results through', async () => {
const instantiator = jest.fn(() => from([0, 1, 2]));
const monitoredObservable = createObservableMonitor(instantiator);

expect(instantiator).not.toHaveBeenCalled();

return new Promise((resolve) => {
const next = jest.fn();
monitoredObservable.pipe(take(3)).subscribe({
next,
complete: () => {
expect(instantiator).toHaveBeenCalled();
expect(next).toHaveBeenCalledWith(0);
expect(next).toHaveBeenCalledWith(1);
expect(next).toHaveBeenCalledWith(2);
resolve();
},
});
});
});

test('unsubscribing from the monitor prevents the monitor from resubscribing to the observable', async () => {
const heartbeatInterval = 1000;
const instantiator = jest.fn(() => interval(100));
const monitoredObservable = createObservableMonitor(instantiator, { heartbeatInterval });

return new Promise((resolve) => {
const next = jest.fn();
monitoredObservable.pipe(take(3)).subscribe({
next,
complete: () => {
expect(instantiator).toHaveBeenCalledTimes(1);
setTimeout(() => {
expect(instantiator).toHaveBeenCalledTimes(1);
resolve();
}, heartbeatInterval * 2);
},
});
});
});

test(`ensures the observable subscription hasn't closed at a fixed interval and reinstantiates if it has`, async () => {
let iteration = 0;
const instantiator = jest.fn(() => {
iteration++;
return interval(100).pipe(
map((index) => `${iteration}:${index}`),
// throw on 3rd value of the first iteration
map((value, index) => {
if (iteration === 1 && index === 3) {
throw new Error('Source threw an error!');
}
return value;
})
);
});

const onError = jest.fn();
const monitoredObservable = createObservableMonitor(instantiator, { onError });

return new Promise((resolve) => {
const next = jest.fn();
const error = jest.fn();
monitoredObservable
.pipe(
// unsubscribe once we confirm we have successfully recovered from an error in the source
takeWhile(function validateExpectation() {
try {
[...times(3, (index) => `1:${index}`), ...times(5, (index) => `2:${index}`)].forEach(
(expecteArg) => {
expect(next).toHaveBeenCalledWith(expecteArg);
}
);
return false;
} catch {
return true;
}
})
)
.subscribe({
next,
error,
complete: () => {
expect(error).not.toHaveBeenCalled();
expect(onError).toHaveBeenCalledWith(new Error('Source threw an error!'));
resolve();
},
});
});
});

test(`ensures the observable subscription hasn't hung at a fixed interval and reinstantiates if it has`, async () => {
let iteration = 0;
const instantiator = jest.fn(() => {
iteration++;
return interval(100).pipe(
map((index) => `${iteration}:${index}`),
// hang on 3rd value of the first iteration
concatMap((value, index) => {
if (iteration === 1 && index === 3) {
return new Promise(() => {
// never resolve or reject, just hang for EVER
});
}
return Promise.resolve(value);
})
);
});

const onError = jest.fn();
const monitoredObservable = createObservableMonitor(instantiator, {
onError,
heartbeatInterval: 100,
inactivityTimeout: 500,
});

return new Promise((resolve) => {
const next = jest.fn();
const error = jest.fn();
monitoredObservable
.pipe(
// unsubscribe once we confirm we have successfully recovered from an error in the source
takeWhile(function validateExpectation() {
try {
[...times(3, (index) => `1:${index}`), ...times(5, (index) => `2:${index}`)].forEach(
(expecteArg) => {
expect(next).toHaveBeenCalledWith(expecteArg);
}
);
return false;
} catch {
return true;
}
})
)
.subscribe({
next,
error,
complete: () => {
expect(error).not.toHaveBeenCalled();
expect(onError).toHaveBeenCalledWith(
new Error(`Observable Monitor: Hung Observable restarted after 500ms of inactivity`)
);
resolve();
},
});
});
});
});
80 changes: 80 additions & 0 deletions x-pack/plugins/task_manager/server/polling/observable_monitor.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

import { Subject, Observable, throwError, interval, timer, Subscription } from 'rxjs';
import { exhaustMap, tap, takeUntil, switchMap, switchMapTo, catchError } from 'rxjs/operators';
import { noop } from 'lodash';

const DEFAULT_HEARTBEAT_INTERVAL = 1000;

// by default don't monitor inactivity as not all observables are expected
// to emit at any kind of fixed interval
const DEFAULT_INACTIVITY_TIMEOUT = 0;

export interface ObservableMonitorOptions<E> {
heartbeatInterval?: number;
inactivityTimeout?: number;
onError?: (err: E) => void;
}

export function createObservableMonitor<T, E>(
observableFactory: () => Observable<T>,
{
heartbeatInterval = DEFAULT_HEARTBEAT_INTERVAL,
inactivityTimeout = DEFAULT_INACTIVITY_TIMEOUT,
onError = noop,
}: ObservableMonitorOptions<E> = {}
): Observable<T> {
return new Observable((subscriber) => {
const subscription: Subscription = interval(heartbeatInterval)
.pipe(
// switch from the heartbeat interval to the instantiated observable until it completes / errors
exhaustMap(() => takeUntilDurationOfInactivity(observableFactory(), inactivityTimeout)),
// if an error is thrown, catch it, notify and try to recover
catchError((err: E, source$: Observable<T>) => {
onError(err);
// return source, which will allow our observable to recover from this error and
// keep pulling values out of it
return source$;
})
)
.subscribe(subscriber);
return () => {
subscription.unsubscribe();
};
});
}

function takeUntilDurationOfInactivity<T>(source$: Observable<T>, inactivityTimeout: number) {
// if there's a specified maximum duration of inactivity, only take values until that
// duration elapses without any new events
if (inactivityTimeout) {
// an observable which starts a timer every time a new value is passed in, replacing the previous timer
// if the timer goes off without having been reset by a fresh value, it will emit a single event - which will
// notify our monitor that the source has been inactive for too long
const inactivityMonitor$ = new Subject<void>();
return source$.pipe(
takeUntil(
inactivityMonitor$.pipe(
// on each new emited value, start a new timer, discarding the old one
switchMap(() => timer(inactivityTimeout)),
// every time a timer expires (meaning no new value came in on time to discard it)
// throw an error, forcing the monitor instantiate a new observable
switchMapTo(
throwError(
new Error(
`Observable Monitor: Hung Observable restarted after ${inactivityTimeout}ms of inactivity`
)
)
)
)
),
// poke `inactivityMonitor$` so it restarts the timer
tap(() => inactivityMonitor$.next())
);
}
return source$;
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ import { Subject } from 'rxjs';
import { Option, none, some } from 'fp-ts/lib/Option';
import { createTaskPoller, PollingError, PollingErrorType } from './task_poller';
import { fakeSchedulers } from 'rxjs-marbles/jest';
import { sleep, resolvable, Resolvable } from './test_utils';
import { asOk, asErr } from './lib/result_type';
import { sleep, resolvable, Resolvable } from '../test_utils';
import { asOk, asErr } from '../lib/result_type';

describe('TaskPoller', () => {
beforeEach(() => jest.useFakeTimers());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import { mapTo, filter, scan, concatMap, tap, catchError } from 'rxjs/operators'

import { pipe } from 'fp-ts/lib/pipeable';
import { Option, none, map as mapOptional, getOrElse } from 'fp-ts/lib/Option';
import { pullFromSet } from './lib/pull_from_set';
import { pullFromSet } from '../lib/pull_from_set';
import {
Result,
Err,
Expand All @@ -24,8 +24,8 @@ import {
asOk,
asErr,
promiseResult,
} from './lib/result_type';
import { timeoutPromiseAfter } from './lib/timeout_promise_after';
} from '../lib/result_type';
import { timeoutPromiseAfter } from './timeout_promise_after';

type WorkFn<T, H> = (...params: T[]) => Promise<H>;

Expand Down
51 changes: 38 additions & 13 deletions x-pack/plugins/task_manager/server/task_manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,12 @@ import {
TaskStatus,
ElasticJs,
} from './task';
import { createTaskPoller, PollingError, PollingErrorType } from './task_poller';
import {
createTaskPoller,
PollingError,
PollingErrorType,
createObservableMonitor,
} from './polling';
import { TaskPool } from './task_pool';
import { TaskManagerRunner, TaskRunner } from './task_runner';
import {
Expand Down Expand Up @@ -154,18 +159,38 @@ export class TaskManager {
maxWorkers: opts.config.max_workers,
});

this.poller$ = createTaskPoller<string, FillPoolResult>({
pollInterval: opts.config.poll_interval,
bufferCapacity: opts.config.request_capacity,
getCapacity: () => this.pool.availableWorkers,
pollRequests$: this.claimRequests$,
work: this.pollForWork,
// Time out the `work` phase if it takes longer than a certain number of polling cycles
// The `work` phase includes the prework needed *before* executing a task
// (such as polling for new work, marking tasks as running etc.) but does not
// include the time of actually running the task
workTimeout: opts.config.poll_interval * opts.config.max_poll_inactivity_cycles,
});
const {
max_poll_inactivity_cycles: maxPollInactivityCycles,
poll_interval: pollInterval,
} = opts.config;
this.poller$ = createObservableMonitor<Result<FillPoolResult, PollingError<string>>, Error>(
() =>
createTaskPoller<string, FillPoolResult>({
pollInterval,
bufferCapacity: opts.config.request_capacity,
getCapacity: () => this.pool.availableWorkers,
pollRequests$: this.claimRequests$,
work: this.pollForWork,
// Time out the `work` phase if it takes longer than a certain number of polling cycles
// The `work` phase includes the prework needed *before* executing a task
// (such as polling for new work, marking tasks as running etc.) but does not
// include the time of actually running the task
workTimeout: pollInterval * maxPollInactivityCycles,
}),
{
heartbeatInterval: pollInterval,
// Time out the poller itself if it has failed to complete the entire stream for a certain amount of time.
// This is different that the `work` timeout above, as the poller could enter an invalid state where
// it fails to complete a cycle even thought `work` is completing quickly.
// We grant it a single cycle longer than the time alotted to `work` so that timing out the `work`
// doesn't get short circuited by the monitor reinstantiating the poller all together (a far more expensive
// operation than just timing out the `work` internally)
inactivityTimeout: pollInterval * (maxPollInactivityCycles + 1),
onError: (error) => {
this.logger.error(`[Task Poller Monitor]: ${error.message}`);
},
}
);
}

private emitEvent = (event: TaskLifecycleEvent) => {
Expand Down

0 comments on commit 5308cc7

Please sign in to comment.