True streaming - use async generators?

Hello,

I’ve only been using Panel for about a week, and am loving it! One of the things I’ve been struggling with is, trying to stream various events and update the UI. I wanted to have more control and do it sort of old school.

I did many iterations on a testbed for logger dumping to a Terminal to see if I could get true streaming. I couldn’t find a good example in the docs. But thanks to Coderambling on Discord, I found this in the docs:

(Asynchronous) generators functions can be used as a reference, to drive streaming

So, I decided to revisit my logger attempts and make a true async version of it.

Here it is, in case anyone finds it useful:

import panel as pn
import asyncio
import random
from datetime import datetime

pn.extension('terminal')

def generate_colored_log(event_type):
    """Generate a log message with colored ANSI output."""
    def rgb(r, g, b, text):
        return f"\033[38;2;{r};{g};{b}m{text}\033[0m"

    event_colors = {
        "INFO": (50, 205, 50),
        "WARNING": (255, 255, 0),
        "ERROR": (255, 69, 0),
        "DEBUG": (70, 130, 180),
        "CRITICAL": (255, 0, 255),
    }
    color = event_colors.get(event_type, (255, 255, 255))
    timestamp = rgb(169, 169, 169, f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}]")
    event_type_colored = rgb(*color, f"[{event_type}]")
    message = rgb(255, 255, 255, "Simulated event occurred.")
    return f"{timestamp} {event_type_colored} {message}"

async def log_stream():
    """
    An asynchronous generator that continuously yields log messages.
    The sleep delay simulates the small delay between events.
    """
    while True:
        event_type = random.choice(["INFO", "WARNING", "ERROR", "DEBUG", "CRITICAL"])
        log_message = generate_colored_log(event_type)
        yield log_message
        await asyncio.sleep(random.uniform(0.0, 0.1))

class RealTimeLogViewer:
    def __init__(self):
        # Create a Terminal widget that will display the logs.
        self.terminal = pn.widgets.Terminal(sizing_mode="stretch_both")
        
        # Create a Stop button to cancel the log stream.
        self.stop_button = pn.widgets.Button(name="Stop Logs", button_type="danger")
        self.stop_button.on_click(self.stop_logs)
        
        # Create a layout with full stretch.
        self.layout = pn.Column(self.terminal, self.stop_button, sizing_mode="stretch_both")
        
        # Flag to control the log streaming.
        self._running = True
        
        # Schedule the asynchronous log update.
        loop = asyncio.get_event_loop()
        self.update_task = loop.create_task(self.update_terminal())

    async def update_terminal(self):
        """Continuously await new log messages and write them to the terminal."""
        try:
            async for log in log_stream():
                if not self._running:
                    break
                # Write the new log to the terminal.
                self.terminal.write(log + "\n")
                # Optionally yield control.
                await asyncio.sleep(0)
        except asyncio.CancelledError:
            pass

    def stop_logs(self, event=None):
        """Callback to stop the log stream."""
        self._running = False
        if not self.update_task.done():
            self.update_task.cancel()
        self.terminal.write("\033[90m\nLog subscription stopped.\033[0m\n")

    def get_layout(self):
        return self.layout

# Instantiate the viewer and open it in a browser via .show().
viewer = RealTimeLogViewer()
viewer.get_layout().show()

Cheers!

Here it is on PY CAFE:

One thing I noticed is that it starts cycling in the generator regardless of whether anyone visited the page or not… So I made another Session aware version. Purposefully show=False, and a print to the console right before the yield, so you can see it for yourself.

At first I thought it wasn’t working since closing the page kept printing, but it timed out eventually and stopped.

import panel as pn
import asyncio
import random
from datetime import datetime

pn.extension('terminal')

def generate_colored_log(event_type):
    """Generate a log message with colored ANSI output."""
    def rgb(r, g, b, text):
        return f"\033[38;2;{r};{g};{b}m{text}\033[0m"

    event_colors = {
        "INFO": (50, 205, 50),
        "WARNING": (255, 255, 0),
        "ERROR": (255, 69, 0),
        "DEBUG": (70, 130, 180),
        "CRITICAL": (255, 0, 255),
    }
    color = event_colors.get(event_type, (255, 255, 255))
    timestamp = rgb(169, 169, 169, f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}]")
    event_type_colored = rgb(*color, f"[{event_type}]")
    message = rgb(255, 255, 255, "Simulated event occurred.")
    return f"{timestamp} {event_type_colored} {message}"

async def log_stream():
    """
    An asynchronous generator that continuously yields log messages.
    The sleep delay simulates the small delay between events.
    """
    while True:
        event_type = random.choice(["INFO", "WARNING", "ERROR", "DEBUG", "CRITICAL"])
        log_message = generate_colored_log(event_type)
        yield log_message
        print(log_message)
        await asyncio.sleep(random.uniform(0.0, 0.1))

class RealTimeLogViewer:
    def __init__(self):
        # Create a Terminal widget that will display the logs.
        self.terminal = pn.widgets.Terminal(sizing_mode="stretch_both")
        
        # Create a Stop button to cancel the log stream.
        self.stop_button = pn.widgets.Button(name="Stop Logs", button_type="danger")
        self.stop_button.on_click(self.stop_logs)
        
        # Create a layout with full stretch.
        self.layout = pn.Column(self.terminal, self.stop_button, sizing_mode="stretch_both")
        
        # Flag to control the log streaming.
        self._running = True
        
        # Schedule the asynchronous log update.
        self.update_task = asyncio.create_task(self.update_terminal())

    async def update_terminal(self):
        """Continuously await new log messages and write them to the terminal."""
        try:
            async for log in log_stream():
                if not self._running:
                    break
                # Write the new log to the terminal.
                self.terminal.write(log + "\n")
                # Optionally yield control.
                await asyncio.sleep(0)
        except asyncio.CancelledError:
            # Task was cancelled; perform any necessary cleanup here.
            pass

    def stop_logs(self, event=None):
        """Callback to stop the log stream."""
        self._running = False
        if not self.update_task.done():
            self.update_task.cancel()
        self.terminal.write("\033[90m\nLog subscription stopped.\033[0m\n")

def view():
    """
    This function is called per session. We create a new RealTimeLogViewer instance,
    and register a callback on the Bokeh document so that when the session is destroyed,
    the log stream is stopped.
    """
    viewer = RealTimeLogViewer()
    # Register cleanup callback on session destruction using the Bokeh document.
    pn.state.curdoc.on_session_destroyed(lambda session_context: viewer.stop_logs())
    return viewer.layout

# Serve the app. Each client connection will call the view() function,
# thus creating a separate viewer (and separate streaming task) per open web page.
pn.serve(view, show=False)

Nice example!

One thing I noticed is that it starts cycling in the generator regardless of whether anyone visited the page or not

When you’re using pn.serve in a script the app is executed when you run the script and all components will be shared across user session. As you already discovered, if you want to run independent sessions you should define a function that instantiates all the components and returns the output to be served.

At first I thought it wasn’t working since closing the page kept printing, but it timed out eventually and stopped.

Sessions are cleaned up after the user disconnects, the amount of time before it’s cleaned up can be configured by passing an unused_session_lifetime_milliseconds to pn.serve.

1 Like