[BUG] background thread stops when session is destroyed, leading to weird errors

I’m trying to set a background process that will continuously gather new data and put it in the panel cache. It then uses Dask futures to compute the results in a non-blocking fashion.

That works.

However, once the session is closed (browser tab closed or connection loss), I get strange errors like:
NameError: name 'copy' is not defined
or
NameError: name 'print' is not defined.

Minimal reproducible example:

from bokeh.plotting import figure, curdoc
from dask.distributed import Client
import time, threading, copy, param
import panel as pn
import holoviews as hv

pn.extension()
hv.extension('bokeh')

def dask_fn(entry,counter):
    print('Doing some heavy work with dask')
    time.sleep(3)
    return entry
    
def dask_callback(future):
    if future.status == 'finished':
        entry = future.result()
        print('Future finished successfully')
        del pn.state.cache['futures'][entry]
        print(f'Deleted future for {entry} from cache')
        
    # If future fails, it will automatically retry
    else:
        print('future failed after 10 retries')   


def BG_fn():
    counter = 0
    while True:
        counter += 1
        time.sleep(0.5)
        tickers = copy.deepcopy(list(pn.state.cache['data'].keys()))
        for entry in tickers:
            futures = copy.deepcopy(list(pn.state.cache['futures'].keys()))
            if not entry in futures:
                future = pn.state.cache['dask_client'].submit(dask_fn,entry,counter,retries=10)
                pn.state.cache['futures'][entry] = future
                future.add_done_callback(dask_callback)

# Data cache
if not 'data' in pn.state.cache:
    # Init doc & client in cache
    pn.state.cache['doc'] = curdoc()
    pn.state.cache['dask_client'] = Client(processes=False)
    
    # Init caches
    pn.state.cache['data'] = {}
    pn.state.cache['futures'] = {}
    
    # Init sample data
    pn.state.cache['data']['test'] = 'lalala'
    
    # Start periodic callback as thread
    thread = threading.Thread(target=BG_fn,args={})
    thread.daemon = True
    thread.start()
    
    pn.state.cache['BG_process'] = thread

class App(param.Parameterized):   
    def __init__(self, **params):
        super().__init__(**params)
        # Init important vars
        self.client = pn.state.cache['dask_client']
        self.doc = curdoc()
        self.plot = pn.pane.HoloViews(self.hv_plot)
    
    @property
    def hv_plot(self): 
        return hv.Curve([1,2,3])
        
app = App(name='')

app.plot.servable()

Console output:

2021-10-19 17:49:35,776 Starting Bokeh server version 2.4.1 (running on Tornado 6.1)
2021-10-19 17:49:35,778 User authentication hooks NOT provided (default user enabled)
2021-10-19 17:49:35,780 Bokeh app running at: http://localhost:5008/bug
2021-10-19 17:49:35,780 Starting Bokeh server with process id: 1638246
Doing some heavy work with dask
Future finished successfully
Deleted future for test from cache
Doing some heavy work with dask
Future finished successfully
Deleted future for test from cache
Doing some heavy work with dask
Future finished successfully
Deleted future for test from cache
Doing some heavy work with dask
Future finished successfully
Deleted future for test from cache
Doing some heavy work with dask
Future finished successfully
Deleted future for test from cache
Doing some heavy work with dask
Future finished successfully
Deleted future for test from cache
Doing some heavy work with dask
Future finished successfully
Deleted future for test from cache
Doing some heavy work with dask
Future finished successfully
Deleted future for test from cache
Doing some heavy work with dask
distributed.client - ERROR - Error in callback <function dask_callback at 0x7f5b1a505a60> of <Future: finished, type: str, key: dask_fn-91da05f4aae28bbb3bb1232c7fef7ff3>:
Traceback (most recent call last):
File “/root/anaconda3/lib/python3.8/site-packages/distributed/client.py”, line 298, in execute_callback
fn(fut)
File “/root/mnt3/02_Data_Serving/Development/bug.ipynb”, line 24, in dask_callback
“def dask_callback(future):\n”,
NameError: name ‘print’ is not defined
Exception in thread Thread-1:
Traceback (most recent call last):
File “/root/anaconda3/lib/python3.8/threading.py”, line 932, in _bootstrap_inner
self.run()
File “/root/anaconda3/lib/python3.8/threading.py”, line 870, in run
self._target(*self._args, **self._kwargs)
File “/root/mnt3/02_Data_Serving/Development/bug.ipynb”, line 38, in BG_fn
" while True:\n",
NameError: name ‘copy’ is not defined

The threading is inspired by @Marc but using Dask futures, rather than async (more powerful, I think).

Any suggestions on how to resolve this?

Best,
Fred

As I commented in another post, I see similar errors in my notebook with anaconda, but in the office with miniconda works ok, always in windows.

1 Like

Hm, but this is on a freshly set up Ubuntu server

I’ve also seen this, the problem is likely that the dynamically created module for the session is cleaned up which probably clears out all globals even if the periodic function somehow persists. Will think about that in the context of this PR which is meant to allow global (i.e. non-session dependent) periodic callbacks.

1 Like

Awesome, thank you for that!

Edit:
Bryan over at the Bokeh discourse pointed me towards using lifecycle hooks.

So, I’ve moved all the initialization code and callback functions to the app_hooks.py file, and caught the exception inside the parameterized app upon callbacks from the param object in the cache.

What used to happen when I’d refresh the page was that all sessions would freeze due to modules being unloaded during callbacks. Now, whenever I refresh the page to destroy a session, all other sessions continue to function normally.

related problem and solution can be found here

1 Like