Can i load data asynchronously in Panel?

I’m now trying to make the above example work with Bokeh figures, but I’m running into some issues. I modified the example above to include a bokeh figure and I want the outputs of the async tasks to be plotted in the bokeh figure upon completion.

To update the bokeh document safely (see bokeh docs Updating From Threads and Unlocked Callbacks), the callback that updates the bokeh document should be scheduled via add_next_tick_callback.

If I dont do this, and instead call the callback cb directly, I get the error:

raise RuntimeError("_pending_writes should be non-None when we have a document lock, and we should have the lock when the document changes")
RuntimeError: _pending_writes should be non-None when we have a document lock, and we should have the lock when the document changes

However, when I do use add_next_tick_callback, nothing happens and it seems that the callback isnt called.
Is this because panel overrides bokeh behaviour related to the event loop? Is there a workaround for this?

If you want to run the code below you need to run a dask LocalCluster and update the ip:port value for cluster.

import panel as pn
import param
import time
import numpy as np
from panel.io import unlocked
from tornado.ioloop import IOLoop
from dask.distributed import Client, as_completed
from bokeh.plotting import figure, curdoc
from bokeh.models import ColumnDataSource
from functools import partial

class ASyncProgressBar(param.Parameterized):
    completed = param.Integer(default=0)
    num_tasks = param.Integer(default=10, bounds=(1, None))

    async def run(self, futures):
        async for task in as_completed(futures):
            with unlocked():
                self.completed += 1

    @property
    def value(self):
        return int(100 * (self.completed / self.num_tasks))

    def reset(self):
        self.completed = 0

    @param.depends('completed', 'num_tasks')
    def view(self):
        if self.value:
            return pn.widgets.Progress(active=True, value=self.value, align="center", sizing_mode="stretch_width")
        else:
            return None


def task_func(arg):
    """This is a normal blocking function which we want to call multiple times"""
    time.sleep(np.random.randint(1, 2))
    return arg


class ASyncExample(param.Parameterized):
    select = param.Selector(objects=range(10))
    slider = param.Number(2, bounds=(0, 10))
    text = param.String()

    do_stuff = param.Action(lambda self: self._do_calc())
    normal_stuff = param.Action(lambda self: self._do_normal())
    result = param.Number(0)

    def __init__(self, cluster, **params):
        super(ASyncExample, self).__init__(**params)
        self.async_pbar = ASyncProgressBar()
        self.cluster = cluster

        self.doc = curdoc()
        self.source = ColumnDataSource({'x': range(10), 'y': range(10)})
        self.figure = figure()
        self.figure.line(x='x', y='y', source=self.source)
        self.bk_pane = pn.pane.Bokeh(self.figure)

        self.col = pn.Column(
            pn.pane.Markdown("## Starts async background process"),
            pn.Param(
                self,
                parameters=["do_stuff", 'normal_stuff', "result"],
                widgets={"result": {"disabled": True}, "do_stuff": {"button_type": "success"}},
                show_name=False,
            ),
            self.async_pbar.view,
            pn.layout.Divider(),
            pn.pane.Markdown("## Works while background process is running"),
            pn.Param(
                self,
                parameters=["select", "slider", "text"],
                widgets={"text": {"disabled": True}},
                show_name=False,
            ),
            self.bk_pane,
            max_width=500,
        )

    @param.depends("slider", "select", watch=True)
    def _on_slider_change(self):
        # This functions does some other python code which we want to keep responsive
        if self.select:
            select = self.select
        else:
            select = 0
        self.text = str(self.slider + select)

    def _do_normal(self):
        "updates the bokeh graph normally"
        self.update_source(list(np.random.randint(0, 2**31, 10)))

    def update_source(self, data):
        self.source.data.update({'y': data})

    async def overall_func(self, num_tasks):
        """
        This function asychronously sends out 10 tasks, which are sent to the progress bar to report on their completion
        Next, in a second step, the futures are combined in the final calculation and the result sent to the results field.
        """
        client = await Client(self.cluster, asynchronous=True)

        futures = client.map(task_func, np.random.randint(0, 2**31, num_tasks))
        await self.async_pbar.run(futures)
        result = await client.submit(np.mean, futures)
        data = await client.gather(futures)
        cb = partial(self.update_source, data)
        self.doc.add_next_tick_callback(cb)

        with unlocked():
            self.result = result
            self.param['do_stuff'].constant = False
            self.async_pbar.reset()

    def _do_calc(self):
        self.doc = curdoc() # save the doc state here in the main thread
        self.param['do_stuff'].constant = True
        num_tasks = 10
        self.async_pbar.num_tasks = num_tasks
        loop = IOLoop.current()
        loop.add_callback(self.overall_func, num_tasks)

    def panel(self):
        return self.col


# Path to dask localcluster, can be started from a seperate python console:
# >>> from dask.distributed import LocalCluster
# >>> cluster = LocalCluster()
# >>> cluster
# This returns the IP:port to use

cluster = '<ip:port for cluster>'

pn.config.sizing_mode = "stretch_width"
async_example = ASyncExample(cluster)
pn.serve(async_example.panel())

Update0: In jupyter notebooks, I can call the callback directly (cb()), and everything works fine.
However, when I run the app with .servable() and panel serve from the console, dask has problems properly pickling the functions and distributing them.

Update1: I’ve updated my example above, which now does work.
The secret was to save a reference of the bokeh document right before starting the async coroutines in another thread rather then in the __init__.

Update2: With respect to serving an app using panel serve in the console. In this case Bokeh creates new module objects every time a user session is initiated, therefore pickle is unable to find the module where task_func is located. If you want to use panel serve with this example you need to move the task_func to a separate module.
See also here: