Skip to content

Commit

Permalink
Clean up file
Browse files Browse the repository at this point in the history
  • Loading branch information
philippjfr committed Mar 7, 2023
1 parent 01759f1 commit eb74896
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 217 deletions.
231 changes: 16 additions & 215 deletions doc/how_to/performance/dask.md
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
# Scaling Panel apps with Dask

By combining Panel and Dask you can **scale your apps to bigger datasets, bigger calculations and more users**.
This guide demonstrates how you can use Panel together with Dask to **scale your apps to bigger datasets, bigger calculations and more users**.

---

Compared to other data app frameworks Panel is novel in that it supports `async`. This means you can easily **offload large computations to your Dask cluster and keep your app responsive while you `await` the results**. This makes Panel and Dask a great combination.

Please note that some of the tools you can use with Panel have built in support for Dask.
Also note that some of the tools you can use with Panel have built in support for Dask:

- You can use `hvPlot` with Dask Dataframes simply by running `import hvplot.dask`.
- You can use `Holoviews` with Dask Dataframes and Arrays datasets.

Also some of the inbuilt analytics tools in Dask are built on Bokeh and can therefore be easily embedded in your Panel app.

## Installation

Lets start by installing *Panel*, *hvPlot* and *Dask Distributed*.
Expand All @@ -17,81 +21,22 @@ Lets start by installing *Panel*, *hvPlot* and *Dask Distributed*.
pip install panel hvplot dask[distributed]
```

## Local Configuration (Optional)

When you import the Dask `Client` in your Panel app it will change the log level of the Bokeh/ Panel server. Thus if you want to see logs, you will have to configure them.

One way to configure the log level is by adding or updating the file ~/.config/dask/config.yaml

```yaml
# config.yaml
logging:
distributed: info
distributed.client: info
bokeh: info
```
If you are behind a reverse proxy you will also have to set the *[Dask Daskboard](https://docs.dask.org/en/stable/dashboard.html) link*. For example
the author of this article is working on a Jupyterhub and needs to add
```yaml
distributed:
dashboard:
# This updates the dash dashboard links to proxy through jupyter-server-proxy
link: /NAMESPACE/user/{JUPYTERHUB_USER}/proxy/{port}/status
```
## Tasks

The Dask `Client` will pickle any *tasks* and send them to the Dask `Cluster` for execution. This means that the `Client` and `Cluster` must able to import the same versions of all *tasks* and python package dependencies.

Lets put some *fibonacci* tasks in a shared `tasks.py` file

```python
# tasks.py
from datetime import datetime as dt
import numpy as np
The Dask `Client` will serialize any *tasks* and send them to the Dask `Cluster` for execution. This means that the `Client` and `Cluster` must able to import the same versions of all *tasks* and python package dependencies.

def _fib(n):
if n < 2:
return n
else:
return _fib(n - 1) + _fib(n - 2)
Let's start by defining some *fibonacci* tasks in a shared `tasks.py` file:

def fibonacci(n):
start = dt.now()
print(start, "start", n)
result = _fib(n)
end = dt.now()
print(end, "end", (end-start).seconds, n, result)
return result
```
.. literalinclude:: ./dask/tasks.py

## Local Dask Cluster

For development, testing and simple use cases a `LocalCluster` is more than fine.

To avoid any issues when combining Panel and Dask we recommend starting the `LocalCluster`
For development, testing and simple use cases a `LocalCluster` is more than fine and will allow you to leverage all the CPUs on your machine. When you want to scale out to an entire cluster will you can switch to a non-local cluster. To avoid any issues when combining Panel and Dask we recommend starting the `LocalCluster`
seperately from the Dask `Client` and your Panel app.

You can do this by creating the file `cluster.py`

```python
# cluster.py
from dask.distributed import LocalCluster
You can do this by creating the file `cluster.py` that looks like this:

