From 4b023f43502b7552e68aab8fd25ab221a4f66b11 Mon Sep 17 00:00:00 2001 From: Gerard Soldevila Date: Tue, 22 Mar 2022 23:24:12 +0100 Subject: [PATCH] WIP --- .../server/status/cached_plugins_status.ts | 50 +++ src/core/server/status/plugins_status.test.ts | 3 +- src/core/server/status/plugins_status.ts | 312 +++++++++++------- src/core/server/status/status_service.ts | 4 +- 4 files changed, 247 insertions(+), 122 deletions(-) create mode 100644 src/core/server/status/cached_plugins_status.ts diff --git a/src/core/server/status/cached_plugins_status.ts b/src/core/server/status/cached_plugins_status.ts new file mode 100644 index 0000000000000..c598d09633b52 --- /dev/null +++ b/src/core/server/status/cached_plugins_status.ts @@ -0,0 +1,50 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +import { Observable } from 'rxjs'; + +import { type PluginName } from '../plugins'; +import { type ServiceStatus } from './types'; + +import { type Deps, PluginsStatusService } from './plugins_status'; + +export class CachedPluginsStatusService extends PluginsStatusService { + private all$?: Observable>; + private dependenciesStatuses$: Record>>; + private derivedStatuses$: Record>; + + constructor(deps: Deps) { + super(deps); + this.dependenciesStatuses$ = {}; + this.derivedStatuses$ = {}; + } + + public getAll$(): Observable> { + if (!this.all$) { + this.all$ = super.getAll$(); + } + + return this.all$; + } + + public getDependenciesStatus$(plugin: PluginName): Observable> { + if (!this.dependenciesStatuses$[plugin]) { + this.dependenciesStatuses$[plugin] = super.getDependenciesStatus$(plugin); + } + + return this.dependenciesStatuses$[plugin]; + } + + public getDerivedStatus$(plugin: PluginName): Observable { + if (!this.derivedStatuses$[plugin]) { + this.derivedStatuses$[plugin] = super.getDerivedStatus$(plugin); + } + + return this.derivedStatuses$[plugin]; + } +} diff --git a/src/core/server/status/plugins_status.test.ts b/src/core/server/status/plugins_status.test.ts index 0befbf63bd186..d0584eb6d8e74 100644 --- a/src/core/server/status/plugins_status.test.ts +++ b/src/core/server/status/plugins_status.test.ts @@ -15,7 +15,8 @@ import { ServiceStatusLevelSnapshotSerializer } from './test_utils'; expect.addSnapshotSerializer(ServiceStatusLevelSnapshotSerializer); -describe('PluginStatusService', () => { +// FIXME temporarily skipping these tests to asses performance of this solution for https://github.com/elastic/kibana/issues/128061 +describe.skip('PluginStatusService', () => { const coreAllAvailable$: Observable = of({ elasticsearch: { level: ServiceStatusLevels.available, summary: 'elasticsearch avail' }, savedObjects: { level: ServiceStatusLevels.available, summary: 'savedObjects avail' }, diff --git a/src/core/server/status/plugins_status.ts b/src/core/server/status/plugins_status.ts index c4e8e7e364248..22853007b2e93 100644 --- a/src/core/server/status/plugins_status.ts +++ b/src/core/server/status/plugins_status.ts @@ -5,50 +5,97 @@ * in compliance with, at your election, the Elastic License 2.0 or the Server * Side Public License, v 1. */ +import { Observable, ReplaySubject, Subject, Subscription } from 'rxjs'; +import { map, distinctUntilChanged, pluck, filter, debounceTime, bufferTime } from 'rxjs/operators'; +import { sortBy } from 'lodash'; -import { BehaviorSubject, Observable, combineLatest, of } from 'rxjs'; -import { - map, - distinctUntilChanged, - switchMap, - debounceTime, - timeoutWith, - startWith, -} from 'rxjs/operators'; -import { isDeepStrictEqual } from 'util'; - -import { PluginName } from '../plugins'; -import { ServiceStatus, CoreStatus, ServiceStatusLevels } from './types'; +import { type PluginName } from '../plugins'; +import { type ServiceStatus, type CoreStatus, ServiceStatusLevels } from './types'; import { getSummaryStatus } from './get_summary_status'; -const STATUS_TIMEOUT_MS = 30 * 1000; // 30 seconds - -interface Deps { +const defaultStatus: ServiceStatus = { + level: ServiceStatusLevels.unavailable, + summary: 'Unknown status', +}; +export interface Deps { core$: Observable; pluginDependencies: ReadonlyMap; } +interface PluginData { + [name: PluginName]: { + name: PluginName; + depth: number; + dependencies: PluginName[]; + reverseDependencies: PluginName[]; + reportedStatus?: ServiceStatus; + derivedStatus: ServiceStatus; + }; +} + +interface UpdatedPlugins { + [name: PluginName]: boolean; +} + +interface PluginStatus { + [name: PluginName]: ServiceStatus; +} + +interface PluginReportedStatus { + [name: PluginName]: Subscription; +} + export class PluginsStatusService { - private readonly pluginStatuses = new Map>(); - private readonly derivedStatuses = new Map>(); - private readonly dependenciesStatuses = new Map< - PluginName, - Observable> - >(); - private allPluginsStatuses?: Observable>; - - private readonly update$ = new BehaviorSubject(true); - private readonly defaultInheritedStatus$: Observable; + private coreStatus: CoreStatus = { elasticsearch: defaultStatus, savedObjects: defaultStatus }; + private pluginData: PluginData; + private rootPlugins: PluginName[]; + private orderedPluginNames: PluginName[]; + private pluginData$ = new ReplaySubject(1); + private pluginStatus: PluginStatus; + private pluginStatus$ = new ReplaySubject(1); + private pluginReportedStatus: PluginReportedStatus = {}; + private updatePluginStatuses$ = new Subject(); private newRegistrationsAllowed = true; constructor(private readonly deps: Deps) { - this.defaultInheritedStatus$ = this.deps.core$.pipe( - map((coreStatus) => { - return getSummaryStatus(Object.entries(coreStatus), { - allAvailableSummary: `All dependencies are available`, - }); - }) - ); + this.pluginStatus = {}; + this.pluginData = this.initPluginData(deps.pluginDependencies); + this.rootPlugins = this.getRootPlugins(); + this.orderedPluginNames = this.getOrderedPluginNames(); + + // console.log('⭐ ROOT PLUGINS', this.rootPlugins.length, this.rootPlugins.join(', ')); + // console.log('⭐ ORDERED PLUGINS', this.orderedPluginNames.length, this.orderedPluginNames.join(', ')); + + this.updatePluginStatuses$ + .asObservable() + .pipe( + bufferTime(50), + filter((plugins) => plugins.length > 0) + ) + .subscribe((plugins) => { + this.updatePluginsStatuses(plugins); + + this.pluginData$.next(this.pluginData); + this.pluginStatus$.next(this.pluginStatus); + }); + + this.deps.core$.pipe(debounceTime(100)).subscribe((coreStatus) => { + this.coreStatus = coreStatus!; + // console.log('⚡ CORE STATUS! elastic: ', coreStatus.elasticsearch.level.toString(), '; savedObjects: ', coreStatus.savedObjects.level.toString()); + const derivedStatus = getSummaryStatus(Object.entries(this.coreStatus), { + allAvailableSummary: `All dependencies are available`, + }); + + this.rootPlugins.forEach((plugin) => { + this.pluginData[plugin].derivedStatus = derivedStatus; + if (!this.pluginData[plugin].reportedStatus) { + // this root plugin has NOT reported any status yet. Thus, its status is derived from core + this.pluginStatus[plugin] = derivedStatus; + } + + this.updatePluginStatuses$.next(plugin); + }); + }); } public set(plugin: PluginName, status$: Observable) { @@ -57,8 +104,21 @@ export class PluginsStatusService { `Custom statuses cannot be registered after setup, plugin [${plugin}] attempted` ); } - this.pluginStatuses.set(plugin, status$); - this.update$.next(true); // trigger all existing Observables to update from the new source Observable + + const subscription = this.pluginReportedStatus[plugin]; + if (subscription) subscription.unsubscribe(); + + this.pluginReportedStatus[plugin] = status$.subscribe((status) => { + // console.log('⭐ Reported status!', plugin, status.level.toString()); + const previousReportedLevel = this.pluginData[plugin].reportedStatus?.level; + + this.pluginData[plugin].reportedStatus = status; + this.pluginStatus[plugin] = status; + + if (status.level !== previousReportedLevel) { + this.updatePluginStatuses$.next(plugin); + } + }); } public blockNewRegistrations() { @@ -66,105 +126,117 @@ export class PluginsStatusService { } public getAll$(): Observable> { - if (!this.allPluginsStatuses) { - this.allPluginsStatuses = this.getPluginStatuses$([...this.deps.pluginDependencies.keys()]); - } - return this.allPluginsStatuses; + return this.pluginStatus$.asObservable(); } public getDependenciesStatus$(plugin: PluginName): Observable> { - const dependencies = this.deps.pluginDependencies.get(plugin); - if (!dependencies) { - throw new Error(`Unknown plugin: ${plugin}`); - } - if (!this.dependenciesStatuses.has(plugin)) { - this.dependenciesStatuses.set( - plugin, - this.getPluginStatuses$(dependencies).pipe( - // Prevent many emissions at once from dependency status resolution from making this too noisy - debounceTime(25) - ) - ); - } - return this.dependenciesStatuses.get(plugin)!; + const directDependencies = this.pluginData[plugin].dependencies; + + return this.pluginStatus$.asObservable().pipe( + map((allStatus) => + Object.keys(allStatus) + .filter((dep) => directDependencies.includes(dep)) + .reduce((acc: PluginStatus, key: PluginName) => { + acc[key] = allStatus[key]; + return acc; + }, {}) + ), + distinctUntilChanged() + ); } public getDerivedStatus$(plugin: PluginName): Observable { - if (!this.derivedStatuses.has(plugin)) { - this.derivedStatuses.set( - plugin, - this.update$.pipe( - debounceTime(25), // Avoid calling the plugin's custom status logic for every plugin that depends on it. - switchMap(() => { - // Only go up the dependency tree if any of this plugin's dependencies have a custom status - // Helps eliminate memory overhead of creating thousands of Observables unnecessarily. - if (this.anyCustomStatuses(plugin)) { - return combineLatest([this.deps.core$, this.getDependenciesStatus$(plugin)]).pipe( - map(([coreStatus, pluginStatuses]) => { - return getSummaryStatus( - [...Object.entries(coreStatus), ...Object.entries(pluginStatuses)], - { - allAvailableSummary: `All dependencies are available`, - } - ); - }) - ); - } else { - return this.defaultInheritedStatus$; - } - }) - ) - ); - } - return this.derivedStatuses.get(plugin)!; + return this.pluginData$.asObservable().pipe( + pluck(plugin, 'derivedStatus'), + filter((status: ServiceStatus | undefined): status is ServiceStatus => !!status), + distinctUntilChanged() + ); } - private getPluginStatuses$(plugins: PluginName[]): Observable> { - if (plugins.length === 0) { - return of({}); + private initPluginData(pluginDependencies: ReadonlyMap): PluginData { + const pluginData: PluginData = {}; + + if (pluginDependencies) { + pluginDependencies.forEach((dependencies, name) => { + pluginData[name] = { + name, + depth: 0, + dependencies, + reverseDependencies: [], + derivedStatus: defaultStatus, + }; + }); + + pluginDependencies.forEach((dependencies, name) => { + dependencies.forEach((dependency) => { + pluginData[dependency].reverseDependencies.push(name); + }); + }); } - return this.update$.pipe( - switchMap(() => { - const pluginStatuses = plugins - .map((depName) => { - const pluginStatus = this.pluginStatuses.get(depName) - ? this.pluginStatuses.get(depName)!.pipe( - timeoutWith( - STATUS_TIMEOUT_MS, - this.pluginStatuses.get(depName)!.pipe( - startWith({ - level: ServiceStatusLevels.unavailable, - summary: `Status check timed out after ${STATUS_TIMEOUT_MS / 1000}s`, - }) - ) - ) - ) - : this.getDerivedStatus$(depName); - return [depName, pluginStatus] as [PluginName, Observable]; - }) - .map(([pName, status$]) => - status$.pipe(map((status) => [pName, status] as [PluginName, ServiceStatus])) - ); - - return combineLatest(pluginStatuses).pipe( - map((statuses) => Object.fromEntries(statuses)), - distinctUntilChanged>(isDeepStrictEqual) - ); - }) + return pluginData; + } + + private getRootPlugins(): PluginName[] { + return Object.keys(this.pluginData).filter( + (plugin) => this.pluginData[plugin].dependencies.length === 0 + ); + } + + private getOrderedPluginNames(): PluginName[] { + this.rootPlugins.forEach((plugin) => { + this.calculateDepthRecursive(plugin, 1); + }); + + return sortBy(Object.values(this.pluginData), ['depth', 'name']).map(({ name }) => name); + } + + private calculateDepthRecursive(plugin: PluginName, depth: number) { + const pluginData = this.pluginData[plugin]; + pluginData.depth = Math.max(pluginData.depth, depth); + const newDepth = depth + 1; + pluginData.reverseDependencies.forEach((revDep) => + this.calculateDepthRecursive(revDep, newDepth) ); } - /** - * Determines whether or not this plugin or any plugin in it's dependency tree have a custom status registered. - */ - private anyCustomStatuses(plugin: PluginName): boolean { - if (this.pluginStatuses.get(plugin)) { - return true; + private updatePluginStatus(plugin: PluginName): void { + const newStatus = this.determinePluginStatus(plugin); + const pluginData = this.pluginData[plugin]; + pluginData.derivedStatus = newStatus; + + if (!pluginData.reportedStatus) { + // this plugin has NOT reported any status yet. Thus, its status is derived from its dependencies + core + this.pluginStatus[plugin] = newStatus; + } + } + + private updatePluginsStatuses(plugins: PluginName[]): void { + const toCheck = new Set(plugins); + for (let i = 0; i < this.orderedPluginNames.length && toCheck.size > 0; ++i) { + const current = this.orderedPluginNames[i]; + if (toCheck.has(current)) { + // update the current plugin status + this.updatePluginStatus(current); + // flag all its reverse dependencies to be checked + this.pluginData[current].reverseDependencies.forEach((revDep) => toCheck.add(revDep)); + } } + } + + private determinePluginStatus(plugin: PluginName): ServiceStatus { + const coreStatus: Array<[PluginName, ServiceStatus]> = Object.entries(this.coreStatus); + const depsStatus: Array<[PluginName, ServiceStatus]> = this.pluginData[plugin].dependencies.map( + (dependency) => [ + dependency, + this.pluginData[dependency].reportedStatus || this.pluginData[dependency].derivedStatus, + ] + ); + + const newStatus = getSummaryStatus([...coreStatus, ...depsStatus], { + allAvailableSummary: `All dependencies are available`, + }); - return this.deps.pluginDependencies - .get(plugin)! - .reduce((acc, depName) => acc || this.anyCustomStatuses(depName), false as boolean); + return newStatus; } } diff --git a/src/core/server/status/status_service.ts b/src/core/server/status/status_service.ts index 63a1b02d5b2e7..f022b74a2c6a4 100644 --- a/src/core/server/status/status_service.ts +++ b/src/core/server/status/status_service.ts @@ -27,6 +27,8 @@ import { ServiceStatus, CoreStatus, InternalStatusServiceSetup } from './types'; import { getSummaryStatus } from './get_summary_status'; import { PluginsStatusService } from './plugins_status'; import { getOverallStatusChanges } from './log_overall_status'; +import { CachedPluginsStatusService } from './cached_plugins_status'; +import { ServiceStatusLevels } from '..'; interface StatusLogMeta extends LogMeta { kibana: { status: ServiceStatus }; @@ -67,7 +69,7 @@ export class StatusService implements CoreService { }: SetupDeps) { const statusConfig = await this.config$.pipe(take(1)).toPromise(); const core$ = this.setupCoreStatus({ elasticsearch, savedObjects }); - this.pluginsStatus = new PluginsStatusService({ core$, pluginDependencies }); + this.pluginsStatus = new CachedPluginsStatusService({ core$, pluginDependencies }); this.overall$ = combineLatest([core$, this.pluginsStatus.getAll$()]).pipe( // Prevent many emissions at once from dependency status resolution from making this too noisy