`Canvas.raster` failing when reading with dask

I posted a similar question to xarray but, after further experimenting, I think this relates more to the behaviour of Canvas.raster (which I probably don’t fully understand).

I’m trying to process the NLCD, which is a +/-15GB file, on a machine with 16GB of RAM and 4cores/8threads. At the simplest, I try to open the file and aggregate it into a coarser raster that fits in memory and that I can then plot or manipulate further in memory:

%matplotlib inline

import xarray
import rioxarray
import matplotlib.pyplot as plt
import datashader as ds
from dask.distributed import Client

client = Client(ip="")

da = rioxarray.open_rasterio("NLCD_2016_Land_Cover_L48_20190424.tif",
                          chunks="auto"
                         )
w = 600
h = int(w * da.shape[2] / da.shape[1])
rtr = ds.Canvas(plot_width=w,
                plot_height=h,
               ).raster(da.sel(band=1))

When I load the raster:

rtrl = rtr.compute()

It takes several minutes where, by monitoring the Dask Client, it’s progressing on the computation but, at the end, when there is only the last bit (which I assume is pulling everything together), it starts returning the following warnings:

distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Worker process still alive after 3 seconds, killing
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Worker process still alive after 3 seconds, killing
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker

And eventually fails with:

---------------------------------------------------------------------------
KilledWorker                              Traceback (most recent call last)
<timed exec> in <module>

/opt/conda/lib/python3.7/site-packages/xarray/core/dataarray.py in load(self, **kwargs)
    800         dask.array.compute
    801         """
--> 802         ds = self._to_temp_dataset().load(**kwargs)
    803         new = self._from_temp_dataset(ds)
    804         self._variable = new._variable

/opt/conda/lib/python3.7/site-packages/xarray/core/dataset.py in load(self, **kwargs)
    652 
    653             # evaluate all the dask arrays simultaneously
--> 654             evaluated_data = da.compute(*lazy_data.values(), **kwargs)
    655 
    656             for k, data in zip(lazy_data, evaluated_data):

/opt/conda/lib/python3.7/site-packages/dask/base.py in compute(*args, **kwargs)
    435     keys = [x.__dask_keys__() for x in collections]
    436     postcomputes = [x.__dask_postcompute__() for x in collections]
--> 437     results = schedule(dsk, keys, **kwargs)
    438     return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
    439 

/opt/conda/lib/python3.7/site-packages/distributed/client.py in get(self, dsk, keys, restrictions, loose_restrictions, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs)
   2593                     should_rejoin = False
   2594             try:
-> 2595                 results = self.gather(packed, asynchronous=asynchronous, direct=direct)
   2596             finally:
   2597                 for f in futures.values():

/opt/conda/lib/python3.7/site-packages/distributed/client.py in gather(self, futures, errors, direct, asynchronous)
   1891                 direct=direct,
   1892                 local_worker=local_worker,
-> 1893                 asynchronous=asynchronous,
   1894             )
   1895 

/opt/conda/lib/python3.7/site-packages/distributed/client.py in sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
    778         else:
    779             return sync(
--> 780                 self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
    781             )
    782 

/opt/conda/lib/python3.7/site-packages/distributed/utils.py in sync(loop, func, callback_timeout, *args, **kwargs)
    346     if error[0]:
    347         typ, exc, tb = error[0]
--> 348         raise exc.with_traceback(tb)
    349     else:
    350         return result[0]

/opt/conda/lib/python3.7/site-packages/distributed/utils.py in f()
    330             if callback_timeout is not None:
    331                 future = asyncio.wait_for(future, callback_timeout)
--> 332             result[0] = yield future
    333         except Exception as exc:
    334             error[0] = sys.exc_info()

/opt/conda/lib/python3.7/site-packages/tornado/gen.py in run(self)
    733 
    734                     try:
--> 735                         value = future.result()
    736                     except Exception:
    737                         exc_info = sys.exc_info()

/opt/conda/lib/python3.7/site-packages/distributed/client.py in _gather(self, futures, errors, direct, local_worker)
   1750                             exc = CancelledError(key)
   1751                         else:
-> 1752                             raise exception.with_traceback(traceback)
   1753                         raise exc
   1754                     if errors == "skip":

KilledWorker: ("('concatenate-finalize-from-value-resample_2d-getitem-609402cf05a3a0ada83ecd1d7eda556c', 0, 0)", <Worker 'tcp://172.17.0.2:42001', name: 2, memory: 0, processing: 20>)

I imagine I’m doing something wrong or not completely right, but I can’t put my finger on it. Trying simpler calculations on da (like getting the minimum/maximum values) work as expected and complete without errors.

Do you have any suggestions as to what I’m doing wrong or how I could get this example to work?

Thank you very much for your consideration and also for an awesome library!

Quick update that I’ve made some progress, in case it helps for future reference. The critical part seems to be on setting up the Client(). If I do not set a client and, instead, monitor progress with a ProgressBar(), the computation completes performantly.

Interesting, thanks for reporting back. I think this probably deserves an actual issue on the datashader repo since what you’re doing looks fine to me.