How do I create an efficient streaming application to serve multiple users with low latency

Hi

I would like to create a streaming application that

  • extracts a chunk of a log file from a ftp server
  • transforms the log chunk to a time series chunk
  • streams the time series chunk to a plot

A illustration of the problem looks like.

import panel as pn
from time import sleep
import pandas as pd
import pandas as pd
import numpy as np
import random
from datetime import datetime, timedelta
from bokeh.plotting import figure

from bokeh.models import ColumnDataSource

pn.extension()

CHUNK_SIZE = 1.1

UPDATE_PERIOD=1000

def extract(chunk_size=CHUNK_SIZE):
    """Extract most recent chunk of a text log file from a ftp server
    
    """
    sleep(0.2 + CHUNK_SIZE/10) # extraction might be done async
    # We don't know exactly which period the chunk will cover
    milliseconds=random.randint(-10,10)
    return chunk_size+milliseconds/1000

def transform(data):
    """Extract a timeseries from the file content."""
    sleep(0.1 + CHUNK_SIZE/10) # will probably not be helped by async
    end_time = datetime.now()
    start_time = end_time - timedelta(seconds=data)
    
    periods = pd.period_range(start=start_time, end=end_time, freq='25L')

    num_periods = len(periods)
    values = np.random.normal(loc=0.1, scale=0.01, size=num_periods)
    # The timeseries have some spikes
    spike_mask = np.random.rand(num_periods) < 1/num_periods/2
    values[spike_mask] += 0.5
    
    values = np.clip(values, 0, 1)

    return pd.DataFrame({'time': periods.to_timestamp(),
                    'value': values})

def read_chunk():    
    raw = extract()
    gold = transform(raw)
    return gold

data = read_chunk()

cds=ColumnDataSource(data=data)

plot = figure(height=400, sizing_mode="stretch_width")
plot.line('time', 'value', source=cds, line_width=3, line_alpha=0.6)

def update_plot():
    data = read_chunk()
    cds.stream(data)

pn.state.add_periodic_callback(update_plot, period=UPDATE_PERIOD)

pn.pane.Bokeh(plot).servable()

I need to make this efficiently scale to 5 concurrent users.

I see some issues in the current implementation that I’m hoping to get advice to solve

  1. The same data is currently extracted from the ftp server by each user. This is not efficient.
  2. I would like to be able to run the server in multiple processes using --num-procs 3. To me this seems not possible to share the chunks across the processes without adding an external message queue, database or similar. How should I solve this issue? I’m looking for a short term solution and thus its not possible for me to get access to new services in the cloud like a message queue. I have a MS SQL server though.
  3. It would be really nice to use HoloViews because I know that and would like to add more functionality to the plot over time. But how do I stream just as efficiently as Bokeh with the ColumnDatasource in hvPlot/ Holoviews?

I have some ideas for solving the issue but I don’t have the experience to determine if they will work.

  • Maybe I need a separate process/ job to read_chunk and load it to a database or message queue. Then each panel process can read from there on a periodic schedule? Maybe that database could be sqllite locally. I don’t need a lot of persistence of the data. But I need a 1 seconds or lower latency on what my users see in the app
  • maybe its better to use only one process. And then optimize it as possible. In the future the app might be used by up to 20 users…

I can find many pointers but does not have the experience to figure out which is the best to use. Advice much appreciated. Thanks.

@Material-Scientist

1 Like

Hi @Marc,

You mean like this?

I’ve done something similar in this thread, using bokeh’s lifecycle hooks:
[BUG] background thread stops when session is destroyed, leading to weird errors - Panel - HoloViz Discourse

Bear with me, as I haven’t looked at the code in months, but on a high level it should be something like this:

There’s 2 scripts:
main.py (panel app), and app_hooks.py (BG script).

The BG script just runs a while loop on a separate thread, where it periodically submits tasks for loading new data to dask. The results are saved in pn.state.cache, and the panel app is notified via event trigger that is also stored in pn.state.cache. Finally, the panel app then updates the internal columndatasource (CDS) object with the new data upon being triggered by the event. This then causes all plots to refresh simultaneously, as you can see in the video.

The reason for using lifecycle hooks was because panel would unload everything from memory, upon session destruction. Not sure if this has been fixed in the meantime.

The neat part about using dask for this (apart from offloading computations) is observability.

Ideally, I’d like to use dask or redis as distributed memory cache to allow for more than just one panel process to be running.

Note: IIRC, Bokeh has discontinued the lifecycle hooks in version 3 though. So, you may need to use an older version.

1 Like

Thanks so much @Material-Scientist

It complements what I have been thinking. I’ve just been offloading to a separate producer script that polls the ftp server every 1 second and stores the results to a sqllite database. In the Panel app I also have a loop running in a seperate thread to the consume the data in the sqllite database and notify all the sessions via an event from a shared, global Parameterized class.

This is a very easy to use architecture as I don’t have to manage external services or even depend on others to get access to cloud services like message queues etc.

One thing that my architecture enables is other applications using the data produced as it is available in the database. But you could store the data produced by dask as well if needed.

2 Likes