Why Panel is 10x slower with Dask than with Pandas?

@ahuang11 , @jerry.vinokurov

My problem (for this message) is essentially solved but I’ll leave this for the future googlers. (see the Solution section)

I am trying to use a local Dask cluster and to set up the dask client dashboard.

Just Dask, no Panel

For some reason the dask code works without Panel:

# full MWE below
client = get_client()
future = client.submit(get_data_for_graph, sys.argv[1], (0, 1440), [21, 79])
mae_a, mae_b = future.result()
mae_a = mae_a.compute()
mae_b = mae_b.compute()

print(mae_a, mae_b)
printout
minute_of_day
780     0.055595
790     0.042109
310     0.046115
1080    0.031772
1170    0.030551
          ...   
210     0.052976
1160    0.039351
420     0.049656
350     0.033365
530     0.046255
Name: err_A, Length: 144, dtype: float64 minute_of_day
780     0.067251
790     0.060881
310     0.060365
1080    0.049023
1170    0.033088
          ...   
210     0.071234
1160    0.049264
420     0.079492
350     0.032884
530     0.065835
Name: err_B, Length: 144, dtype: float64
Full MWE code

This one uses the same test dataset testdata.parquet given above.

from __future__ import annotations

import sys

import dask.dataframe as dd
from dask.distributed import Client
import hvplot.dask  # noqa
import panel as pn


def get_data(file) -> dd.DataFrame:
    df = dd.read_parquet(file)
    for i in range(15):
        df[f"extra_col{i}"] = df["val_A"] + i
    return df


@pn.cache  # share the client across all users and sessions
def get_client():
    return Client("tcp://127.0.0.1:8786")


def get_data_for_graph(
    file: str,
    minute_of_day: tuple[float, float],
    identifiers: list[int],
):

    df = get_data(file)

    mae_a = calculate_mae(
        df=df,
        label="A",
        minute_of_day=minute_of_day,
        identifiers=identifiers,
    )
    mae_b = calculate_mae(
        df=df,
        label="B",
        minute_of_day=minute_of_day,
        identifiers=identifiers,
    )
    return mae_a, mae_b


def calculate_mae(
    df: dd.DataFrame,
    minute_of_day: tuple[int, int],
    identifiers: list[int],
    label: str,
) -> dd.Series:
    df_filtered = _filter_based_on_ui_selections(
        df, minute_of_day=minute_of_day, identifiers=identifiers
    )
    mae = do_calculate_mae(df_filtered, label=label)
    return mae


def _filter_based_on_ui_selections(
    df: dd.DataFrame,
    minute_of_day: tuple[int, int],
    identifiers: list[int],
) -> dd.DataFrame:

    df_filtered = df[df["minute_of_day"].between(*minute_of_day)]
    return df_filtered[df_filtered["identifier"].isin(identifiers)]


def do_calculate_mae(
    df: dd.DataFrame,
    label: str,
) -> dd.Series:

    col_err = f"err_{label}"
    df[col_err] = abs(df[f"val_{label}"] - df["val_C"])
    mae = df.groupby("minute_of_day")[col_err].agg("mean")

    return mae


client = get_client()
future = client.submit(get_data_for_graph, sys.argv[1], (0, 1440), [21, 79])
mae_a, mae_b = future.result()
mae_a = mae_a.compute()
mae_b = mae_b.compute()

print(mae_a, mae_b)

Dask within Panel App

…but if I use Dask within a Panel App, like this:

imain_graph = pn.bind(
    create_graph, file=sys.argv[1], minute_of_day=minute_of_day, identifiers=identifiers
)
main_graph = pn.panel(imain_graph)

where create_graph is

@pn.io.profile("profile", engine="snakeviz")
def create_graph(
    file: str,
    minute_of_day: tuple[float, float],
    identifiers: list[int],
):

    client = get_client()
    minute_of_day = tuple(map(round, minute_of_day))

    future = client.submit(get_data_for_graph, file, minute_of_day, identifiers)
    mae_a, mae_b = future.result()
    mae_a = mae_a.compute()
    mae_b = mae_b.compute()

    common_opts = dict(legend="top")
    graph_scatter_pipeline = mae_b.hvplot.scatter(
        label="B", **common_opts
    ) * mae_a.hvplot.scatter(label="A", **common_opts)

    return graph_scatter_pipeline

it will throw an exception:

