Can i load data asynchronously in Panel?

I have been wondering for some if If it is possible to load data and do other stuff asynchronously and how? Tornado is an asynchronous web server.

For example populating a select widget with one option in code and fetching the associated data for a dataframe or a plot only to be able to quickly show something to the user. Then in the background load more options asynchronously etc.

The purpose is to use async to make my application load fast, appear faster and be more responsive.

Where do I find some examples or documentation?

I don’t think there are any examples about async execution yet except for a periodic callback example updating random numbers. We should really expose the add_next_tick_callback function from bokeh, currently you should be able to schedule such a callback with pn.state.curdoc.add_next_tick_callback() on the server, which will schedule that callback to be executed asynchronously. That should accept proper async functions, functions decorated with tornado.gen.coroutine and regular synchronous functions.

1 Like

Adding some example on that would be good. Eventually I’d also like to rewrite large parts of panel itself to be asynchronous.

1 Like

There is some small example to start with here I believe.

I’ve been playing around today with await and async (which are new to me) and after a long and confusing day I’ve come up with:

executor = ThreadPoolExecutor(max_workers=2)
progress = ProgressExtMod()

class TestClass(param.Parameterized):
    do_stuff = param.Action(lambda self: self._do_calc())
    select = param.Selector(objects=range(10))
    result = param.Number(0)
    slider = param.Number(2, bounds=(0, 10))
    text = param.String()
    
    def __init__(self, **params):
        super(TestClass, self).__init__(**params)
        self.widgets = pn.Param(self.param)
        self.col = pn.Column(*self.widgets)
        self.col.append(progress.view)
    
    @param.depends('slider', watch=True)
    def _on_slider_change(self):
        # This functions does some other python code which we want to keep responsive
        self.text = str(self.slider)
    
    def _do_calc(self):
        num_tasks = 10
        progress.num_tasks = num_tasks
        loop = IOLoop.current()
        for i in range(num_tasks):
            future = executor.submit(self.blocking_task)
            loop.add_future(future, self.update)

    @progress.increment()
    def update(self, future):
        number = future.result()
        with unlocked():
            self.result += number
            self.progress_widget.value += number    
    
    def blocking_task(self):
        time.sleep(np.random.randint(1, 5))
        return 5
        
    def panel(self):
        return self.col
    
tc = TestClass()
tc.panel()

This works quite well I think, the slider keeps responding while the blocking task is running, and what I like about it is that you can swap out the excecutor for a dask.distributed.Client and the code should work the same. This means you can do the heavy lifting easily in parallel or on a remote machine/server.

I’m not really sure in how everything works but here is how I think it works so please correct:
panel and bokeh run on tornado which has its main loop which you can access with IOLoop.current(). Then I use the add_future method on the loop which will call the supplied callback with the future as argument as soon as its completed.

In the callback itself I’m using with unlocked() which is a panel equivalent to bokeh’s add_next_tick_callback?
This is needed because you should only update the GUI from the main thread itself, and not from any other thread, and the callback supplied to add_future will be from a different thread.

ezgif-5-54bd7a7d0786

The progress bar is a modification of the one from awesome-panel:

class ProgressExtMod(param.Parameterized):
    """
    adaptation of: https://github.com/MarcSkovMadsen/awesome-panel/blob/master/package/awesome_panel/express/widgets/progress_ext.py
    """

    completed = param.Integer(default=0)
    bar_color = param.String(default="info")
    num_tasks = param.Integer(default=100, bounds=(1, None))
        
    #@param.depends('completed', 'num_tasks')
    @property
    def value(self):
        return int(100*(self.completed/self.num_tasks))
    
    def reset(self):
        """Resets the value and message"""
        # Please note the order matters as the Widgets updates two times. One for each change
        self.completed = 0

    @param.depends("completed", "message", "bar_color")
    def view(self):
        """View the widget
        Returns:
            pn.viewable.Viewable: Add this to your app to see the progress reported
        """
        content = []
        if self.value:
            return pn.widgets.Progress(active=True, value=self.value, align="center",sizing_mode="stretch_width")
        else:
            return None

    @contextmanager
    def increment(self):
        """Increment the value 
        Can be used as context manager or decorator?
        Yields:
            None: Nothing is yielded
        """
        self.completed += 1
        yield
        if self.completed == self.num_tasks:
            self.reset()

