I’m trying to understand how to use coroutines together with an executor
(either dask
asynchronous client or ThreadPoolExecutor
to execute computing jobs in parallel in the background, and update graphs/progress bars as the jobs complete.
I’ve modified and example I used previously here: Panel webapp with dask/bokeh/tornado and asynchronous tasks - #5 by Jhsmit
As in that example I have a hv.DynamicMap
with a hv.streams.Buffer
(buffer
) and a bokeh figure
with a ColumnDataSource
(source
).
The relevant part of the code is (full code below):
class DaskASyncExample(param.Parameterized):
# .-- definition of graphs and buffers etc here
# @pn.io.with_lock # Adding this wrapper breaks updating the DynamicMap
async def stream(self, _id):
future = self.executor.submit(blocking_function, _id)
if isinstance(future, concurrent.futures.Future):
future = wrap_future(future)
result = await future
x, y = result
cmap = get_cmap('jet')
c = np.random.random()
color = to_hex(cmap(c))
colors = [color] * len(x)
value = np.ones_like(x)*c
data = pd.DataFrame({'x': x, 'y': y, 'value': value}, columns=['x', 'y', 'value'])
# Update the hv DynamicMap
self.buffer.send(data)
self.progress.value += 1
if self.progress.value == self.progress.max:
self.progress.value = 0
self.progress.active = False
# Update the bokeh graph (this parts doesn't work and needs pn.io.unlock)
# d = {'x': x, 'y': y, 'color': colors}
# self.source.stream(d)
def _action_submit_async(self):
self._reset()
for i in range(self.num_jobs):
_id = uuid.uuid4()
func = partial(self.stream, _id)
async_executor(func)
print(i, _id)
I’m executing the stream
coroutine with param/panel’s async_executor
. This all works great and I prefer it over my previous approach:
- I dont need to be running a periodic callback to check if any futures are finished.
- I can easily exchange the
executor
from aThreadPoolExecutor
to a dask asynchronous client, which makes local / github actions testing of my app easier - It uses
async
/await
coroutines which I don’t really understand but I think they’re newer and better and potentially integrate better withtornado
/dask
However, the problem is with updating the bokeh figure. Updating the bokeh figure directly with source.stream(data)
requires document lock. When I add the @pn.io.with_lock
decorator around the stream
coroutine the bokeh figure does update perfectly with each finished task, but now the progress bar no longer updates and the DynamicMap
only updates with the last result and ignores all others.
I don’t really understand what pn.io.with_lock
does that breaks things. Is it possible to get both DynamicMap
and normal bokeh figures to play along nicely in one coroutine? I’ve tried putting in with pn.io.unlocked
or splitting the stream
coroutine to a separate function which updates the bokeh figure, but so far I didnt get anything working.
Perhaps to solution to this is to only use DynamicMap
s? For my use case this is certainly possible but I’d like to understand what is happening a bit better before updating my code.
The full code:
Main script:
from concurrent.futures.thread import ThreadPoolExecutor
from functools import partial
import holoviews as hv
from holoviews.streams import Buffer
import pandas as pd
import panel as pn
import numpy as np
from bokeh.plotting import figure
from bokeh.models import ColumnDataSource
from matplotlib.cm import get_cmap
from matplotlib.colors import to_hex
import param
import uuid
import concurrent
from blocking import blocking_function
from asyncio import wrap_future
from functools import partial, wraps
pn.extension(sizing_mode="stretch_width")
from param.parameterized import async_executor
class DaskASyncExample(param.Parameterized):
num_jobs = param.Integer(default=5, bounds=(1, None), label='Number of jobs')
dev = param.Action(lambda self: self._action_dev())
submit_async = param.Action(lambda self: self._action_submit_async(), label='Submit async')
submit = param.Action(lambda self: self._action_submit(), label='Submit')
data = param.ClassSelector(pd.DataFrame, precedence=-1)
def __init__(self, executor, **params):
super().__init__(**params)
self.executor = executor
self.progress = pn.widgets.Progress(sizing_mode='stretch_width', height=20, active=False, value=0)
data = pd.DataFrame({'x': [], 'y': [], 'value': []}, columns=['x', 'y', 'value'])
self.buffer = Buffer(data=data, length=int(1e6), index=False)
self.source = ColumnDataSource(data={'x': [], 'y': [], 'color': []})
@property
def hv_stream(self):
scatter = hv.DynamicMap(hv.Scatter, streams=[self.buffer])
scatter.opts(color='value', xlim=(0, 5), ylim=(0, 5), cmap='jet', responsive=True)
return scatter
@property
def bk_stream(self):
fig = figure()
fig.scatter(x='x', y='y', color='color', source=self.source, line_color=None)
return fig
def normal_stream(self, _id):
result = blocking_function(_id)
x, y = result
cmap = get_cmap('jet')
c = np.random.random()
color = to_hex(cmap(c))
colors = [color] * len(x)
value = np.ones_like(x)*c
data = pd.DataFrame({'x': x, 'y': y, 'value': value}, columns=['x', 'y', 'value'])
# Update the hv DynamicMap
self.buffer.send(data)
self.progress.value += 1
if self.progress.value == self.progress.max:
self.progress.value = 0
self.progress.active = False
# Update the bokeh graph
d = {'x': x, 'y': y, 'color': colors}
self.source.stream(d)
async def stream(self, _id):
future = self.executor.submit(blocking_function, _id)
if isinstance(future, concurrent.futures.Future):
future = wrap_future(future)
result = await future
x, y = result
cmap = get_cmap('jet')
c = np.random.random()
color = to_hex(cmap(c))
colors = [color] * len(x)
value = np.ones_like(x)*c
data = pd.DataFrame({'x': x, 'y': y, 'value': value}, columns=['x', 'y', 'value'])
# Update the hv DynamicMap
self.buffer.send(data)
self.progress.value += 1
if self.progress.value == self.progress.max:
self.progress.value = 0
self.progress.active = False
# Update the bokeh graph (doesnt work)
# d = {'x': x, 'y': y, 'color': colors}
# self.source.stream(d)
def _reset(self):
self.source.data = {'x': [], 'y': [], 'color': []}
self.buffer.clear()
self.progress.value = 0
self.progress.max = self.num_jobs
self.progress.active = True
def _action_submit(self):
self._reset()
for i in range(self.num_jobs):
_id = uuid.uuid4()
self.normal_stream(_id)
print(i, _id)
def _action_submit_async(self):
self._reset()
for i in range(self.num_jobs):
_id = uuid.uuid4()
func = partial(self.stream, _id)
async_executor(func)
print(i, _id)
executor = ThreadPoolExecutor()
# executor = default_client(asynchronous=True)
dask = DaskASyncExample(executor=executor)
sidebar = pn.Param(dask)
col = pn.Column(*sidebar, dask.progress)
template = pn.template.FastGridTemplate(
title='FastGridTemplate',
sidebar=col)
template.main[:2, :6] = pn.pane.HoloViews(dask.hv_stream, sizing_mode='stretch_both')
template.main[:2, 6:] = pn.pane.Bokeh(dask.bk_stream, sizing_mode='stretch_both')
print('Loaded')
template.servable()
In blocking.py
:
import numpy as np
import time
def blocking_function(uuid):
"""This function executes some kind of long calculation on a Dask cluster"""
duration = np.random.uniform(0.5, 2.)
time.sleep(duration)
N = np.random.randint(200, 1000)
x0 = np.random.random() * 5
xs = 0.1#np.random.poisson()
y0 = np.random.random() * 5
ys = 0.1#np.random.poisson()
x = np.random.normal(x0, xs, N)
y = np.random.normal(y0, ys, N)
return x, y