Panel Background Cache Update

Hello,

I’m trying to develop a real time dashboard in panel which updates from data retrieved from the network. In order to run the updating threaded process I’m using --setup and session_context to start / stop the thread as the session is created / destroyed. The threaded process basically updates a dict in the panel cache at the rate at which data appears on the network (1000Hz).

This approach works and allows me to start the app and view real time data. Once the session is closed the thread will close (~30secs) and the CLI will exit to the prompt.

For a production env this method feels like the right approach, however for development this does not allow for quick panel esque development using --autoreload or --dev so I feel like I’m missing something.

The following two files are needed for the MRE.

setup.py

import sys
import threading
import time
import panel as pn
import numpy as np

running = True


def worker():
    """
    Worker function to read data from network object and update the data in the Panel state cache.

    Parameters:
    None

    Returns:
    None
    """

    while running:
        time.sleep(0.001)
        pn.state.cache["data"] = {"x": np.random.randn(), "y": np.random.randn()}


def session_created(session_context):
    """
    Function to handle the creation of a new session.

    Parameters:
    - session_context (object): The context of the session being created.

    Returns:
    None
    """
    print(f"Created a session running at the {session_context.request.uri} endpoint")
    thread = threading.Thread(target=worker)
    thread.start()


pn.state.on_session_created(session_created)


def session_destroyed(session_context):
    """
    Stop the object and exit the program when a session is destroyed.

    Parameters:
    - session_context (object): The context of the session that was destroyed.

    Returns:
    None
    """
    print(f"Closed a session running at the {session_context.request.uri} endpoint")
    running = False
    sys.exit()


pn.state.on_session_destroyed(session_destroyed)


app.py (this app basically uses @Marc’s question about streaming in the holoviews discourse as an example)

import pandas as pd
import panel as pn
import hvplot.pandas
import holoviews as hv
from holoviews.streams import Buffer
from asyncio import sleep


pn.extension()

data = pn.state.cache["data"]

dfstream = Buffer(pd.DataFrame(data, index=[pd.Timestamp.now()]), length=100, index=False)


def plot(data, window_seconds, alpha):
    data = data.rolling(f"{window_seconds}s").mean()
    return data.hvplot(y="y", ylim=(-1, 1), alpha=alpha, color="blue", line_width=5)


window_seconds = pn.widgets.IntSlider(value=5, start=1, end=10, name="Window (secs)")
alpha = pn.widgets.FloatSlider(value=1, start=0, end=1, name="Alpha")
iplot = hv.DynamicMap(
    plot,
    streams={
        "data": dfstream.param.data,
        "window_seconds": window_seconds.param.value,
        "alpha": alpha.param.value,
    },
)


pn.Column(iplot, window_seconds, alpha).servable()


async def run():
    while True:
        await sleep(0.1)
        data = pn.state.cache["data"]
        dfstream.send(pd.DataFrame(data, index=[pd.Timestamp.now()]))


pn.state.onload(run)

To start this MRE run

python -m panel serve .\app.py --setup .\setup.py

Is there a better way to starting a non blocking script to update the pn.state.cache in the background?
Ideally I would start the process on the command line, so I could leave it running in the background and not have to start it as part of the app startup process. Can I share the pn.state.cache across processes without having to read / write cache data to a file?

Note: For the real time display process I don’t need 1000Hz I just want the latest data in the cache, therefore allowing the app to update at its periodic callback or async refresh rate.

1 Like

Maybe Defer Long Running Tasks to Improve the User Experience — Panel v1.4.4

@ahuang11 Thanks for the response.

The code already used pn.state.onload to defer its loading process.

To test your suggestion, I added the following code to the async run function but it does not work as expected.

async def run():

    thread = threading.Thread(target=worker)
    thread.start()
    sleep(10)
    while True:
        await sleep(0.1)
        data = pn.state.cache["data"]
        dfstream.send(pd.DataFrame(data, index=[pd.Timestamp.now()]))


pn.state.onload(run)

This produces a keyerror cause the cache never fills.


>python -m panel serve .\app.py --setup .\setup.py --show
2024-07-03 11:16:43,099 Starting Bokeh server version 3.4.1 (running on Tornado 6.4.1)
2024-07-03 11:16:43,099 User authentication hooks NOT provided (default user enabled)
2024-07-03 11:16:43,108 Bokeh app running at: http://localhost:5006/app
2024-07-03 11:16:43,108 Starting Bokeh server with process id: 28080
...
    data = pn.state.cache["data"]
           ~~~~~~~~~~~~~~^^^^^^^^
KeyError: 'data

For my own real time display I have persisted with --setup and have now implemented what I need for version 0.1.0 which took longer than expected due to this specific problem.

Next I’m thinking of testing Dask and its fire and forget function and will report back any success.