Streaming data with Bokeh and Panel periodic callbacks?

Maybe there is something in this discussion you can use?

1 Like

I have looked at the solution proposed by @Jhsmit and tried to adapt it to have periodic callback (the async feature of the solution was not much of interest to me).

import panel as pn
import param

from tornado.ioloop import IOLoop
from bokeh.plotting import figure
from bokeh.models import ColumnDataSource
from bokeh.io import curdoc
from functools import partial


class Application(param.Parameterized):
    do_calc = param.Action(lambda self: self._do_calc())

    def __init__(self):
        self.doc = curdoc()
        self.source = ColumnDataSource({"x": range(10), "y": range(10)})
        self.figure = figure()
        self.figure.line(x="x", y="y", source=self.source)
        self.bk_pane = pn.pane.Bokeh(self.figure)
        self.col = pn.Column(
            pn.pane.Markdown("## Title"),
            pn.Param(self, parameters=["do_calc"], show_name=False,),
            self.bk_pane,
        )
        self.col.add_periodic_callback(self._do_calc)  # <- this line does not work properly

    def update_source(self, data):
        self.source.data.update({"y": data})

    def _do_calc(self):
        self.doc = curdoc()
        data = list(np.random.randint(0, 2 ** 31, 10))
        cb = partial(self.update_source, data)
        self.doc.add_next_tick_callback(cb)

    def panel(self):
        return self.col


app = Application()
server = pn.serve(app.panel(), show=False, loop=IOLoop().current(), start=False, port=5006)
# Do stuff
server.io_loop.start()

In this example, I have a figure with a Button “Do calc” that updates the figure. What happens here is that if I click on the button, the figure updates without problems. However, the line self.col.add_periodic_callback(self._do_calc) does not work properly. I have verified that it calls _do_calc periodically, but self.doc.add_next_tick_callback(cb) in _do_calc does not seem to work in this context.

Of course, if I try to call update_source directly from self.col.add_periodic_callback, then I get again the error RuntimeError: _pending_writes should be non-None when we have a document lock, and we should have the lock when the document changes is again present. I even tried to put with unlocked() before updating the source, but the error stays.

Once this PR is merged you’ll be able to do this:

    
class Application(param.Parameterized):
    ...

    def _do_calc(self):
        data = list(np.random.randint(0, 2 ** 31, 10))
        cb = partial(self.update_source, data)
        pn.state.curdoc.add_next_tick_callback(cb)

    ...

def create_app():
    app = Application()
    return app.panel()

server = pn.serve(create_app, show=False, loop=IOLoop().current(), start=False, port=5006)
# Do stuff
server.io_loop.start()

and the periodic callback will correctly discover the current document. Make sure to use pn.state.curdoc yourself.

1 Like

This is because of the way you start your server, in this scenario for the Document instance you store in __init__ is not the same as the actual current document when you start the server:

If I add these print statements in the _do_calc method:

    def _do_calc(self):
        print(self.doc)
        print(curdoc())
        data = list(np.random.randint(0, 2 ** 31, 10))
        cb = partial(self.update_source, data)
        curdoc().add_next_tick_callback(cb)

Gives

<bokeh.document.document.Document object at 0x0000021D38068308>
<bokeh.document.document.Document object at 0x0000021D34E6AAC8>

This means that your periodic callback is added to the wrong Document instance. The _do_calc button press was working because in your example before you add callback you call curdoc() and store it in self.doc so you get the correct document instance.The _do_calc method get executed in the main thread so this all works fine.

If you want to get this working you need to somehow get the callback on the correct Document. The code below does that when you press the ‘add_cb’ button. This is probably not the solution you want, but I tried a few things and I couldn’t figure out a way to do it better. Basically what I think you need to to somehow add the callback after you start the io loop.

Also added is missing numpy import and call to super


import panel as pn
import param
from tornado.ioloop import IOLoop
from bokeh.plotting import figure
from bokeh.models import ColumnDataSource
from bokeh.io import curdoc
from functools import partial
import numpy as np


class Application(param.Parameterized):
    do_calc = param.Action(lambda self: self._do_calc())
    add_cb = param.Action(lambda  self:self._add_cb())

    def __init__(self, **params):
        super(Application, self).__init__(**params)

        self.doc = None
        self.source = ColumnDataSource({"x": range(10), "y": range(10)})
        self.figure = figure()
        self.figure.line(x="x", y="y", source=self.source)
        self.bk_pane = pn.pane.Bokeh(self.figure)
        self.col = pn.Column(
            pn.pane.Markdown("## Title"),
            pn.Param(self, parameters=["do_calc", "add_cb"], show_name=False,),
            self.bk_pane,
        )

    def update_source(self, data):
        self.source.data.update({"y": data})

    def _add_cb(self):
        self.doc = curdoc()
        self.doc.add_periodic_callback(self._do_calc, 200)  # <- this line does not work properly

    def _do_calc(self):
        data = list(np.random.randint(0, 2 ** 31, 10))
        cb = partial(self.update_source, data)
        curdoc().add_next_tick_callback(cb)

    def panel(self):
        return self.col


app = Application()
loop = IOLoop().current()
server = pn.serve(app.panel(), show=False, loop=loop, start=False, port=5006)

# Do stuff

server.io_loop.start()
1 Like

Actually in my example above you don’t even need next_tick_callback, this will do:

class Application(param.Parameterized):
    ...

    def _do_calc(self):
        data = list(np.random.randint(0, 2 ** 31, 10))
        self.source.data.update({"y": data})
    ...

def create_app():
    app = Application()
    return app.panel()

