I posted a similar question to xarray
but, after further experimenting, I think this relates more to the behaviour of Canvas.raster
(which I probably don’t fully understand).
I’m trying to process the NLCD, which is a +/-15GB file, on a machine with 16GB of RAM and 4cores/8threads. At the simplest, I try to open the file and aggregate it into a coarser raster that fits in memory and that I can then plot or manipulate further in memory:
%matplotlib inline
import xarray
import rioxarray
import matplotlib.pyplot as plt
import datashader as ds
from dask.distributed import Client
client = Client(ip="")
da = rioxarray.open_rasterio("NLCD_2016_Land_Cover_L48_20190424.tif",
chunks="auto"
)
w = 600
h = int(w * da.shape[2] / da.shape[1])
rtr = ds.Canvas(plot_width=w,
plot_height=h,
).raster(da.sel(band=1))
When I load the raster:
rtrl = rtr.compute()
It takes several minutes where, by monitoring the Dask Client, it’s progressing on the computation but, at the end, when there is only the last bit (which I assume is pulling everything together), it starts returning the following warnings:
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Worker process still alive after 3 seconds, killing
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Worker process still alive after 3 seconds, killing
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
And eventually fails with:
---------------------------------------------------------------------------
KilledWorker Traceback (most recent call last)
<timed exec> in <module>
/opt/conda/lib/python3.7/site-packages/xarray/core/dataarray.py in load(self, **kwargs)
800 dask.array.compute
801 """
--> 802 ds = self._to_temp_dataset().load(**kwargs)
803 new = self._from_temp_dataset(ds)
804 self._variable = new._variable
/opt/conda/lib/python3.7/site-packages/xarray/core/dataset.py in load(self, **kwargs)
652
653 # evaluate all the dask arrays simultaneously
--> 654 evaluated_data = da.compute(*lazy_data.values(), **kwargs)
655
656 for k, data in zip(lazy_data, evaluated_data):
/opt/conda/lib/python3.7/site-packages/dask/base.py in compute(*args, **kwargs)
435 keys = [x.__dask_keys__() for x in collections]
436 postcomputes = [x.__dask_postcompute__() for x in collections]
--> 437 results = schedule(dsk, keys, **kwargs)
438 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
439
/opt/conda/lib/python3.7/site-packages/distributed/client.py in get(self, dsk, keys, restrictions, loose_restrictions, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs)
2593 should_rejoin = False
2594 try:
-> 2595 results = self.gather(packed, asynchronous=asynchronous, direct=direct)
2596 finally:
2597 for f in futures.values():
/opt/conda/lib/python3.7/site-packages/distributed/client.py in gather(self, futures, errors, direct, asynchronous)
1891 direct=direct,
1892 local_worker=local_worker,
-> 1893 asynchronous=asynchronous,
1894 )
1895
/opt/conda/lib/python3.7/site-packages/distributed/client.py in sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
778 else:
779 return sync(
--> 780 self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
781 )
782
/opt/conda/lib/python3.7/site-packages/distributed/utils.py in sync(loop, func, callback_timeout, *args, **kwargs)
346 if error[0]:
347 typ, exc, tb = error[0]
--> 348 raise exc.with_traceback(tb)
349 else:
350 return result[0]
/opt/conda/lib/python3.7/site-packages/distributed/utils.py in f()
330 if callback_timeout is not None:
331 future = asyncio.wait_for(future, callback_timeout)
--> 332 result[0] = yield future
333 except Exception as exc:
334 error[0] = sys.exc_info()
/opt/conda/lib/python3.7/site-packages/tornado/gen.py in run(self)
733
734 try:
--> 735 value = future.result()
736 except Exception:
737 exc_info = sys.exc_info()
/opt/conda/lib/python3.7/site-packages/distributed/client.py in _gather(self, futures, errors, direct, local_worker)
1750 exc = CancelledError(key)
1751 else:
-> 1752 raise exception.with_traceback(traceback)
1753 raise exc
1754 if errors == "skip":
KilledWorker: ("('concatenate-finalize-from-value-resample_2d-getitem-609402cf05a3a0ada83ecd1d7eda556c', 0, 0)", <Worker 'tcp://172.17.0.2:42001', name: 2, memory: 0, processing: 20>)
I imagine I’m doing something wrong or not completely right, but I can’t put my finger on it. Trying simpler calculations on da
(like getting the minimum/maximum values) work as expected and complete without errors.
Do you have any suggestions as to what I’m doing wrong or how I could get this example to work?
Thank you very much for your consideration and also for an awesome library!