Understanding pn.io.with_lock with coroutines and buffers

I’m trying to understand how to use coroutines together with an executor(either dask asynchronous client or ThreadPoolExecutor to execute computing jobs in parallel in the background, and update graphs/progress bars as the jobs complete.

I’ve modified and example I used previously here: Panel webapp with dask/bokeh/tornado and asynchronous tasks - #5 by Jhsmit

As in that example I have a hv.DynamicMap with a hv.streams.Buffer (buffer) and a bokeh figure with a ColumnDataSource (source).

The relevant part of the code is (full code below):

class DaskASyncExample(param.Parameterized):
 # .-- definition of graphs and buffers etc here

    # @pn.io.with_lock  # Adding this wrapper breaks updating the DynamicMap
    async def stream(self, _id):
        future = self.executor.submit(blocking_function, _id)
        if isinstance(future, concurrent.futures.Future):
            future = wrap_future(future)
        result = await future

        x, y = result
        cmap = get_cmap('jet')
        c = np.random.random()
        color = to_hex(cmap(c))
        colors = [color] * len(x)
        value = np.ones_like(x)*c

        data = pd.DataFrame({'x': x, 'y': y, 'value': value}, columns=['x', 'y', 'value'])

        # Update the hv DynamicMap
        self.buffer.send(data)
        self.progress.value += 1

        if self.progress.value == self.progress.max:
            self.progress.value = 0
            self.progress.active = False

        # Update the bokeh graph (this parts doesn't work and needs pn.io.unlock)
        # d = {'x': x, 'y': y, 'color': colors}
        # self.source.stream(d)
 
    def _action_submit_async(self):
        self._reset()

        for i in range(self.num_jobs):
            _id = uuid.uuid4()
            func = partial(self.stream, _id)
            async_executor(func)

            print(i, _id)

I’m executing the stream coroutine with param/panel’s async_executor. This all works great and I prefer it over my previous approach:

  • I dont need to be running a periodic callback to check if any futures are finished.
  • I can easily exchange the executor from a ThreadPoolExecutor to a dask asynchronous client, which makes local / github actions testing of my app easier
  • It uses async / await coroutines which I don’t really understand but I think they’re newer and better and potentially integrate better with tornado / dask

However, the problem is with updating the bokeh figure. Updating the bokeh figure directly with source.stream(data) requires document lock. When I add the @pn.io.with_lock decorator around the stream coroutine the bokeh figure does update perfectly with each finished task, but now the progress bar no longer updates and the DynamicMap only updates with the last result and ignores all others.

I don’t really understand what pn.io.with_lock does that breaks things. Is it possible to get both DynamicMap and normal bokeh figures to play along nicely in one coroutine? I’ve tried putting in with pn.io.unlocked or splitting the stream coroutine to a separate function which updates the bokeh figure, but so far I didnt get anything working.

Perhaps to solution to this is to only use DynamicMaps? For my use case this is certainly possible but I’d like to understand what is happening a bit better before updating my code.

The full code:

Main script:

from concurrent.futures.thread import ThreadPoolExecutor
from functools import partial

import holoviews as hv
from holoviews.streams import Buffer
import pandas as pd
import panel as pn
import numpy as np
from bokeh.plotting import figure
from bokeh.models import ColumnDataSource
from matplotlib.cm import get_cmap
from matplotlib.colors import to_hex
import param
import uuid
import concurrent

from blocking import blocking_function

from asyncio import wrap_future
from functools import partial, wraps

pn.extension(sizing_mode="stretch_width")

from param.parameterized import async_executor


class DaskASyncExample(param.Parameterized):
    num_jobs = param.Integer(default=5, bounds=(1, None), label='Number of jobs')
    dev = param.Action(lambda self: self._action_dev())
    submit_async = param.Action(lambda self: self._action_submit_async(), label='Submit async')
    submit = param.Action(lambda self: self._action_submit(), label='Submit')

    data = param.ClassSelector(pd.DataFrame, precedence=-1)

    def __init__(self, executor, **params):
        super().__init__(**params)

        self.executor = executor
        self.progress = pn.widgets.Progress(sizing_mode='stretch_width', height=20, active=False, value=0)

        data = pd.DataFrame({'x': [], 'y': [], 'value': []}, columns=['x', 'y', 'value'])
        self.buffer = Buffer(data=data, length=int(1e6), index=False)
        self.source = ColumnDataSource(data={'x': [], 'y': [], 'color': []})

    @property
    def hv_stream(self):
        scatter = hv.DynamicMap(hv.Scatter, streams=[self.buffer])
        scatter.opts(color='value', xlim=(0, 5), ylim=(0, 5), cmap='jet', responsive=True)

        return scatter

    @property
    def bk_stream(self):
        fig = figure()
        fig.scatter(x='x', y='y', color='color', source=self.source, line_color=None)

        return fig

    def normal_stream(self, _id):
        result = blocking_function(_id)

        x, y = result
        cmap = get_cmap('jet')
        c = np.random.random()
        color = to_hex(cmap(c))
        colors = [color] * len(x)
        value = np.ones_like(x)*c

        data = pd.DataFrame({'x': x, 'y': y, 'value': value}, columns=['x', 'y', 'value'])

        # Update the hv DynamicMap
        self.buffer.send(data)
        self.progress.value += 1

        if self.progress.value == self.progress.max:
            self.progress.value = 0
            self.progress.active = False

        # Update the bokeh graph
        d = {'x': x, 'y': y, 'color': colors}
        self.source.stream(d)

    async def stream(self, _id):
        future = self.executor.submit(blocking_function, _id)
        if isinstance(future, concurrent.futures.Future):
            future = wrap_future(future)
        result = await future

        x, y = result
        cmap = get_cmap('jet')
        c = np.random.random()
        color = to_hex(cmap(c))
        colors = [color] * len(x)
        value = np.ones_like(x)*c

        data = pd.DataFrame({'x': x, 'y': y, 'value': value}, columns=['x', 'y', 'value'])

        # Update the hv DynamicMap
        self.buffer.send(data)
        self.progress.value += 1

        if self.progress.value == self.progress.max:
            self.progress.value = 0
            self.progress.active = False

        # Update the bokeh graph (doesnt work)
        # d = {'x': x, 'y': y, 'color': colors}
        # self.source.stream(d)

    def _reset(self):
        self.source.data = {'x': [], 'y': [], 'color': []}
        self.buffer.clear()
        self.progress.value = 0
        self.progress.max = self.num_jobs
        self.progress.active = True

    def _action_submit(self):
        self._reset()

        for i in range(self.num_jobs):
            _id = uuid.uuid4()
            self.normal_stream(_id)
            print(i, _id)

    def _action_submit_async(self):
        self._reset()

        for i in range(self.num_jobs):
            _id = uuid.uuid4()
            func = partial(self.stream, _id)
            async_executor(func)

            print(i, _id)


executor = ThreadPoolExecutor()
# executor = default_client(asynchronous=True)

dask = DaskASyncExample(executor=executor)

sidebar = pn.Param(dask)
col = pn.Column(*sidebar, dask.progress)

template = pn.template.FastGridTemplate(
    title='FastGridTemplate',
    sidebar=col)

template.main[:2, :6] = pn.pane.HoloViews(dask.hv_stream, sizing_mode='stretch_both')
template.main[:2, 6:] = pn.pane.Bokeh(dask.bk_stream, sizing_mode='stretch_both')

print('Loaded')
template.servable()

In blocking.py:

import numpy as np
import time



def blocking_function(uuid):
    """This function executes some kind of long calculation on a Dask cluster"""

    duration = np.random.uniform(0.5, 2.)
    time.sleep(duration)

    N = np.random.randint(200, 1000)
    x0 = np.random.random() * 5
    xs = 0.1#np.random.poisson()
    y0 = np.random.random() * 5
    ys = 0.1#np.random.poisson()

    x = np.random.normal(x0, xs, N)
    y = np.random.normal(y0, ys, N)

    return x, y

Sorry for the short answer here but I think you should never try to mess with the document lock when interacting with bokeh models directly. The better thing to do is schedule your update to the bokeh model (in this case the ColumnDataSource) as a task using pn.state.curdoc.add_next_tick_callback this will ensure the update will acquire the proper lock. The unlocking that Panel does is a little bit of a hack but allows Panel to generally dispatch things without waiting on acquiring the document lock. If you attempt to mess with the lock yourself you’re likely going to force Panel to respect the lock which means various updates aren’t sent until the lock can be safely lifted.

1 Like

Ok thanks, that helps a lot, I’m trying to avoid a hacky solution that will bite me later.

I’ve changed the code where now I pass the document to the coroutine:

    async def stream(self, _id, doc):
    #  ...
    cb = partial(self.source.stream, d)
    doc.add_next_tick_callback(cb)

Such that I have access to the document (calling pn.state.curdoc there returns None)

Updated full code:

from concurrent.futures.thread import ThreadPoolExecutor
from functools import partial

import holoviews as hv
from holoviews.streams import Buffer
import pandas as pd
import panel as pn
import numpy as np
from bokeh.plotting import figure
from bokeh.models import ColumnDataSource
from matplotlib.cm import get_cmap
from matplotlib.colors import to_hex
import param
import uuid
import concurrent

from blocking import blocking_function

from asyncio import wrap_future
from functools import partial, wraps

pn.extension(sizing_mode="stretch_width")

from param.parameterized import async_executor


class DaskASyncExample(param.Parameterized):
    num_jobs = param.Integer(default=5, bounds=(1, None), label='Number of jobs')
    dev = param.Action(lambda self: self._action_dev())
    submit_async = param.Action(lambda self: self._action_submit_async(), label='Submit async')
    submit = param.Action(lambda self: self._action_submit(), label='Submit')

    data = param.ClassSelector(pd.DataFrame, precedence=-1)

    def __init__(self, executor, **params):
        super().__init__(**params)

        self.executor = executor
        self.progress = pn.widgets.Progress(sizing_mode='stretch_width', height=20, active=False, value=0)

        data = pd.DataFrame({'x': [], 'y': [], 'value': []}, columns=['x', 'y', 'value'])
        self.buffer = Buffer(data=data, length=int(1e6), index=False)
        self.source = ColumnDataSource(data={'x': [], 'y': [], 'color': []})

    @property
    def hv_stream(self):
        scatter = hv.DynamicMap(hv.Scatter, streams=[self.buffer])
        scatter.opts(color='value', xlim=(0, 5), ylim=(0, 5), cmap='jet', responsive=True)

        return scatter

    @property
    def bk_stream(self):
        fig = figure()
        fig.scatter(x='x', y='y', color='color', source=self.source, line_color=None)

        return fig

    def normal_stream(self, _id):
        result = blocking_function(_id)

        x, y = result
        cmap = get_cmap('jet')
        c = np.random.random()
        color = to_hex(cmap(c))
        colors = [color] * len(x)
        value = np.ones_like(x)*c

        data = pd.DataFrame({'x': x, 'y': y, 'value': value}, columns=['x', 'y', 'value'])

        # Update the hv DynamicMap
        self.buffer.send(data)
        self.progress.value += 1

        if self.progress.value == self.progress.max:
            self.progress.value = 0
            self.progress.active = False

        # Update the bokeh graph
        d = {'x': x, 'y': y, 'color': colors}
        self.source.stream(d)

    async def stream(self, _id, doc):
        future = self.executor.submit(blocking_function, _id)
        if isinstance(future, concurrent.futures.Future):
            future = wrap_future(future)
        result = await future

        x, y = result
        cmap = get_cmap('jet')
        c = np.random.random()
        color = to_hex(cmap(c))
        colors = [color] * len(x)
        value = np.ones_like(x)*c

        data = pd.DataFrame({'x': x, 'y': y, 'value': value}, columns=['x', 'y', 'value'])

        # Update the hv DynamicMap
        self.buffer.send(data)
        self.progress.value += 1

        if self.progress.value == self.progress.max:
            self.progress.value = 0
            self.progress.active = False

        # Update the bokeh graph (doesnt work)
        d = {'x': x, 'y': y, 'color': colors}
        cb = partial(self.source.stream, d)
        doc.add_next_tick_callback(cb)

    def _reset(self):
        self.source.data = {'x': [], 'y': [], 'color': []}
        self.buffer.clear()
        self.progress.value = 0
        self.progress.max = self.num_jobs
        self.progress.active = True

    def _action_submit(self):
        self._reset()

        for i in range(self.num_jobs):
            _id = uuid.uuid4()
            self.normal_stream(_id)
            print(i, _id)

    def _action_submit_async(self):
        self._reset()

        for i in range(self.num_jobs):
            _id = uuid.uuid4()
            func = partial(self.stream, _id, pn.state.curdoc)
            async_executor(func)

            print(i, _id)


executor = ThreadPoolExecutor()
# executor = default_client(asynchronous=True)

dask = DaskASyncExample(executor=executor)

sidebar = pn.Param(dask)
col = pn.Column(*sidebar, dask.progress)

template = pn.template.FastGridTemplate(
    title='FastGridTemplate',
    sidebar=col)

template.main[:2, :6] = pn.pane.HoloViews(dask.hv_stream, sizing_mode='stretch_both')
template.main[:2, 6:] = pn.pane.Bokeh(dask.bk_stream, sizing_mode='stretch_both')

print('Loaded')
template.servable()

3 Likes

Having a best practice, reference example of using async. ThreadpoolExecutor and/ or dask in the Gallery would be awesome.

1 Like

Indeed! There is currently one that uses futures, but not async yet.
I can try get a PR for a new example based on async.
I have an application in mind where several large files need to be loaded and processed in parallel, but it’ll take me a while probably

4 Likes