Panel webapp with dask/bokeh/tornado and asynchronous tasks

Hello !
I am developing a web app using panel/bokeh/tornado with dask/xarray.
This app is hosted on a webserver and several users could use it at the same time.
The aim is to visualize very big netcdf files with a friendly interface: time-series, lat/lon maps, etc…
The app is launched with “panel serve”.

Sometimes, it is quite long to load the data from big netcdf files (ie several minutes), using dask.compute or dask.load.
During these calls, the whole app is “freeze” for all the users.
I would like that during loading tasks, the whole app is still working for all users (with the possibility to have another loading tasks, or different plot tasks, etc…)

I found that one way to handle with this is to start ‘panel serve’ with several procs (num-proc=X),X is the different possible user.
However, I am searching a better solution in order to load the data asynchronously.
I have read several topics on it (can-i-load-data-asynchronously-in-panel
But I could not understand what it is the best way to do it.

I am lost between the possibility :

  • tornado.gen and ThreadPoolExecutor, @without_document_lock
  • dask client and await/async
  • using add_next_tick_callback or not
  • using panel.io.unlocked or not

What is the right way to do this ?

Thanks a lot for any help !
Tivincent

1 Like

I would not know the right way. But I do know.

The documentation now contains Async and Concurrency — Panel 0.11.3 documentation (holoviz.org).

If it was me I would start out trying the dask client and await/ async . Then I would use the Loading indicator — Panel 0.11.3 documentation (holoviz.org)

Hi @tivincent

Would it be possible to create a very small reference example? Maybe something that downloads a netcfd file from the internet and displays it. Then different approaches could be tried out and a recommendation found.

Hello @Marc !

Thanks for the answer !
Well, I would like to share the app, but for the moment, it will not be possible (I am using a python module in development). However, I could create a small and ligth app, I will try to do it quicly !

1 Like

Hi @tivincent, I’ve had this problem again recently where I want to submit tasks to dask and update the GUI when the results come in.
In the topic you mention I posted an example where I said “this should work with Dask as well”, but that doesnt seem to be the case since the futures returned from dask’ client.submit are different from tornano/asyncio futures and thus you cannot use loop.add_future. One workaround I’ve tried is to wrap the future in asyncio.ensure_future but I didn’t get very far in this.

I’ve made a new example with Dask which is what I’m using right now. Instead of using add_next_tick_callback or pn.io.unlocked I’m using a sort of queue for futures which are checked from a periodic callback for completion. If they are done the results is sent to a callback which updates the graphs. The advantage of this approach is that updating the graphs is done from the same thread as the one which runs the main GUI, and this avoids the needs of documents locks and will prevent major headaches. The disadvantage is that you have a periodic callback running permanently which I’ve seen in some cases can cause high CPU usage.

Here is the example code. There are two files, one starts the Dask scheduler, the other the panel app.
In the panel app I’ve made two examples of how to stream data to graphs, either via bokeh and ColumnDataSource or holoviews using DynamicMap and and Buffer.

In cluster.py:


from dask.distributed import LocalCluster
import numpy as np
import time

SCHEDULER_PORT = 64719


def blocking_function(uuid):
    """This function executes some kind of long calculation on a Dask cluster"""

    duration = np.random.uniform(0.5, 2.)
    time.sleep(duration)

    N = np.random.randint(200, 1000)
    x0 = np.random.random() * 5
    xs = 0.1#np.random.poisson()
    y0 = np.random.random() * 5
    ys = 0.1#np.random.poisson()

    x = np.random.normal(x0, xs, N)
    y = np.random.normal(y0, ys, N)

    return x, y



if __name__ == '__main__':
    cluster = LocalCluster(scheduler_port=SCHEDULER_PORT, n_workers=4)
    print(cluster.scheduler_address)
    input()

Run this first (and keep running) to start the dask scheduler. I’ve also put the blocking_function there which cannot be in the same file as the web app/panel code.

To create the app (dask_example.py):


import holoviews as hv
from holoviews.streams import Buffer
import pandas as pd
import panel as pn
import numpy as np
from bokeh.plotting import figure
from bokeh.models import ColumnDataSource
from matplotlib.cm import get_cmap
from matplotlib.colors import to_hex
import param
from dask.distributed import Client
import uuid

from cluster import blocking_function, SCHEDULER_PORT
pn.extension(sizing_mode="stretch_width")


class DaskExample(param.Parameterized):
    num_jobs = param.Integer(default=5, bounds=(1, None), label='Number of jobs')
    submit = param.Action(lambda self: self._action_submit())

    def __init__(self, client, **params):
        super().__init__(**params)
        self.client = client
        self.future_queue = []
        self.progress = pn.widgets.Progress(sizing_mode='stretch_width', height=20, active=False, value=0)

        data = pd.DataFrame({'x': [], 'y': [], 'count': []}, columns=['x', 'y', 'count'])
        self.buffer = Buffer(data=data, length=int(1e6), index=False)
        self.source = ColumnDataSource(data={'x': [], 'y': [], 'color': []})

    def check_futures(self):
        if self.future_queue:
            for future, callback in self.future_queue[:]:
                if future.status == 'finished':
                    callback(future)
                    self.future_queue.remove((future, callback))

    def start(self):
        refresh_rate = 25
        pn.state.add_periodic_callback(
            self.check_futures, refresh_rate
        )

    def callback(self, future):
        x, y = future.result()
        cmap = get_cmap('jet')
        color = to_hex(cmap(np.random.random()))
        colors = [color] * len(x)
        count = np.ones_like(x) * self.progress.value

        data = pd.DataFrame({'x': x, 'y': y, 'count': count}, columns=['x', 'y', 'count'])
        self.buffer.send(data)
        self.source.stream({'x': x, 'y': y, 'color': colors})

        self.progress.value += 1

        if self.progress.value == self.progress.max:
            self.progress.value = 0
            self.progress.active = False

    @property
    def hv_stream(self):
        scatter = hv.DynamicMap(hv.Scatter, streams=[self.buffer])
        scatter.opts(color='count', xlim=(0, 5), ylim=(0, 5), cmap='jet', responsive=True)

        return scatter

    @property
    def bk_stream(self):
        fig = figure()
        fig.scatter(x='x', y='y', color='color', source=self.source, line_color=None)

        return fig

    def _action_submit(self):
        self.source.data = {'x': [], 'y': [], 'color': []}
        self.buffer.clear()
        self.progress.value = 0
        self.progress.max = self.num_jobs
        self.progress.active = True
        for i in range(self.num_jobs):
            future = self.client.submit(blocking_function, uuid.uuid4())
            entry = (future, self.callback)
            self.future_queue.append(entry)


client = Client(f'tcp://127.0.0.1:{SCHEDULER_PORT}')
dask = DaskExample(client=client)

sidebar = pn.Param(dask)
col = pn.Column(*sidebar, dask.progress)

template = pn.template.FastGridTemplate(
    title='FastGridTemplate',
    sidebar=col)

template.main[:2, :6] = pn.pane.HoloViews(dask.hv_stream, sizing_mode='stretch_both')
template.main[:2, 6:] = pn.pane.Bokeh(dask.bk_stream, sizing_mode='stretch_both')

pn.state.onload(dask.start)

template.servable()

Run with panel serve dask_example.py

3 Likes

I wonder if there’s a way for a built-in panel decorator like @pn.state.run_background tied to the root tornado/asyncio stuff and when the function is triggered, it submits the job as a background process (like updating WMTS tiles) and updates upon completion.

1 Like