Stopping callbacks for user session

Hi all,

I have this sample app here which I plan on deploying using the panel cli. Now, while testing it I have issues with callbacks that continue running even with a user session being closed. The app below can be opened in mutiple browser windows and streams a lot of data in near real-time - initializes with a snapshot and updates with incremental data using a callback. Callbacks should ideally be linked to a user session, and should stop when browser window closes. Is there any way this can be controlled? I’m sure there’s something wrong in the way I’m deploying this app - any pointers will be super helpful.

    class App:
        def __init__(self):
            self.dr = DataRetriever()
            self.refresh_ms = 1000
    
        def get_snapshot(self):
            return self.dr.get_snapshot()
    
        def updates_app(self):
            pn.state.on_session_destroyed(self.stop_callback)
            template = pn.template.FastGridTemplate(
                site="Mysite", title="Title",
                theme_toggle=True,
                main_layout='card',
                theme='dark'
            )
            component1 = self.get_component_1()
            template.main[:2, :6] = component1
            return template
    
        def get_component_1(self):
            snapshot = self.get_snapshot()
            component = # some component to be rendered
            component_col = pn.Column(
                pn.Row(component)
            )
    
            def update_component():
                update = self.dr.get_update()
                ## update component with callback
            self.cb = pn.state.add_periodic_callback(update_component, period=self.refresh_ms)
            return component_col 
    
        def stop_callback(self, session_context):
            self.cb.stop()
    App().updates_app().servable()

Best
JJ

1 Like

Hi @jyojena

Could you Update the example to a minmum, reproducible example? This issue might come from the code that is not includes.

1 Like

Hi @Marc,

Thanks for your response I’ve coded up this simple example for you

server.py

import asyncio
import struct
import sys

from zmq.asyncio import Context
import zmq
import numpy as np
from datetime import datetime


def fmtd_time():
    return datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')


class DataFetch:
    def __init__(self):
        ctx = zmq.Context()
        self.req = ctx.socket(zmq.DEALER)
        self.req.connect('tcp://localhost:8050')

    def get_snapshot(self):
        self.req.send_string('snapshot')
        msg = self.req.recv()
        return round(struct.unpack('!f', msg)[0], 4)

    def get_update(self):
        self.req.send_string('update')
        msg = self.req.recv()
        return round(struct.unpack('!f', msg)[0], 4)


class Server:
    async def listen(self):
        ctx = Context()
        sock = ctx.socket(zmq.ROUTER)
        sock.bind('tcp://*:8050')
        price = 50.
        while True:
            try:
                msg = await sock.recv_multipart()
                id = msg[0]
                instr = msg[1].decode()
                if instr == 'snapshot':
                    print(f'{fmtd_time()} Received snapshot request from {id}')
                    await sock.send_multipart([id], zmq.SNDMORE)
                    await sock.send_multipart([struct.pack('!f', price)])
                if instr == 'update':
                    print(f'{fmtd_time()} Received update request from {id}')
                    price = np.exp(np.random.normal(loc=0, scale=np.sqrt(0.3 / 252))) * price
                    await sock.send_multipart([id], zmq.SNDMORE)
                    await sock.send_multipart([struct.pack('!f', price)])
            except(KeyboardInterrupt, zmq.ZMQError):
                print('Interrupted')
                break


if __name__ == '__main__':
    server = Server()
    if sys.platform == 'win32':
        asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
    asyncio.get_event_loop().run_until_complete(asyncio.gather(server.listen()))

panelUi.py

from streamz import Stream

from server import DataFetch
import panel as pn


def get_markdown(px):
    return f'''
    <div class="bk" style="font-size: 25px; line-height: 1.3; height: 50%; width: 100%;">
        <div class="bk" style="font-size: 0.8em; word-wrap: break-word;">Price</div>
        <div class="bk" style="font-size: 1.2em;">{px: .4f}</div>
    </div>
    '''


class App:
    def __init__(self):
        self.df = DataFetch()
        self.cb = []

    def stop_callbacks(self, session_context):
        print('Stopping callbacks')
        [c.stop() for c in self.cb]

    def app(self):
        pn.state.on_session_destroyed(self.stop_callbacks)
        template = pn.template.FastGridTemplate(
            site="", title="MyApp",
            theme_toggle=True,
            main_layout='card',
            theme='dark'
        )
        component = self.get_component()
        template.main[:1, :1] = component
        return template

    def get_snapshot(self):
        return self.df.get_snapshot()

    def get_component(self):
        snapshot = self.get_snapshot()
        source = Stream()
        px_stream = pn.pane.Streamz(source.map(get_markdown), always_watch=True)
        source.emit(snapshot)
        component = pn.Column(
            pn.Row(px_stream)
        )

        def stream_price():
            print('Streaming updates')
            source.emit(self.df.get_update())

        cb = pn.state.add_periodic_callback(stream_price, period=1000)
        self.cb.append(cb)
        return component


App().app().servable()

I tried running this just now and saw that the callbacks were stopping, so, perhaps an issue at my end. But I’ll keep this code here just for reference as I have not seen much documentation for the on_session_destroyed method. If you could spare a moment to let me know if there are issues in which i’m rendering components it’ll be much appreciated, as I’ve just started using this framework and really like it thus far.

Thanks
JJ

1 Like