Understanding pn.io.with_lock with coroutines and buffers

Ok thanks, that helps a lot, I’m trying to avoid a hacky solution that will bite me later.

I’ve changed the code where now I pass the document to the coroutine:

    async def stream(self, _id, doc):
    #  ...
    cb = partial(self.source.stream, d)
    doc.add_next_tick_callback(cb)

Such that I have access to the document (calling pn.state.curdoc there returns None)

Updated full code:

from concurrent.futures.thread import ThreadPoolExecutor
from functools import partial

import holoviews as hv
from holoviews.streams import Buffer
import pandas as pd
import panel as pn
import numpy as np
from bokeh.plotting import figure
from bokeh.models import ColumnDataSource
from matplotlib.cm import get_cmap
from matplotlib.colors import to_hex
import param
import uuid
import concurrent

from blocking import blocking_function

from asyncio import wrap_future
from functools import partial, wraps

pn.extension(sizing_mode="stretch_width")

from param.parameterized import async_executor


class DaskASyncExample(param.Parameterized):
    num_jobs = param.Integer(default=5, bounds=(1, None), label='Number of jobs')
    dev = param.Action(lambda self: self._action_dev())
    submit_async = param.Action(lambda self: self._action_submit_async(), label='Submit async')
    submit = param.Action(lambda self: self._action_submit(), label='Submit')

    data = param.ClassSelector(pd.DataFrame, precedence=-1)

    def __init__(self, executor, **params):
        super().__init__(**params)

        self.executor = executor
        self.progress = pn.widgets.Progress(sizing_mode='stretch_width', height=20, active=False, value=0)

        data = pd.DataFrame({'x': [], 'y': [], 'value': []}, columns=['x', 'y', 'value'])
        self.buffer = Buffer(data=data, length=int(1e6), index=False)
        self.source = ColumnDataSource(data={'x': [], 'y': [], 'color': []})

    @property
    def hv_stream(self):
        scatter = hv.DynamicMap(hv.Scatter, streams=[self.buffer])
        scatter.opts(color='value', xlim=(0, 5), ylim=(0, 5), cmap='jet', responsive=True)

        return scatter

    @property
    def bk_stream(self):
        fig = figure()
        fig.scatter(x='x', y='y', color='color', source=self.source, line_color=None)

        return fig

    def normal_stream(self, _id):
        result = blocking_function(_id)

        x, y = result
        cmap = get_cmap('jet')
        c = np.random.random()
        color = to_hex(cmap(c))
        colors = [color] * len(x)
        value = np.ones_like(x)*c

        data = pd.DataFrame({'x': x, 'y': y, 'value': value}, columns=['x', 'y', 'value'])

        # Update the hv DynamicMap
        self.buffer.send(data)
        self.progress.value += 1

        if self.progress.value == self.progress.max:
            self.progress.value = 0
            self.progress.active = False

        # Update the bokeh graph
        d = {'x': x, 'y': y, 'color': colors}
        self.source.stream(d)

    async def stream(self, _id, doc):
        future = self.executor.submit(blocking_function, _id)
        if isinstance(future, concurrent.futures.Future):
            future = wrap_future(future)
        result = await future

        x, y = result
        cmap = get_cmap('jet')
        c = np.random.random()
        color = to_hex(cmap(c))
        colors = [color] * len(x)
        value = np.ones_like(x)*c

        data = pd.DataFrame({'x': x, 'y': y, 'value': value}, columns=['x', 'y', 'value'])

        # Update the hv DynamicMap
        self.buffer.send(data)
        self.progress.value += 1

        if self.progress.value == self.progress.max:
            self.progress.value = 0
            self.progress.active = False

        # Update the bokeh graph (doesnt work)
        d = {'x': x, 'y': y, 'color': colors}
        cb = partial(self.source.stream, d)
        doc.add_next_tick_callback(cb)

    def _reset(self):
        self.source.data = {'x': [], 'y': [], 'color': []}
        self.buffer.clear()
        self.progress.value = 0
        self.progress.max = self.num_jobs
        self.progress.active = True

    def _action_submit(self):
        self._reset()

        for i in range(self.num_jobs):
            _id = uuid.uuid4()
            self.normal_stream(_id)
            print(i, _id)

    def _action_submit_async(self):
        self._reset()

        for i in range(self.num_jobs):
            _id = uuid.uuid4()
            func = partial(self.stream, _id, pn.state.curdoc)
            async_executor(func)

            print(i, _id)


executor = ThreadPoolExecutor()
# executor = default_client(asynchronous=True)

dask = DaskASyncExample(executor=executor)

sidebar = pn.Param(dask)
col = pn.Column(*sidebar, dask.progress)

template = pn.template.FastGridTemplate(
    title='FastGridTemplate',
    sidebar=col)

template.main[:2, :6] = pn.pane.HoloViews(dask.hv_stream, sizing_mode='stretch_both')
template.main[:2, 6:] = pn.pane.Bokeh(dask.bk_stream, sizing_mode='stretch_both')

print('Loaded')
template.servable()

3 Likes