Panel - Starting a Stream of Data

Motivated by a few posts by @fishing

I would like to provide some examples on how to start streams in Panel.

  1. Simple “Streaming” using meta_refresh
  2. Streaming using pn.state.add_periodic_callback
  3. Streaming from a separate thread.
1 Like

Automatic Reload using meta_refresh

This is not really streaming but a quick and dirty way to have the browser reload the page periodically.

You can achieve this using the Fast Templates meta_refresh argument. See FastListTemplate.

For example if you set the meta_refresh argument to '30' then the app will refresh automatically every 30 seconds.

This will work better the less updates you have, the less interactive your apps is, the less users you have and the less compute costly your “refresh” is.

You can serve the code below via panel serve name_of_script.py.

import datetime

import panel as pn

pn.extension(sizing_mode="stretch_both")

ACCENT_BASE_COLOR = "#DAA520"

text = f"""# Last Update: {datetime.datetime.utcnow().strftime('%H:%M:%S')}"""
streaming_component = pn.pane.Markdown(text)

panel = pn.Row(streaming_component, height=90)

template = pn.template.FastListTemplate(
    site="Awesome Panel",
    title="Streaming using meta_refresh",
    logo="https://panel.holoviz.org/_static/logo_stacked.png",
    header_background=ACCENT_BASE_COLOR,
    accent_base_color=ACCENT_BASE_COLOR,
    meta_refresh="3",
    main=[panel],
).servable()

Streaming using a PeriodicCallback

If you need better performance or user experience than meta_refresh can provide you can look into adding a PeriodicCallback using pn.state.add_periodic_callback.

There is also a nice gallery example here which is running live here.

The updated example looks like

and the code is

import datetime

import panel as pn

pn.extension(sizing_mode="stretch_both")

ACCENT_BASE_COLOR = "#DAA520"

streaming_component = pn.pane.Markdown()

panel = pn.Row(streaming_component, height=90)

def update():
    text = f"""# Last Update: {datetime.datetime.utcnow().strftime('%H:%M:%S.%f')}"""
    print(text)
    streaming_component.object = text

pn.state.add_periodic_callback(callback=update, period=500)

template = pn.template.FastListTemplate(
    site="Awesome Panel",
    title="Streaming using meta_refresh",
    logo="https://panel.holoviz.org/_static/logo_stacked.png",
    header_background=ACCENT_BASE_COLOR,
    accent_base_color=ACCENT_BASE_COLOR,
    main=[panel],
).servable()

Sharing the PeriodicCallback across users and session.

UPDATE: I’ve later learned that the callback added using pn.state.add_periodic_callback is not global and will stop when the browser tab is closed. A PR is on the way to add this functionality. #2661. In the mean time you can use threading instead as shown in the posts below. Alternatively you can use tornados add_timeout, call_at or call_later


If you have many users it might not be feasible to run it individually for all your users. Instead you should run it once for your entire application.

You can use pn.state.cache (a globally shared dictionary) to share the PeriodicCallback and the streaming_component across users.

You can serve the below code via panel serve name_of_script.py (dont use --autoreload flag).

import datetime

import panel as pn

pn.extension(sizing_mode="stretch_both")

ACCENT_BASE_COLOR = "#DAA520"

if not "streaming_component" in pn.state.cache:
    streaming_component = pn.pane.Markdown("hello")
    def update():
        text = f"""# Last Update: {datetime.datetime.utcnow().strftime('%H:%M:%S.%f')}"""
        print(text)
        streaming_component.object = text
    pn.state.cache["streaming_component"]=streaming_component
    periodic_callback = pn.state.add_periodic_callback(callback=update, period=500)

streaming_component=pn.state.cache["streaming_component"]


panel = pn.Row(streaming_component, height=90)

template = pn.template.FastListTemplate(
    site="Awesome Panel",
    title="add_periodic_callback",
    logo="https://panel.holoviz.org/_static/logo_stacked.png",
    header_background=ACCENT_BASE_COLOR,
    accent_base_color=ACCENT_BASE_COLOR,
    main=[panel],
).servable()
1 Like

