Streaming data with Bokeh and Panel periodic callbacks?

Hi everyone,

I am trying to use the streaming capabilities of Bokeh in combination with the Panel library. More specifically, I am trying to use ColumnDataSource.stream (from Bokeh) in a callback function that should be periodically called. However, I do not manage to make this work: if I add the periodic callback to the bokeh.io.curdoc, the callback is never called and if I add the periodic callback to any Panel object, then I get a Bokeh Error.

You will find below two minimal examples that do not work. Here are some informations about my runs:

  • Panel version 0.9.7
  • Bokeh version 2.1.1
  • Python version 3.7.3
  • OS: Linux

Using panel.add_periodic_callback

from bokeh.plotting import figure
from bokeh.models import ColumnDataSource
import numpy as np
import panel as pn

ds = ColumnDataSource({"x": [0], "y": [0]})
i = 1

def update(event=None):
    global i
    ds.stream({"x": [i], "y": [np.random.rand()]})
    i += 1

p = figure()
p.line(x="x", y="y", source=ds)

pane = pn.panel(p)
pane.add_periodic_callback(update, 200)

In this case, the update function is called just fine, but as soon as I try to visualize the plot (i.e. opening the standalone server in a browser tab or showing the pane object in a jupyter cell), then I get the following error from Bokeh:

ERROR:tornado.application:Exception in callback <bound method PeriodicCallback._periodic_callback of PeriodicCallback(callback=<function update at 0x7f428be6b1e0>, count=None, name='PeriodicCallback00006', period=200, timeout=None)>
Traceback (most recent call last):
  File "/home/jokteur/anaconda3/lib/python3.7/site-packages/tornado/ioloop.py", line 907, in _run
    return self.callback()
  File "/home/jokteur/anaconda3/lib/python3.7/site-packages/panel/callbacks.py", line 65, in _periodic_callback
    self.callback()
  File "test.py", line 14, in update
    ds.stream({"x": [i], "y": [np.random.rand()]})
  File "/home/jokteur/anaconda3/lib/python3.7/site-packages/bokeh/models/sources.py", line 415, in stream
    self._stream(new_data, rollover)
  File "/home/jokteur/anaconda3/lib/python3.7/site-packages/bokeh/models/sources.py", line 527, in _stream
    self.data._stream(self.document, self, new_data, rollover, setter)
  File "/home/jokteur/anaconda3/lib/python3.7/site-packages/bokeh/core/property/wrappers.py", line 430, in _stream
    hint=ColumnsStreamedEvent(doc, source, new_data, rollover, setter))
  File "/home/jokteur/anaconda3/lib/python3.7/site-packages/bokeh/core/property/wrappers.py", line 150, in _notify_owners
    descriptor._notify_mutated(owner, old, hint=hint)
  File "/home/jokteur/anaconda3/lib/python3.7/site-packages/bokeh/core/property/descriptors.py", line 869, in _notify_mutated
    self._real_set(obj, old, value, hint=hint)
  File "/home/jokteur/anaconda3/lib/python3.7/site-packages/bokeh/core/property/descriptors.py", line 832, in _real_set
    self._trigger(obj, old, value, hint=hint, setter=setter)
  File "/home/jokteur/anaconda3/lib/python3.7/site-packages/bokeh/core/property/descriptors.py", line 909, in _trigger
    obj.trigger(self.name, old, value, hint, setter)
  File "/home/jokteur/anaconda3/lib/python3.7/site-packages/bokeh/model.py", line 661, in trigger
    super().trigger(attr, old, new, hint=hint, setter=setter)
  File "/home/jokteur/anaconda3/lib/python3.7/site-packages/bokeh/util/callback_manager.py", line 157, in trigger
    self._document._notify_change(self, attr, old, new, hint, setter, invoke)
  File "/home/jokteur/anaconda3/lib/python3.7/site-packages/bokeh/document/document.py", line 1042, in _notify_change
    self._trigger_on_change(event)
  File "/home/jokteur/anaconda3/lib/python3.7/site-packages/bokeh/document/document.py", line 1137, in _trigger_on_change
    self._with_self_as_curdoc(invoke_callbacks)
  File "/home/jokteur/anaconda3/lib/python3.7/site-packages/bokeh/document/document.py", line 1150, in _with_self_as_curdoc
    return f()
  File "/home/jokteur/anaconda3/lib/python3.7/site-packages/bokeh/document/document.py", line 1136, in invoke_callbacks
    cb(event)
  File "/home/jokteur/anaconda3/lib/python3.7/site-packages/bokeh/document/document.py", line 704, in <lambda>
    self._callbacks[receiver] = lambda event: event.dispatch(receiver)
  File "/home/jokteur/anaconda3/lib/python3.7/site-packages/bokeh/document/events.py", line 269, in dispatch
    super().dispatch(receiver)
  File "/home/jokteur/anaconda3/lib/python3.7/site-packages/bokeh/document/events.py", line 124, in dispatch
    receiver._document_patched(self)
  File "/home/jokteur/anaconda3/lib/python3.7/site-packages/bokeh/server/session.py", line 218, in _document_patched
    raise RuntimeError("_pending_writes should be non-None when we have a document lock, and we should have the lock when the document changes")