@Marc Why is the yield needed in the increment? If I don’t put it there the progress bar starts at 1.

Hi @Jhsmit

Such a nice example. I’ve been wanted to learn how to do this for so long. Would be nice if the Panel docs or Gallery included a reference example of the right way to do this.

Regarding the increment and why yield is needed. Well yield is needed by the @contextmanager. But why the progressbar starts at 1 without the yield I don’t know. I can just see that the increment function gets called on instantiation if the yield is not there which causes self.completed=1.

I played around with the code a created a working version

import time
from concurrent.futures import ThreadPoolExecutor
from contextlib import contextmanager
from panel.io.server import unlocked

import numpy as np
import panel as pn
import param
from tornado.ioloop import IOLoop


class ProgressExtMod(param.Parameterized):
    """
    adaptation of: https://github.com/MarcSkovMadsen/awesome-panel/blob/master/package/awesome_panel/express/widgets/progress_ext.py
    """

    completed = param.Integer(default=0)
    bar_color = param.String(default="info")
    num_tasks = param.Integer(default=100, bounds=(1, None))

    # @param.depends('completed', 'num_tasks')
    @property
    def value(self):
        return int(100 * (self.completed / self.num_tasks))

    def reset(self):
        """Resets the value and message"""
        # Please note the order matters as the Widgets updates two times. One for each change
        self.completed = 0

    @param.depends("completed", "message", "bar_color")
    def view(self):
        """View the widget
        Returns:
            pn.viewable.Viewable: Add this to your app to see the progress reported
        """
        if self.value:
            return pn.widgets.Progress(active=True, value=self.value, align="center")
        else:
            return None

    @contextmanager
    def increment(self):
        """Increment the value
        Can be used as context manager or decorator?
        Yields:
            None: Nothing is yielded
        """
        self.completed += 1
        yield
        if self.completed == self.num_tasks:
            self.reset()


executor = ThreadPoolExecutor(max_workers=2)
progress = ProgressExtMod()


class TestClass(param.Parameterized):
    select = param.Selector(objects=range(10))
    slider = param.Number(2, bounds=(0, 10))
    text = param.String()

    do_stuff = param.Action(lambda self: self._do_calc())
    result = param.Number(0)

    def __init__(self, **params):
        super(TestClass, self).__init__(**params)
        self.col = pn.Column(
            pn.pane.Markdown("## Starts async background process"),
            pn.Param(
                self,
                parameters=["do_stuff", "result"],
                widgets={"result": {"disabled": True}, "do_stuff": {"button_type": "success"}},
                show_name=False,
            ),
            progress.view,
            pn.layout.Divider(),
            pn.pane.Markdown("## Works while background process is running"),
            pn.Param(
                self,
                parameters=["select", "slider", "text"],
                widgets={"text": {"disabled": True}},
                show_name=False,
            ),
            max_width=500,
        )

    @param.depends("slider", "select", watch=True)
    def _on_slider_change(self):
        # This functions does some other python code which we want to keep responsive
        if self.select:
            select = self.select
        else:
            select = 0
        self.text = str(self.slider + select)

    def _do_calc(self):
        num_tasks = 10
        progress.num_tasks = num_tasks
        loop = IOLoop.current()
        for _ in range(num_tasks):
            future = executor.submit(self.blocking_task)
            loop.add_future(future, self.update)

    @progress.increment()
    def update(self, future):
        number = future.result()
        with unlocked():
            self.result += number

    def blocking_task(self):
        time.sleep(np.random.randint(1, 5))
        return 5

    def panel(self):
        return self.col


pn.config.sizing_mode = "stretch_width"
tc = TestClass()
tc.panel().servable()

1 Like

I’ve published the example at awesome-panel.org

If anyone has a good working knowledge of Python async I’d love to talk to you at some point. All the Panel internals should eventually be rewritten on top of the native async primitives rather than using tornado co-routines, which should then also lead to better support for user async callbacks.

1 Like

I’ve made another example which uses python 3 async coroutines and a dask Client to schedule tasks. In this example there are 10 tasks which need to be completed before combining them in one final asynchronous task. The ‘Do stuff’ button locks when the calculation is running and becomes active again afterwards.


