I’m trying to use dask to save a larger-than-memory zarr store. Concretely, I’m trying to:
Concatenate several NetCDF files (on the same geo grid, same variables) by time
Regrid them to a different grid
Save them as a zarr store, chunked such that there is only ever one chunk on time.
(the resultant zarr store would then be used for further analysis, which is done along the time dimension, and therefore I want it to be unchunked by time to not lose a lot of overhead to rechunking / etc.).
However, I’m having trouble setting up the workflow without causing memory use to balloon in the ds.to_zarr() call.
I’m trying to follow the Dask best practices (especially this one). A simplified version of the workflow is:
import xarray as xr
import numpy as np
import xesmf as xe
from distributed import Client
# Filenames to load (for example, historical and SSP2.45 files for the same GCM model / experiment / run)
fn_list = [fn1,fn2]
# Start dask client
client = Client()
display(client)
@dask.delayed
def load(fn_list):
ds = xr.open_mfdataset(fn_list)
return ds
@dask.delayed
def process(ds):
# Do something to dataset, e.g., regridding
ref_grid = xr.Dataset(coords = {'lat':np.arange(-89.5,89.6),
'lon':np.arange(-179.5,179.6)})
rgrd = xe.Regridder(ds,ref_grid,'conservative')
ds = rgrd(ds)
return ds
def workflow(fn_list):
ds = load(fn_list)
ds = process(ds)
# Rechunk
ds = ds.chunk({'time':-1,'lat':12,'lon':12})
delayed = dask.delayed(ds.to_zarr)('test.zarr')
return delayed
out = dask.compute(workflow)
dask.compute(out)
From what I’ve been gathering through researching this problem, something in the way the task graph is set up causes the whole array to be loaded and sent to one worker when the dask.compute() gets to the .to_zarr() call (which then of course crashes the client).
I guess my primary question is - why does the .to_zarr() call need everything in memory / is it possible to set it up so that it doesn’t? I’d be grateful for any insight!
why does the .to_zarr() call need everything in memory / is it possible to set it up so that it doesn’t?
I think because your workflow uses both an operation that requires the full spatial extent of the data (regridding using xemsf) and an operation that requires the full temporal extent of the data (creating one chunk along the time dimension). The combination requires the full dataset being loaded into memory. AFAIK whether it’s possibly to avoid this depends on the chunking of the input datasets, but I expect your best bet is probably caching the regridded output before rechunking. I recommend checking out @norlandrhagen’s blog post on downscaling pipelines, which also includes a regrid and rechunk step.
I view using both Dask Array and Delayed objects as a bit of an orange flag. I’m not sure why you’d need to do that in this case rather than just using Xarray’s integrated parallelism with Dask.Array objects.
XESMF without pre-generated weights is the slowest and most memory intensive of the commonly available regridding options (see Summary – Geospatial reprojection in Python (2024)). While conservative regridding is a good reason to put up with it, it’s worth considering separating out that step from the application of the regridder.
I think you’re making it too complicated. Your code should look something like this
# will produce a dask.array backed dataset
ds = xr.open_mfdataset(list_of_all_the_netcdf_files)
# note: whatever package you use for regridding needs to be "lazy", i.e. work with Dask arrays
# without triggering computing
ds_regridded = regrid(ds)
ds_regridded.to_zarr("test.zarr")
This should all work in a streaming fashion, without overloading memory.
Thanks for the suggestion @wietzesuijker! However, it’s important to recognize that regridding and reprojection are not necessarily the same thing. (@maxrjones’s talk here has a great overview: https://discourse.pangeo.io/t/pangeo-showcase-geospatial-reprojection-in-python-2024-whats-available-and-whats-next/4531.) Reprojection is defined a bit more narrowly in terms of moving a geospatial raster from one regular rectangular pixel grid and CRS to another, whereas regridding covers the more complex geometries (e.g. curvilinear grid) and resampling strategies (e.g. conservative) used with numerical modeling grids.
I recently had this problem, but the files I wanted to concat were zarr groups with different chunk sizes across temporal and spatial dimensions with overlapping data. The fastest solution was to rechunk the inputs across a standard chunk size when reading in the data, then rechunk the full dataset after the concat. While it’s not a 1 to 1 solution, it may be useful to attach the code to this thread given the overlapping concept.
(For context, the data is streamflow mapped to river reaches, or graph edges and ~100GB total. Runtime was just over 3 minutes).
This has saved me (in addition to other optimizations) lots of headaches when porting code between environments in Alliance Canada, with different architectures/limits/policies.
Wanted to flag that the folks at Carbon Plan (@norlandrhagen from the link @maxrjones shared above) seem to have this figured out. What ended up working was adapting their process, which seems to basically save a temp zarr store for every step of the problem (rechunking to one chunk in space, regridding, rechunking to one chunk in time). Very fast and no memory issues.
In theory, Dask should be smart enough to stream all of these operations together without an intermediate write to disk. Maybe this would be a good workflow to share in Large GeoSpatial Benchmarks: First Pass.