So, while it’s possible to set dask as the executor for both param & panel likes this:
ex = client.get_executor() # dask executor
param.parameterized.async_executor = ex.submit # can't serialize event obj due to link
pn.state._thread_pool = ex # can't serialize event obj due to link
It’s not really practical as panel/bokeh objects require (de-)serialization every time an event occurs, which probably takes more time to (de-)serialize than you get out in terms of app responsiveness.
However, one thing you can do is wrap the callback functions with dask, so that:
- you see the functions being executed in the dask dashboard and have some observability
- your callbacks get called from a separate thread via done_callback
- you can offload expensive parts of your callback function to dask, but not anything that changes the document state
So, here’s an example of button presses being wrapped with a dummy function that gets executed on dask, before the document gets modified via done_callback (spawns separate thread, may require add_next_tick_callback
lock):
Code:
from dask.distributed import Client, fire_and_forget, print
from tornado.ioloop import IOLoop
from bokeh.plotting import curdoc
from datetime import datetime
import functools as ft
import param, json
import panel as pn
pn.extension()
client = await Client(processes=False,asynchronous=True)
client
# ex = client.get_executor() # dask executor
# param.parameterized.async_executor = ex.submit # can't serialize event obj due to link
# pn.state._thread_pool = ex # can't serialize event obj due to link
# just helper function to see execution dask dashboard
def dummy_fn(*args,**kwargs):
return
def dask_wrapper(func):
@ft.wraps(func,)
def wrapper(*args,**kwargs):
# Submit fn to cluster for observability
f = client.submit(
kwargs.get('dask_fn'), # call dummy function or computationally intensive part of real function
str(args), # stringify args
# json.dumps(kwargs), # app is not JSON serializable
key=f'{func.__name__}--{datetime.utcnow()}', # key must be descriptive and unique (otherwise dask will cache)
retries=1, # one more try
priority=100, # higher priority takes precedence
)
# Add done-callback for actual function call from separate thread
f.add_done_callback(
ft.partial(func,self=kwargs.get('self'),event=args[1])
)
# fire_and_forget(f) # optional - don't let future fall out of scope
return wrapper
class App(param.Parameterized):
btn = param.Event()
date = param.String(f'{datetime.utcnow()}')
def __init__(self,**kwargs):
super().__init__(**kwargs)
# bokeh doc in case we need to make threadsafe writes with self.doc.add_next_tick_callback
self.doc = curdoc()
# watcher
self.watcher = self.param.watch(
ft.partial(self.fn,dask_fn=dummy_fn,self=self), # partial to pass additional args
'btn'
)
# this now gets called from separate dask callback thread - note that future is 1st arg, not self.
@dask_wrapper
def fn(future,self,event):
if future.status=='finished':
# do something with future
# result = future.result()
# Any document modifications should happen here. Possibly need to acquire lock with self.doc.add_next_tick_callback
self.date = f'{datetime.utcnow()}'
# future also could've err'd or cancelled, will still execute callback
else:
raise Exception(future.traceback())
app = App(name='dask observability test')
pn.serve(pn.Param(app))
So, by using this method, your button callback won’t be blocking (since nothing gets executed on the main event loop). Instead, you can either just use a dummy function for observability or offload all your expensive functions (that don’t modify the document) to dask, and finally modify your doc from dask’s separate callback thread (or call add_next_tick_callback
to be sure you have document lock).