import panel as pn
import param
import time
import numpy as np
from panel.io import unlocked
from tornado.ioloop import IOLoop
from dask.distributed import Client, as_completed
pn.extension()

class ASyncProgressBar(param.Parameterized):
    completed = param.Integer(default=0)
    num_tasks = param.Integer(default=10, bounds=(1, None))
    
    async def run(self, futures):
        async for task in as_completed(futures):
            with unlocked():
                self.completed +=1
        
    @property
    def value(self):
        return int(100*(self.completed/self.num_tasks))    
    
    def reset(self):
        self.completed = 0
    
    @param.depends('completed', 'num_tasks')
    def view(self):
        if self.value:
            return pn.widgets.Progress(active=True, value=self.value, align="center",sizing_mode="stretch_width")
        else:
            return None

def task_func(arg):
    """This is a normal blocking function which we want to call multiple times"""
    time.sleep(np.random.randint(1, 4))
    return arg

class ASyncExample(param.Parameterized):
    select = param.Selector(objects=range(10))
    slider = param.Number(2, bounds=(0, 10))
    text = param.String()

    do_stuff = param.Action(lambda self: self._do_calc())
    result = param.Number(0)

    def __init__(self, **params):
        super(ASyncExample, self).__init__(**params)
        self.async_pbar = ASyncProgressBar()
        self.col = pn.Column(
            pn.pane.Markdown("## Starts async background process"),
            pn.Param(
                self,
                parameters=["do_stuff", "result"],
                widgets={"result": {"disabled": True}, "do_stuff": {"button_type": "success"}},
                show_name=False,
            ),
            self.async_pbar.view,
            pn.layout.Divider(),
            pn.pane.Markdown("## Works while background process is running"),
            pn.Param(
                self,
                parameters=["select", "slider", "text"],
                widgets={"text": {"disabled": True}},
                show_name=False,
            ),
            max_width=500,
        )

    @param.depends("slider", "select", watch=True)
    def _on_slider_change(self):
        # This functions does some other python code which we want to keep responsive
        if self.select:
            select = self.select
        else:
            select = 0
        self.text = str(self.slider + select)

    async def overall_func(self, num_tasks):
        """
        This function asychronously sends out 10 tasks, which are sent to the progress bar to report on their completion
        Next, in a second step, the futures are combined in the final calculation and the result sent to the results field.
        """
        client = await Client(asynchronous=True)
        futures = client.map(task_func, range(num_tasks))
        await self.async_pbar.run(futures)
        print(futures)
        result = await client.submit(sum, futures)
        with unlocked():
            self.result = result
            self.param['do_stuff'].constant=False
            self.async_pbar.reset()
        
    def _do_calc(self):
        self.param['do_stuff'].constant=True
        num_tasks = 10
        self.async_pbar.num_tasks = num_tasks
        loop = IOLoop.current()
        loop.add_callback(self.overall_func, num_tasks)
        
    def panel(self):
        return self.col


pn.config.sizing_mode = "stretch_width"
async_example = ASyncExample()
async_example.panel().servable()
1 Like

@philippjfr Since this example doesnt use threads but instead coroutines, is the with unlocked() part still needed?

I’m now trying to make the above example work with Bokeh figures, but I’m running into some issues. I modified the example above to include a bokeh figure and I want the outputs of the async tasks to be plotted in the bokeh figure upon completion.

To update the bokeh document safely (see bokeh docs Updating From Threads and Unlocked Callbacks), the callback that updates the bokeh document should be scheduled via add_next_tick_callback.

If I dont do this, and instead call the callback cb directly, I get the error:

raise RuntimeError("_pending_writes should be non-None when we have a document lock, and we should have the lock when the document changes")
RuntimeError: _pending_writes should be non-None when we have a document lock, and we should have the lock when the document changes

However, when I do use add_next_tick_callback, nothing happens and it seems that the callback isnt called.
Is this because panel overrides bokeh behaviour related to the event loop? Is there a workaround for this?

If you want to run the code below you need to run a dask LocalCluster and update the ip:port value for cluster.

