jyojena
September 4, 2022, 4:03pm
1
Hi all,
I have this sample app here which I plan on deploying using the panel cli. Now, while testing it I have issues with callbacks that continue running even with a user session being closed. The app below can be opened in mutiple browser windows and streams a lot of data in near real-time - initializes with a snapshot and updates with incremental data using a callback. Callbacks should ideally be linked to a user session, and should stop when browser window closes. Is there any way this can be controlled? I’m sure there’s something wrong in the way I’m deploying this app - any pointers will be super helpful.
class App:
def __init__(self):
self.dr = DataRetriever()
self.refresh_ms = 1000
def get_snapshot(self):
return self.dr.get_snapshot()
def updates_app(self):
pn.state.on_session_destroyed(self.stop_callback)
template = pn.template.FastGridTemplate(
site="Mysite", title="Title",
theme_toggle=True,
main_layout='card',
theme='dark'
)
component1 = self.get_component_1()
template.main[:2, :6] = component1
return template
def get_component_1(self):
snapshot = self.get_snapshot()
component = # some component to be rendered
component_col = pn.Column(
pn.Row(component)
)
def update_component():
update = self.dr.get_update()
## update component with callback
self.cb = pn.state.add_periodic_callback(update_component, period=self.refresh_ms)
return component_col
def stop_callback(self, session_context):
self.cb.stop()
App().updates_app().servable()
Best
JJ
1 Like
Marc
September 4, 2022, 5:02pm
2
Hi @jyojena
Could you Update the example to a minmum, reproducible example? This issue might come from the code that is not includes.
1 Like
jyojena
September 4, 2022, 6:10pm
3
Hi @Marc ,
Thanks for your response I’ve coded up this simple example for you
server.py
import asyncio
import struct
import sys
from zmq.asyncio import Context
import zmq
import numpy as np
from datetime import datetime
def fmtd_time():
return datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')
class DataFetch:
def __init__(self):
ctx = zmq.Context()
self.req = ctx.socket(zmq.DEALER)
self.req.connect('tcp://localhost:8050')
def get_snapshot(self):
self.req.send_string('snapshot')
msg = self.req.recv()
return round(struct.unpack('!f', msg)[0], 4)
def get_update(self):
self.req.send_string('update')
msg = self.req.recv()
return round(struct.unpack('!f', msg)[0], 4)
class Server:
async def listen(self):
ctx = Context()
sock = ctx.socket(zmq.ROUTER)
sock.bind('tcp://*:8050')
price = 50.
while True:
try:
msg = await sock.recv_multipart()
id = msg[0]
instr = msg[1].decode()
if instr == 'snapshot':
print(f'{fmtd_time()} Received snapshot request from {id}')
await sock.send_multipart([id], zmq.SNDMORE)
await sock.send_multipart([struct.pack('!f', price)])
if instr == 'update':
print(f'{fmtd_time()} Received update request from {id}')
price = np.exp(np.random.normal(loc=0, scale=np.sqrt(0.3 / 252))) * price
await sock.send_multipart([id], zmq.SNDMORE)
await sock.send_multipart([struct.pack('!f', price)])
except(KeyboardInterrupt, zmq.ZMQError):
print('Interrupted')
break
if __name__ == '__main__':
server = Server()
if sys.platform == 'win32':
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
asyncio.get_event_loop().run_until_complete(asyncio.gather(server.listen()))
panelUi.py
from streamz import Stream
from server import DataFetch
import panel as pn
def get_markdown(px):
return f'''
<div class="bk" style="font-size: 25px; line-height: 1.3; height: 50%; width: 100%;">
<div class="bk" style="font-size: 0.8em; word-wrap: break-word;">Price</div>
<div class="bk" style="font-size: 1.2em;">{px: .4f}</div>
</div>
'''
class App:
def __init__(self):
self.df = DataFetch()
self.cb = []
def stop_callbacks(self, session_context):
print('Stopping callbacks')
[c.stop() for c in self.cb]
def app(self):
pn.state.on_session_destroyed(self.stop_callbacks)
template = pn.template.FastGridTemplate(
site="", title="MyApp",
theme_toggle=True,
main_layout='card',
theme='dark'
)
component = self.get_component()
template.main[:1, :1] = component
return template
def get_snapshot(self):
return self.df.get_snapshot()
def get_component(self):
snapshot = self.get_snapshot()
source = Stream()
px_stream = pn.pane.Streamz(source.map(get_markdown), always_watch=True)
source.emit(snapshot)
component = pn.Column(
pn.Row(px_stream)
)
def stream_price():
print('Streaming updates')
source.emit(self.df.get_update())
cb = pn.state.add_periodic_callback(stream_price, period=1000)
self.cb.append(cb)
return component
App().app().servable()
I tried running this just now and saw that the callbacks were stopping, so, perhaps an issue at my end. But I’ll keep this code here just for reference as I have not seen much documentation for the on_session_destroyed method. If you could spare a moment to let me know if there are issues in which i’m rendering components it’ll be much appreciated, as I’ve just started using this framework and really like it thus far.
Thanks
JJ
1 Like