Dask RESAMPLING (can't handle large files...)

Hello All! First post (thanks for the referral @rsignell)

My colleague and I are working on cooking up a version of SSEBop (an energy-balance Evapotranspiration model) that takes advantage of Dask parallel processing inspired by the Pangeo tools… We’ve been getting stuck when we try and resample rasters. We’re running into memory errors…

Here is the workflow:

  1. Read in the rasters with rioxarray.open_rasterio(filepath, chunks=True)
  2. Resample several rasters to 25km and 100km with Nearest Neighbor
  3. Do some raster logic/math
  4. Output an ET fraction image.

We’re prototyping this on EROS VDI and our personal laptops. The input files are global rasters at 1km resolution so they are ~2GB in size. When we go to reproject as few as one raster is where we run into issues. Originally we tried resampling as recommended by rioxarray, but this doesn’t work at all because suddenly we get a memory issue:

https://corteva.github.io/rioxarray/stable/examples/resampling.html

I figured that’s because the reproject needs to call everything into memory at once, so I was looking for a way around using the rasterio resampling based on a reprojection. Then I found this pyresample example (Based on a question by @rabernat):

https://github.com/pytroll/pyresample/issues/206

I tried using the pyresample method to do my resampling in a parallel fashion (since it is dask-enabled), but I watched my dask-distributed workers struggling with the workload and overall failing to resample this large 1km raster to 25km:

I also tried the default threaded scheduler and also no luck there…

I thought that Dask was supposed to automate stuff behind the scenes, and queue up jobs in bite-size chunks so you don’t hit memory errors?? I’m certainly not having that experience. We are soon gonna have more compute resources at our disposal and move into AWS (where I don’t have a TON of experience) but I thought this would be possible on a laptop. It’s nice to be able to model AT Least ONE SCENE without messing with the cloud stuff all the time. What am I missing? How do people do resampling workflows quickly?

We could resample by other means and preprocess the types of rasters we need but the vision was to do it on the fly with fast parallel tools, but this ain’t cutting it hahaha. Arcpy can resample this stuff on a laptop so there must be a way we can get it going with open source, right?

Much love,
Gabe

Hi @Gabe-Parrish and welcome to the forum! We would love to help you with this issue.

It’s concerning that you can’t even process a single scene without crashing / running out of memory. My strong recommendation would be to not move to Dask until you have a single standalone task actually working. Dask is not magic. It is good for parallelizing workflows over multiple CPUs or machines in a cluster. But it cannot change the basic memory requirements of your calculation. For Dask to work, you need to split your problem into many small pieces that can be handled in memory. It sounds to me like you have not yet found the right chunking strategy to split up your problem into small pieces.

It sounds like you are trying to resample a global 1km raster image. Resample to what? What is your source / target grid / CRS? Are you trying to do a global → global reprojection of the data? If so, that can be a hugely expensive operation. It may be that the only solution is to run the operation on a huge computer with tons of memory. Regridding is complicated to parallelize because every grid point potentially maps to every other grid point. Dask can’t just magically figure out how to do this. You would need a custom algorithm.

If you share more specific details, we can probably be more helpful

@Gabe-Parrish , as @rabernat said, thanks for posting here!

I notice you don’t mention re-projection here in this bullet list, only resampling a 1 km grid to 25 km. You mention using a nearest neighbor, but would taking the mean be preferable? It should be straightforward to open the data with something like chunks={'x':2500, 'y':2500} and then creating a list of delayed tasks where each task computes 25x25 km block means on each chunk, probably using the reshape approach detailed in this SO answer.

The list of delayed tasks could then be executed using Dask bag.

Or maybe map_blocks would be more appropriate?

Adding to what @rsignell mentioned, the average can be calculated in linear time, while nearest neighbours interpolation tends to be more greedy, with runtimes between logarithmic and quadratic complexities (sometimes with the same impact on memory usage).

In a certain way how you organize your data may affect these runtimes too. A brief discussion on time and memory complexity and design choices may be found here.

For block_mean resampling (e.g. from 1km to 25km), @ocefpaf reminded me of the xarray.coarsen method.

So actually this problem is as easy as:

da_25km = da_1km.coarsen(x=25, y=25, boundary='pad').mean()

Here’s a complete example notebook!