ModuleNotFoundError: No module named 'bokeh_app_30c15ce7b3b248ec93ee1ae6185c16f7'

The above exception was the direct cause of the following exception:

Traceback (most recent call last):

... [omitted]

RuntimeError: Error during deserialization of the task graph. This frequently
occurs if the Scheduler and Client have different environments.
For more information, see
https://docs.dask.org/en/stable/deployment-considerations.html#consistent-software-environments
Full traceback
2024-01-24 16:52:28,756 Error running application handler <bokeh.application.handlers.script.ScriptHandler object at 0x7f6e6048b070>: Error during deserialization of the task graph. This frequently
occurs if the Scheduler and Client have different environments.
For more information, see
https://docs.dask.org/en/stable/deployment-considerations.html#consistent-software-environments

File 'client.py', line 330, in _result:
raise exc.with_traceback(tb) Traceback (most recent call last):
  File "/home/niko/code/myproj/.venvs/venv/lib/python3.10/site-packages/distributed/scheduler.py", line 4671, in update_graph
    graph = deserialize(graph_header, graph_frames).data
  File "/home/niko/code/myproj/.venvs/venv/lib/python3.10/site-packages/distributed/protocol/serialize.py", line 439, in deserialize
    return loads(header, frames)
  File "/home/niko/code/myproj/.venvs/venv/lib/python3.10/site-packages/distributed/protocol/serialize.py", line 101, in pickle_loads
    return pickle.loads(x, buffers=buffers)
  File "/home/niko/code/myproj/.venvs/venv/lib/python3.10/site-packages/distributed/protocol/pickle.py", line 96, in loads
    return pickle.loads(x)
ModuleNotFoundError: No module named 'bokeh_app_30c15ce7b3b248ec93ee1ae6185c16f7'

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/niko/code/myproj/.venvs/venv/lib/python3.10/site-packages/bokeh/application/handlers/code_runner.py", line 229, in run
    exec(self._code, module.__dict__)
  File "/home/niko/code/myproj/app.py", line 126, in <module>
    main_graph = pn.panel(imain_graph)
  File "/home/niko/code/myproj/.venvs/venv/lib/python3.10/site-packages/panel/pane/base.py", line 87, in panel
    pane = PaneBase.get_pane_type(obj, **kwargs)(obj, **kwargs)
  File "/home/niko/code/myproj/.venvs/venv/lib/python3.10/site-packages/panel/param.py", line 799, in __init__
    self._replace_pane()
  File "/home/niko/code/myproj/.venvs/venv/lib/python3.10/site-packages/panel/param.py", line 865, in _replace_pane
    new_object = self.eval(self.object)
  File "/home/niko/code/myproj/.venvs/venv/lib/python3.10/site-packages/panel/param.py", line 824, in eval
    return eval_function_with_deps(function)
  File "/home/niko/code/myproj/.venvs/venv/lib/python3.10/site-packages/param/parameterized.py", line 162, in eval_function_with_deps
    return function(*args, **kwargs)
  File "/home/niko/code/myproj/.venvs/venv/lib/python3.10/site-packages/param/depends.py", line 41, in _depends
    return func(*args, **kw)
  File "/home/niko/code/myproj/.venvs/venv/lib/python3.10/site-packages/param/reactive.py", line 431, in wrapped
    return eval_fn()(*combined_args, **combined_kwargs)
  File "/home/niko/code/myproj/.venvs/venv/lib/python3.10/site-packages/panel/io/profile.py", line 251, in wrapped
    return func(*args, **kwargs)
  File "/home/niko/code/myproj/app.py", line 63, in create_graph
    mae_a, mae_b = future.result()
  File "/home/niko/code/myproj/.venvs/venv/lib/python3.10/site-packages/distributed/client.py", line 322, in result
    return self.client.sync(self._result, callback_timeout=timeout)
  File "/home/niko/code/myproj/.venvs/venv/lib/python3.10/site-packages/distributed/client.py", line 330, in _result
    raise exc.with_traceback(tb)
RuntimeError: Error during deserialization of the task graph. This frequently
occurs if the Scheduler and Client have different environments.
For more information, see
https://docs.dask.org/en/stable/deployment-considerations.html#consistent-software-environments
Full MWE code
# Run with:
# python -m panel app.py --autoreload --show --args  [data_file]
from __future__ import annotations

import sys


import dask.dataframe as dd
from dask.distributed import Client
import pandas as pd
import hvplot.pandas  # noqa
import hvplot.dask  # noqa
import panel as pn