RuntimeError: _pending_writes should be non-None when we have a document lock, and we should have the lock when the document changes

From what I understand, is that Panel added the periodic callback to the tornadio IOLoop, but Bokeh does not allow for ColumnDataSource.stream to be called outside of a normal Bokeh document periodic callback.

Using curdoc().add_periodic_callback

I tried also to simply add the periodic callback to the curdoc(), but here the problem is that update never gets called, even when I have an active session that is opened:

from bokeh.plotting import figure
from bokeh.models import ColumnDataSource
from bokeh.io import curdoc
import numpy as np
import panel as pn

ds = ColumnDataSource({"x": [0], "y": [0]})
i = 1

def update(event=None):
    global i
    ds.stream({"x": [i], "y": [np.random.rand()]})
    i += 1

p = figure()
p.line(x="x", y="y", source=ds)

pane = pn.panel(p)
curdoc().add_periodic_callback(update, 200)

Conclusion

I am aware that holoviews possess a streaming module, but I would like to stick to Bokeh and Panel only if possible. In the worst case I can not make this periodic callback work, then I would probably abandon Panel (in worked for me in an Bokeh only app, even if there are some nice widgets that I could use.

Thank you in advance for all the help you can give.

Would you mind filing an issue about this with Panel?

I will open an issue on github.

@jokteur

From the bokeh standpoint, you need to run things as a server b/c your callbacks are written in python.

Your second example, using curdoc().add_periodic_callback, works for me with the following modifications to make the panel servable

pane = pn.panel(p)
pane.servable()

curdoc().add_periodic_callback(update, 200)

And then running the python file using panel serve, e.g. if your file is named pn_stream.py

>> panel serve pn_stream.py

1 Like

Indeed, when using panel serve, it works as intended. However, I don’t want to use panel serve, because I this would be integrated into a standalone python application.

This means to launch my application, I am doing the following:

server = pn.serve(pane, show=False, loop=IOLoop().current(), start=False)

# do some other stuff with the IOLoop()

server.io_loop.start()

In this context, even with pane.servable(), it does not work.

@jokteur

I see. I don’t know enough about the panel low-level inner-workings, but bokeh specifically requires things to run in the server mode when using python callback and/or the add_periodic_callback() mechanism.

If the panel functionality via pn.serve() somehow addresses this internally, you might try to use add_next_tick_callback() in conjunction with your first method. The error you’re getting there is associated with trying to do something to the document when you don’t have the lock for it, and add_next_tick_callback() is usually a way to have what you want done scheduled so bokeh can execute it with the document lock held. So perhaps your periodic call can invoke a next-tick callback that does the work.

@_jm

My impression is that somehow when using Panel, the callbacks of curdoc() are never properly called. I tried your suggestion, but again, it does not change anything:

def update(event):
    global i
    ds.stream({"x": [i], "y": [np.random.rand()]})
    i += 1

def update_next_tick(event=None):
    curdoc().add_next_tick_callback(update)

# [...]

pane.add_periodic_callback(update_next_tick, 200)

In the example above, update_next_tick is called, but update is still never called.

I am not very sure, but what happens if instead of using curdoc you use pn.state.curdoc ???

Maybe there is something in this discussion you can use?

1 Like

I have looked at the solution proposed by @Jhsmit and tried to adapt it to have periodic callback (the async feature of the solution was not much of interest to me).

import panel as pn
import param

from tornado.ioloop import IOLoop
from bokeh.plotting import figure
from bokeh.models import ColumnDataSource
from bokeh.io import curdoc
from functools import partial


class Application(param.Parameterized):
    do_calc = param.Action(lambda self: self._do_calc())

    def __init__(self):
        self.doc = curdoc()
        self.source = ColumnDataSource({"x": range(10), "y": range(10)})
        self.figure = figure()
        self.figure.line(x="x", y="y", source=self.source)
        self.bk_pane = pn.pane.Bokeh(self.figure)
        self.col = pn.Column(
            pn.pane.Markdown("## Title"),
            pn.Param(self, parameters=["do_calc"], show_name=False,),
            self.bk_pane,
        )
        self.col.add_periodic_callback(self._do_calc)  # <- this line does not work properly

    def update_source(self, data):
        self.source.data.update({"y": data})

    def _do_calc(self):
        self.doc = curdoc()
        data = list(np.random.randint(0, 2 ** 31, 10))
        cb = partial(self.update_source, data)
        self.doc.add_next_tick_callback(cb)

    def panel(self):
        return self.col


app = Application()
server = pn.serve(app.panel(), show=False, loop=IOLoop().current(), start=False, port=5006)
# Do stuff
server.io_loop.start()

In this example, I have a figure with a Button “Do calc” that updates the figure. What happens here is that if I click on the button, the figure updates without problems. However, the line self.col.add_periodic_callback(self._do_calc) does not work properly. I have verified that it calls _do_calc periodically, but self.doc.add_next_tick_callback(cb) in _do_calc does not seem to work in this context.

Of course, if I try to call update_source directly from self.col.add_periodic_callback, then I get again the error RuntimeError: _pending_writes should be non-None when we have a document lock, and we should have the lock when the document changes is again present. I even tried to put with unlocked() before updating the source, but the error stays.

Once this PR is merged you’ll be able to do this:

    
class Application(param.Parameterized):
    ...

    def _do_calc(self):
        data = list(np.random.randint(0, 2 ** 31, 10))
        cb = partial(self.update_source, data)
        pn.state.curdoc.add_next_tick_callback(cb)

    ...

def create_app():
    app = Application()
    return app.panel()

server = pn.serve(create_app, show=False, loop=IOLoop().current(), start=False, port=5006)
# Do stuff
server.io_loop.start()

and the periodic callback will correctly discover the current document. Make sure to use pn.state.curdoc yourself.

1 Like

This is because of the way you start your server, in this scenario for the Document instance you store in __init__ is not the same as the actual current document when you start the server:

If I add these print statements in the _do_calc method:

    def _do_calc(self):
        print(self.doc)
        print(curdoc())
        data = list(np.random.randint(0, 2 ** 31, 10))
        cb = partial(self.update_source, data)
        curdoc().add_next_tick_callback(cb)

Gives

<bokeh.document.document.Document object at 0x0000021D38068308>
<bokeh.document.document.Document object at 0x0000021D34E6AAC8>

This means that your periodic callback is added to the wrong Document instance. The _do_calc button press was working because in your example before you add callback you call curdoc() and store it in self.doc so you get the correct document instance.The _do_calc method get executed in the main thread so this all works fine.

If you want to get this working you need to somehow get the callback on the correct Document. The code below does that when you press the ‘add_cb’ button. This is probably not the solution you want, but I tried a few things and I couldn’t figure out a way to do it better. Basically what I think you need to to somehow add the callback after you start the io loop.

Also added is missing numpy import and call to super


import panel as pn
import param
from tornado.ioloop import IOLoop
from bokeh.plotting import figure
from bokeh.models import ColumnDataSource
from bokeh.io import curdoc
from functools import partial
import numpy as np


class Application(param.Parameterized):
    do_calc = param.Action(lambda self: self._do_calc())
    add_cb = param.Action(lambda  self:self._add_cb())

    def __init__(self, **params):
        super(Application, self).__init__(**params)

        self.doc = None
        self.source = ColumnDataSource({"x": range(10), "y": range(10)})
        self.figure = figure()
        self.figure.line(x="x", y="y", source=self.source)
        self.bk_pane = pn.pane.Bokeh(self.figure)
        self.col = pn.Column(
            pn.pane.Markdown("## Title"),
            pn.Param(self, parameters=["do_calc", "add_cb"], show_name=False,),
            self.bk_pane,
        )

    def update_source(self, data):
        self.source.data.update({"y": data})

    def _add_cb(self):
        self.doc = curdoc()
        self.doc.add_periodic_callback(self._do_calc, 200)  # <- this line does not work properly

    def _do_calc(self):
        data = list(np.random.randint(0, 2 ** 31, 10))
        cb = partial(self.update_source, data)
        curdoc().add_next_tick_callback(cb)

    def panel(self):
        return self.col


app = Application()
loop = IOLoop().current()
server = pn.serve(app.panel(), show=False, loop=loop, start=False, port=5006)

# Do stuff

server.io_loop.start()
1 Like

Actually in my example above you don’t even need next_tick_callback, this will do:

class Application(param.Parameterized):
    ...

    def _do_calc(self):
        data = list(np.random.randint(0, 2 ** 31, 10))
        self.source.data.update({"y": data})
    ...

def create_app():
    app = Application()
    return app.panel()

server = pn.serve(create_app, show=False, loop=IOLoop().current(), start=False, port=5006)
# Do stuff
server.io_loop.start()

This will still require my PR with the fix though.

But does that also then work with a periodic callback?

Yes, sorry for abbreviating the example, both my samples were assuming the __init__ still looks like this:

    def __init__(self):
        self.doc = curdoc()
        self.source = ColumnDataSource({"x": range(10), "y": range(10)})
        self.figure = figure()
        self.figure.line(x="x", y="y", source=self.source)
        self.bk_pane = pn.pane.Bokeh(self.figure)
        self.col = pn.Column(
            pn.pane.Markdown("## Title"),
            pn.Param(self, parameters=["do_calc"], show_name=False,),
            self.bk_pane,
        )
        self.col.add_periodic_callback(self._do_calc)  # <- this line does not work properly

Ah I see, that makes sense because adding to next_tick_callback is only needed when you do things from another thread, and this periodic_callback is on the same thread as the main loop?

I also think I should move the add_periodic_callback method onto the pn.state object in future. It’s kind of weird to have it on each object and makes more sense there.

Ah I see, that makes sense because adding to next_tick_callback is only needed when you do things from another thread, and this periodic_callback is on the same thread as the main loop?

Yes, exactly.

1 Like

Could we get a full working example for reference marked as a solution please such that we are 100% sure what to do?

Thanks

Full working example, which requires current master or Panel >=0.10.0a18 (to be released tomorrow), to work:

import panel as pn
import param
from tornado.ioloop import IOLoop
from bokeh.plotting import figure
from bokeh.models import ColumnDataSource
import numpy as np


class Application(param.Parameterized):
    do_calc = param.Action(lambda self: self._do_calc())

    def __init__(self, **params):
        super(Application, self).__init__(**params)
        self.source = ColumnDataSource({"x": range(10), "y": range(10)})
        self.figure = figure()
        self.figure.line(x="x", y="y", source=self.source)
        self.col = pn.Column(
            pn.pane.Markdown("## Title"),
            self.param.do_calc,
            self.figure,
        )
        pn.state.add_periodic_callback(self._do_calc, 200)  # <- this line does not work properly

    def _do_calc(self):
        data = list(np.random.randint(0, 2 ** 31, 10))
        self.source.data.update({"y": data})

    def panel(self):
        return self.col


def create_app():
    app = Application()
    return app.panel()

loop = IOLoop().current()
server = pn.serve(create_app, show=False, loop=loop, start=False, port=5006)

# Do stuff

server.io_loop.start()
3 Likes

Thank you for all these replies and the quick fix @philippjfr. It has been most helpfull.

1 Like