I’m trying to create a streaming application using HoloViews and the Buffer
.
The resources to get me started is Working with Streaming Data — HoloViews v1.18.0 where the Buffer
is described.
After some time I figured out that I can use a function as input to the DynamicMap
and not just a HoloViews plot element. I want to display rolling values etc.
Then I want to combine with widgets. Again after some time I found Responding to Events — HoloViews v1.18.0. And especially the section on Panel.
After some attempts I figured out that the Buffer
is a Parameterized class with a data
parameter holding the buffered value.
Now I can see many ways of doing this. For example the two below.
Questions
- Which is the best practice way of doing this?
- What optimizations does a DynamicMap do and when? (Its sometimes indicated that it can send the changes only to the client and thus update Bokeh much more efficiently. But this is totally unclear for me if it happens and how I can confirm it happens.)
- What optimizations does a
Buffer
do. Could I just as well use a customTimeseriesBuffer
withpn.bind
as implemented below? - What advantages (for example optimizations) does the
Streamz
library provide? When should I use it? How would the below example look usingStreamz
library? - We are almost always streaming irregularly spaced timeseries data. Can the
Buffer
accept aTimeDelta
instead of an integerwindow
/length
?
HoloViews and streams
dictionary
import panel as pn
pn.extension()
import pandas as pd
import panel as pn
import hvplot.pandas
import holoviews as hv
from holoviews.streams import Buffer
from asyncio import sleep
import numpy as np
pn.extension()
def new_row():
return pd.DataFrame({"y": np.random.randn()}, index=[pd.Timestamp.now()])
dfstream = Buffer(new_row(), length=100, index=False)
def plot(data, window_seconds, alpha):
data = data.rolling(f"{window_seconds}s").mean()
return data.hvplot(y="y", ylim=(-1, 1), alpha=alpha, color="blue", line_width=5)
window_seconds = pn.widgets.IntSlider(value=5, start=1, end=10, name="Window (secs)")
alpha = pn.widgets.FloatSlider(value=1, start=0, end=1, name="Alpha")
iplot = hv.DynamicMap(
plot,
streams={
"data": dfstream.param.data,
"window_seconds": window_seconds.param.value,
"alpha": alpha.param.value,
},
)
pn.Column(iplot, window_seconds, alpha).servable()
async def run():
while True:
await sleep(0.1)
row = new_row()
dfstream.send(row)
pn.state.onload(run)
Panel and pn.bind
import panel as pn
pn.extension()
import pandas as pd
import panel as pn
import hvplot.pandas
import holoviews as hv
from holoviews.streams import Buffer
from asyncio import sleep
import numpy as np
pn.extension()
def new_row():
return pd.DataFrame({"y": np.random.randn()}, index=[pd.Timestamp.now()])
dfstream = Buffer(new_row(), length=100, index=False)
def plot(data, window_seconds, alpha):
data = data.rolling(f"{window_seconds}s").mean()
return data.hvplot(y="y", ylim=(-1, 1), alpha=alpha, color="blue", line_width=5)
window_seconds = pn.widgets.IntSlider(value=5, start=1, end=10, name="Window (secs)")
alpha = pn.widgets.FloatSlider(value=1, start=0, end=1, name="Alpha")
iplot = hv.DynamicMap(
pn.bind(plot, data=dfstream.param.data, window_seconds=window_seconds, alpha=alpha)
)
pn.Column(iplot, window_seconds, alpha).servable()
async def run():
while True:
await sleep(0.1)
row = new_row()
dfstream.send(row)
pn.state.onload(run)
Custom TimeseriesBuffer
class TimeseriesBuffer(param.Parameterized):
data: DataFrame = param.DataFrame(allow_None=False)
window: Timedelta = param.ClassSelector(class_=Timedelta, allow_None=False)
def send(self, data: DataFrame):
data = pd.concat([self.data, data])
since = Timestamp.now() - self.window
self.data = data[data.index>=since]
or an optimized version (?)
class TimeseriesBuffer(param.Parameterized):
data: DataFrame = param.DataFrame(allow_None=False)
window: Timedelta = param.ClassSelector(class_=Timedelta, allow_None=False)
def send(self, new_rows: DataFrame):
data = pd.concat([self.data, new_rows], sort=False)
since = Timestamp.now() - self.window
data.drop(data.index[data.index<since], inplace=True)
self.data = data