Hi
I would like to create a streaming application that
- extracts a chunk of a log file from a ftp server
- transforms the log chunk to a time series chunk
- streams the time series chunk to a plot
A illustration of the problem looks like.
import panel as pn
from time import sleep
import pandas as pd
import pandas as pd
import numpy as np
import random
from datetime import datetime, timedelta
from bokeh.plotting import figure
from bokeh.models import ColumnDataSource
pn.extension()
CHUNK_SIZE = 1.1
UPDATE_PERIOD=1000
def extract(chunk_size=CHUNK_SIZE):
"""Extract most recent chunk of a text log file from a ftp server
"""
sleep(0.2 + CHUNK_SIZE/10) # extraction might be done async
# We don't know exactly which period the chunk will cover
milliseconds=random.randint(-10,10)
return chunk_size+milliseconds/1000
def transform(data):
"""Extract a timeseries from the file content."""
sleep(0.1 + CHUNK_SIZE/10) # will probably not be helped by async
end_time = datetime.now()
start_time = end_time - timedelta(seconds=data)
periods = pd.period_range(start=start_time, end=end_time, freq='25L')
num_periods = len(periods)
values = np.random.normal(loc=0.1, scale=0.01, size=num_periods)
# The timeseries have some spikes
spike_mask = np.random.rand(num_periods) < 1/num_periods/2
values[spike_mask] += 0.5
values = np.clip(values, 0, 1)
return pd.DataFrame({'time': periods.to_timestamp(),
'value': values})
def read_chunk():
raw = extract()
gold = transform(raw)
return gold
data = read_chunk()
cds=ColumnDataSource(data=data)
plot = figure(height=400, sizing_mode="stretch_width")
plot.line('time', 'value', source=cds, line_width=3, line_alpha=0.6)
def update_plot():
data = read_chunk()
cds.stream(data)
pn.state.add_periodic_callback(update_plot, period=UPDATE_PERIOD)
pn.pane.Bokeh(plot).servable()
I need to make this efficiently scale to 5 concurrent users.
I see some issues in the current implementation that I’m hoping to get advice to solve
- The same data is currently extracted from the ftp server by each user. This is not efficient.
- I would like to be able to run the server in multiple processes using
--num-procs 3
. To me this seems not possible to share the chunks across the processes without adding an external message queue, database or similar. How should I solve this issue? I’m looking for a short term solution and thus its not possible for me to get access to new services in the cloud like a message queue. I have a MS SQL server though. - It would be really nice to use HoloViews because I know that and would like to add more functionality to the plot over time. But how do I stream just as efficiently as Bokeh with the ColumnDatasource in hvPlot/ Holoviews?
I have some ideas for solving the issue but I don’t have the experience to determine if they will work.
- Maybe I need a separate process/ job to
read_chunk
andload
it to a database or message queue. Then each panel process can read from there on a periodic schedule? Maybe that database could be sqllite locally. I don’t need a lot of persistence of the data. But I need a 1 seconds or lower latency on what my users see in the app - maybe its better to use only one process. And then optimize it as possible. In the future the app might be used by up to 20 users…
I can find many pointers but does not have the experience to figure out which is the best to use. Advice much appreciated. Thanks.