I’m now trying to make the above example work with Bokeh figures, but I’m running into some issues. I modified the example above to include a bokeh figure and I want the outputs of the async tasks to be plotted in the bokeh figure upon completion.
To update the bokeh document safely (see bokeh docs Updating From Threads and Unlocked Callbacks), the callback that updates the bokeh document should be scheduled via add_next_tick_callback
.
If I dont do this, and instead call the callback cb
directly, I get the error:
raise RuntimeError("_pending_writes should be non-None when we have a document lock, and we should have the lock when the document changes")
RuntimeError: _pending_writes should be non-None when we have a document lock, and we should have the lock when the document changes
However, when I do use add_next_tick_callback
, nothing happens and it seems that the callback isnt called.
Is this because panel overrides bokeh behaviour related to the event loop? Is there a workaround for this?
If you want to run the code below you need to run a dask LocalCluster and update the ip:port value for cluster
.
import panel as pn
import param
import time
import numpy as np
from panel.io import unlocked
from tornado.ioloop import IOLoop
from dask.distributed import Client, as_completed
from bokeh.plotting import figure, curdoc
from bokeh.models import ColumnDataSource
from functools import partial
class ASyncProgressBar(param.Parameterized):
completed = param.Integer(default=0)
num_tasks = param.Integer(default=10, bounds=(1, None))
async def run(self, futures):
async for task in as_completed(futures):
with unlocked():
self.completed += 1
@property
def value(self):
return int(100 * (self.completed / self.num_tasks))
def reset(self):
self.completed = 0
@param.depends('completed', 'num_tasks')
def view(self):
if self.value:
return pn.widgets.Progress(active=True, value=self.value, align="center", sizing_mode="stretch_width")
else:
return None
def task_func(arg):
"""This is a normal blocking function which we want to call multiple times"""
time.sleep(np.random.randint(1, 2))
return arg
class ASyncExample(param.Parameterized):
select = param.Selector(objects=range(10))
slider = param.Number(2, bounds=(0, 10))
text = param.String()
do_stuff = param.Action(lambda self: self._do_calc())
normal_stuff = param.Action(lambda self: self._do_normal())
result = param.Number(0)
def __init__(self, cluster, **params):
super(ASyncExample, self).__init__(**params)
self.async_pbar = ASyncProgressBar()
self.cluster = cluster
self.doc = curdoc()
self.source = ColumnDataSource({'x': range(10), 'y': range(10)})
self.figure = figure()
self.figure.line(x='x', y='y', source=self.source)
self.bk_pane = pn.pane.Bokeh(self.figure)
self.col = pn.Column(
pn.pane.Markdown("## Starts async background process"),
pn.Param(
self,
parameters=["do_stuff", 'normal_stuff', "result"],
widgets={"result": {"disabled": True}, "do_stuff": {"button_type": "success"}},
show_name=False,
),
self.async_pbar.view,
pn.layout.Divider(),
pn.pane.Markdown("## Works while background process is running"),
pn.Param(
self,
parameters=["select", "slider", "text"],
widgets={"text": {"disabled": True}},
show_name=False,
),
self.bk_pane,
max_width=500,
)
@param.depends("slider", "select", watch=True)
def _on_slider_change(self):
# This functions does some other python code which we want to keep responsive
if self.select:
select = self.select
else:
select = 0
self.text = str(self.slider + select)
def _do_normal(self):
"updates the bokeh graph normally"
self.update_source(list(np.random.randint(0, 2**31, 10)))
def update_source(self, data):
self.source.data.update({'y': data})
async def overall_func(self, num_tasks):
"""
This function asychronously sends out 10 tasks, which are sent to the progress bar to report on their completion
Next, in a second step, the futures are combined in the final calculation and the result sent to the results field.
"""
client = await Client(self.cluster, asynchronous=True)
futures = client.map(task_func, np.random.randint(0, 2**31, num_tasks))
await self.async_pbar.run(futures)
result = await client.submit(np.mean, futures)
data = await client.gather(futures)
cb = partial(self.update_source, data)
self.doc.add_next_tick_callback(cb)
with unlocked():
self.result = result
self.param['do_stuff'].constant = False
self.async_pbar.reset()
def _do_calc(self):
self.doc = curdoc() # save the doc state here in the main thread
self.param['do_stuff'].constant = True
num_tasks = 10
self.async_pbar.num_tasks = num_tasks
loop = IOLoop.current()
loop.add_callback(self.overall_func, num_tasks)
def panel(self):
return self.col
# Path to dask localcluster, can be started from a seperate python console:
# >>> from dask.distributed import LocalCluster
# >>> cluster = LocalCluster()
# >>> cluster
# This returns the IP:port to use
cluster = '<ip:port for cluster>'
pn.config.sizing_mode = "stretch_width"
async_example = ASyncExample(cluster)
pn.serve(async_example.panel())
Update0: In jupyter notebooks, I can call the callback directly (cb()
), and everything works fine.
However, when I run the app with .servable()
and panel serve
from the console, dask
has problems properly pickling the functions and distributing them.
Update1: I’ve updated my example above, which now does work.
The secret was to save a reference of the bokeh document right before starting the async
coroutines in another thread rather then in the __init__
.
Update2: With respect to serving an app using panel serve
in the console. In this case Bokeh creates new module objects every time a user session is initiated, therefore pickle
is unable to find the module where task_func
is located. If you want to use panel serve
with this example you need to move the task_func
to a separate module.
See also here: