diff --git a/pkg/nuclide-react-native-node-executor/lib/Child.js b/pkg/nuclide-react-native-node-executor/lib/Child.js deleted file mode 100644 index c6013ec5e9..0000000000 --- a/pkg/nuclide-react-native-node-executor/lib/Child.js +++ /dev/null @@ -1,145 +0,0 @@ -'use babel'; -/* @flow */ - -/* - * Copyright (c) 2015-present, Facebook, Inc. - * All rights reserved. - * - * This source code is licensed under the license found in the LICENSE file in - * the root directory of this source tree. - */ - -import type { - ExecutorError, - ExecutorResponse, - ExecutorResult, - ExecutorRequest, - RnRequest, - ServerReplyCallback, -} from './types'; -import type {EventEmitter} from 'events'; - -import featureConfig from '../../nuclide-feature-config'; -import {forkWithExecEnvironment, getOutputStream} from '../../nuclide-commons/lib/process'; -import {getLogger} from '../../nuclide-logging'; -import invariant from 'assert'; -import path from 'path'; -import Rx from 'rx'; - -const logger = getLogger(); - -export default class Child { - - _closed: Promise; - _execScriptMessageId: number; - _process$: Rx.Observable; - _input$: Rx.Subject; - - constructor(onReply: ServerReplyCallback, emitter: EventEmitter) { - this._execScriptMessageId = -1; - - const execPath = featureConfig.get('nuclide-react-native.pathToNode'); - const process$ = this._process$ = Rx.Observable.fromPromise( - // TODO: The node location/path needs to be more configurable. We need to figure out a way to - // handle this across the board. - forkWithExecEnvironment( - path.join(__dirname, 'executor.js'), - [], - {execPath, silent: true}, - ) - ); - - // Pipe output from forked process. This just makes things easier to debug for us. - process$ - .flatMapLatest(process => getOutputStream(process)) - .subscribe(message => { - switch (message.kind) { - case 'error': - logger.error(message.error.message); - return; - case 'stderr': - case 'stdout': - logger.info(message.data.toString()); - return; - } - }); - - this._closed = process$.flatMap(process => Rx.Observable.fromEvent(process, 'close')) - .first() - .toPromise(); - - // A stream of messages we're sending to the executor. - this._input$ = new Rx.Subject(); - - // The messages we're receiving from the executor. - const output$ = ( - process$.flatMap( - process => Rx.Observable.fromEvent(process, 'message') - ): Rx.Observable - ); - - const result$ = ( - (output$.filter(message => message.kind === 'result'): any): Rx.Observable - ); - const error$ = ( - (output$.filter(message => message.kind === 'error'): any): Rx.Observable - ); - - // Emit the eval_application_script event when we get the message that corresponds to it. - result$ - .filter( - message => message.kind === 'result' && message.replyId === this._execScriptMessageId - ) - .first() - .combineLatest(process$) - .map(([, process]) => process.pid) - .subscribe(pid => { - emitter.emit('eval_application_script', pid); - }); - - // Forward the output we get from the process to subscribers - result$.subscribe(message => { onReply(message.replyId, message.result); }); - - // Log the errors. - error$.subscribe(error => { logger.error(error.message); }); - - // Buffer the messages until we have a process to send them to, then send them. - const bufferedMessage$ = this._input$.takeUntil(process$).buffer(process$).flatMap(x => x); - const remainingMessages = this._input$.skipUntil(process$); - bufferedMessage$.concat(remainingMessages) - .combineLatest(process$) - .subscribe(([message, process]) => { - process.send(message); - }); - } - - async kill(): Promise { - // Kill the process once we have one. - this._process$.subscribe(process => { process.kill(); }); - await this._closed; - } - - executeApplicationScript(script: string, inject: string, id: number) { - this._execScriptMessageId = id; - this._input$.onNext({ - id, - op: 'executeApplicationScript', - data: { - script, - inject, - }, - }); - } - - execCall(payload: RnRequest, id: number) { - invariant(payload.method != null); - this._input$.onNext({ - id, - op: 'call', - data: { - method: payload.method, - arguments: payload.arguments, - }, - }); - } -} diff --git a/pkg/nuclide-react-native-node-executor/lib/ChildManager.js b/pkg/nuclide-react-native-node-executor/lib/ChildManager.js index 2f4fc202c4..7d8c1e79ee 100644 --- a/pkg/nuclide-react-native-node-executor/lib/ChildManager.js +++ b/pkg/nuclide-react-native-node-executor/lib/ChildManager.js @@ -9,17 +9,16 @@ * the root directory of this source tree. */ -import invariant from 'assert'; -import http from 'http'; -import url from 'url'; - -import Child from './Child'; import type { RnRequest, + ExecutorResponse, ServerReplyCallback, } from './types'; import type {EventEmitter} from 'events'; +import {executeRnRequests} from './executeRnRequests'; +import {Observable, Subject} from 'rx'; + let logger; function getLogger() { if (!logger) { @@ -30,95 +29,58 @@ function getLogger() { export default class ChildManager { - _child: ?Child; _onReply: ServerReplyCallback; _emitter: EventEmitter; + _executorSubscription: ?IDisposable; + _executorResponses: Observable; + _rnRequests: Subject; + constructor(onReply: ServerReplyCallback, emitter: EventEmitter) { this._onReply = onReply; this._emitter = emitter; + this._rnRequests = new Subject(); + this._executorResponses = executeRnRequests(this._rnRequests); } _createChild(): void { - if (this._child == null) { - this._child = new Child(this._onReply, this._emitter); + if (this._executorSubscription != null) { + return; } + + this._executorSubscription = this._executorResponses.subscribe(response => { + switch (response.kind) { + case 'result': + this._onReply(response.replyId, response.result); + return; + case 'error': + getLogger().error(response.message); + return; + case 'pid': + this._emitter.emit('eval_application_script', response.pid); + return; + } + }); } - async killChild(): Promise { - if (!this._child) { + killChild(): void { + if (!this._executorSubscription) { return; } - await this._child.kill(); - this._child = null; + this._executorSubscription.dispose(); + this._executorSubscription = null; } - handleMessage(message: RnRequest): void { - if (message.replyID) { + handleMessage(request: RnRequest): void { + if (request.replyID) { // getting cross-talk from another executor (probably Chrome) return; } - switch (message.method) { - case 'prepareJSRuntime': - return this._prepareJSRuntime(message); - case 'executeApplicationScript': - return this._executeApplicationScript(message); - default: - return this._executeJSCall(message); - } - } - - _prepareJSRuntime(message: RnRequest): void { + // Make sure we have a worker to run the JS. this._createChild(); - this._onReply(message.id); - } - - _executeApplicationScript(message: RnRequest): void { - (async () => { - if (!this._child) { - // Warn Child not initialized; - return; - } - - const {id: messageId, url: messageUrl, inject} = message; - invariant(messageId != null); - invariant(messageUrl != null); - invariant(inject != null); - - const parsedUrl = url.parse(messageUrl, /* parseQueryString */ true); - invariant(parsedUrl.query); - parsedUrl.query.inlineSourceMap = true; - delete parsedUrl.search; - // $FlowIssue url.format() does not accept what url.parse() returns. - const scriptUrl = url.format(parsedUrl); - const script = await getScriptContents(scriptUrl); - invariant(this._child); - this._child.executeApplicationScript(script, inject, messageId); - })(); - } - _executeJSCall(message: RnRequest): void { - if (!this._child) { - // Warn Child not initialized; - return; - } - this._child.execCall(message, message.id); + this._rnRequests.onNext(request); } -} -function getScriptContents(src): Promise { - return new Promise((resolve, reject) => { - http.get(src, res => { - res.setEncoding('utf8'); - let buff = ''; - res.on('data', chunk => buff += chunk); - res.on('end', () => { - resolve(buff); - }); - }).on('error', err => { - getLogger().error('Failed to get script from packager.'); - reject(err); - }); - }); } diff --git a/pkg/nuclide-react-native-node-executor/lib/executeRnRequests.js b/pkg/nuclide-react-native-node-executor/lib/executeRnRequests.js new file mode 100644 index 0000000000..3894aff512 --- /dev/null +++ b/pkg/nuclide-react-native-node-executor/lib/executeRnRequests.js @@ -0,0 +1,80 @@ +'use babel'; +/* @flow */ + +/* + * Copyright (c) 2015-present, Facebook, Inc. + * All rights reserved. + * + * This source code is licensed under the license found in the LICENSE file in + * the root directory of this source tree. + */ + +import type {ExecutorResponse, RnRequest} from './types'; + +import featureConfig from '../../nuclide-feature-config'; +import { + createProcessStream, + forkWithExecEnvironment, + getOutputStream, +} from '../../nuclide-commons/lib/process'; +import {getLogger} from '../../nuclide-logging'; +import {CompositeDisposable} from 'atom'; +import path from 'path'; +import {Observable} from 'rx'; + +const logger = getLogger(); + +export function executeRnRequests(rnRequests: Observable): Observable { + const workerProcess = createProcessStream(() => ( + // TODO: The node location/path needs to be more configurable. We need to figure out a way to + // handle this across the board. + forkWithExecEnvironment( + path.join(__dirname, 'executor.js'), + [], + { + execArgv: ['--debug-brk'], + execPath: featureConfig.get('nuclide-react-native.pathToNode'), + silent: true, + }, + ) + )); + + return Observable.merge( + workerProcess.map(process => ({ + kind: 'pid', + pid: process.pid, + })), + + // The messages we're receiving from the worker process. + ( + workerProcess.flatMap( + process => Observable.fromEvent(process, 'message') + ): Observable + ), + + Observable.create(() => ( + new CompositeDisposable( + // Send the incoming requests to the worker process for evaluation. + rnRequests + .withLatestFrom(workerProcess, (r, p) => ([r, p])) + .subscribe(([request, process]) => { process.send(request); }), + + // Pipe output from forked process. This just makes things easier to debug for us. + workerProcess + .flatMapLatest(process => getOutputStream(process)) + .subscribe(message => { + switch (message.kind) { + case 'error': + logger.error(message.error.message); + return; + case 'stderr': + case 'stdout': + logger.info(message.data.toString()); + return; + } + }), + ) + )), + + ).share(); +} diff --git a/pkg/nuclide-react-native-node-executor/lib/executor.js b/pkg/nuclide-react-native-node-executor/lib/executor.js index 7f88e3ba84..8d6bff8322 100644 --- a/pkg/nuclide-react-native-node-executor/lib/executor.js +++ b/pkg/nuclide-react-native-node-executor/lib/executor.js @@ -10,53 +10,72 @@ /* eslint-disable no-var, no-console */ +var http = require('http'); +var invariant = require('assert'); +var url = require('url'); var vm = require('vm'); var currentContext = null; -// respond to ops -var ops = { - executeApplicationScript: function(id, data) { - var globals = { - console: console, - }; - if (data.inject) { - for (var name in data.inject) { - globals[name] = JSON.parse(data.inject[name]); - } - } - currentContext = vm.createContext(globals); - - try { - // The file name is dummy here. Without a file name, the source map is not used. - vm.runInContext(data.script, currentContext, '/tmp/react-native.js'); - } catch (e) { - sendError('Failed to exec script: ' + e); - } - - sendResult(id); - }, - - call: function(id, data) { - var returnValue = [[], [], [], [], []]; - try { - if (currentContext != null && typeof currentContext.__fbBatchedBridge === 'object') { - returnValue = currentContext.__fbBatchedBridge[data.method].apply(null, data.arguments); +process.on('message', function(request) { + switch (request.method) { + case 'prepareJSRuntime': + currentContext = vm.createContext({console: console}); + sendResult(request.id); + return; + + case 'executeApplicationScript': + // Modify the URL to make sure we get the inline source map. + var parsedUrl = url.parse(request.url, /* parseQueryString */ true); + invariant(parsedUrl.query); + parsedUrl.query.inlineSourceMap = true; + delete parsedUrl.search; + // $FlowIssue url.format() does not accept what url.parse() returns. + var scriptUrl = url.format(parsedUrl); + + getScriptContents(scriptUrl, function(err, script) { + if (err != null) { + sendError('Failed to get script from packager:' + err.message); + return; + } + + if (currentContext == null) { + sendError('JS runtime not prepared'); + return; + } + + if (request.inject) { + for (var name in request.inject) { + currentContext[name] = JSON.parse(request.inject[name]); + } + } + + try { + // The file name is dummy here. Without a file name, the source map is not used. + vm.runInContext(script, currentContext, '/tmp/react-native.js'); + } catch (e) { + sendError('Failed to exec script: ' + e); + } + sendResult(request.id); + }); + + return; + + default: + var returnValue = [[], [], [], [], []]; + try { + if (currentContext != null && typeof currentContext.__fbBatchedBridge === 'object') { + returnValue = + currentContext.__fbBatchedBridge[request.method].apply(null, request.arguments); + } + } catch (e) { + sendError('Failed while making a call ' + request.method + ':::' + e); + } finally { + sendResult(request.id, JSON.stringify(returnValue)); } - } catch (e) { - sendError('Failed while making a call ' + data.method + ':::' + e); - } finally { - sendResult(id, JSON.stringify(returnValue)); - } - }, -}; - -process.on('message', function(payload) { - if (!ops[payload.op]) { - sendError('Unknown op ' + payload.op + ' ' + JSON.stringify(payload)); - return; + + return; } - ops[payload.op](payload.id, payload.data); }); function sendResult(replyId, result) { @@ -73,3 +92,16 @@ function sendError(message) { message: message, }); } + +function getScriptContents(src, callback) { + http + .get(src, function(res) { + res.setEncoding('utf8'); + var buff = ''; + res.on('data', function(chunk) { buff += chunk; }); + res.on('end', () => { + callback(null, buff); + }); + }) + .on('error', function(err) { callback(err); }); +} diff --git a/pkg/nuclide-react-native-node-executor/lib/types.js b/pkg/nuclide-react-native-node-executor/lib/types.js index a2ee822dbb..62681a52da 100644 --- a/pkg/nuclide-react-native-node-executor/lib/types.js +++ b/pkg/nuclide-react-native-node-executor/lib/types.js @@ -24,31 +24,12 @@ export type ExecutorError = { message: string; }; -export type ExecutorResponse = ExecutorResult | ExecutorError; - -// Input to executor.js - -type EvaluateApplicationScriptExecutorRequest = { - id: number; - op: 'executeApplicationScript'; - data: { - script: string; - inject: string; - }; -}; - -type CallExecutorRequest = { - id: number; - op: 'call'; - data: { - method: string; - arguments: ?Array; - }; +export type ExecutorPid = { + kind: 'pid'; + pid: number; }; -export type ExecutorRequest = - EvaluateApplicationScriptExecutorRequest - | CallExecutorRequest; +export type ExecutorResponse = ExecutorResult | ExecutorError | ExecutorPid; // Requests coming from React Native diff --git a/pkg/nuclide-react-native/lib/debugging/ReactNativeDebuggerInstance.js b/pkg/nuclide-react-native/lib/debugging/ReactNativeDebuggerInstance.js index 26ce4eb870..e08664dc3a 100644 --- a/pkg/nuclide-react-native/lib/debugging/ReactNativeDebuggerInstance.js +++ b/pkg/nuclide-react-native/lib/debugging/ReactNativeDebuggerInstance.js @@ -15,7 +15,7 @@ import {Session} from '../../../nuclide-debugger-node/lib/Session'; import { DebuggerProxyClient, } from '../../../nuclide-react-native-node-executor/lib/DebuggerProxyClient'; -import {CompositeDisposable} from 'atom'; +import {CompositeDisposable, Disposable} from 'atom'; import Rx from 'rx'; import {Server as WebSocketServer} from 'ws'; @@ -39,26 +39,20 @@ export class ReactNativeDebuggerInstance extends DebuggerInstance { let didConnect; this._connected = new Promise(resolve => { didConnect = resolve; }); - // Once we have a connection from Nuclide (Chrome UI) and a pid, create a new debugging session. - const session$ = uiConnection$ - .combineLatest(pid$) - .flatMapLatest(([ws, pid]) => { - const config = { - debugPort, - preload: false, // This makes the node inspector not load all the source files on startup. - }; - - return Rx.Observable.create(observer => { - // Creating a new Session is actually side-effecty. - const session = new Session(config, debugPort, ws); - observer.onNext(session); - return Rx.Disposable.create(() => { session.close(); }); - }); - }) - .share(); + const session$ = Rx.Observable.create(observer => ( + // `Session` is particular about what order everything is closed in, so we manage it carefully + // here. + new CompositeDisposable( + uiConnection$ + .combineLatest(pid$) + .flatMapLatest(([ws, pid]) => createSessionStream(ws, debugPort)) + .subscribe(observer), + uiConnection$.connect(), + pid$.connect(), + ) + )); this._disposables = new CompositeDisposable( - // Tell the user if we can't connect to the debugger UI. uiConnection$.subscribeOnError(err => { atom.notifications.addError( @@ -73,18 +67,9 @@ export class ReactNativeDebuggerInstance extends DebuggerInstance { this.dispose(); }), - // Enable debugging in the process whenever we get a new pid. - // See and - // - pid$.subscribe(pid => { - // $FlowIgnore This is an undocumented API. It's an alternative to the UNIX SIGUSR1 signal. - process._debugProcess(pid); - }), - - session$.subscribe(), - pid$.first().subscribe(() => { didConnect(); }), + session$.subscribe(), ); } @@ -116,7 +101,7 @@ const pid$ = Rx.Observable.using( }, ({client}) => observableFromSubscribeFunction(client.onDidEvalApplicationScript.bind(client)), ) -.share(); +.publish(); /** * Connections from the Chrome UI. There will only be one connection at a time. This stream won't @@ -133,10 +118,25 @@ const uiConnection$ = Rx.Observable.using( }, ({server}) => ( Rx.Observable.merge( - Rx.Observable.fromEvent(server, 'close').flatMap(Rx.Observable.throw), Rx.Observable.fromEvent(server, 'error').flatMap(Rx.Observable.throw), Rx.Observable.fromEvent(server, 'connection'), ) + .takeUntil(Rx.Observable.fromEvent(server, 'close')) ), ) -.share(); +.publish(); + +function createSessionStream(ws: WebSocket, debugPort: number): Rx.Observable { + const config = { + debugPort, + // This makes the node inspector not load all the source files on startup: + preload: false, + }; + + return Rx.Observable.create(observer => { + // Creating a new Session is actually side-effecty. + const session = new Session(config, debugPort, ws); + observer.onNext(session); + return new Disposable(() => { session.close(); }); + }); +}