UK research example if not working for me

Hello, I am looking to run data shader bundle operation, and it is failing for me. I thought of trying out the example codes. Facebook data graph has worked fine, however, the UK researcher example is not working for me. I can get to this part of the code

%%time
r_bundled = hv.Curve(hammer_bundle(r_nodes.data, r_edges.data),label=“Bundled”)

The error: CancelledError: resample_edges-b6272cbe-89dd-4ffe-a28f-dcbcaa141bb1

Am not sure whether I am providing all the relevant info. Pls let me know, and I’ll share more logs. I did research and increased ‘timeout’ as well.

https://examples.holoviz.org/gallery/uk_researchers/uk_researchers.html

I ran this example in the Jupyter notebook hosted in uk-researchers-notebook.pyviz.demo.anaconda.com. What I got was only a few Warnings. It took 4:30 minutes to execute:

%%time

%%time
r_bundled = hv.Curve(hammer_bundle(r_nodes.data, r_edges.data),label="Bundled")
/opt/continuum/anaconda/envs/default/lib/python3.9/site-packages/distributed/client.py:3125: UserWarning: Sending large graph of size 16.49 MiB.
This may cause some slowdown.
Consider scattering data ahead of time and using futures.
  warnings.warn(
/opt/continuum/anaconda/envs/default/lib/python3.9/site-packages/distributed/worker.py:2995: UserWarning: Large object of size 1.13 MiB detected in task graph: 
  ([array([[0.54141443, 0.58125185],
       [0.42114 ... .008, 0.016, 2)
Consider scattering large objects ahead of time
with client.scatter to reduce scheduler burden and 
keep data on workers

    future = client.submit(func, big_data)    # bad

    big_future = client.scatter(big_data)     # good
    future = client.submit(func, big_future)  # good
  warnings.warn(
CPU times: user 1min 5s, sys: 5.42 s, total: 1min 10s
Wall time: 4min 37s

The notebook is unfortunately read-only so I cannot easily check which versions they’re running. From the anaconda-project.yml I would guess every package used is the latest one (since when?).

Could you provide the full traceback of the error? What are the versions of dask, datashader, holoviews, params, etc. that you use?

1 Like

Thank you. The notebook hosted in Anaconda did work for me as well.

Versions at my machine:
HoloViews version: 1.18.1
Datashader version: 0.15.2
Param version: 1.13.0
Dask version: 2023.6.0

Here is my error log:

2024-01-22 12:25:24,358 - distributed.protocol.core - CRITICAL - Failed to deserialize
Traceback (most recent call last):
  File "/Users/saurabhkumar/anaconda3/lib/python3.11/site-packages/distributed/protocol/core.py", line 158, in loads
    return msgpack.loads(
           ^^^^^^^^^^^^^^
  File "/Users/saurabhkumar/anaconda3/lib/python3.11/site-packages/msgpack/fallback.py", line 128, in unpackb
    0xDC: (2, ">H", TYPE_ARRAY),
              ^^^^^^^^^^^^^^^^^^
  File "/Users/saurabhkumar/anaconda3/lib/python3.11/site-packages/msgpack/fallback.py", line 565, in _unpack
    def __iter__(self):
                        
  File "/Users/saurabhkumar/anaconda3/lib/python3.11/site-packages/msgpack/fallback.py", line 592, in _unpack
  File "/Users/saurabhkumar/anaconda3/lib/python3.11/site-packages/msgpack/fallback.py", line 565, in _unpack
    def __iter__(self):
                        
  File "/Users/saurabhkumar/anaconda3/lib/python3.11/site-packages/msgpack/fallback.py", line 565, in _unpack
    def __iter__(self):
                        
  File "/Users/saurabhkumar/anaconda3/lib/python3.11/site-packages/msgpack/fallback.py", line 602, in _unpack
UnicodeDecodeError: 'utf-8' codec can't decode byte 0xe0 in position 2: invalid continuation byte
2024-01-22 12:25:24,361 - distributed.core - ERROR - Exception while handling op register-client
Traceback (most recent call last):
  File "/Users/saurabhkumar/anaconda3/lib/python3.11/site-packages/distributed/core.py", line 924, in _handle_comm
    result = await result
             ^^^^^^^^^^^^
  File "/Users/saurabhkumar/anaconda3/lib/python3.11/site-packages/distributed/scheduler.py", line 5449, in add_client
    await self.handle_stream(comm=comm, extra={"client": client})
  File "/Users/saurabhkumar/anaconda3/lib/python3.11/site-packages/distributed/core.py", line 977, in handle_stream
    msgs = await comm.read()
           ^^^^^^^^^^^^^^^^^
  File "/Users/saurabhkumar/anaconda3/lib/python3.11/site-packages/distributed/comm/tcp.py", line 254, in read
    msg = await from_frames(
          ^^^^^^^^^^^^^^^^^^
  File "/Users/saurabhkumar/anaconda3/lib/python3.11/site-packages/distributed/comm/utils.py", line 100, in from_frames
    res = _from_frames()
          ^^^^^^^^^^^^^^
  File "/Users/saurabhkumar/anaconda3/lib/python3.11/site-packages/distributed/comm/utils.py", line 83, in _from_frames
    return protocol.loads(
           ^^^^^^^^^^^^^^^
  File "/Users/saurabhkumar/anaconda3/lib/python3.11/site-packages/distributed/protocol/core.py", line 158, in loads
    return msgpack.loads(
           ^^^^^^^^^^^^^^
  File "/Users/saurabhkumar/anaconda3/lib/python3.11/site-packages/msgpack/fallback.py", line 128, in unpackb
    0xDC: (2, ">H", TYPE_ARRAY),
              ^^^^^^^^^^^^^^^^^^
  File "/Users/saurabhkumar/anaconda3/lib/python3.11/site-packages/msgpack/fallback.py", line 565, in _unpack
    def __iter__(self):
                        
  File "/Users/saurabhkumar/anaconda3/lib/python3.11/site-packages/msgpack/fallback.py", line 592, in _unpack
  File "/Users/saurabhkumar/anaconda3/lib/python3.11/site-packages/msgpack/fallback.py", line 565, in _unpack
    def __iter__(self):
                        
  File "/Users/saurabhkumar/anaconda3/lib/python3.11/site-packages/msgpack/fallback.py", line 565, in _unpack
    def __iter__(self):
                        
  File "/Users/saurabhkumar/anaconda3/lib/python3.11/site-packages/msgpack/fallback.py", line 602, in _unpack
UnicodeDecodeError: 'utf-8' codec can't decode byte 0xe0 in position 2: invalid continuation byte
Task exception was never retrieved
future: <Task finished name='Task-28575112' coro=<Server._handle_comm() done, defined at /Users/saurabhkumar/anaconda3/lib/python3.11/site-packages/distributed/core.py:830> exception=UnicodeDecodeError('utf-8', b'-U\xe0s\xe4?\xc4\x10\x895xv7>\xc8?\xa3\x1e\xe5', 2, 3, 'invalid continuation byte')>
Traceback (most recent call last):
  File "/Users/saurabhkumar/anaconda3/lib/python3.11/site-packages/distributed/core.py", line 924, in _handle_comm
    result = await result
             ^^^^^^^^^^^^
  File "/Users/saurabhkumar/anaconda3/lib/python3.11/site-packages/distributed/scheduler.py", line 5449, in add_client
    await self.handle_stream(comm=comm, extra={"client": client})
  File "/Users/saurabhkumar/anaconda3/lib/python3.11/site-packages/distributed/core.py", line 977, in handle_stream
    msgs = await comm.read()
           ^^^^^^^^^^^^^^^^^
  File "/Users/saurabhkumar/anaconda3/lib/python3.11/site-packages/distributed/comm/tcp.py", line 254, in read
    msg = await from_frames(
          ^^^^^^^^^^^^^^^^^^
  File "/Users/saurabhkumar/anaconda3/lib/python3.11/site-packages/distributed/comm/utils.py", line 100, in from_frames
    res = _from_frames()
          ^^^^^^^^^^^^^^
  File "/Users/saurabhkumar/anaconda3/lib/python3.11/site-packages/distributed/comm/utils.py", line 83, in _from_frames
    return protocol.loads(
           ^^^^^^^^^^^^^^^
  File "/Users/saurabhkumar/anaconda3/lib/python3.11/site-packages/distributed/protocol/core.py", line 158, in loads
    return msgpack.loads(
           ^^^^^^^^^^^^^^
  File "/Users/saurabhkumar/anaconda3/lib/python3.11/site-packages/msgpack/fallback.py", line 128, in unpackb
    0xDC: (2, ">H", TYPE_ARRAY),
              ^^^^^^^^^^^^^^^^^^
  File "/Users/saurabhkumar/anaconda3/lib/python3.11/site-packages/msgpack/fallback.py", line 565, in _unpack
    def __iter__(self):
                        
  File "/Users/saurabhkumar/anaconda3/lib/python3.11/site-packages/msgpack/fallback.py", line 592, in _unpack
  File "/Users/saurabhkumar/anaconda3/lib/python3.11/site-packages/msgpack/fallback.py", line 565, in _unpack
    def __iter__(self):
                        
  File "/Users/saurabhkumar/anaconda3/lib/python3.11/site-packages/msgpack/fallback.py", line 565, in _unpack
    def __iter__(self):
                        
  File "/Users/saurabhkumar/anaconda3/lib/python3.11/site-packages/msgpack/fallback.py", line 602, in _unpack
UnicodeDecodeError: 'utf-8' codec can't decode byte 0xe0 in position 2: invalid continuation byte
---------------------------------------------------------------------------
CancelledError                            Traceback (most recent call last)
File <timed exec>:1

File ~/anaconda3/lib/python3.11/site-packages/param/parameterized.py:3658, in ParameterizedFunction.__new__(class_, *args, **params)
   3656 inst = class_.instance()
   3657 inst.param._set_name(class_.__name__)
-> 3658 return inst.__call__(*args,**params)

File ~/anaconda3/lib/python3.11/site-packages/datashader/bundling.py:496, in hammer_bundle.__call__(self, nodes, edges, **params)
    493 edge_segments = [resample_edges(segment, p.min_segment_length, p.max_segment_length, segment_class.ndims) for segment in edge_segments]
    495 # Finally things can be sent for computation
--> 496 edge_segments = compute(*edge_segments)
    498 # Smooth out the graph
    499 for i in range(10):

File ~/anaconda3/lib/python3.11/site-packages/dask/base.py:595, in compute(traverse, optimize_graph, scheduler, get, *args, **kwargs)
    592     keys.append(x.__dask_keys__())
    593     postcomputes.append(x.__dask_postcompute__())
--> 595 results = schedule(dsk, keys, **kwargs)
    596 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])

File ~/anaconda3/lib/python3.11/site-packages/distributed/client.py:3243, in Client.get(self, dsk, keys, workers, allow_other_workers, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs)
   3241         should_rejoin = False
   3242 try:
-> 3243     results = self.gather(packed, asynchronous=asynchronous, direct=direct)
   3244 finally:
   3245     for f in futures.values():

File ~/anaconda3/lib/python3.11/site-packages/distributed/client.py:2368, in Client.gather(self, futures, errors, direct, asynchronous)
   2366 except ValueError:
   2367     local_worker = None
-> 2368 return self.sync(
   2369     self._gather,
   2370     futures,
   2371     errors=errors,
   2372     direct=direct,
   2373     local_worker=local_worker,
   2374     asynchronous=asynchronous,
   2375 )

File ~/anaconda3/lib/python3.11/site-packages/distributed/utils.py:351, in SyncMethodMixin.sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
    349     return future
    350 else:
--> 351     return sync(
    352         self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
    353     )

File ~/anaconda3/lib/python3.11/site-packages/distributed/utils.py:418, in sync(loop, func, callback_timeout, *args, **kwargs)
    416 if error:
    417     typ, exc, tb = error
--> 418     raise exc.with_traceback(tb)
    419 else:
    420     return result

File ~/anaconda3/lib/python3.11/site-packages/distributed/utils.py:391, in sync.<locals>.f()
    389         future = wait_for(future, callback_timeout)
    390     future = asyncio.ensure_future(future)
--> 391     result = yield future
    392 except Exception:
    393     error = sys.exc_info()

File ~/anaconda3/lib/python3.11/site-packages/tornado/gen.py:767, in Runner.run(self)
    765 try:
    766     try:
--> 767         value = future.result()
    768     except Exception as e:
    769         # Save the exception for later. It's important that
    770         # gen.throw() not be called inside this try/except block
    771         # because that makes sys.exc_info behave unexpectedly.
    772         exc: Optional[Exception] = e

File ~/anaconda3/lib/python3.11/site-packages/distributed/client.py:2232, in Client._gather(self, futures, errors, direct, local_worker)
   2230     else:
   2231         raise exception.with_traceback(traceback)
-> 2232     raise exc
   2233 if errors == "skip":
   2234     bad_keys.add(key)

CancelledError: resample_edges-b6272cbe-89dd-4ffe-a28f-dcbcaa141bb1

@sorukumar thank you for posting the traceback. If you could format it by using three backticks (```) before and after it, it would be formatted so that it would be easier to read.

How did you get the parquet files used in the example? The download link on UK Researchers — Examples 0.1.0 documentation seems to be broken.

@fohrloop, Thanks for your willingness to help.

with a quick search, I cannot figure out from where I downloaded the file. I downloaded it sometime back at the end of November. Not sure if it was UK Researchers — Examples 0.1.0 documentation. Based on my browser history in November, I have visited this site many times. and, of course, right now it is not working. if you are okay I will upload the file. I’ll be worried about downloading files from strangers, so am okay either way.
by the way, I’ll keep looking if I can locate the link where I downloaded from.

@sorukumar I was experimenting with dask + panel myself on different app, and saw this

  File "/home/niko/myproj/venv/lib/python3.10/site-packages/distributed/client.py", line 336, in _result
    raise exception
concurrent.futures._base.CancelledError: get_mae-977797ab2a9a158afb938e1fe4264ec2

While it is not exactly the same message or from the same line, it looks very similar. This occurs if the app is in the middle of calculating some dask task, and I kill the dask scheduler/worker process. I wonder if also in yout case the dask scheduler / worker process dies in the middle of the calculation? (for example, due to using up all available memory or something)

1 Like