I’m trying to create a streaming application using HoloViews and the Buffer.
The resources to get me started is Working with Streaming Data — HoloViews v1.18.0 where the Buffer is described.
After some time I figured out that I can use a function as input to the DynamicMap and not just a HoloViews plot element. I want to display rolling values etc.
Then I want to combine with widgets. Again after some time I found Responding to Events — HoloViews v1.18.0. And especially the section on Panel.
After some attempts I figured out that the Buffer is a Parameterized class with a data parameter holding the buffered value.
Now I can see many ways of doing this. For example the two below.
Questions
- Which is the best practice way of doing this?
- What optimizations does a DynamicMap do and when? (Its sometimes indicated that it can send the changes only to the client and thus update Bokeh much more efficiently. But this is totally unclear for me if it happens and how I can confirm it happens.)
- What optimizations does a
Bufferdo. Could I just as well use a customTimeseriesBufferwithpn.bindas implemented below? - What advantages (for example optimizations) does the
Streamzlibrary provide? When should I use it? How would the below example look usingStreamzlibrary? - We are almost always streaming irregularly spaced timeseries data. Can the
Bufferaccept aTimeDeltainstead of an integerwindow/length?
HoloViews and streams dictionary
import panel as pn
pn.extension()
import pandas as pd
import panel as pn
import hvplot.pandas
import holoviews as hv
from holoviews.streams import Buffer
from asyncio import sleep
import numpy as np
pn.extension()
def new_row():
return pd.DataFrame({"y": np.random.randn()}, index=[pd.Timestamp.now()])
dfstream = Buffer(new_row(), length=100, index=False)
def plot(data, window_seconds, alpha):
data = data.rolling(f"{window_seconds}s").mean()
return data.hvplot(y="y", ylim=(-1, 1), alpha=alpha, color="blue", line_width=5)
window_seconds = pn.widgets.IntSlider(value=5, start=1, end=10, name="Window (secs)")
alpha = pn.widgets.FloatSlider(value=1, start=0, end=1, name="Alpha")
iplot = hv.DynamicMap(
plot,
streams={
"data": dfstream.param.data,
"window_seconds": window_seconds.param.value,
"alpha": alpha.param.value,
},
)
pn.Column(iplot, window_seconds, alpha).servable()
async def run():
while True:
await sleep(0.1)
row = new_row()
dfstream.send(row)
pn.state.onload(run)
Panel and pn.bind
import panel as pn
pn.extension()
import pandas as pd
import panel as pn
import hvplot.pandas
import holoviews as hv
from holoviews.streams import Buffer
from asyncio import sleep
import numpy as np
pn.extension()
def new_row():
return pd.DataFrame({"y": np.random.randn()}, index=[pd.Timestamp.now()])
dfstream = Buffer(new_row(), length=100, index=False)
def plot(data, window_seconds, alpha):
data = data.rolling(f"{window_seconds}s").mean()
return data.hvplot(y="y", ylim=(-1, 1), alpha=alpha, color="blue", line_width=5)
window_seconds = pn.widgets.IntSlider(value=5, start=1, end=10, name="Window (secs)")
alpha = pn.widgets.FloatSlider(value=1, start=0, end=1, name="Alpha")
iplot = hv.DynamicMap(
pn.bind(plot, data=dfstream.param.data, window_seconds=window_seconds, alpha=alpha)
)
pn.Column(iplot, window_seconds, alpha).servable()
async def run():
while True:
await sleep(0.1)
row = new_row()
dfstream.send(row)
pn.state.onload(run)
Custom TimeseriesBuffer
class TimeseriesBuffer(param.Parameterized):
data: DataFrame = param.DataFrame(allow_None=False)
window: Timedelta = param.ClassSelector(class_=Timedelta, allow_None=False)
def send(self, data: DataFrame):
data = pd.concat([self.data, data])
since = Timestamp.now() - self.window
self.data = data[data.index>=since]
or an optimized version (?)
class TimeseriesBuffer(param.Parameterized):
data: DataFrame = param.DataFrame(allow_None=False)
window: Timedelta = param.ClassSelector(class_=Timedelta, allow_None=False)
def send(self, new_rows: DataFrame):
data = pd.concat([self.data, new_rows], sort=False)
since = Timestamp.now() - self.window
data.drop(data.index[data.index<since], inplace=True)
self.data = data