server = pn.serve(create_app, show=False, loop=IOLoop().current(), start=False, port=5006)
# Do stuff
server.io_loop.start()

This will still require my PR with the fix though.

But does that also then work with a periodic callback?

Yes, sorry for abbreviating the example, both my samples were assuming the __init__ still looks like this:

    def __init__(self):
        self.doc = curdoc()
        self.source = ColumnDataSource({"x": range(10), "y": range(10)})
        self.figure = figure()
        self.figure.line(x="x", y="y", source=self.source)
        self.bk_pane = pn.pane.Bokeh(self.figure)
        self.col = pn.Column(
            pn.pane.Markdown("## Title"),
            pn.Param(self, parameters=["do_calc"], show_name=False,),
            self.bk_pane,
        )
        self.col.add_periodic_callback(self._do_calc)  # <- this line does not work properly

Ah I see, that makes sense because adding to next_tick_callback is only needed when you do things from another thread, and this periodic_callback is on the same thread as the main loop?

I also think I should move the add_periodic_callback method onto the pn.state object in future. It’s kind of weird to have it on each object and makes more sense there.

Ah I see, that makes sense because adding to next_tick_callback is only needed when you do things from another thread, and this periodic_callback is on the same thread as the main loop?

Yes, exactly.

1 Like

Could we get a full working example for reference marked as a solution please such that we are 100% sure what to do?

Thanks

Full working example, which requires current master or Panel >=0.10.0a18 (to be released tomorrow), to work:

import panel as pn
import param
from tornado.ioloop import IOLoop
from bokeh.plotting import figure
from bokeh.models import ColumnDataSource
import numpy as np


class Application(param.Parameterized):
    do_calc = param.Action(lambda self: self._do_calc())

    def __init__(self, **params):
        super(Application, self).__init__(**params)
        self.source = ColumnDataSource({"x": range(10), "y": range(10)})
        self.figure = figure()
        self.figure.line(x="x", y="y", source=self.source)
        self.col = pn.Column(
            pn.pane.Markdown("## Title"),
            self.param.do_calc,
            self.figure,
        )
        pn.state.add_periodic_callback(self._do_calc, 200)  # <- this line does not work properly

    def _do_calc(self):
        data = list(np.random.randint(0, 2 ** 31, 10))
        self.source.data.update({"y": data})

    def panel(self):
        return self.col


def create_app():
    app = Application()
    return app.panel()

loop = IOLoop().current()
server = pn.serve(create_app, show=False, loop=loop, start=False, port=5006)

# Do stuff

server.io_loop.start()
3 Likes

Thank you for all these replies and the quick fix @philippjfr. It has been most helpfull.

1 Like

Hey!
Any way to make the same streaming example to work inside a Jupyter(Lab) app too?

@ItamarShDev Can you be more specific? It should already work, what went wrong when you tried?

when i run your code from your comment inside a local JupyterLab
i get:

This event loop is already running

Oh you should not try to start a new event loop or start a server in Jupyter, just display the app as normal:

app = Application()
app.panel()

AttributeError: ‘_state’ object has no attribute ‘add_periodic_callback’

Note the version requirement I stated above:

2 Likes

Using the code still not working for me on 0a18.

tried moving to Stream and still nothing. no error either

import panel as pn
import param
from tornado.ioloop import IOLoop
from bokeh.plotting import figure
from bokeh.models import ColumnDataSource
import numpy as np
pn.extension()


class Application(param.Parameterized):
    do_calc = param.Action(lambda self: self._do_calc())

    def __init__(self, **params):
        super(Application, self).__init__(**params)
        self.source = ColumnDataSource({"x": [1,2,3], "y": [2,3,4]})
        self.figure = figure()
        self.figure.line(x="x", y="y", source=self.source)
        self.col = pn.Column(
            pn.pane.Markdown("## Title"),
            self.param.do_calc,
            self.figure,
        )
        pn.state.add_periodic_callback(self._do_calc, 200)  # <- this line does not work properly


    def _do_calc(self):
        data = list(np.random.randint(0, 2 ** 31, 10))
        self.source.stream({"y": data, "x": data}, 300)

    def panel(self):
        return self.col


app = Application()
app.panel()
1 Like

The example is not working for me either.

Neither the param button, nor the periodic callback work.

However, when using bokeh’s button, the chart does update.

import panel as pn
import param
from tornado.ioloop import IOLoop
from bokeh.plotting import figure
from bokeh.models import ColumnDataSource, Button
import pandas as pd
import numpy as np
pn.extension()


class Application(param.Parameterized):
    do_calc = param.Action(lambda self: self._do_calc())

    def __init__(self, **params):
        super(Application, self).__init__(**params)
        self.source = ColumnDataSource(
            pd.DataFrame({
                'x':np.random.randint(0, 2 ** 31, 10),
                 'y':np.random.randint(0, 2 ** 31, 10)
            }).cumsum().set_index('x')
        )
        self.figure = figure()
        self.figure.line(x="x", y="y", source=self.source)
        self.button = Button(label="Update Plot")
        self.button.on_click(self._do_calc)
        self.col = pn.Column(
            pn.pane.Markdown("## Title"),
#             self.param.do_calc, # <- this line does not work properly
            self.button, # bokeh's button works
            self.figure,
        )

#         pn.state.add_periodic_callback(self._do_calc, 200)  # <- this line does not work properly


    def _do_calc(self,event='test'):
        data = pd.DataFrame({'x':np.random.randint(0, 2 ** 31, 10),
                             'y':np.random.randint(0, 2 ** 31, 10)}).cumsum()
        self.source.data.update(data)

    def panel(self):
        return self.col


app = Application()
app.panel()