import panel as pn
import param
import time
import numpy as np
from panel.io import unlocked
from tornado.ioloop import IOLoop
from dask.distributed import Client, as_completed
from bokeh.plotting import figure, curdoc
from bokeh.models import ColumnDataSource
from functools import partial

class ASyncProgressBar(param.Parameterized):
    completed = param.Integer(default=0)
    num_tasks = param.Integer(default=10, bounds=(1, None))

    async def run(self, futures):
        async for task in as_completed(futures):
            with unlocked():
                self.completed += 1

    @property
    def value(self):
        return int(100 * (self.completed / self.num_tasks))

    def reset(self):
        self.completed = 0

    @param.depends('completed', 'num_tasks')
    def view(self):
        if self.value:
            return pn.widgets.Progress(active=True, value=self.value, align="center", sizing_mode="stretch_width")
        else:
            return None


def task_func(arg):
    """This is a normal blocking function which we want to call multiple times"""
    time.sleep(np.random.randint(1, 2))
    return arg


class ASyncExample(param.Parameterized):
    select = param.Selector(objects=range(10))
    slider = param.Number(2, bounds=(0, 10))
    text = param.String()

    do_stuff = param.Action(lambda self: self._do_calc())
    normal_stuff = param.Action(lambda self: self._do_normal())
    result = param.Number(0)

    def __init__(self, cluster, **params):
        super(ASyncExample, self).__init__(**params)
        self.async_pbar = ASyncProgressBar()
        self.cluster = cluster

        self.doc = curdoc()
        self.source = ColumnDataSource({'x': range(10), 'y': range(10)})
        self.figure = figure()
        self.figure.line(x='x', y='y', source=self.source)
        self.bk_pane = pn.pane.Bokeh(self.figure)

        self.col = pn.Column(
            pn.pane.Markdown("## Starts async background process"),
            pn.Param(
                self,
                parameters=["do_stuff", 'normal_stuff', "result"],
                widgets={"result": {"disabled": True}, "do_stuff": {"button_type": "success"}},
                show_name=False,
            ),
            self.async_pbar.view,
            pn.layout.Divider(),
            pn.pane.Markdown("## Works while background process is running"),
            pn.Param(
                self,
                parameters=["select", "slider", "text"],
                widgets={"text": {"disabled": True}},
                show_name=False,
            ),
            self.bk_pane,
            max_width=500,
        )

    @param.depends("slider", "select", watch=True)
    def _on_slider_change(self):
        # This functions does some other python code which we want to keep responsive
        if self.select:
            select = self.select
        else:
            select = 0
        self.text = str(self.slider + select)

    def _do_normal(self):
        "updates the bokeh graph normally"
        self.update_source(list(np.random.randint(0, 2**31, 10)))

    def update_source(self, data):
        self.source.data.update({'y': data})

    async def overall_func(self, num_tasks):
        """
        This function asychronously sends out 10 tasks, which are sent to the progress bar to report on their completion
        Next, in a second step, the futures are combined in the final calculation and the result sent to the results field.
        """
        client = await Client(self.cluster, asynchronous=True)

        futures = client.map(task_func, np.random.randint(0, 2**31, num_tasks))
        await self.async_pbar.run(futures)
        result = await client.submit(np.mean, futures)
        data = await client.gather(futures)
        cb = partial(self.update_source, data)
        self.doc.add_next_tick_callback(cb)

        with unlocked():
            self.result = result
            self.param['do_stuff'].constant = False
            self.async_pbar.reset()

    def _do_calc(self):
        self.doc = curdoc() # save the doc state here in the main thread
        self.param['do_stuff'].constant = True
        num_tasks = 10
        self.async_pbar.num_tasks = num_tasks
        loop = IOLoop.current()
        loop.add_callback(self.overall_func, num_tasks)

    def panel(self):
        return self.col


# Path to dask localcluster, can be started from a seperate python console:
# >>> from dask.distributed import LocalCluster
# >>> cluster = LocalCluster()
# >>> cluster
# This returns the IP:port to use

cluster = '<ip:port for cluster>'

pn.config.sizing_mode = "stretch_width"
async_example = ASyncExample(cluster)
pn.serve(async_example.panel())