def get_data(file) -> dd.DataFrame:
    df = dd.read_parquet(file)
    for i in range(15):
        df[f"extra_col{i}"] = df["val_A"] + i
    return df


@pn.cache  # share the client across all users and sessions
def get_client():
    return Client("tcp://127.0.0.1:8786")


def get_data_for_graph(
    file: str,
    minute_of_day: tuple[float, float],
    identifiers: list[int],
):

    df = get_data(file)

    mae_a = calculate_mae(
        df=df,
        label="A",
        minute_of_day=minute_of_day,
        identifiers=identifiers,
    )
    mae_b = calculate_mae(
        df=df,
        label="B",
        minute_of_day=minute_of_day,
        identifiers=identifiers,
    )
    return mae_a, mae_b


@pn.io.profile("profile", engine="snakeviz")
def create_graph(
    file: str,
    minute_of_day: tuple[float, float],
    identifiers: list[int],
):

    client = get_client()
    minute_of_day = tuple(map(round, minute_of_day))

    future = client.submit(get_data_for_graph, file, minute_of_day, identifiers)
    mae_a, mae_b = future.result()
    mae_a = mae_a.compute()
    mae_b = mae_b.compute()

    common_opts = dict(legend="top")
    graph_scatter_pipeline = mae_b.hvplot.scatter(
        label="B", **common_opts
    ) * mae_a.hvplot.scatter(label="A", **common_opts)

    return graph_scatter_pipeline


def calculate_mae(
    df: dd.DataFrame,
    minute_of_day: tuple[int, int],
    identifiers: list[int],
    label: str,
) -> dd.Series:
    df_filtered = _filter_based_on_ui_selections(
        df, minute_of_day=minute_of_day, identifiers=identifiers
    )
    mae = do_calculate_mae(df_filtered, label=label)
    return mae


def _filter_based_on_ui_selections(
    df: dd.DataFrame,
    minute_of_day: tuple[int, int],
    identifiers: list[int],
) -> dd.DataFrame:

    df_filtered = df[df["minute_of_day"].between(*minute_of_day)]
    return df_filtered[df_filtered["identifier"].isin(identifiers)]


def do_calculate_mae(
    df: dd.DataFrame,
    label: str,
) -> dd.Series:

    col_err = f"err_{label}"
    df[col_err] = abs(df[f"val_{label}"] - df["val_C"])
    mae = df.groupby("minute_of_day")[col_err].agg("mean")

    return mae


pn.param.ParamMethod.loading_indicator = True
pn.extension(sizing_mode="stretch_width", throttled=True)


dfraw = get_data(sys.argv[1])
unique_ids = list(dfraw["identifier"].unique())

minute_of_day = pn.widgets.RangeSlider(
    name="Minute of day", start=0, end=1440, value=(0, 1440), step=10
)
identifiers = pn.widgets.MultiChoice(
    name="Identifiers", options=unique_ids, value=unique_ids
)
imain_graph = pn.bind(
    create_graph, file=sys.argv[1], minute_of_day=minute_of_day, identifiers=identifiers
)
main_graph = pn.panel(imain_graph)

minute_of_day.servable(area="sidebar")
identifiers.servable(area="sidebar")
main_graph.servable(title="Data graph")

What is happening here? Why dask works without Panel, but not with it? What is this unpickling of bokeh_app_30c15ce7b3b248ec93ee1ae6185c16f7 ?

Edit: Solution

I found this comment from the bokeh discourse:

Bokeh dynamically creates module new module objects every time a user session is initiated, and the app code is run inside that new module for isolation. If you are saving/reading between different sessions.

I also figured out the reason. The reason is that panel uses bokeh, and bokeh creates dynamically a new module for each browser session. When calling client.submit, or more generally, when sending tasks for Dask like this:

future = client.submit(somefunction, *args)

the somefunction is also sent to the Cluster as a pickled function. The pickling process includes having to know the module and the function name. The summary of this is that all functions that are processed in Dask must be defined outside a bokeh app module (because of that random naming the modules get). I know that Scaling with Dask — Panel v1.3.7 says

the Client and Cluster must able to import the same versions of all tasks and python package dependencies

but I think it would be better for Dask+Panel beginners that it would also explicitly say that you cannot process any functions with Dask which are defined in your app.py (main Bokeh / Panel module).

1 Like