Streaming from a Separate Thread

If your periodic callback is long running it might freeze your application. Then its better to look into Async and Concurrency.

Here we will provide a small example using threading.

Note how the application keeps responsive even though a python script is continuously running in separate thread.

You can serve the app via panel serve name_of_script.py.

import datetime
import threading
import panel as pn
import time

pn.extension(sizing_mode="stretch_both")

ACCENT_BASE_COLOR = "#DAA520"



if not "streaming_component" in pn.state.cache:
    streaming_component = pn.pane.Markdown("hello")
    pn.state.cache["streaming_component"]=streaming_component

    def update():
        while True:
            text = f"""# Last Update: {datetime.datetime.utcnow().strftime('%H:%M:%S.%f')}"""
            print(text)
            streaming_component.object = text
            time.sleep(0.5)

    thread=threading.Thread(target=update)
    thread.daemon = True
    thread.start()

streaming_component=pn.state.cache["streaming_component"]
slider = pn.widgets.IntSlider(value=0, start=0, end=10, margin=25)
panel = pn.Column(streaming_component, slider, slider.param.value, height=225)

template = pn.template.FastListTemplate(
    site="Awesome Panel",
    title="add_periodic_callback",
    logo="https://panel.holoviz.org/_static/logo_stacked.png",
    header_background=ACCENT_BASE_COLOR,
    accent_base_color=ACCENT_BASE_COLOR,
    main=[panel],
).servable()
3 Likes

Personalization

If you want to personalize what is shown to the user, for example based on inputs from sliders etc, that is also possible.

To enable personalization you can update some shared parameter value from another thread. Then each user session can bind to or depend on the updates to the shared parameter value.

import datetime
import threading
import panel as pn
import time
import param

pn.extension(sizing_mode="stretch_both")

ACCENT_BASE_COLOR = "#DAA520"

class App(param.Parameterized):
    utcnow = param.Date()

    def update_utcnow(self):
        self.utcnow = datetime.datetime.utcnow()


if not "app" in pn.state.cache:
    app = App()
    pn.state.cache["app"]=app
    def update():
        while True:
            app.utcnow = datetime.datetime.utcnow()
            print("update app.utcnow: ", app.utcnow)
            time.sleep(0.5)

    thread=threading.Thread(target=update)
    thread.daemon = True
    thread.start()

app = pn.state.cache["app"]
streaming_component = pn.pane.Markdown("hello")
slider = pn.widgets.IntSlider(value=0, start=0, end=10, margin=25)

@param.depends(utcnow=app.param.utcnow, slider_value=slider.param.value, watch=True)
def update_streaming_component(utcnow, slider_value):
    print("update streaming_component", streaming_component.name)
    utcnow += datetime.timedelta(seconds=slider_value*60*60)
    streaming_component.object = f"""# Last Update: {utcnow.strftime('%H:%M:%S.%f')}"""

panel = pn.Column(streaming_component, slider, slider.param.value, height=225)

template = pn.template.FastListTemplate(
    site="Awesome Panel",
    title="Streaming from Another Thread",
    logo="https://panel.holoviz.org/_static/logo_stacked.png",
    header_background=ACCENT_BASE_COLOR,
    accent_base_color=ACCENT_BASE_COLOR,
    main=[panel],
).servable()

Panel and the HoloViz ecosystem provides a lot of features and tools for streaming because there are so many different use cases and requirements. For more check out.

This post inspired me to use a shared data source. But I also wanted to allow personalization (individual widget states, and hence different data).

So, I tried the following:

The way it works is that the parameterized app only performs initial loads of data that is not already in the cache.

The cache then periodically updates that data (using threading, as periodic callbacks die after the session is closed like you pointed out) and puts it in a ColumnDataSource (CDS) that is still inside the cache.

