What is best practice for streaming using DynamicMap, Buffer and Panel widgets?

  1. The best way is to probably stick to the HoloViews documentation and use Pipe for updating all data, and Buffer for append-only. However, as you’ve already noted, we’d preferably be able to patch data as well. What’s maybe not so well-known is that DynamicMap also supports param objects as streams. Panel has made some efforts to create custom Param classes with stream & patch events.
  2. As mentioned in 1., you’d ideally want a combination of patch/stream events. Neither Pipe nor Buffer support both. But there’s custom panel classes like the DataFrame & Tabulator that support both streaming & patching. However, last time I’ve checked, their jslink was broken. If this were fixed, you could efficiently update plots based on only the changed data.
  3. Buffer holds either a limited or unlimited number of data points, but neither sorts, nor replaces existing keys. So, it wouldn’t work with timeseries that update an existing datetime key.
  4. Streamz is very convenient, but doesn’t currently support Holoviews’ nice layout features. So, I wrote my own plot class:
@Stream.register_api()    
class DMap(Sink,param.Parameterized):
    # ToDo: Once pn.widgets.DataFrame jslink is fixed, use it here instead of pipe
    hvplot = pn.pane.HoloViews()
    plot_fn = param.Callable(instantiate=True)
    initialized = None
    
    def __init__(self,upstream,name,plot_fn,value_cols=[],**kwargs):
        Stream.__init__(self, upstream, **kwargs)
        param.Parameterized.__init__(self, **kwargs)
        self.param.update(plot_fn=plot_fn)
        self.value_cols = value_cols
    
    @gen.coroutine
    def update(self,x,who=None,metadata=None):
        if not hasattr(self,'pipe'):
            self.param.add_parameter('data_container',param.DataFrame(x[self.value_cols]))
            self.pipe = Pipe(x[self.value_cols])
            self.dmap = hv.DynamicMap(self.plot_fn,kdims=[],streams=[self.pipe])
            self.hvplot.param.update(object=self.dmap)
        else:
            if len(self.data_container):
                self.data_container.update(x[self.value_cols]) # update values of existing keys
                x = x[self.value_cols].loc[self.data_container.index[-1]:].iloc[1:] # values of non-overlapping keys
            x = pd.concat([self.data_container,x[self.value_cols]]) # ToDo: Update existing keys, cutoff length/date
            self.param.update(data_container = x[self.value_cols])
            self.pipe.send(x[self.value_cols])
            
    def _ipython_display_(self):
        # Display plot pane as repr in notebook
        return display(self.hvplot)

Example use:

def plot_fn_OHLC(
    data:pd.DataFrame, # dataframe with trades data
) -> hv.Curve: # returns a Curve hvplot
    """
    Returns a basic hvplot with index as x, and the rest as y. Has range-following hook for syncing multiple charts when stacked.
    """
    return (
        data
        .hvplot.ohlc(**bk_default_kwargs)
        .opts(**{
            'min_height':320,
            'yaxis':'right',
            'hooks':[partial(range_following,x_range_custom=x_range_custom_init)]
        })
    )

CVD_takers_base_resampled_price_plot_OHLC = (
    CVD_takers_base_resampled_OHLC
    .DMap(
        plot_fn=plot_fn_OHLC,
        value_cols=['open','high','low','close'],
        name='CVD_takers_base_resampled_price_plot_OHLC'
    )
    # .HvPlot(plot_fn=plot_fn,value_cols=['price'])
)

You can then easily make composite plots like this:

price_plot = CVD_takers_base_resampled_price_plot.dmap.opts(xaxis='top',clone=True,hooks=[crosshair,watermark,y_formatter,partial(range_following,x_range_custom=x_range_custom)])
price_plot_OHLC = CVD_takers_base_resampled_price_plot_OHLC.dmap.opts(xaxis='top',clone=True,hooks=[crosshair,watermark,y_formatter,partial(range_following,x_range_custom=x_range_custom)])

composite_plot = (price_plot + price_plot_OHLC).cols(1)

  1. Unfortunately, no. But you can use alternatives like the above.
1 Like