DASK_SCHEDULER_PORT = 64719
DASK_SCHEDULER_ADDRESS = f"tcp://127.0.0.1:{DASK_SCHEDULER_PORT}"
N_WORKERS = 4
if __name__ == '__main__':
cluster = LocalCluster(scheduler_port=DASK_SCHEDULER_PORT, n_workers=N_WORKERS)
print(cluster.scheduler_address)
input()
```
.. literalinclude:: ./dask/cluster.py

and running

Expand All @@ -102,9 +47,7 @@ tcp://127.0.0.1:64719

You can now open the [Dask Dashboard](https://docs.dask.org/en/stable/dashboard.html) at http://localhost:8787/status.

The author is behind a reverse proxy on a JupyterHub and can open it at https://DOMAIN/NAMESPACE/user/USER_NAME/proxy/8787/status

So far there is not a lot to see
So far there is not a lot to see here:

![Empty Dask Dashboard](https://user-images.githubusercontent.com/42288570/209457963-1e62cfe0-135a-45b6-844f-18a9a94f3ca4.jpg)

Expand All @@ -114,81 +57,7 @@ It can be very usefull to include some of the live [Dask endpoints](https://dist

In the `dashboard.py` file we define the `DaskViewer` component that can be used to explore the *individual dask plots*.

```python
# dashboard.py
import os
import param
import panel as pn
DASK_DASHBOARD_ADDRESS = os.getenv("DASK_DASHBOARD", "http://localhost:8787/status")
VIEWS = {
"aggregate-time-per-action": "individual-aggregate-time-per-action",
"bandwidth-types": "individual-bandwidth-types",
"bandwidth-workers": "individual-bandwidth-workers",
"cluster-memory": "individual-cluster-memory",
"compute-time-per-key": "individual-compute-time-per-key",
"cpu": "individual-cpu",
"exceptions": "individual-exceptions",
"gpu-memory": "individual-gpu-memory",
"gpu-utilization": "individual-gpu-utilization",
"graph": "individual-graph",
"groups": "individual-groups",
"memory-by-key": "individual-memory-by-key",
"nprocessing": "individual-nprocessing",
"occupancy": "individual-occupancy",
"profile-server": "individual-profile-server",
"profile": "individual-profile",
"progress": "individual-progress",
"scheduler-system": "individual-scheduler-system",
"task-stream": "individual-task-stream",
"workers-cpu-timeseries": "individual-workers-cpu-timeseries",
"workers-disk-timeseries": "individual-workers-disk-timeseries",
"workers-disk": "individual-workers-disk",
"workers-memory-timeseries": "individual-workers-memory-timeseries",
"workers-memory": "individual-workers-memory",
"workers-network-timeseries": "individual-workers-network-timeseries",
"workers-network": "individual-workers-network",
"workers": "individual-workers",
}
VIEWER_PARAMETERS = ["url", "path"]
def to_iframe(url):
return f"""<iframe src="{url}" frameBorder="0" style="height:100%;width:100%"></iframe>"""
class DaskViewer(pn.viewable.Viewer):
url = param.String(DASK_DASHBOARD_ADDRESS, doc="The url of the Dask status dashboard")
path = param.Selector(default="individual-cpu", objects=VIEWS, doc="the endpoint", label="View")
def __init__(self, size=20, **params):
viewer_params = {k:v for k, v in params.items() if k in VIEWER_PARAMETERS}
layout_params = {k:v for k, v in params.items() if k not in VIEWER_PARAMETERS}
self._iframe = pn.pane.HTML(sizing_mode="stretch_both")
super().__init__(**viewer_params)
self._select = pn.widgets.Select.from_param(self.param.path, size=size, width=300, sizing_mode="fixed", margin=(20,5,10,5))
self._link = pn.panel(f"""<a href="{DASK_DASHBOARD_ADDRESS}" target="_blank">Dask Dashboard</a>""", height=50, margin=(0,20))
self._panel = pn.Column(pn.Row(self._iframe, self._select, sizing_mode="stretch_both"), self._link, **layout_params)
@pn.depends("url", "path", watch=True, on_init=True)
def _update_panel(self):
url = self.url.replace("/status", "/") + self.path
self._iframe.object = to_iframe(url)
def __panel__(self):
return self._panel
if __name__.startswith("bokeh"):
pn.extension(sizing_mode="stretch_width")
DaskViewer(height=500, size=25).servable()
```
.. literalinclude:: ./dask/dashboard.py

Try running `panel serve dashboard.py`. If your Dask cluster is working, you will see something like

Expand Down Expand Up @@ -222,78 +91,10 @@ fib_n = await client.submit(fibonacci, n)

Lets now define the full `app.py` file.

```python
# app.py
from datetime import datetime as dt
import param
from dashboard import DASK_DASHBOARD_ADDRESS, DaskViewer
from dask.distributed import Client
import panel as pn
from cluster import DASK_SCHEDULER_ADDRESS
from tasks import fibonacci
QUEUE = []
pn.extension("terminal", sizing_mode="stretch_width")
async def get_client():
if not "dask-client" in pn.state.cache:
pn.state.cache["dask-client"] = await Client(
DASK_SCHEDULER_ADDRESS, asynchronous=True
)
return pn.state.cache["dask-client"]
n_input = pn.widgets.IntInput(value=0, width=100, sizing_mode="fixed", name="n")
submit_button = pn.widgets.Button(name="Submit", button_type="primary", align="end")
queue_widget = pn.widgets.LiteralInput(value=QUEUE, name="In Progress", disabled=True)
terminal_widget = pn.widgets.Terminal(
height=200,
)
dask_viewer = DaskViewer(height=500)
@pn.depends(submit_button, watch=True)
async def _handle_click(_):
n = n_input.value
n_input.value += 1
start = dt.now()
QUEUE.append(n)
queue_widget.value = QUEUE.copy()
client = await get_client()
fib_n = await client.submit(fibonacci, n)
end = dt.now()
QUEUE.pop(QUEUE.index(n))
queue_widget.value = QUEUE.copy()
duration = (end - start).seconds
terminal_widget.write(f"fibonacci({n})={fib_n} in {duration}sec\n")
component = pn.Column(
"## Tasks",
pn.Row(n_input, submit_button),
queue_widget,
"## Results",
terminal_widget,
"## Dask Dashboard",
dask_viewer,
)
pn.template.FastListTemplate(
site="Panel + Dask", title="Async background tasks", main=[component]
).servable()
```
.. literalinclude:: ./dask/app.py

You can now run `panel serve app.py` and the app will look like

![Panel Dask Async Tasks](https://user-images.githubusercontent.com/42288570/209510282-e98f048a-0ad3-4ede-a6c4-c6eef9d11fa6.gif).

You might find additional inspiration in [Panel - Async and Concurrency](../../user_guide/Async_and_Concurrency.ipynb), [Dask - Async/Await and Non-Blocking Execution Documentation](https://examples.dask.org/applications/async-await.html#Async/Await-and-Non-Blocking-Execution) and [Dask - Async Web Server](https://examples.dask.org/applications/async-web-server.html).
You might find additional inspiration in [Panel - Async and Concurrency](../../user_guide/Async_and_Concurrency.ipynb), [Dask - Async/Await and Non-Blocking Execution Documentation](https://examples.dask.org/applications/async-await.html#Async/Await-and-Non-Blocking-Execution) and [Dask - Async Web Server](https://examples.dask.org/applications/async-web-server.html).
4 changes: 2 additions & 2 deletions doc/how_to/performance/dask/dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ def __init__(self, size=20, **params):
layout_params = {k:v for k, v in params.items() if k not in VIEWER_PARAMETERS}

self._iframe = pn.pane.HTML(sizing_mode="stretch_both")

super().__init__(**viewer_params)

self._select = pn.widgets.Select.from_param(self.param.path, size=size, width=300, sizing_mode="fixed", margin=(20,5,10,5))
Expand All @@ -70,4 +70,4 @@ def __panel__(self):
if __name__.startswith("bokeh"):
pn.extension(sizing_mode="stretch_width")

DaskViewer(height=500, size=25).servable()
DaskViewer(height=500, size=25).servable()

0 comments on commit eb74896

Please sign in to comment.