The app then removes & re-creates an on_change callback to that CDS every time the CDS changes and/or the app’s widgets state changes (new pair or window selected). It then takes the data from the cache’s CDS and copies it to its internal CDS (I first tried keeping all CDSs in cache and just swapping the plot’s CDS on every widget state change, but the plot didn’t reflect the changes).

The result is that it can bootstrap a data state from disk, and continuously update it in realtime from within the app. Ideally, the CDS would trigger a callback on all users who have subscribed to it. And while the CDS does actually trigger callbacks on each of the connected sessions, only the first session’s chart actually updates:

I’d ideally like to do it this way, but if it can’t be done, I could also create a periodic callback within the app to query the cache for new data.

Unfortunately, I can’t share the exact code as I plan to use this for a product, but the above graphic should provide a high-level overview of how it works.

1 Like

Looks amazing. Is the “1D”, “1W”, “1M” etc buttons something you have created or something that Bokeh provides?

1 Like

Thanks - that’s just a RadioButtonGroup widget:

window = pn.widgets.RadioButtonGroup(name = 'Window', options = ["1D", "1W", "1M","6M", "1Y", "3Y"], value = '1W', height = 30)

Based on which window is selected, it’ll dynamically query & downsample the data on the fly (1TB+):

1 Like

Ah, nevermind! I figured it out :smiley:

As you can see, both sessions are updated at the exact same time since they are subscribed to the same CDS that resides in the panel cache.

The trick was to do a next_tick_callback on the session’s own document, rather than the one stored in the cache that is attached to the CDS (for whatever reason):

self.doc.add_next_tick_callback(partial(self.patch,to_be_updated=to_be_updated))

1 Like

I got this solution(only for the record) with a parameterized dict , I think it can serve to bring data from a database too

cds

import panel as pn, param, numpy as np
from bokeh.plotting import ColumnDataSource, figure
from threading import Thread 
from time import sleep
from functools import partial 

class globalDict(param.Parameterized):
    gd = param.Dict({})

global_dict = globalDict(gd=dict(a=np.arange(100), b= np.random.random((100))))

def thread_function(global_dict):
    while True:
        sleep(0.25)
        global_dict.gd = dict(a=np.arange(100), b= np.random.random((100)))

th = Thread(target=thread_function, args=(global_dict,))
th.daemon = True
th.start()

if not "dict" in pn.state.cache:
    pn.state.cache["dict"] = global_dict

def app():
    dict_inner = pn.state.cache['dict']
    cds = ColumnDataSource(dict_inner.gd)
    doc = pn.state.curdoc

    p = figure()
    p.circle(x='a', y='b', source=cds)
    
    def update(cds, gd):
        cds.data = gd


    @pn.depends(dict_inner.param.gd, watch=True)
    def scope_function(gd1):
        # here the dict gd1 can be filtered according to the particular user  
        doc.add_next_tick_callback(partial(update,cds=cds,gd=gd1))
 
    return pn.Row(p)


pn.Row(app).servable()
2 Likes

Ah, that’s a nice way too! :ok_hand:

I think so, but sometimes appears extrange errors like np is not defined or partial is not defined.

I will try it a little more to understand what it is happening

edit: it only happens in my notebook, it should be related to the corrupted conda installation. In the office it works smooth

Do you know how to dynamically change what to watch?

I.e. switching the obj to watch based on some widget state

@param.depends(f"pn.state.cache['data']['{obj_to_watch}'].{property_to_watch}", watch=True)

@Marc

Edit:
It works if you do something like this inside your app class:

CDS = pn.state.cache['CDS'][f'{ticker.value.upper()}@{window.value}']

@param.depends('CDS.OF_norm_CDS', watch=True)
def scope_fn(self):
    if f'{self.ticker.value.upper()}@{self.window.value}' in pn.state.cache['CDS'].keys():
        self.doc.add_next_tick_callback(partial(self.update,to_be_updated=['OF_norm']))

Thanks, @nghenzi2019 - yours is the more elegant & performant solution! :slight_smile:

1 Like