forked from DonJayamanne/pythonVSCode
-
Notifications
You must be signed in to change notification settings - Fork 1.2k
/
Copy pathserver.ts
333 lines (301 loc) · 14.7 KB
/
server.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
import * as net from 'net';
import * as crypto from 'crypto';
import { Disposable, Event, EventEmitter, TestRun } from 'vscode';
import * as path from 'path';
import { ChildProcess } from 'child_process';
import {
ExecutionFactoryCreateWithEnvironmentOptions,
ExecutionResult,
IPythonExecutionFactory,
SpawnOptions,
} from '../../../common/process/types';
import { traceError, traceInfo, traceLog, traceVerbose } from '../../../logging';
import { DataReceivedEvent, ITestServer, TestCommandOptions } from './types';
import { ITestDebugLauncher, LaunchOptions } from '../../common/types';
import { UNITTEST_PROVIDER } from '../../common/constants';
import {
MESSAGE_ON_TESTING_OUTPUT_MOVE,
createDiscoveryErrorPayload,
createEOTPayload,
createExecutionErrorPayload,
extractJsonPayload,
fixLogLinesNoTrailing,
} from './utils';
import { createDeferred } from '../../../common/utils/async';
import { EnvironmentVariables } from '../../../api/types';
export class PythonTestServer implements ITestServer, Disposable {
private _onDataReceived: EventEmitter<DataReceivedEvent> = new EventEmitter<DataReceivedEvent>();
private uuids: Array<string> = [];
private server: net.Server;
private ready: Promise<void>;
private _onRunDataReceived: EventEmitter<DataReceivedEvent> = new EventEmitter<DataReceivedEvent>();
private _onDiscoveryDataReceived: EventEmitter<DataReceivedEvent> = new EventEmitter<DataReceivedEvent>();
constructor(private executionFactory: IPythonExecutionFactory, private debugLauncher: ITestDebugLauncher) {
this.server = net.createServer((socket: net.Socket) => {
let buffer: Buffer = Buffer.alloc(0); // Buffer to accumulate received data
socket.on('data', (data: Buffer) => {
traceVerbose('data received from python server: ', data.toString());
buffer = Buffer.concat([buffer, data]); // get the new data and add it to the buffer
while (buffer.length > 0) {
try {
// try to resolve data, returned unresolved data
const remainingBuffer = this._resolveData(buffer);
if (remainingBuffer.length === buffer.length) {
// if the remaining buffer is exactly the same as the buffer before processing,
// then there is no more data to process so loop should be exited.
break;
}
buffer = remainingBuffer;
} catch (ex) {
traceError(`Error reading data from buffer: ${ex} observed.`);
buffer = Buffer.alloc(0);
this._onDataReceived.fire({ uuid: '', data: '' });
}
}
});
});
this.ready = new Promise((resolve, _reject) => {
this.server.listen(undefined, 'localhost', () => {
resolve();
});
});
this.server.on('error', (ex) => {
traceLog(`Error starting test server: ${ex}`);
});
this.server.on('close', () => {
traceLog('Test server closed.');
});
this.server.on('listening', () => {
traceLog('Test server listening.');
});
this.server.on('connection', () => {
traceLog('Test server connected to a client.');
});
}
savedBuffer = '';
public _resolveData(buffer: Buffer): Buffer {
try {
const extractedJsonPayload = extractJsonPayload(buffer.toString(), this.uuids);
// what payload is so small it doesn't include the whole UUID think got this
if (extractedJsonPayload.uuid !== undefined && extractedJsonPayload.cleanedJsonData !== undefined) {
// if a full json was found in the buffer, fire the data received event then keep cycling with the remaining raw data.
traceVerbose(`Firing data received event, ${extractedJsonPayload.cleanedJsonData}`);
this._fireDataReceived(extractedJsonPayload.uuid, extractedJsonPayload.cleanedJsonData);
} else {
traceVerbose(
`extract json payload incomplete, uuid= ${extractedJsonPayload.uuid} and cleanedJsonData= ${extractedJsonPayload.cleanedJsonData}`,
);
}
buffer = Buffer.from(extractedJsonPayload.remainingRawData);
if (buffer.length === 0) {
// if the buffer is empty, then there is no more data to process so buffer should be cleared.
buffer = Buffer.alloc(0);
}
} catch (ex) {
traceError(`Error attempting to resolve data: ${ex}`);
this._onDataReceived.fire({ uuid: '', data: '' });
}
return buffer;
}
private _fireDataReceived(uuid: string, extractedJSON: string): void {
if (extractedJSON.includes(`"tests":`) || extractedJSON.includes(`"command_type": "discovery"`)) {
this._onDiscoveryDataReceived.fire({
uuid,
data: extractedJSON,
});
// if the rawData includes result then this is a run request
} else if (extractedJSON.includes(`"result":`) || extractedJSON.includes(`"command_type": "execution"`)) {
this._onRunDataReceived.fire({
uuid,
data: extractedJSON,
});
} else {
traceError(`Error processing test server request: request is not recognized as discovery or run.`);
this._onDataReceived.fire({ uuid: '', data: '' });
}
}
public serverReady(): Promise<void> {
return this.ready;
}
public getPort(): number {
return (this.server.address() as net.AddressInfo).port;
}
public createUUID(): string {
const uuid = crypto.randomUUID();
this.uuids.push(uuid);
return uuid;
}
public deleteUUID(uuid: string): void {
this.uuids = this.uuids.filter((u) => u !== uuid);
}
public get onRunDataReceived(): Event<DataReceivedEvent> {
return this._onRunDataReceived.event;
}
public get onDiscoveryDataReceived(): Event<DataReceivedEvent> {
return this._onDiscoveryDataReceived.event;
}
public triggerRunDataReceivedEvent(payload: DataReceivedEvent): void {
this._onRunDataReceived.fire(payload);
}
public triggerDiscoveryDataReceivedEvent(payload: DataReceivedEvent): void {
this._onDiscoveryDataReceived.fire(payload);
}
public dispose(): void {
this.server.close();
this._onDataReceived.dispose();
}
public get onDataReceived(): Event<DataReceivedEvent> {
return this._onDataReceived.event;
}
async sendCommand(
options: TestCommandOptions,
env: EnvironmentVariables,
runTestIdPort?: string,
runInstance?: TestRun,
testIds?: string[],
callback?: () => void,
): Promise<void> {
const { uuid } = options;
const isDiscovery = (testIds === undefined || testIds.length === 0) && runTestIdPort === undefined;
const mutableEnv = { ...env };
// get python path from mutable env, it contains process.env as well
const pythonPathParts: string[] = mutableEnv.PYTHONPATH?.split(path.delimiter) ?? [];
const pythonPathCommand = [options.cwd, ...pythonPathParts].join(path.delimiter);
mutableEnv.PYTHONPATH = pythonPathCommand;
mutableEnv.TEST_UUID = uuid.toString();
mutableEnv.TEST_PORT = this.getPort().toString();
mutableEnv.RUN_TEST_IDS_PORT = runTestIdPort;
const spawnOptions: SpawnOptions = {
token: options.token,
cwd: options.cwd,
throwOnStdErr: true,
outputChannel: options.outChannel,
env: mutableEnv,
};
const isRun = runTestIdPort !== undefined;
// Create the Python environment in which to execute the command.
const creationOptions: ExecutionFactoryCreateWithEnvironmentOptions = {
allowEnvironmentFetchExceptions: false,
resource: options.workspaceFolder,
};
const execService = await this.executionFactory.createActivatedEnvironment(creationOptions);
const args = [options.command.script].concat(options.command.args);
if (options.outChannel) {
options.outChannel.appendLine(`python ${args.join(' ')}`);
}
try {
if (options.debugBool) {
const launchOptions: LaunchOptions = {
cwd: options.cwd,
args,
token: options.token,
testProvider: UNITTEST_PROVIDER,
runTestIdsPort: runTestIdPort,
pytestUUID: uuid.toString(),
pytestPort: this.getPort().toString(),
};
traceInfo(`Running DEBUG unittest for workspace ${options.cwd} with arguments: ${args}\r\n`);
await this.debugLauncher!.launchDebugger(launchOptions, () => {
callback?.();
});
} else {
if (isRun) {
// This means it is running the test
traceInfo(`Running unittests for workspace ${options.cwd} with arguments: ${args}\r\n`);
} else {
// This means it is running discovery
traceLog(`Discovering unittest tests for workspace ${options.cwd} with arguments: ${args}\r\n`);
}
const deferredTillExecClose = createDeferred<ExecutionResult<string>>();
let resultProc: ChildProcess | undefined;
runInstance?.token.onCancellationRequested(() => {
traceInfo(`Test run cancelled, killing unittest subprocess for workspace ${options.cwd}.`);
// if the resultProc exists just call kill on it which will handle resolving the ExecClose deferred, otherwise resolve the deferred here.
if (resultProc) {
resultProc?.kill();
} else {
deferredTillExecClose?.resolve();
}
});
const result = execService?.execObservable(args, spawnOptions);
resultProc = result?.proc;
// Displays output to user and ensure the subprocess doesn't run into buffer overflow.
// TODO: after a release, remove discovery output from the "Python Test Log" channel and send it to the "Python" channel instead.
// TODO: after a release, remove run output from the "Python Test Log" channel and send it to the "Test Result" channel instead.
if (isDiscovery) {
result?.proc?.stdout?.on('data', (data) => {
const out = fixLogLinesNoTrailing(data.toString());
spawnOptions?.outputChannel?.append(`${out}`);
traceInfo(out);
});
result?.proc?.stderr?.on('data', (data) => {
const out = fixLogLinesNoTrailing(data.toString());
spawnOptions?.outputChannel?.append(`${out}`);
traceError(out);
});
} else {
result?.proc?.stdout?.on('data', (data) => {
const out = fixLogLinesNoTrailing(data.toString());
runInstance?.appendOutput(`${out}`);
spawnOptions?.outputChannel?.append(out);
});
result?.proc?.stderr?.on('data', (data) => {
const out = fixLogLinesNoTrailing(data.toString());
runInstance?.appendOutput(`${out}`);
spawnOptions?.outputChannel?.append(out);
});
}
result?.proc?.on('exit', (code, signal) => {
// if the child has testIds then this is a run request
spawnOptions?.outputChannel?.append(MESSAGE_ON_TESTING_OUTPUT_MOVE);
if (isDiscovery) {
if (code !== 0) {
// This occurs when we are running discovery
traceError(
`Subprocess exited unsuccessfully with exit code ${code} and signal ${signal} on workspace ${options.cwd}. Creating and sending error discovery payload \n`,
);
this._onDiscoveryDataReceived.fire({
uuid,
data: JSON.stringify(createDiscoveryErrorPayload(code, signal, options.cwd)),
});
// then send a EOT payload
this._onDiscoveryDataReceived.fire({
uuid,
data: JSON.stringify(createEOTPayload(true)),
});
}
} else if (code !== 0 && testIds) {
// This occurs when we are running the test and there is an error which occurs.
traceError(
`Subprocess exited unsuccessfully with exit code ${code} and signal ${signal} for workspace ${options.cwd}. Creating and sending error execution payload \n`,
);
// if the child process exited with a non-zero exit code, then we need to send the error payload.
this._onRunDataReceived.fire({
uuid,
data: JSON.stringify(createExecutionErrorPayload(code, signal, testIds, options.cwd)),
});
// then send a EOT payload
this._onRunDataReceived.fire({
uuid,
data: JSON.stringify(createEOTPayload(true)),
});
}
deferredTillExecClose.resolve();
});
await deferredTillExecClose.promise;
}
} catch (ex) {
traceError(`Error while server attempting to run unittest command for workspace ${options.cwd}: ${ex}`);
this.uuids = this.uuids.filter((u) => u !== uuid);
this._onDataReceived.fire({
uuid,
data: JSON.stringify({
status: 'error',
errors: [(ex as Error).message],
}),
});
}
}
}