I’m trying to set a background process that will continuously gather new data and put it in the panel cache. It then uses Dask futures to compute the results in a non-blocking fashion.
That works.
However, once the session is closed (browser tab closed or connection loss), I get strange errors like:
NameError: name 'copy' is not defined
or
NameError: name 'print' is not defined
.
Minimal reproducible example:
from bokeh.plotting import figure, curdoc
from dask.distributed import Client
import time, threading, copy, param
import panel as pn
import holoviews as hv
pn.extension()
hv.extension('bokeh')
def dask_fn(entry,counter):
print('Doing some heavy work with dask')
time.sleep(3)
return entry
def dask_callback(future):
if future.status == 'finished':
entry = future.result()
print('Future finished successfully')
del pn.state.cache['futures'][entry]
print(f'Deleted future for {entry} from cache')
# If future fails, it will automatically retry
else:
print('future failed after 10 retries')
def BG_fn():
counter = 0
while True:
counter += 1
time.sleep(0.5)
tickers = copy.deepcopy(list(pn.state.cache['data'].keys()))
for entry in tickers:
futures = copy.deepcopy(list(pn.state.cache['futures'].keys()))
if not entry in futures:
future = pn.state.cache['dask_client'].submit(dask_fn,entry,counter,retries=10)
pn.state.cache['futures'][entry] = future
future.add_done_callback(dask_callback)
# Data cache
if not 'data' in pn.state.cache:
# Init doc & client in cache
pn.state.cache['doc'] = curdoc()
pn.state.cache['dask_client'] = Client(processes=False)
# Init caches
pn.state.cache['data'] = {}
pn.state.cache['futures'] = {}
# Init sample data
pn.state.cache['data']['test'] = 'lalala'
# Start periodic callback as thread
thread = threading.Thread(target=BG_fn,args={})
thread.daemon = True
thread.start()
pn.state.cache['BG_process'] = thread
class App(param.Parameterized):
def __init__(self, **params):
super().__init__(**params)
# Init important vars
self.client = pn.state.cache['dask_client']
self.doc = curdoc()
self.plot = pn.pane.HoloViews(self.hv_plot)
@property
def hv_plot(self):
return hv.Curve([1,2,3])
app = App(name='')
app.plot.servable()
Console output:
2021-10-19 17:49:35,776 Starting Bokeh server version 2.4.1 (running on Tornado 6.1)
2021-10-19 17:49:35,778 User authentication hooks NOT provided (default user enabled)
2021-10-19 17:49:35,780 Bokeh app running at: http://localhost:5008/bug
2021-10-19 17:49:35,780 Starting Bokeh server with process id: 1638246
Doing some heavy work with dask
Future finished successfully
Deleted future for test from cache
Doing some heavy work with dask
Future finished successfully
Deleted future for test from cache
Doing some heavy work with dask
Future finished successfully
Deleted future for test from cache
Doing some heavy work with dask
Future finished successfully
Deleted future for test from cache
Doing some heavy work with dask
Future finished successfully
Deleted future for test from cache
Doing some heavy work with dask
Future finished successfully
Deleted future for test from cache
Doing some heavy work with dask
Future finished successfully
Deleted future for test from cache
Doing some heavy work with dask
Future finished successfully
Deleted future for test from cache
Doing some heavy work with dask
distributed.client - ERROR - Error in callback <function dask_callback at 0x7f5b1a505a60> of <Future: finished, type: str, key: dask_fn-91da05f4aae28bbb3bb1232c7fef7ff3>:
Traceback (most recent call last):
File “/root/anaconda3/lib/python3.8/site-packages/distributed/client.py”, line 298, in execute_callback
fn(fut)
File “/root/mnt3/02_Data_Serving/Development/bug.ipynb”, line 24, in dask_callback
“def dask_callback(future):\n”,
NameError: name ‘print’ is not defined
Exception in thread Thread-1:
Traceback (most recent call last):
File “/root/anaconda3/lib/python3.8/threading.py”, line 932, in _bootstrap_inner
self.run()
File “/root/anaconda3/lib/python3.8/threading.py”, line 870, in run
self._target(*self._args, **self._kwargs)
File “/root/mnt3/02_Data_Serving/Development/bug.ipynb”, line 38, in BG_fn
" while True:\n",
NameError: name ‘copy’ is not defined
The threading is inspired by @Marc but using Dask futures, rather than async (more powerful, I think).
Any suggestions on how to resolve this?
Best,
Fred