diff --git a/docs/source/public_server.rst b/docs/source/public_server.rst index 5bbce15429..edadbe3ffc 100644 --- a/docs/source/public_server.rst +++ b/docs/source/public_server.rst @@ -365,7 +365,7 @@ or in :file:`jupyter_notebook_config.py`: .. code-block:: python - c.Gateway.url = http://my-gateway-server:8888 + c.GatewayClient.url = http://my-gateway-server:8888 When provided, all kernel specifications will be retrieved from the specified Gateway server and all kernels will be managed by that server. This option enables the ability to target kernel processes diff --git a/notebook/gateway/handlers.py b/notebook/gateway/handlers.py index 1e28bfa779..8e09b10861 100644 --- a/notebook/gateway/handlers.py +++ b/notebook/gateway/handlers.py @@ -2,11 +2,9 @@ # Distributed under the terms of the Modified BSD License. import os -import json import logging -from socket import gaierror -from ..base.handlers import APIHandler, IPythonHandler +from ..base.handlers import IPythonHandler from ..utils import url_path_join from tornado import gen, web @@ -14,14 +12,13 @@ from tornado.ioloop import IOLoop from tornado.websocket import WebSocketHandler, websocket_connect from tornado.httpclient import HTTPRequest -from tornado.simple_httpclient import HTTPTimeoutError from tornado.escape import url_escape, json_decode, utf8 from ipython_genutils.py3compat import cast_unicode from jupyter_client.session import Session from traitlets.config.configurable import LoggingConfigurable -from .managers import Gateway +from .managers import GatewayClient class WebSocketChannelsHandler(WebSocketHandler, IPythonHandler): @@ -57,7 +54,7 @@ def authenticate(self): def initialize(self): self.log.debug("Initializing websocket connection %s", self.request.path) self.session = Session(config=self.config) - self.gateway = GatewayWebSocketClient(gateway_url=Gateway.instance().url) + self.gateway = GatewayWebSocketClient(gateway_url=GatewayClient.instance().url) @gen.coroutine def get(self, kernel_id, *args, **kwargs): @@ -124,12 +121,12 @@ def __init__(self, **kwargs): def _connect(self, kernel_id): self.kernel_id = kernel_id ws_url = url_path_join( - Gateway.instance().ws_url, - Gateway.instance().kernels_endpoint, url_escape(kernel_id), 'channels' + GatewayClient.instance().ws_url, + GatewayClient.instance().kernels_endpoint, url_escape(kernel_id), 'channels' ) self.log.info('Connecting to {}'.format(ws_url)) kwargs = {} - kwargs = Gateway.instance().load_connection_args(**kwargs) + kwargs = GatewayClient.instance().load_connection_args(**kwargs) request = HTTPRequest(ws_url, **kwargs) self.ws_future = websocket_connect(request) @@ -141,8 +138,8 @@ def _connection_done(self, fut): self.log.debug("Connection is ready: ws: {}".format(self.ws)) else: self.log.warning("Websocket connection has been cancelled via client disconnect before its establishment. " - "Kernel with ID '{}' may not be terminated on Gateway: {}". - format(self.kernel_id, Gateway.instance().url)) + "Kernel with ID '{}' may not be terminated on GatewayClient: {}". + format(self.kernel_id, GatewayClient.instance().url)) def _disconnect(self): if self.ws is not None: @@ -203,152 +200,8 @@ def on_close(self): self._disconnect() -# ----------------------------------------------------------------------------- -# kernel handlers -# ----------------------------------------------------------------------------- - -class MainKernelHandler(APIHandler): - """Replace default MainKernelHandler to enable async lookup of kernels.""" - - @web.authenticated - @gen.coroutine - def get(self): - km = self.kernel_manager - kernels = yield gen.maybe_future(km.list_kernels()) - self.finish(json.dumps(kernels)) - - @web.authenticated - @gen.coroutine - def post(self): - km = self.kernel_manager - model = self.get_json_body() - if model is None: - model = { - 'name': km.default_kernel_name - } - else: - model.setdefault('name', km.default_kernel_name) - - kernel_id = yield gen.maybe_future(km.start_kernel(kernel_name=model['name'])) - # This is now an async operation - model = yield gen.maybe_future(km.kernel_model(kernel_id)) - location = url_path_join(self.base_url, 'api', 'kernels', url_escape(kernel_id)) - self.set_header('Location', location) - self.set_status(201) - self.finish(json.dumps(model)) - - -class KernelHandler(APIHandler): - """Replace default KernelHandler to enable async lookup of kernels.""" - - @web.authenticated - @gen.coroutine - def get(self, kernel_id): - km = self.kernel_manager - # This is now an async operation - model = yield gen.maybe_future(km.kernel_model(kernel_id)) - if model is None: - raise web.HTTPError(404, u'Kernel does not exist: %s' % kernel_id) - self.finish(json.dumps(model)) - - @web.authenticated - @gen.coroutine - def delete(self, kernel_id): - km = self.kernel_manager - yield gen.maybe_future(km.shutdown_kernel(kernel_id)) - self.set_status(204) - self.finish() - - -class KernelActionHandler(APIHandler): - """Replace default KernelActionHandler to enable async lookup of kernels.""" - - @web.authenticated - @gen.coroutine - def post(self, kernel_id, action): - km = self.kernel_manager - - if action == 'interrupt': - km.interrupt_kernel(kernel_id) - self.set_status(204) - - if action == 'restart': - try: - yield gen.maybe_future(km.restart_kernel(kernel_id)) - except Exception as e: - self.log.error("Exception restarting kernel", exc_info=True) - self.set_status(500) - else: - # This is now an async operation - model = yield gen.maybe_future(km.kernel_model(kernel_id)) - self.write(json.dumps(model)) - self.finish() - -# ----------------------------------------------------------------------------- -# kernel spec handlers -# ----------------------------------------------------------------------------- - - -class MainKernelSpecHandler(APIHandler): - @web.authenticated - @gen.coroutine - def get(self): - ksm = self.kernel_spec_manager - try: - kernel_specs = yield gen.maybe_future(ksm.list_kernel_specs()) - # TODO: Remove resources until we support them - for name, spec in kernel_specs['kernelspecs'].items(): - spec['resources'] = {} - self.set_header("Content-Type", 'application/json') - self.write(json.dumps(kernel_specs)) - - # Trap a set of common exceptions so that we can inform the user that their Gateway url is incorrect - # or the server is not running. - # NOTE: We do this here since this handler is called during the Notebook's startup and subsequent refreshes - # of the tree view. - except ConnectionRefusedError: - self.log.error("Connection refused from Gateway server url '{}'. " - "Check to be sure the Gateway instance is running.".format(Gateway.instance().url)) - except HTTPTimeoutError: - # This can occur if the host is valid (e.g., foo.com) but there's nothing there. - self.log.error("Timeout error attempting to connect to Gateway server url '{}'. " - "Ensure gateway url is valid and the Gateway instance is running.".format(Gateway.instance().url)) - except gaierror as e: - self.log.error("The Gateway server specified in the gateway_url '{}' doesn't appear to be valid. " - "Ensure gateway url is valid and the Gateway instance is running.".format(Gateway.instance().url)) - - self.finish() - - -class KernelSpecHandler(APIHandler): - @web.authenticated - @gen.coroutine - def get(self, kernel_name): - ksm = self.kernel_spec_manager - kernel_spec = yield ksm.get_kernel_spec(kernel_name) - if kernel_spec is None: - raise web.HTTPError(404, u'Kernel spec %s not found' % kernel_name) - # TODO: Remove resources until we support them - kernel_spec['resources'] = {} - self.set_header("Content-Type", 'application/json') - self.finish(json.dumps(kernel_spec)) - -# ----------------------------------------------------------------------------- -# URL to handler mappings -# ----------------------------------------------------------------------------- - - -from ..services.kernels.handlers import _kernel_id_regex, _kernel_action_regex -from ..services.kernelspecs.handlers import kernel_name_regex +from ..services.kernels.handlers import _kernel_id_regex default_handlers = [ - (r"/api/kernels", MainKernelHandler), - (r"/api/kernels/%s" % _kernel_id_regex, KernelHandler), - (r"/api/kernels/%s/%s" % (_kernel_id_regex, _kernel_action_regex), KernelActionHandler), (r"/api/kernels/%s/channels" % _kernel_id_regex, WebSocketChannelsHandler), - (r"/api/kernelspecs", MainKernelSpecHandler), - (r"/api/kernelspecs/%s" % kernel_name_regex, KernelSpecHandler), - # TODO: support kernel spec resources - # (r"/kernelspecs/%s/(?P.*)" % kernel_name_regex, KernelSpecResourceHandler), - ] diff --git a/notebook/gateway/managers.py b/notebook/gateway/managers.py index 42da1696fc..73af7d9799 100644 --- a/notebook/gateway/managers.py +++ b/notebook/gateway/managers.py @@ -4,9 +4,11 @@ import os import json -from tornado import gen +from socket import gaierror +from tornado import gen, web from tornado.escape import json_encode, json_decode, url_escape from tornado.httpclient import HTTPClient, AsyncHTTPClient, HTTPError +from tornado.simple_httpclient import HTTPTimeoutError from ..services.kernels.kernelmanager import MappingKernelManager from ..services.sessions.sessionmanager import SessionManager @@ -18,22 +20,10 @@ from traitlets.config import SingletonConfigurable -@gen.coroutine -def fetch_gateway(endpoint, **kwargs): - """Make an async request to kernel gateway endpoint.""" - client = AsyncHTTPClient() - - kwargs = Gateway.instance().load_connection_args(**kwargs) - - response = yield client.fetch(endpoint, **kwargs) - raise gen.Return(response) - - -class Gateway(SingletonConfigurable): - """This class manages the configuration. It's its own class so that we can avoid having command - line options of the likes `--GatewayKernelManager.connect_timeout` and use the shorter and more - applicable `--Gateway.connect_timeout`, etc. It also contains some helper methods to build - request arguments out of the various config options. +class GatewayClient(SingletonConfigurable): + """This class manages the configuration. It's its own singleton class so that we + can share these values across all objects. It also contains some helper methods + to build request arguments out of the various config options. """ @@ -56,7 +46,7 @@ def _url_validate(self, proposal): # Ensure value, if present, starts with 'http' if value is not None and len(value) > 0: if not str(value).lower().startswith('http'): - raise TraitError("Gateway url must start with 'http': '%r'" % value) + raise TraitError("GatewayClient url must start with 'http': '%r'" % value) return value ws_url = Unicode(default_value=None, allow_none=True, config=True, @@ -80,7 +70,7 @@ def _ws_url_validate(self, proposal): # Ensure value, if present, starts with 'ws' if value is not None and len(value) > 0: if not str(value).lower().startswith('ws'): - raise TraitError("Gateway ws_url must start with 'ws': '%r'" % value) + raise TraitError("GatewayClient ws_url must start with 'ws': '%r'" % value) return value kernels_endpoint_default_value = '/api/kernels' @@ -204,9 +194,21 @@ def validate_cert_default(self): return bool(os.environ.get(self.validate_cert_env, str(self.validate_cert_default_value)) not in ['no', 'false']) def __init__(self, **kwargs): - super(Gateway, self).__init__(**kwargs) + super(GatewayClient, self).__init__(**kwargs) self._static_args = {} # initialized on first use + env_whitelist_default_value = '' + env_whitelist_env = 'JUPYTER_GATEWAY_ENV_WHITELIST' + env_whitelist = Unicode(default_value=env_whitelist_default_value, config=True, + help="""A comma-separated list of environment variable names that will be included, along with + their values, in the kernel startup request. The corresponding `env_whitelist` configuration + value must also be set on the Gateway server - since that configuration value indicates which + environmental values to make available to the kernel. (JUPYTER_GATEWAY_ENV_WHITELIST env var)""") + + @default('env_whitelist') + def _env_whitelist_default(self): + return os.environ.get(self.env_whitelist_env, self.env_whitelist_default_value) + @property def gateway_enabled(self): return bool(self.url is not None and len(self.url) > 0) @@ -242,6 +244,34 @@ def load_connection_args(self, **kwargs): kwargs.update(self._static_args) return kwargs + +@gen.coroutine +def gateway_request(endpoint, **kwargs): + """Make an async request to kernel gateway endpoint, returns a response """ + client = AsyncHTTPClient() + kwargs = GatewayClient.instance().load_connection_args(**kwargs) + try: + response = yield client.fetch(endpoint, **kwargs) + # Trap a set of common exceptions so that we can inform the user that their Gateway url is incorrect + # or the server is not running. + # NOTE: We do this here since this handler is called during the Notebook's startup and subsequent refreshes + # of the tree view. + except ConnectionRefusedError: + raise web.HTTPError(503, "Connection refused from Gateway server url '{}'. " + "Check to be sure the Gateway instance is running.".format(GatewayClient.instance().url)) + except HTTPTimeoutError: + # This can occur if the host is valid (e.g., foo.com) but there's nothing there. + raise web.HTTPError(504, "Timeout error attempting to connect to Gateway server url '{}'. " \ + "Ensure gateway url is valid and the Gateway instance is running.".format( + GatewayClient.instance().url)) + except gaierror as e: + raise web.HTTPError(404, "The Gateway server specified in the gateway_url '{}' doesn't appear to be valid. " + "Ensure gateway url is valid and the Gateway instance is running.".format( + GatewayClient.instance().url)) + + raise gen.Return(response) + + class GatewayKernelManager(MappingKernelManager): """Kernel manager that supports remote kernels hosted by Jupyter Kernel or Enterprise Gateway.""" @@ -250,7 +280,7 @@ class GatewayKernelManager(MappingKernelManager): def __init__(self, **kwargs): super(GatewayKernelManager, self).__init__(**kwargs) - self.base_endpoint = url_path_join(Gateway.instance().url, Gateway.instance().kernels_endpoint) + self.base_endpoint = url_path_join(GatewayClient.instance().url, GatewayClient.instance().kernels_endpoint) def __contains__(self, kernel_id): return kernel_id in self._kernels @@ -291,18 +321,25 @@ def start_kernel(self, kernel_id=None, path=None, **kwargs): self.log.info('Request start kernel: kernel_id=%s, path="%s"', kernel_id, path) if kernel_id is None: + if path is not None: + kwargs['cwd'] = self.cwd_for_path(path) kernel_name = kwargs.get('kernel_name', 'python3') kernel_url = self._get_kernel_endpoint_url() self.log.debug("Request new kernel at: %s" % kernel_url) + # Let KERNEL_USERNAME take precedent over http_user config option. + if os.environ.get('KERNEL_USERNAME') is None and GatewayClient.instance().http_user: + os.environ['KERNEL_USERNAME'] = GatewayClient.instance().http_user + kernel_env = {k: v for (k, v) in dict(os.environ).items() if k.startswith('KERNEL_') - or k in os.environ.get('GATEWAY_ENV_WHITELIST', '').split(",")} + or k in GatewayClient.instance().env_whitelist.split(",")} json_body = json_encode({'name': kernel_name, 'env': kernel_env}) - response = yield fetch_gateway(kernel_url, method='POST', body=json_body) + response = yield gateway_request(kernel_url, method='POST', body=json_body) kernel = json_decode(response.body) kernel_id = kernel['id'] self.log.info("Kernel started: %s" % kernel_id) + self.log.debug("Kernel args: %r" % kwargs) else: kernel = yield self.get_kernel(kernel_id) kernel_id = kernel['id'] @@ -323,7 +360,7 @@ def get_kernel(self, kernel_id=None, **kwargs): kernel_url = self._get_kernel_endpoint_url(kernel_id) self.log.debug("Request kernel at: %s" % kernel_url) try: - response = yield fetch_gateway(kernel_url, method='GET') + response = yield gateway_request(kernel_url, method='GET') except HTTPError as error: if error.code == 404: self.log.warn("Kernel not found at: %s" % kernel_url) @@ -334,7 +371,7 @@ def get_kernel(self, kernel_id=None, **kwargs): else: kernel = json_decode(response.body) self._kernels[kernel_id] = kernel - self.log.info("Kernel retrieved: %s" % kernel) + self.log.debug("Kernel retrieved: %s" % kernel) raise gen.Return(kernel) @gen.coroutine @@ -356,13 +393,13 @@ def list_kernels(self, **kwargs): """Get a list of kernels.""" kernel_url = self._get_kernel_endpoint_url() self.log.debug("Request list kernels: %s", kernel_url) - response = yield fetch_gateway(kernel_url, method='GET') + response = yield gateway_request(kernel_url, method='GET') kernels = json_decode(response.body) self._kernels = {x['id']:x for x in kernels} raise gen.Return(kernels) @gen.coroutine - def shutdown_kernel(self, kernel_id): + def shutdown_kernel(self, kernel_id, now=False, restart=False): """Shutdown a kernel by its kernel uuid. Parameters @@ -372,7 +409,7 @@ def shutdown_kernel(self, kernel_id): """ kernel_url = self._get_kernel_endpoint_url(kernel_id) self.log.debug("Request shutdown kernel at: %s", kernel_url) - response = yield fetch_gateway(kernel_url, method='DELETE') + response = yield gateway_request(kernel_url, method='DELETE') self.log.debug("Shutdown kernel response: %d %s", response.code, response.reason) self.remove_kernel(kernel_id) @@ -387,7 +424,7 @@ def restart_kernel(self, kernel_id, now=False, **kwargs): """ kernel_url = self._get_kernel_endpoint_url(kernel_id) + '/restart' self.log.debug("Request restart kernel at: %s", kernel_url) - response = yield fetch_gateway(kernel_url, method='POST', body=json_encode({})) + response = yield gateway_request(kernel_url, method='POST', body=json_encode({})) self.log.debug("Restart kernel response: %d %s", response.code, response.reason) @gen.coroutine @@ -401,14 +438,15 @@ def interrupt_kernel(self, kernel_id, **kwargs): """ kernel_url = self._get_kernel_endpoint_url(kernel_id) + '/interrupt' self.log.debug("Request interrupt kernel at: %s", kernel_url) - response = yield fetch_gateway(kernel_url, method='POST', body=json_encode({})) + response = yield gateway_request(kernel_url, method='POST', body=json_encode({})) self.log.debug("Interrupt kernel response: %d %s", response.code, response.reason) - def shutdown_all(self): + def shutdown_all(self, now=False): """Shutdown all kernels.""" # Note: We have to make this sync because the NotebookApp does not wait for async. + shutdown_kernels = [] kwargs = {'method': 'DELETE'} - kwargs = Gateway.instance().load_connection_args(**kwargs) + kwargs = GatewayClient.instance().load_connection_args(**kwargs) client = HTTPClient() for kernel_id in self._kernels.keys(): kernel_url = self._get_kernel_endpoint_url(kernel_id) @@ -417,16 +455,19 @@ def shutdown_all(self): response = client.fetch(kernel_url, **kwargs) except HTTPError: pass - self.log.debug("Delete kernel response: %d %s", response.code, response.reason) - self.remove_kernel(kernel_id) + else: + self.log.debug("Delete kernel response: %d %s", response.code, response.reason) + shutdown_kernels.append(kernel_id) # avoid changing dict size during iteration client.close() + for kernel_id in shutdown_kernels: + self.remove_kernel(kernel_id) class GatewayKernelSpecManager(KernelSpecManager): def __init__(self, **kwargs): super(GatewayKernelSpecManager, self).__init__(**kwargs) - self.base_endpoint = url_path_join(Gateway.instance().url, Gateway.instance().kernelspecs_endpoint) + self.base_endpoint = url_path_join(GatewayClient.instance().url, GatewayClient.instance().kernelspecs_endpoint) def _get_kernelspecs_endpoint_url(self, kernel_name=None): """Builds a url for the kernels endpoint @@ -440,12 +481,39 @@ def _get_kernelspecs_endpoint_url(self, kernel_name=None): return self.base_endpoint + @gen.coroutine + def get_all_specs(self): + fetched_kspecs = yield self.list_kernel_specs() + + # get the default kernel name and compare to that of this server. + # If different log a warning and reset the default. However, the + # caller of this method will still return this server's value until + # the next fetch of kernelspecs - at which time they'll match. + km = self.parent.kernel_manager + remote_default_kernel_name = fetched_kspecs.get('default') + if remote_default_kernel_name != km.default_kernel_name: + self.log.info("Default kernel name on Gateway server ({gateway_default}) differs from " + "Notebook server ({notebook_default}). Updating to Gateway server's value.". + format(gateway_default=remote_default_kernel_name, + notebook_default=km.default_kernel_name)) + km.default_kernel_name = remote_default_kernel_name + + # gateway doesn't support resources (requires transfer for use by NB client) + # so add `resource_dir` to each kernelspec and value of 'not supported in gateway mode' + remote_kspecs = fetched_kspecs.get('kernelspecs') + for kernel_name, kspec_info in remote_kspecs.items(): + if not kspec_info.get('resource_dir'): + kspec_info['resource_dir'] = 'not supported in gateway mode' + remote_kspecs[kernel_name].update(kspec_info) + + raise gen.Return(remote_kspecs) + @gen.coroutine def list_kernel_specs(self): """Get a list of kernel specs.""" kernel_spec_url = self._get_kernelspecs_endpoint_url() self.log.debug("Request list kernel specs at: %s", kernel_spec_url) - response = yield fetch_gateway(kernel_spec_url, method='GET') + response = yield gateway_request(kernel_spec_url, method='GET') kernel_specs = json_decode(response.body) raise gen.Return(kernel_specs) @@ -461,189 +529,27 @@ def get_kernel_spec(self, kernel_name, **kwargs): kernel_spec_url = self._get_kernelspecs_endpoint_url(kernel_name=str(kernel_name)) self.log.debug("Request kernel spec at: %s" % kernel_spec_url) try: - response = yield fetch_gateway(kernel_spec_url, method='GET') + response = yield gateway_request(kernel_spec_url, method='GET') except HTTPError as error: if error.code == 404: - self.log.warn("Kernel spec not found at: %s" % kernel_spec_url) - kernel_spec = None + # Convert not found to KeyError since that's what the Notebook handler expects + # message is not used, but might as well make it useful for troubleshooting + raise KeyError('kernelspec {kernel_name} not found on Gateway server at: {gateway_url}'. + format(kernel_name=kernel_name, gateway_url=GatewayClient.instance().url)) else: raise else: kernel_spec = json_decode(response.body) - raise gen.Return(kernel_spec) + # Convert to instance of Kernelspec + kspec_instance = self.kernel_spec_class(resource_dir=u'', **kernel_spec['spec']) + raise gen.Return(kspec_instance) class GatewaySessionManager(SessionManager): kernel_manager = Instance('notebook.gateway.managers.GatewayKernelManager') @gen.coroutine - def create_session(self, path=None, name=None, type=None, - kernel_name=None, kernel_id=None): - """Creates a session and returns its model. - - Overrides base class method to turn into an async operation. - """ - session_id = self.new_session_id() - - kernel = None - if kernel_id is not None: - # This is now an async operation - kernel = yield self.kernel_manager.get_kernel(kernel_id) - - if kernel is not None: - pass - else: - kernel_id = yield self.start_kernel_for_session( - session_id, path, name, type, kernel_name, - ) - - result = yield self.save_session( - session_id, path=path, name=name, type=type, kernel_id=kernel_id, - ) - raise gen.Return(result) - - @gen.coroutine - def save_session(self, session_id, path=None, name=None, type=None, - kernel_id=None): - """Saves the items for the session with the given session_id - - Given a session_id (and any other of the arguments), this method - creates a row in the sqlite session database that holds the information - for a session. - - Parameters - ---------- - session_id : str - uuid for the session; this method must be given a session_id - path : str - the path for the given notebook - kernel_id : str - a uuid for the kernel associated with this session - - Returns - ------- - model : dict - a dictionary of the session model - """ - # This is now an async operation - session = yield super(GatewaySessionManager, self).save_session( - session_id, path=path, name=name, type=type, kernel_id=kernel_id - ) - raise gen.Return(session) - - @gen.coroutine - def get_session(self, **kwargs): - """Returns the model for a particular session. - - Takes a keyword argument and searches for the value in the session - database, then returns the rest of the session's info. - - Overrides base class method to turn into an async operation. - - Parameters - ---------- - **kwargs : keyword argument - must be given one of the keywords and values from the session database - (i.e. session_id, path, kernel_id) - - Returns - ------- - model : dict - returns a dictionary that includes all the information from the - session described by the kwarg. - """ - # This is now an async operation - session = yield super(GatewaySessionManager, self).get_session(**kwargs) - raise gen.Return(session) - - @gen.coroutine - def update_session(self, session_id, **kwargs): - """Updates the values in the session database. - - Changes the values of the session with the given session_id - with the values from the keyword arguments. - - Overrides base class method to turn into an async operation. - - Parameters - ---------- - session_id : str - a uuid that identifies a session in the sqlite3 database - **kwargs : str - the key must correspond to a column title in session database, - and the value replaces the current value in the session - with session_id. - """ - # This is now an async operation - session = yield self.get_session(session_id=session_id) - - if not kwargs: - # no changes - return - - sets = [] - for column in kwargs.keys(): - if column not in self._columns: - raise TypeError("No such column: %r" % column) - sets.append("%s=?" % column) - query = "UPDATE session SET %s WHERE session_id=?" % (', '.join(sets)) - self.cursor.execute(query, list(kwargs.values()) + [session_id]) - - @gen.coroutine - def row_to_model(self, row): - """Takes sqlite database session row and turns it into a dictionary. - - Overrides base class method to turn into an async operation. - """ - # Retrieve kernel for session, which is now an async operation - kernel = yield self.kernel_manager.get_kernel(row['kernel_id']) - if kernel is None: - # The kernel was killed or died without deleting the session. - # We can't use delete_session here because that tries to find - # and shut down the kernel. - self.cursor.execute("DELETE FROM session WHERE session_id=?", - (row['session_id'],)) - raise KeyError - - model = { - 'id': row['session_id'], - 'path': row['path'], - 'name': row['name'], - 'type': row['type'], - 'kernel': kernel - } - if row['type'] == 'notebook': # Provide the deprecated API. - model['notebook'] = {'path': row['path'], 'name': row['name']} - - raise gen.Return(model) - - @gen.coroutine - def list_sessions(self): - """Returns a list of dictionaries containing all the information from - the session database. - - Overrides base class method to turn into an async operation. - """ - c = self.cursor.execute("SELECT * FROM session") - result = [] - # We need to use fetchall() here, because row_to_model can delete rows, - # which messes up the cursor if we're iterating over rows. - for row in c.fetchall(): - try: - # This is now an async operation - model = yield self.row_to_model(row) - result.append(model) - except KeyError: - pass - raise gen.Return(result) - - @gen.coroutine - def delete_session(self, session_id): - """Deletes the row in the session database with given session_id. - - Overrides base class method to turn into an async operation. - """ - # This is now an async operation - session = yield self.get_session(session_id=session_id) - yield gen.maybe_future(self.kernel_manager.shutdown_kernel(session['kernel']['id'])) - self.cursor.execute("DELETE FROM session WHERE session_id=?", (session_id,)) + def kernel_culled(self, kernel_id): + """Checks if the kernel is still considered alive and returns true if its not found. """ + kernel = yield self.kernel_manager.get_kernel(kernel_id) + raise gen.Return(kernel is None) diff --git a/notebook/notebookapp.py b/notebook/notebookapp.py index 559e775794..2639b4faa8 100755 --- a/notebook/notebookapp.py +++ b/notebook/notebookapp.py @@ -84,7 +84,7 @@ from .services.contents.filemanager import FileContentsManager from .services.contents.largefilemanager import LargeFileManager from .services.sessions.sessionmanager import SessionManager -from .gateway.managers import GatewayKernelManager, GatewayKernelSpecManager, GatewaySessionManager, Gateway +from .gateway.managers import GatewayKernelManager, GatewayKernelSpecManager, GatewaySessionManager, GatewayClient from .auth.login import LoginHandler from .auth.logout import LogoutHandler @@ -311,16 +311,22 @@ def init_handlers(self, settings): handlers.extend(load_handlers('notebook.services.nbconvert.handlers')) handlers.extend(load_handlers('notebook.services.security.handlers')) handlers.extend(load_handlers('notebook.services.shutdown')) - - # If gateway server is configured, replace appropriate handlers to perform redirection - if Gateway.instance().gateway_enabled: - handlers.extend(load_handlers('notebook.gateway.handlers')) - else: - handlers.extend(load_handlers('notebook.services.kernels.handlers')) - handlers.extend(load_handlers('notebook.services.kernelspecs.handlers')) + handlers.extend(load_handlers('notebook.services.kernels.handlers')) + handlers.extend(load_handlers('notebook.services.kernelspecs.handlers')) handlers.extend(settings['contents_manager'].get_extra_handlers()) + # If gateway mode is enabled, replace appropriate handlers to perform redirection + if GatewayClient.instance().gateway_enabled: + # for each handler required for gateway, locate its pattern + # in the current list and replace that entry... + gateway_handlers = load_handlers('notebook.gateway.handlers') + for i, gwh in enumerate(gateway_handlers): + for j, h in enumerate(handlers): + if gwh[0] == h[0]: + handlers[j] = (gwh[0], gwh[1]) + break + handlers.append( (r"/nbextensions/(.*)", FileFindHandler, { 'path': settings['nbextensions_path'], @@ -554,7 +560,7 @@ def start(self): 'notebook-dir': 'NotebookApp.notebook_dir', 'browser': 'NotebookApp.browser', 'pylab': 'NotebookApp.pylab', - 'gateway-url': 'Gateway.url', + 'gateway-url': 'GatewayClient.url', }) #----------------------------------------------------------------------------- @@ -575,7 +581,7 @@ class NotebookApp(JupyterApp): classes = [ KernelManager, Session, MappingKernelManager, KernelSpecManager, ContentsManager, FileContentsManager, NotebookNotary, - GatewayKernelManager, GatewayKernelSpecManager, GatewaySessionManager, Gateway, + GatewayKernelManager, GatewayKernelSpecManager, GatewaySessionManager, GatewayClient, ] flags = Dict(flags) aliases = Dict(aliases) @@ -1325,8 +1331,9 @@ def parse_command_line(self, argv=None): def init_configurables(self): - # If gateway server is configured, replace appropriate managers to perform redirection - self.gateway_config = Gateway.instance(parent=self) + # If gateway server is configured, replace appropriate managers to perform redirection. To make + # this determination, instantiate the GatewayClient config singleton. + self.gateway_config = GatewayClient.instance(parent=self) if self.gateway_config.gateway_enabled: self.kernel_manager_class = 'notebook.gateway.managers.GatewayKernelManager' diff --git a/notebook/services/kernels/handlers.py b/notebook/services/kernels/handlers.py index cfef2a4a0e..897fa51db2 100644 --- a/notebook/services/kernels/handlers.py +++ b/notebook/services/kernels/handlers.py @@ -45,7 +45,7 @@ def post(self): model.setdefault('name', km.default_kernel_name) kernel_id = yield gen.maybe_future(km.start_kernel(kernel_name=model['name'])) - model = km.kernel_model(kernel_id) + model = yield gen.maybe_future(km.kernel_model(kernel_id)) location = url_path_join(self.base_url, 'api', 'kernels', url_escape(kernel_id)) self.set_header('Location', location) self.set_status(201) @@ -57,7 +57,6 @@ class KernelHandler(APIHandler): @web.authenticated def get(self, kernel_id): km = self.kernel_manager - km._check_kernel_id(kernel_id) model = km.kernel_model(kernel_id) self.finish(json.dumps(model, default=date_default)) @@ -87,7 +86,7 @@ def post(self, kernel_id, action): self.log.error("Exception restarting kernel", exc_info=True) self.set_status(500) else: - model = km.kernel_model(kernel_id) + model = yield gen.maybe_future(km.kernel_model(kernel_id)) self.write(json.dumps(model, default=date_default)) self.finish() diff --git a/notebook/services/kernelspecs/handlers.py b/notebook/services/kernelspecs/handlers.py index d272db2f71..c0157e4c57 100644 --- a/notebook/services/kernelspecs/handlers.py +++ b/notebook/services/kernelspecs/handlers.py @@ -11,7 +11,7 @@ import os pjoin = os.path.join -from tornado import web +from tornado import web, gen from ...base.handlers import APIHandler from ...utils import url_path_join, url_unescape @@ -48,13 +48,15 @@ def kernelspec_model(handler, name, spec_dict, resource_dir): class MainKernelSpecHandler(APIHandler): @web.authenticated + @gen.coroutine def get(self): ksm = self.kernel_spec_manager km = self.kernel_manager model = {} model['default'] = km.default_kernel_name model['kernelspecs'] = specs = {} - for kernel_name, kernel_info in ksm.get_all_specs().items(): + kspecs = yield gen.maybe_future(ksm.get_all_specs()) + for kernel_name, kernel_info in kspecs.items(): try: d = kernelspec_model(self, kernel_name, kernel_info['spec'], kernel_info['resource_dir']) @@ -69,11 +71,12 @@ def get(self): class KernelSpecHandler(APIHandler): @web.authenticated + @gen.coroutine def get(self, kernel_name): ksm = self.kernel_spec_manager kernel_name = url_unescape(kernel_name) try: - spec = ksm.get_kernel_spec(kernel_name) + spec = yield gen.maybe_future(ksm.get_kernel_spec(kernel_name)) except KeyError: raise web.HTTPError(404, u'Kernel spec %s not found' % kernel_name) model = kernelspec_model(self, kernel_name, spec.to_dict(), spec.resource_dir) diff --git a/notebook/services/sessions/sessionmanager.py b/notebook/services/sessions/sessionmanager.py index ee70eb0810..4497cfbc33 100644 --- a/notebook/services/sessions/sessionmanager.py +++ b/notebook/services/sessions/sessionmanager.py @@ -56,21 +56,22 @@ def __del__(self): """Close connection once SessionManager closes""" self.close() + @gen.coroutine def session_exists(self, path): """Check to see if the session of a given name exists""" + exists = False self.cursor.execute("SELECT * FROM session WHERE path=?", (path,)) row = self.cursor.fetchone() - if row is None: - return False - else: + if row is not None: # Note, although we found a row for the session, the associated kernel may have # been culled or died unexpectedly. If that's the case, we should delete the # row, thereby terminating the session. This can be done via a call to # row_to_model that tolerates that condition. If row_to_model returns None, # we'll return false, since, at that point, the session doesn't exist anyway. - if self.row_to_model(row, tolerate_culled=True) is None: - return False - return True + model = yield gen.maybe_future(self.row_to_model(row, tolerate_culled=True)) + if model is not None: + exists = True + raise gen.Return(exists) def new_session_id(self): "Create a uuid for a new session" @@ -101,6 +102,7 @@ def start_kernel_for_session(self, session_id, path, name, type, kernel_name): # py2-compat raise gen.Return(kernel_id) + @gen.coroutine def save_session(self, session_id, path=None, name=None, type=None, kernel_id=None): """Saves the items for the session with the given session_id @@ -129,8 +131,10 @@ def save_session(self, session_id, path=None, name=None, type=None, kernel_id=No self.cursor.execute("INSERT INTO session VALUES (?,?,?,?,?)", (session_id, path, name, type, kernel_id) ) - return self.get_session(session_id=session_id) + result = yield gen.maybe_future(self.get_session(session_id=session_id)) + raise gen.Return(result) + @gen.coroutine def get_session(self, **kwargs): """Returns the model for a particular session. @@ -174,8 +178,10 @@ def get_session(self, **kwargs): raise web.HTTPError(404, u'Session not found: %s' % (', '.join(q))) - return self.row_to_model(row) + model = yield gen.maybe_future(self.row_to_model(row)) + raise gen.Return(model) + @gen.coroutine def update_session(self, session_id, **kwargs): """Updates the values in the session database. @@ -191,7 +197,7 @@ def update_session(self, session_id, **kwargs): and the value replaces the current value in the session with session_id. """ - self.get_session(session_id=session_id) + yield gen.maybe_future(self.get_session(session_id=session_id)) if not kwargs: # no changes @@ -205,9 +211,15 @@ def update_session(self, session_id, **kwargs): query = "UPDATE session SET %s WHERE session_id=?" % (', '.join(sets)) self.cursor.execute(query, list(kwargs.values()) + [session_id]) + def kernel_culled(self, kernel_id): + """Checks if the kernel is still considered alive and returns true if its not found. """ + return kernel_id not in self.kernel_manager + + @gen.coroutine def row_to_model(self, row, tolerate_culled=False): """Takes sqlite database session row and turns it into a dictionary""" - if row['kernel_id'] not in self.kernel_manager: + kernel_culled = yield gen.maybe_future(self.kernel_culled(row['kernel_id'])) + if kernel_culled: # The kernel was culled or died without deleting the session. # We can't use delete_session here because that tries to find # and shut down the kernel - so we'll delete the row directly. @@ -222,21 +234,23 @@ def row_to_model(self, row, tolerate_culled=False): format(kernel_id=row['kernel_id'],session_id=row['session_id']) if tolerate_culled: self.log.warning(msg + " Continuing...") - return None + raise gen.Return(None) raise KeyError(msg) + kernel_model = yield gen.maybe_future(self.kernel_manager.kernel_model(row['kernel_id'])) model = { 'id': row['session_id'], 'path': row['path'], 'name': row['name'], 'type': row['type'], - 'kernel': self.kernel_manager.kernel_model(row['kernel_id']) + 'kernel': kernel_model } if row['type'] == 'notebook': # Provide the deprecated API. model['notebook'] = {'path': row['path'], 'name': row['name']} - return model + raise gen.Return(model) + @gen.coroutine def list_sessions(self): """Returns a list of dictionaries containing all the information from the session database""" @@ -246,14 +260,15 @@ def list_sessions(self): # which messes up the cursor if we're iterating over rows. for row in c.fetchall(): try: - result.append(self.row_to_model(row)) + model = yield gen.maybe_future(self.row_to_model(row)) + result.append(model) except KeyError: pass - return result + raise gen.Return(result) @gen.coroutine def delete_session(self, session_id): """Deletes the row in the session database with given session_id""" - session = self.get_session(session_id=session_id) + session = yield gen.maybe_future(self.get_session(session_id=session_id)) yield gen.maybe_future(self.kernel_manager.shutdown_kernel(session['kernel']['id'])) self.cursor.execute("DELETE FROM session WHERE session_id=?", (session_id,)) diff --git a/notebook/services/sessions/tests/test_sessionmanager.py b/notebook/services/sessions/tests/test_sessionmanager.py index 96847a868a..97331ebf9b 100644 --- a/notebook/services/sessions/tests/test_sessionmanager.py +++ b/notebook/services/sessions/tests/test_sessionmanager.py @@ -62,11 +62,11 @@ def co_add(): def create_session(self, **kwargs): return self.create_sessions(kwargs)[0] - + def test_get_session(self): sm = self.sm session_id = self.create_session(path='/path/to/test.ipynb', kernel_name='bar')['id'] - model = sm.get_session(session_id=session_id) + model = self.loop.run_sync(lambda: sm.get_session(session_id=session_id)) expected = {'id':session_id, 'path': u'/path/to/test.ipynb', 'notebook': {'path': u'/path/to/test.ipynb', 'name': None}, @@ -86,7 +86,8 @@ def test_bad_get_session(self): sm = self.sm session_id = self.create_session(path='/path/to/test.ipynb', kernel_name='foo')['id'] - self.assertRaises(TypeError, sm.get_session, bad_id=session_id) # Bad keyword + with self.assertRaises(TypeError): + self.loop.run_sync(lambda: sm.get_session(bad_id=session_id)) # Bad keyword def test_get_session_dead_kernel(self): sm = self.sm @@ -94,9 +95,9 @@ def test_get_session_dead_kernel(self): # kill the kernel sm.kernel_manager.shutdown_kernel(session['kernel']['id']) with self.assertRaises(KeyError): - sm.get_session(session_id=session['id']) + self.loop.run_sync(lambda: sm.get_session(session_id=session['id'])) # no sessions left - listed = sm.list_sessions() + listed = self.loop.run_sync(lambda: sm.list_sessions()) self.assertEqual(listed, []) def test_list_sessions(self): @@ -107,7 +108,7 @@ def test_list_sessions(self): dict(path='/path/to/3', name='foo', type='console', kernel_name='python'), ) - sessions = sm.list_sessions() + sessions = self.loop.run_sync(lambda: sm.list_sessions()) expected = [ { 'id':sessions[0]['id'], @@ -158,7 +159,7 @@ def test_list_sessions_dead_kernel(self): ) # kill one of the kernels sm.kernel_manager.shutdown_kernel(sessions[0]['kernel']['id']) - listed = sm.list_sessions() + listed = self.loop.run_sync(lambda: sm.list_sessions()) expected = [ { 'id': sessions[1]['id'], @@ -181,8 +182,8 @@ def test_update_session(self): sm = self.sm session_id = self.create_session(path='/path/to/test.ipynb', kernel_name='julia')['id'] - sm.update_session(session_id, path='/path/to/new_name.ipynb') - model = sm.get_session(session_id=session_id) + self.loop.run_sync(lambda: sm.update_session(session_id, path='/path/to/new_name.ipynb')) + model = self.loop.run_sync(lambda: sm.get_session(session_id=session_id)) expected = {'id':session_id, 'path': u'/path/to/new_name.ipynb', 'type': 'notebook', @@ -203,7 +204,8 @@ def test_bad_update_session(self): sm = self.sm session_id = self.create_session(path='/path/to/test.ipynb', kernel_name='ir')['id'] - self.assertRaises(TypeError, sm.update_session, session_id=session_id, bad_kw='test.ipynb') # Bad keyword + with self.assertRaises(TypeError): + self.loop.run_sync(lambda: sm.update_session(session_id=session_id, bad_kw='test.ipynb')) # Bad keyword def test_delete_session(self): sm = self.sm @@ -212,8 +214,8 @@ def test_delete_session(self): dict(path='/path/to/2/test2.ipynb', kernel_name='python'), dict(path='/path/to/3', name='foo', type='console', kernel_name='python'), ) - sm.delete_session(sessions[1]['id']) - new_sessions = sm.list_sessions() + self.loop.run_sync(lambda: sm.delete_session(sessions[1]['id'])) + new_sessions = self.loop.run_sync(lambda: sm.list_sessions()) expected = [{ 'id': sessions[0]['id'], 'path': u'/path/to/1/test1.ipynb', diff --git a/notebook/tests/test_gateway.py b/notebook/tests/test_gateway.py index a007046966..ef3cd7ef56 100644 --- a/notebook/tests/test_gateway.py +++ b/notebook/tests/test_gateway.py @@ -1,4 +1,4 @@ -"""Test Gateway""" +"""Test GatewayClient""" import os import json import uuid @@ -7,7 +7,7 @@ from tornado.httpclient import HTTPRequest, HTTPResponse, HTTPError from traitlets.config import Config from .launchnotebook import NotebookTestBase -from notebook.gateway.managers import Gateway +from notebook.gateway.managers import GatewayClient try: from unittest.mock import patch, Mock @@ -25,12 +25,12 @@ def generate_kernelspec(name): argv_stanza = ['python', '-m', 'ipykernel_launcher', '-f', '{connection_file}'] spec_stanza = {'spec': {'argv': argv_stanza, 'env': {}, 'display_name': name, 'language': 'python', 'interrupt_mode': 'signal', 'metadata': {}}} - kernelspec_stanza = {name: {'name': name, 'spec': spec_stanza, 'resources': {}}} + kernelspec_stanza = {'name': name, 'spec': spec_stanza, 'resources': {}} return kernelspec_stanza # We'll mock up two kernelspecs - kspec_foo and kspec_bar -kernelspecs = {'kernelspecs': {'kspec_foo': generate_kernelspec('kspec_foo'), 'kspec_bar': generate_kernelspec('kspec_bar')}} +kernelspecs = {'default': 'kspec_foo', 'kernelspecs': {'kspec_foo': generate_kernelspec('kspec_foo'), 'kspec_bar': generate_kernelspec('kspec_bar')}} # maintain a dictionary of expected running kernels. Key = kernel_id, Value = model. @@ -46,7 +46,7 @@ def generate_model(name): @gen.coroutine -def mock_fetch_gateway(url, **kwargs): +def mock_gateway_request(url, **kwargs): method = 'GET' if kwargs['method']: method = kwargs['method'] @@ -133,7 +133,7 @@ def mock_fetch_gateway(url, **kwargs): raise HTTPError(404, message='Kernel does not exist: %s' % requested_kernel_id) -mocked_gateway = patch('notebook.gateway.managers.fetch_gateway', mock_fetch_gateway) +mocked_gateway = patch('notebook.gateway.managers.gateway_request', mock_gateway_request) class TestGateway(NotebookTestBase): @@ -143,12 +143,12 @@ class TestGateway(NotebookTestBase): @classmethod def setup_class(cls): - Gateway.clear_instance() + GatewayClient.clear_instance() super(TestGateway, cls).setup_class() @classmethod def teardown_class(cls): - Gateway.clear_instance() + GatewayClient.clear_instance() super(TestGateway, cls).teardown_class() @classmethod @@ -161,7 +161,7 @@ def get_patch_env(cls): @classmethod def get_argv(cls): argv = super(TestGateway, cls).get_argv() - argv.extend(['--Gateway.connect_timeout=44.4', '--Gateway.http_user=' + TestGateway.mock_http_user]) + argv.extend(['--GatewayClient.connect_timeout=44.4', '--GatewayClient.http_user=' + TestGateway.mock_http_user]) return argv def test_gateway_options(self): @@ -185,15 +185,14 @@ def test_gateway_get_kernelspecs(self): content = json.loads(response.content.decode('utf-8'), encoding='utf-8') kspecs = content.get('kernelspecs') self.assertEqual(len(kspecs), 2) - self.assertEqual(kspecs.get('kspec_bar').get('kspec_bar').get('name'), 'kspec_bar') + self.assertEqual(kspecs.get('kspec_bar').get('name'), 'kspec_bar') def test_gateway_get_named_kernelspec(self): # Validate that a specific kernelspec can be retrieved from gateway. with mocked_gateway: response = self.request('GET', '/api/kernelspecs/kspec_foo') self.assertEqual(response.status_code, 200) - content = json.loads(response.content.decode('utf-8'), encoding='utf-8') - kspec_foo = content.get('kspec_foo') + kspec_foo = json.loads(response.content.decode('utf-8'), encoding='utf-8') self.assertEqual(kspec_foo.get('name'), 'kspec_foo') response = self.request('GET', '/api/kernelspecs/no_such_spec')