Update0: In jupyter notebooks, I can call the callback directly (cb()), and everything works fine.
However, when I run the app with .servable() and panel serve from the console, dask has problems properly pickling the functions and distributing them.

Update1: I’ve updated my example above, which now does work.
The secret was to save a reference of the bokeh document right before starting the async coroutines in another thread rather then in the __init__.

Update2: With respect to serving an app using panel serve in the console. In this case Bokeh creates new module objects every time a user session is initiated, therefore pickle is unable to find the module where task_func is located. If you want to use panel serve with this example you need to move the task_func to a separate module.
See also here:

@Jhsmit and others looking at this thread. There is a different approach (and more powerful I believe) here by @Material-Scientist that can also be used as inspiration.

2 Likes

@Marc, @Jhsmit I am curious to know if the executor can be replaced with a ProcessPoolExecutor for CPU intensive tasks? I am so far not successful in using multiple processes instead of threads to run the above code.

I would not know. I have not really entered into this area yet.

But I would really like to learn and use.

Hi @prithivi, this is indeed possible but Processes are a bit more complicated than threads because they do not share memory and therefore your code and data needs to be serialized (by pickle) in order for it to work. The example above can be modified to make this work but you need to

  1. define your intensive computation function (blocking_function) outside of the TestClass otherwise cannot be pickled. I’m not sure why this happens exactly but I think its because its a subclass of param.Parameterized.
  2. Add the if __name__ == '__main__': check (at least on windows). This ensures the panel is only launched once for each process.

This example below should do what you want:

import param
import panel as pn
from concurrent.futures import ProcessPoolExecutor
from tornado.ioloop import IOLoop
import time
import numpy as np

executor = ProcessPoolExecutor(max_workers=2)


def blocking_function():
    time.sleep(np.random.randint(1, 5))
    return 5


class TestClass(param.Parameterized):
    do_stuff = param.Action(lambda self: self._do_calc())
    select = param.Selector(objects=range(10))
    result = param.Number(0)
    slider = param.Number(2, bounds=(0, 10))
    text = param.String()

    def __init__(self, **params):
        super(TestClass, self).__init__(**params)
        self.progress_widget = pn.widgets.Progress(active=True, value=0, align="center", sizing_mode="stretch_width")
        self.completed_tasks = 0
        self.num_tasks = 10

        self.widgets = pn.Param(self.param)
        self.col = pn.Column(*self.widgets)
        self.col.append(self.progress_widget)

    @param.depends('slider', watch=True)
    def _on_slider_change(self):
        # This functions does some other python code which we want to keep responsive
        self.text = str(self.slider)

    def _do_calc(self):
        loop = IOLoop.current()
        for i in range(self.num_tasks):
            future = executor.submit(blocking_function)
            loop.add_future(future, self.update)

    def update(self, future):
        number = future.result()
        with pn.io.unlocked():
            self.result += number
            self.completed_tasks += 1
            self.progress_widget.value = int(100 * self.completed_tasks / self.num_tasks)

    def blocking_task(self):
        # This function is not pickleable: Can't pickle local object 'Param.widget.<locals>.link'
        time.sleep(np.random.randint(1, 5))
        return 5

    def panel(self):
        return self.col


if __name__ == '__main__':
    tc = TestClass()
    pn.serve(tc.panel())
1 Like

@Jhsmit Excellent. Moving the blocking_function outside the class really helped.

Hi all,

I’m new to Panel and asynchronous tasks. I’m trying to load records from a database using the Django ORM within a param.Parametrized class. So far I failed, because these queries are not allowed
in an async context and all my attempts to solve it failed. I’ve tried to modify panel’s official example for Django apps, more details in this Stackoverflow question.

Is this conversation here directly related to my problem? You mention several solution approaches here and I tried one with ThreadPoolExecutor, but without success.
Do you have an idea what I should try next?

Probably indeed you need a ThreadPoolExecutor which runs you coroutines. If you use async def this creates a coroutine which you cannot run directly. I’m not too familiar with those sync to ascyn wrappers but I would try something like in my first post:

loop = IOLoop.current()
for i in range(num_tasks):
    future = executor.submit(self.blocking_task)
    loop.add_future(future, self.update)

What exactly did you try with the ThreadPoolExecutor?

1 Like