Skip to content

Commit

Permalink
Translate progress events to progress promises
Browse files Browse the repository at this point in the history
  • Loading branch information
Kartik Raj committed May 12, 2022
1 parent fb3a4c2 commit 4f32c50
Showing 1 changed file with 70 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@

import { Event, EventEmitter } from 'vscode';
import '../../../../common/extensions';
import { createDeferred } from '../../../../common/utils/async';
import { createDeferred, Deferred } from '../../../../common/utils/async';
import { traceError } from '../../../../logging';
import { normalizePath } from '../../../common/externalDependencies';
import { PythonEnvInfo } from '../../info';
import {
GetRefreshEnvironmentsOptions,
IDiscoveryAPI,
IResolvingLocator,
isProgressEvent,
Expand All @@ -24,21 +25,22 @@ import { IEnvsCollectionCache } from './envsCollectionCache';
*/
export class EnvsCollectionService extends PythonEnvsWatcher<PythonEnvCollectionChangedEvent> implements IDiscoveryAPI {
/** Keeps track of ongoing refreshes for various queries. */
private refreshPromises = new Map<PythonLocatorQuery | undefined, Promise<void>>();
private refreshDeferreds = new Map<PythonLocatorQuery | undefined, Deferred<void>>();

/** Keeps track of scheduled refreshes other than the ongoing one for various queries. */
private scheduledRefreshes = new Map<PythonLocatorQuery | undefined, Promise<void>>();

private refreshStageDeferreds = new Map<ProgressReportStage, Deferred<void>>();

private readonly progress = new EventEmitter<ProgressNotificationEvent>();

public get onProgress(): Event<ProgressNotificationEvent> {
return this.progress.event;
}

public getRefreshPromise(): Promise<void> | undefined {
return this.refreshPromises.size > 0
? Promise.all(Array.from(this.refreshPromises.values())).then()
: undefined;
public getRefreshPromise(options?: GetRefreshEnvironmentsOptions): Promise<void> | undefined {
const stage = options?.stage ?? ProgressReportStage.discoveryFinished;
return this.refreshStageDeferreds.get(stage)?.promise;
}

constructor(private readonly cache: IEnvsCollectionCache, private readonly locator: IResolvingLocator) {
Expand All @@ -58,6 +60,10 @@ export class EnvsCollectionService extends PythonEnvsWatcher<PythonEnvCollection
this.cache.onChanged((e) => {
this.fire(e);
});
this.onProgress((event) => {
this.refreshStageDeferreds.get(event.stage)?.resolve();
this.refreshStageDeferreds.delete(event.stage);
});
}

public async resolveEnv(path: string): Promise<PythonEnvInfo | undefined> {
Expand All @@ -81,7 +87,7 @@ export class EnvsCollectionService extends PythonEnvsWatcher<PythonEnvCollection

public getEnvs(query?: PythonLocatorQuery): PythonEnvInfo[] {
const cachedEnvs = this.cache.getAllEnvs();
if (cachedEnvs.length === 0 && this.refreshPromises.size === 0) {
if (cachedEnvs.length === 0 && this.refreshDeferreds.size === 0) {
// We expect a refresh to already be triggered when activating discovery component.
traceError('No python is installed or a refresh has not already been triggered');
this.triggerRefresh().ignoreErrors();
Expand All @@ -98,20 +104,18 @@ export class EnvsCollectionService extends PythonEnvsWatcher<PythonEnvCollection
}

private startRefresh(query: (PythonLocatorQuery & { clearCache?: boolean }) | undefined): Promise<void> {
const deferred = createDeferred<void>();
if (query?.clearCache) {
this.cache.clearCache();
}
// Ensure we set this before we trigger the promise to accurately track when a refresh has started.
this.refreshPromises.set(query, deferred.promise);
this.createProgressStates(query);
const promise = this.addEnvsToCacheForQuery(query);
return promise
.then(async () => {
// Ensure we delete this before we resolve the promise to accurately track when a refresh finishes.
this.refreshPromises.delete(query);
deferred.resolve();
this.resolveProgressStates(query);
})
.catch((ex) => deferred.reject(ex));
.catch((ex) => {
this.rejectProgressStates(query, ex);
});
}

private async addEnvsToCacheForQuery(query: PythonLocatorQuery | undefined) {
Expand All @@ -126,11 +130,19 @@ export class EnvsCollectionService extends PythonEnvsWatcher<PythonEnvCollection
if (iterator.onUpdated !== undefined) {
const listener = iterator.onUpdated(async (event) => {
if (isProgressEvent(event)) {
if (event.stage === ProgressReportStage.discoveryFinished) {
state.done = true;
listener.dispose();
} else if (event.stage === ProgressReportStage.allPathsDiscovered && !query) {
this.progress.fire(event);
switch (event.stage) {
case ProgressReportStage.discoveryFinished:
state.done = true;
listener.dispose();
break;
case ProgressReportStage.allPathsDiscovered:
if (!query) {
// Only mark as all paths discovered when querying for all envs.
this.progress.fire(event);
}
break;
default:
this.progress.fire(event);
}
} else {
state.pending += 1;
Expand Down Expand Up @@ -166,7 +178,7 @@ export class EnvsCollectionService extends PythonEnvsWatcher<PythonEnvCollection
// Even if no refresh is running for this exact query, there might be other
// refreshes running for a superset of this query. For eg. the `undefined` query
// is a superset for every other query, only consider that for simplicity.
return this.refreshPromises.get(query) ?? this.refreshPromises.get(undefined);
return this.refreshDeferreds.get(query)?.promise ?? this.refreshDeferreds.get(undefined)?.promise;
}

/**
Expand All @@ -187,4 +199,42 @@ export class EnvsCollectionService extends PythonEnvsWatcher<PythonEnvCollection
}
return nextRefreshPromise;
}

private createProgressStates(query: PythonLocatorQuery | undefined) {
this.refreshDeferreds.set(query, createDeferred<void>());
Object.values(ProgressReportStage).forEach((stage) => {
const deferred = createDeferred<void>();
this.refreshStageDeferreds.set(stage, deferred);
});
if (ProgressReportStage.allPathsDiscovered && query) {
// This stage is only applicable when no scope is provided.
this.refreshStageDeferreds.delete(ProgressReportStage.allPathsDiscovered);
}
}

private rejectProgressStates(query: PythonLocatorQuery | undefined, ex: Error) {
this.refreshDeferreds.get(query)?.reject(ex);
this.refreshDeferreds.delete(query);
Object.values(ProgressReportStage).forEach((stage) => {
this.refreshStageDeferreds.get(stage)?.reject(ex);
this.refreshStageDeferreds.delete(stage);
});
}

private resolveProgressStates(query: PythonLocatorQuery | undefined) {
this.refreshDeferreds.get(query)?.resolve();
this.refreshDeferreds.delete(query);
Object.values(ProgressReportStage).forEach((stage) => {
this.refreshStageDeferreds.get(stage)?.resolve();
this.refreshStageDeferreds.delete(stage);
});
this.checkIfFinishedAndNotify();
}

private checkIfFinishedAndNotify() {
const isRefreshComplete = Array.from(this.refreshDeferreds.values()).every((d) => d.completed);
if (isRefreshComplete) {
this.progress.fire({ stage: ProgressReportStage.discoveryFinished });
}
}
}

0 comments on commit 4f32c50

Please sign in to comment.