Best practices to go from 1000s of netcdf files to analyses on a HPC cluster?

At the risk of being controversial, I would recommend against dask+xarray for
these kind of ETL pipelines, although they do work better in HPC than the
cloud. When faced with a similar netCDF-to-zarr task, we we ultimately
decided that dask+xarray is not the right tool for this task, and that
“dumbing” our code down to lower-level libraries like netCDF and zarr was
necessary.

Our data consists of highly structured data with the following pattern:

gs://bucket/data/{time in YYYYMMDD.HHMMSS}/data.tile{tile}.nc

Each netCDF file contains O(10) variables and has identical dims and coords.
The tile dimension is a spatial dimension for a subdomain of a cubed-sphere
atmospheric model. There are 6 tiles, 3800 time steps, and each file is 270
MB in size (so 6 TB in total).

Ultimately, we could not use xarray+dask to process this data into zarr. We
tried several different strategies that seemed interesting, but all
deadlocked on rechunking/out of memory issues. We fixed this by subtracting
the dask layer, which is rightfully focused on a much larger set of problems,
and less optimized for these simple data transformation tasks, which usually
have a very structured form.

The basic idea is to

  1. open a single file. Use this is a template to
    initialize a zarr group and zarr array with the correct size to fit all the
    files. This only uses zarr.open_group and zarr.create.
  2. loop over all files and insert into the pre-initialized array.
    The loop can be done in
    parallel—we used Google Dataflow, but dask distributed would work too—but
    the chunking per worker has to be correct if you want to avoid race
    conditions when inserting data. The easiest way to accomplish this is to use
    a chunksize of 1 along time or tile dimensions. The data can be rechunked
    later using xarray+dask. For HPC writes to a shared filesystem you could
    probably use zarr’s locking
    primitives
    to avoid
    race-conditions even if workers need to write to the same chunk.

This more manual approach is scalable and robust. You could do this for 10 or
million files since there is no global dask graph that needs to fit in
memory. You can easily add retry logic for I/O failures, making the pipeline
fault tolerant.

A disadvantage is that it requires reverse engineering how xarray interprets
zarr stores using attributes like _ARRAY_DIMENSIONS, fill_values, and time
encodings.

tl;dr; xarray+dask is a heavy layer for these highly structure I/O
operations. Use a parellel for-loop instead.

3 Likes