Skip to content

Commit

Permalink
Allow running onload and defer_load tasks on threads (#5865)
Browse files Browse the repository at this point in the history
  • Loading branch information
philippjfr authored Nov 13, 2023
1 parent 8128ca2 commit 7ab311b
Show file tree
Hide file tree
Showing 6 changed files with 120 additions and 25 deletions.
12 changes: 5 additions & 7 deletions doc/how_to/callbacks/defer_load.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,10 @@ import panel as pn

pn.extension(template="bootstrap")


def some_long_running_task():
time.sleep(5)
return "# Wow. That took some time. Are you still here?"


pn.panel(some_long_running_task).servable()
```

Expand All @@ -39,8 +37,7 @@ Now lets learn how to defer long running tasks to after the application has load

## Defer all Tasks

Its easy defer the execution of all bound and displayed functions with
`pn.extension(..., defer_load=True)`.
Its easy defer the execution of all bound and displayed functions with `pn.extension(..., defer_load=True)`.

```python
import time
Expand All @@ -67,20 +64,21 @@ import panel as pn

pn.extension(loading_indicator=True, template="bootstrap")


def short_running_task():
return "# I'm shown on load"


def long_running_task():
time.sleep(3)
return "# I'm deferred and shown after load"


pn.Column(
short_running_task,
pn.panel(long_running_task, defer_load=True, min_height=50, min_width=200),
).servable()
```

![panel-defer-specific-example](https://assets.holoviz.org/panel/gifs/defer_specific_task.gif)

```{note}
If you [enable threading](../concurrency/threading.md) by setting `config.nthreads` or `--num-threads` on the commandline deferred callbacks will be executed concurrently on separate threads.
```
2 changes: 1 addition & 1 deletion doc/how_to/callbacks/load.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ layout.servable()

![panel-onload-example](https://assets.holoviz.org/panel/gifs/onload_callback.gif)

Note that `pn.state.onload` accepts both *sync* and *async* functions.
Note that `pn.state.onload` accepts both *sync* and *async* functions and also accepts a `threaded` argument, which, when combined with [enabling `config.nthreads`](../concurrency/threading.md) will run the callbacks concurrently on separate threads.

This example could also be implemented using a *bound and displayed function*. We recommend using that method together with `defer_load` when possible. See the [Defer Bound and Displayed Functions Guide](defer_load.md).

Expand Down
4 changes: 4 additions & 0 deletions doc/how_to/concurrency/threading.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,7 @@ In a threaded context on the other hand the two clicks will be processed concurr
> Finished processing 1th click.
> Finished processing 2th click.
```

```{note}
Note that the global ThreadPool is used to dispatch events triggered by changes in parameters, events (such as click events), [`defer_load`](../callbacks/defer_load.md) callbacks and optionally [`onload` callbacks](../callbacks/load.md).
```
47 changes: 31 additions & 16 deletions panel/io/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@

import asyncio
import datetime as dt
import functools
import inspect
import logging
import shutil
Expand Down Expand Up @@ -377,24 +376,26 @@ def _schedule_on_load(self, doc: Document, event) -> None:

def _on_load(self, doc: Optional[Document] = None) -> None:
doc = doc or self.curdoc
self._loaded[doc] = True
callbacks = self._onload.pop(doc, [])
if not callbacks:
self._loaded[doc] = True
return

from ..config import config
from .profile import profile_ctx
with set_curdoc(doc):
if (doc and doc in self._launching) or not config.profiler:
for cb in callbacks:
self.execute(cb, schedule=False)
for cb, threaded in callbacks:
self.execute(cb, schedule='thread' if threaded else False)
return

with profile_ctx(config.profiler) as sessions:
for cb in callbacks:
self.execute(cb, schedule=False)
for cb, threaded in callbacks:
self.execute(cb, schedule='thread' if threaded else False)
path = doc.session_context.request.path
self._profiles[(path+':on_load', config.profiler)] += sessions
self.param.trigger('_profiles')
self._loaded[doc] = True

async def _scheduled_cb(self, name: str) -> None:
if name not in self._scheduled:
Expand Down Expand Up @@ -573,10 +574,17 @@ def clear_caches(self):
pass
self._memoize_cache.clear()

def _execute_on_thread(self, doc, callback):
with set_curdoc(doc):
if param.parameterized.iscoroutinefunction(callback):
param.parameterized.async_executor(callback)
else:
self.execute(callback, schedule=False)

def execute(
self,
callback: Callable([], None),
schedule: bool | Literal['auto'] = 'auto'
schedule: bool | Literal['auto', 'thread'] = 'auto'
) -> None:
"""
Executes both synchronous and asynchronous callbacks
Expand All @@ -589,15 +597,20 @@ def execute(
---------
callback: Callable[[], None]
Callback to execute
schedule: boolean | Literal['auto']
Whether to schedule synchronous callback on the event loop
or execute it immediately.
schedule: boolean | Literal['auto', 'thread']
Whether to schedule the callback on the event loop, on a thread
or execute them immediately.
"""
cb = callback
while isinstance(cb, functools.partial):
cb = cb.func
doc = self.curdoc
if param.parameterized.iscoroutinefunction(cb):
if schedule == 'thread':
if not state._thread_pool:
raise RuntimeError(
'Cannot execute callback on thread. Ensure you have '
'enabled threading setting `config.nthreads`.'
)
future = state._thread_pool.submit(partial(self._execute_on_thread, doc, callback))
future.add_done_callback(self._handle_future_exception)
elif param.parameterized.iscoroutinefunction(callback):
param.parameterized.async_executor(callback)
elif doc and doc.session_context and (schedule == True or (schedule == 'auto' and not self._unblocked(doc))):
doc.add_next_tick_callback(self._handle_exception_wrapper(callback))
Expand Down Expand Up @@ -658,14 +671,16 @@ def log(self, msg: str, level: str = 'info') -> None:
msg = LOG_USER_MSG.format(msg=msg)
getattr(_state_logger, level.lower())(msg, *args)

def onload(self, callback: Callable[[], None | Awaitable[None]] | Coroutine[Any, Any, None]):
def onload(self, callback: Callable[[], None | Awaitable[None]] | Coroutine[Any, Any, None], threaded: bool = False):
"""
Callback that is triggered when a session has been served.
Arguments
---------
callback: Callable[[], None] | Coroutine[Any, Any, None]
Callback that is executed when the application is loaded
threaded: bool
Whether the onload callback can be threaded
"""
if self.curdoc is None or self._is_pyodide:
if self._thread_pool:
Expand All @@ -680,7 +695,7 @@ def onload(self, callback: Callable[[], None | Awaitable[None]] | Coroutine[Any,
self.curdoc.on_event('document_ready', partial(self._schedule_on_load, self.curdoc))
except AttributeError:
pass # Document already cleaned up
self._onload[self.curdoc].append(callback)
self._onload[self.curdoc].append((callback, threaded))

def on_session_created(self, callback: Callable[[BokehSessionContext], None]) -> None:
"""
Expand Down
5 changes: 4 additions & 1 deletion panel/param.py
Original file line number Diff line number Diff line change
Expand Up @@ -942,7 +942,10 @@ def _get_model(
if not self._evaled:
deferred = self.defer_load and not state.loaded
if deferred:
state.onload(partial(self._replace_pane, force=True))
state.onload(
partial(self._replace_pane, force=True),
threaded=bool(state._thread_pool)
)
self._replace_pane(force=not deferred)
return super()._get_model(doc, root, parent, comm)

Expand Down
75 changes: 75 additions & 0 deletions panel/tests/test_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from panel.models import HTML as BkHTML
from panel.models.tabulator import TableEditEvent
from panel.pane import Markdown
from panel.param import ParamFunction
from panel.reactive import ReactiveHTML
from panel.template import BootstrapTemplate
from panel.tests.util import serve_and_request, serve_and_wait, wait_until
Expand Down Expand Up @@ -461,6 +462,80 @@ def test_serve_can_serve_bokeh_app_from_file():
assert "/bk-app" in server._tornado.applications


def test_server_thread_pool_on_load(threads, port):
counts = []

def cb(count=[0]):
count[0] += 1
counts.append(count[0])
time.sleep(0.5)
count[0] -= 1

def app():
state.onload(cb, threaded=True)
state.onload(cb, threaded=True)

# Simulate rendering
def loaded():
state._schedule_on_load(state.curdoc, None)
state.execute(loaded, schedule=True)

return 'App'

serve_and_request(app)

# Checks whether onload callback was executed concurrently
wait_until(lambda: len(counts) > 0 and max(counts) > 1)


def test_server_thread_pool_execute(threads, port):
counts = []

def cb(count=[0]):
count[0] += 1
counts.append(count[0])
time.sleep(0.5)
count[0] -= 1

def app():
state.execute(cb, schedule='thread')
state.execute(cb, schedule='thread')
return 'App'

serve_and_request(app)

# Checks whether execute was executed concurrently
wait_until(lambda: len(counts) > 0 and max(counts) > 1)


def test_server_thread_pool_defer_load(threads, port):
counts = []

def cb(count=[0]):
count[0] += 1
counts.append(count[0])
time.sleep(0.5)
value = counts[-1]
count[0] -= 1
return value

def app():
# Simulate rendering
def loaded():
state._schedule_on_load(state.curdoc, None)
state.execute(loaded, schedule=True)

return Row(
ParamFunction(cb, defer_load=True),
ParamFunction(cb, defer_load=True),
)

serve_and_request(app)

# Checks whether defer_load callback was executed concurrently
wait_until(lambda: len(counts) > 0 and max(counts) > 1)


def test_server_thread_pool_change_event(threads, port):
button = Button(name='Click')
button2 = Button(name='Click')
Expand Down

0 comments on commit 7ab311b

Please sign in to comment.