Panel webapp with dask/bokeh/tornado and asynchronous tasks

Hi @tivincent, I’ve had this problem again recently where I want to submit tasks to dask and update the GUI when the results come in.
In the topic you mention I posted an example where I said “this should work with Dask as well”, but that doesnt seem to be the case since the futures returned from dask’ client.submit are different from tornano/asyncio futures and thus you cannot use loop.add_future. One workaround I’ve tried is to wrap the future in asyncio.ensure_future but I didn’t get very far in this.

I’ve made a new example with Dask which is what I’m using right now. Instead of using add_next_tick_callback or pn.io.unlocked I’m using a sort of queue for futures which are checked from a periodic callback for completion. If they are done the results is sent to a callback which updates the graphs. The advantage of this approach is that updating the graphs is done from the same thread as the one which runs the main GUI, and this avoids the needs of documents locks and will prevent major headaches. The disadvantage is that you have a periodic callback running permanently which I’ve seen in some cases can cause high CPU usage.

Here is the example code. There are two files, one starts the Dask scheduler, the other the panel app.
In the panel app I’ve made two examples of how to stream data to graphs, either via bokeh and ColumnDataSource or holoviews using DynamicMap and and Buffer.

In cluster.py:


from dask.distributed import LocalCluster
import numpy as np
import time

SCHEDULER_PORT = 64719


def blocking_function(uuid):
    """This function executes some kind of long calculation on a Dask cluster"""

    duration = np.random.uniform(0.5, 2.)
    time.sleep(duration)

    N = np.random.randint(200, 1000)
    x0 = np.random.random() * 5
    xs = 0.1#np.random.poisson()
    y0 = np.random.random() * 5
    ys = 0.1#np.random.poisson()

    x = np.random.normal(x0, xs, N)
    y = np.random.normal(y0, ys, N)

    return x, y



if __name__ == '__main__':
    cluster = LocalCluster(scheduler_port=SCHEDULER_PORT, n_workers=4)
    print(cluster.scheduler_address)
    input()

Run this first (and keep running) to start the dask scheduler. I’ve also put the blocking_function there which cannot be in the same file as the web app/panel code.

To create the app (dask_example.py):


import holoviews as hv
from holoviews.streams import Buffer
import pandas as pd
import panel as pn
import numpy as np
from bokeh.plotting import figure
from bokeh.models import ColumnDataSource
from matplotlib.cm import get_cmap
from matplotlib.colors import to_hex
import param
from dask.distributed import Client
import uuid

from cluster import blocking_function, SCHEDULER_PORT
pn.extension(sizing_mode="stretch_width")


class DaskExample(param.Parameterized):
    num_jobs = param.Integer(default=5, bounds=(1, None), label='Number of jobs')
    submit = param.Action(lambda self: self._action_submit())

    def __init__(self, client, **params):
        super().__init__(**params)
        self.client = client
        self.future_queue = []
        self.progress = pn.widgets.Progress(sizing_mode='stretch_width', height=20, active=False, value=0)

        data = pd.DataFrame({'x': [], 'y': [], 'count': []}, columns=['x', 'y', 'count'])
        self.buffer = Buffer(data=data, length=int(1e6), index=False)
        self.source = ColumnDataSource(data={'x': [], 'y': [], 'color': []})

    def check_futures(self):
        if self.future_queue:
            for future, callback in self.future_queue[:]:
                if future.status == 'finished':
                    callback(future)
                    self.future_queue.remove((future, callback))

    def start(self):
        refresh_rate = 25
        pn.state.add_periodic_callback(
            self.check_futures, refresh_rate
        )

    def callback(self, future):
        x, y = future.result()
        cmap = get_cmap('jet')
        color = to_hex(cmap(np.random.random()))
        colors = [color] * len(x)
        count = np.ones_like(x) * self.progress.value

        data = pd.DataFrame({'x': x, 'y': y, 'count': count}, columns=['x', 'y', 'count'])
        self.buffer.send(data)
        self.source.stream({'x': x, 'y': y, 'color': colors})

        self.progress.value += 1

        if self.progress.value == self.progress.max:
            self.progress.value = 0
            self.progress.active = False

    @property
    def hv_stream(self):
        scatter = hv.DynamicMap(hv.Scatter, streams=[self.buffer])
        scatter.opts(color='count', xlim=(0, 5), ylim=(0, 5), cmap='jet', responsive=True)

        return scatter

    @property
    def bk_stream(self):
        fig = figure()
        fig.scatter(x='x', y='y', color='color', source=self.source, line_color=None)

        return fig

    def _action_submit(self):
        self.source.data = {'x': [], 'y': [], 'color': []}
        self.buffer.clear()
        self.progress.value = 0
        self.progress.max = self.num_jobs
        self.progress.active = True
        for i in range(self.num_jobs):
            future = self.client.submit(blocking_function, uuid.uuid4())
            entry = (future, self.callback)
            self.future_queue.append(entry)


client = Client(f'tcp://127.0.0.1:{SCHEDULER_PORT}')
dask = DaskExample(client=client)

sidebar = pn.Param(dask)
col = pn.Column(*sidebar, dask.progress)

template = pn.template.FastGridTemplate(
    title='FastGridTemplate',
    sidebar=col)

template.main[:2, :6] = pn.pane.HoloViews(dask.hv_stream, sizing_mode='stretch_both')
template.main[:2, 6:] = pn.pane.Bokeh(dask.bk_stream, sizing_mode='stretch_both')

pn.state.onload(dask.start)

template.servable()

Run with panel serve dask_example.py

3 Likes