Set dask as default executor for param?

I just learned that param supports async execution like in the example below:

import param, asyncio, aiohttp
from tornado.ioloop import IOLoop

param.parameterized.async_executor = IOLoop.current().add_callback

class Downloader(param.Parameterized):
    url = param.String()
    results = param.List()
    
    def __init__(self, **params):
        super().__init__(**params)
        self.param.watch(self.fetch, ['url'])

    async def fetch(self, event):
        async with aiohttp.ClientSession() as session:
            async with session.get(event.new) as response:
                img = await response.read()
                self.results.append((event.new, img))

f = Downloader()
n = 7
for index in range(n):
    f.url = f"https://picsum.photos/800/300?image={index}"

f.results # will update gradually

Is there a way to set dask (in async mode or with futures) as the default executor?

5 Likes

I love this async example.

1 Like

Came across this again.

Yes, you can:

# param.parameterized.async_executor = IOLoop.current().add_callback
param.parameterized.async_executor = pn.state.cache['distr_dask_client'].loop.add_callback

This will use dask’s ioloop to make any button presses asynchronous if you also make the functions async and watch them using e.g.:
self.param.watch(self.toggle_lock, 'locked', onlychanged=True, queued=True), where toggle_lock is:

async def toggle_lock(self,event):
    self.layout.loading = event.new

However, if the executor is not in the same thread, the UI won’t lock (function still gets executed though).

I think @philippjfr had a fix for this somewhere, but I can’t find it anymore.

1 Like

Hi @Material-Scientist

Could you elaborate on the example? I would like to understand if this then uses Dask as the execution engine?

Could you maybe share a minimum, reproducible example and explain how it works? Thanks.

So, while it’s possible to set dask as the executor for both param & panel likes this:

ex = client.get_executor() # dask executor
param.parameterized.async_executor = ex.submit # can't serialize event obj due to link
pn.state._thread_pool = ex # can't serialize event obj due to link

It’s not really practical as panel/bokeh objects require (de-)serialization every time an event occurs, which probably takes more time to (de-)serialize than you get out in terms of app responsiveness.

However, one thing you can do is wrap the callback functions with dask, so that:

  1. you see the functions being executed in the dask dashboard and have some observability
  2. your callbacks get called from a separate thread via done_callback
  3. you can offload expensive parts of your callback function to dask, but not anything that changes the document state

So, here’s an example of button presses being wrapped with a dummy function that gets executed on dask, before the document gets modified via done_callback (spawns separate thread, may require add_next_tick_callback lock):

Code:

from dask.distributed import Client, fire_and_forget, print
from tornado.ioloop import IOLoop
from bokeh.plotting import curdoc
from datetime import datetime
import functools as ft
import param, json
import panel as pn

pn.extension()

client = await Client(processes=False,asynchronous=True)
client

# ex = client.get_executor() # dask executor
# param.parameterized.async_executor = ex.submit # can't serialize event obj due to link
# pn.state._thread_pool = ex # can't serialize event obj due to link

# just helper function to see execution dask dashboard
def dummy_fn(*args,**kwargs):
    return

def dask_wrapper(func):
    @ft.wraps(func,)
    def wrapper(*args,**kwargs):
        # Submit fn to cluster for observability
        f = client.submit(
            kwargs.get('dask_fn'), # call dummy function or computationally intensive part of real function
            str(args), # stringify args
            # json.dumps(kwargs), # app is not JSON serializable
            key=f'{func.__name__}--{datetime.utcnow()}', # key must be descriptive and unique (otherwise dask will cache)
            retries=1, # one more try
            priority=100, # higher priority takes precedence
        )
        # Add done-callback for actual function call from separate thread
        f.add_done_callback(
            ft.partial(func,self=kwargs.get('self'),event=args[1])
        )
        # fire_and_forget(f) # optional - don't let future fall out of scope
    return wrapper

class App(param.Parameterized):
    btn = param.Event()
    date = param.String(f'{datetime.utcnow()}')
    def __init__(self,**kwargs):
        super().__init__(**kwargs)
        # bokeh doc in case we need to make threadsafe writes with self.doc.add_next_tick_callback
        self.doc = curdoc()
        # watcher
        self.watcher = self.param.watch(
            ft.partial(self.fn,dask_fn=dummy_fn,self=self), # partial to pass additional args
            'btn'
        )
    
    # this now gets called from separate dask callback thread - note that future is 1st arg, not self.
    @dask_wrapper
    def fn(future,self,event):
        if future.status=='finished':
            # do something with future
            # result = future.result()
            # Any document modifications should happen here. Possibly need to acquire lock with self.doc.add_next_tick_callback
            self.date = f'{datetime.utcnow()}'
        # future also could've err'd or cancelled, will still execute callback
        else:
            raise Exception(future.traceback())
            
app = App(name='dask observability test')

pn.serve(pn.Param(app))

So, by using this method, your button callback won’t be blocking (since nothing gets executed on the main event loop). Instead, you can either just use a dummy function for observability or offload all your expensive functions (that don’t modify the document) to dask, and finally modify your doc from dask’s separate callback thread (or call add_next_tick_callback to be sure you have document lock).

1 Like