Ok thanks, that helps a lot, I’m trying to avoid a hacky solution that will bite me later.
I’ve changed the code where now I pass the document to the coroutine:
async def stream(self, _id, doc):
# ...
cb = partial(self.source.stream, d)
doc.add_next_tick_callback(cb)
Such that I have access to the document (calling pn.state.curdoc
there returns None
)
Updated full code:
from concurrent.futures.thread import ThreadPoolExecutor
from functools import partial
import holoviews as hv
from holoviews.streams import Buffer
import pandas as pd
import panel as pn
import numpy as np
from bokeh.plotting import figure
from bokeh.models import ColumnDataSource
from matplotlib.cm import get_cmap
from matplotlib.colors import to_hex
import param
import uuid
import concurrent
from blocking import blocking_function
from asyncio import wrap_future
from functools import partial, wraps
pn.extension(sizing_mode="stretch_width")
from param.parameterized import async_executor
class DaskASyncExample(param.Parameterized):
num_jobs = param.Integer(default=5, bounds=(1, None), label='Number of jobs')
dev = param.Action(lambda self: self._action_dev())
submit_async = param.Action(lambda self: self._action_submit_async(), label='Submit async')
submit = param.Action(lambda self: self._action_submit(), label='Submit')
data = param.ClassSelector(pd.DataFrame, precedence=-1)
def __init__(self, executor, **params):
super().__init__(**params)
self.executor = executor
self.progress = pn.widgets.Progress(sizing_mode='stretch_width', height=20, active=False, value=0)
data = pd.DataFrame({'x': [], 'y': [], 'value': []}, columns=['x', 'y', 'value'])
self.buffer = Buffer(data=data, length=int(1e6), index=False)
self.source = ColumnDataSource(data={'x': [], 'y': [], 'color': []})
@property
def hv_stream(self):
scatter = hv.DynamicMap(hv.Scatter, streams=[self.buffer])
scatter.opts(color='value', xlim=(0, 5), ylim=(0, 5), cmap='jet', responsive=True)
return scatter
@property
def bk_stream(self):
fig = figure()
fig.scatter(x='x', y='y', color='color', source=self.source, line_color=None)
return fig
def normal_stream(self, _id):
result = blocking_function(_id)
x, y = result
cmap = get_cmap('jet')
c = np.random.random()
color = to_hex(cmap(c))
colors = [color] * len(x)
value = np.ones_like(x)*c
data = pd.DataFrame({'x': x, 'y': y, 'value': value}, columns=['x', 'y', 'value'])
# Update the hv DynamicMap
self.buffer.send(data)
self.progress.value += 1
if self.progress.value == self.progress.max:
self.progress.value = 0
self.progress.active = False
# Update the bokeh graph
d = {'x': x, 'y': y, 'color': colors}
self.source.stream(d)
async def stream(self, _id, doc):
future = self.executor.submit(blocking_function, _id)
if isinstance(future, concurrent.futures.Future):
future = wrap_future(future)
result = await future
x, y = result
cmap = get_cmap('jet')
c = np.random.random()
color = to_hex(cmap(c))
colors = [color] * len(x)
value = np.ones_like(x)*c
data = pd.DataFrame({'x': x, 'y': y, 'value': value}, columns=['x', 'y', 'value'])
# Update the hv DynamicMap
self.buffer.send(data)
self.progress.value += 1
if self.progress.value == self.progress.max:
self.progress.value = 0
self.progress.active = False
# Update the bokeh graph (doesnt work)
d = {'x': x, 'y': y, 'color': colors}
cb = partial(self.source.stream, d)
doc.add_next_tick_callback(cb)
def _reset(self):
self.source.data = {'x': [], 'y': [], 'color': []}
self.buffer.clear()
self.progress.value = 0
self.progress.max = self.num_jobs
self.progress.active = True
def _action_submit(self):
self._reset()
for i in range(self.num_jobs):
_id = uuid.uuid4()
self.normal_stream(_id)
print(i, _id)
def _action_submit_async(self):
self._reset()
for i in range(self.num_jobs):
_id = uuid.uuid4()
func = partial(self.stream, _id, pn.state.curdoc)
async_executor(func)
print(i, _id)
executor = ThreadPoolExecutor()
# executor = default_client(asynchronous=True)
dask = DaskASyncExample(executor=executor)
sidebar = pn.Param(dask)
col = pn.Column(*sidebar, dask.progress)
template = pn.template.FastGridTemplate(
title='FastGridTemplate',
sidebar=col)
template.main[:2, :6] = pn.pane.HoloViews(dask.hv_stream, sizing_mode='stretch_both')
template.main[:2, 6:] = pn.pane.Bokeh(dask.bk_stream, sizing_mode='stretch_both')
print('Loaded')
template.servable()