Netcdf to Zarr best practices

Hi all,

I recently went through the process of converting some netCDF files in HDF5 format to Zarr. I now feel your pain!
I wanted to share a notebook that demonstrates my workflow in an attempt to

  1. Learn what mistakes I might have made along the way.
  2. Discover which components of the workflow can be generalized and placed in a library (pangeo-forge?)
    for others to use.
  3. Learn about existing tools that already exists for this workflow.

And one hopefully easy, technical question:

  1. Should coordinate variables be chunked? Does it matter whether they’re
    dimension coordinates or non-dimension coordinates?

The full notebook is at https://nbviewer.jupyter.org/gist/TomAugspurger/b4f95f5432fced544d1f23e18ea8368f I’ll give a summary here.

The Dataset

We’re working with the Daymet dataset. It’s a public dataset on Azure Blob Storage. I’m also writing the data converted to Azure Blob Storage.

There are a few aspects of this dataset that matter for the conversion to Zarr.

  • There are three completely independent regions (Hawaii, North America, Puerto Rico). I’ve only done Hawaii so far.
  • There are several variables of interest that share the same coordinates (tmin, tmax, prcp, etc.). These are indexed by (time, x, y).
  • A new version of this dataset is generated each year. We want to make it easy to expand this in time.
  • The source files are netCDF datasets (stored as directories of HDF5 files) named like {variable}_{year}_{region}.nc4.

So the raw data is like

daymetv3_dayl_1980_hawaii.nc4
daymetv3_tmax_1980_hawaii.nc4
...
daymetv3_tmax_1980_na.nc4
...
daymetv3_tmax_2010_hawaii.nc4

And here’s the repr for the dataset that we’ll generate for Hawaii.

<xarray.Dataset>
Dimensions:                  (nv: 2, time: 14600, x: 284, y: 584)
Coordinates:
    lat                      (y, x) float32 dask.array<chunksize=(584, 284), meta=np.ndarray>
    lon                      (y, x) float32 dask.array<chunksize=(584, 284), meta=np.ndarray>
  * time                     (time) datetime64[ns] 1980-01-01T12:00:00 ... 20...
  * x                        (x) float32 -5.802e+06 -5.801e+06 ... -5.519e+06
  * y                        (y) float32 -3.9e+04 -4e+04 ... -6.21e+05 -6.22e+05
Dimensions without coordinates: nv
Data variables:
    dayl                     (time, y, x) float32 dask.array<chunksize=(365, 584, 284), meta=np.ndarray>
    lambert_conformal_conic  float32 ...
    prcp                     (time, y, x) float32 dask.array<chunksize=(365, 584, 284), meta=np.ndarray>
    srad                     (time, y, x) float32 dask.array<chunksize=(365, 584, 284), meta=np.ndarray>
    swe                      (time, y, x) float32 dask.array<chunksize=(365, 584, 284), meta=np.ndarray>
    time_bnds                (time, nv) datetime64[ns] dask.array<chunksize=(14600, 2), meta=np.ndarray>
    tmax                     (time, y, x) float32 dask.array<chunksize=(365, 584, 284), meta=np.ndarray>
    tmin                     (time, y, x) float32 dask.array<chunksize=(365, 584, 284), meta=np.ndarray>
    vp                       (time, y, x) float32 dask.array<chunksize=(365, 584, 284), meta=np.ndarray>
    yearday                  (time) int16 dask.array<chunksize=(14600,), meta=np.ndarray>

Our overall strategy is to

  1. Treat each region completely independently. They’ll end up in different Zarr stores.
  2. Construct a dictionary with all of the attrs we whish to preserve (global attributes and per-variable attributes)
  3. “Allocate” an empty Zarr Group containing the dataset. “Allocate” is in quotes, since we aren’t writing any actual data here. We just want to get the layout of the Zarr metadata right. We’ll copy the actual data later.
  4. Copy the data for “static” variables. These are coordinates / variables like lat, lon, or lambert_conformal_conic that not indexed by time. We only need to copy them once.
  5. Process each year’s worth of data.

This is a deliberately low-level strategy that’s attempting to do the conversion as efficiently as possible.
In particular, we avoid any expensive inference operations in xarray, and we’ve set up the problem to be as parallel as possible.

The Conversion

The notebook has the details of the conversion but I want to highlight two aspects in particular

  1. Preallocating the Zarr groups
  2. Computing offsets: the start position in the output Zarr array for each input file

Together, these allow us to completely parallelize the most expensive part of the pipeline: copying the actual data form HDF5 to Zarr.

# Preallocate groups

for variable in variables:  # tmax, tmin, pcrp, etc.
    h5file = h5py.File(fs.open(path/to/variable.nc4))
    dataset = h5file[variable]
    chunks = dask.array.empty_like(dataset).chunksize
    # We explicitly use a chunksize of 365 on the time axis.
    # This makes it easier to append in the future, since the chunks will always be
    # uniform. We won't have to rechunk a dangling chunk of data from the previous year.
    chunks = (365,) + chunks[1:]
    v = group.empty(
        dataset.name, shape=full_shape, dtype=dataset.dtype, chunks=chunks, overwrite=True
    )
    v.attrs.update(attrs[dataset.name.lstrip("/")])
    
offsets = 365 * np.arange(len(years))  # the starting point in the output for each input

...

# Writes are completely parallel for each variable, year
map(process_file, all_variables_all_years, offsets, ....)

In the notebook, I parallelized this with Dask on a cluster. You could use Dask on a single
machine (though bandwidth would probably become a bottleneck), or anything implementing the
concurrent.futures interface.

Reading

Now we have an analysis-ready, cloud optimized dataset, when accessed from the same region.

import fsspec
import xarray as xr

store = fsspec.get_mapper(
    "az://daymet-zarr/daymet_v3_hawaii.zarr",
    account_name="daymeteuwest",
)
%time ds = xr.open_zarr(store, consolidated=True)
CPU times: user 187 ms, sys: 31.3 ms, total: 219 ms
Wall time: 555 ms

With the original files, that would have taken ~30 minutes. Apologies for the long post, and thanks for any feedback.

3 Likes

Apparently they shouldn’t. See Puzzling S3 xarray.open_zarr latency

Thanks for the description of your steps! Looking forward to going over your notebook.

1 Like

This is great Tom. It will be very useful for the Pangeo Forge effort to have you diving into these weeds. :frog:

I definitely understand the motivation to drop to the lower level I/O libraries (h5py, zarr) to improve performance, bypassing xarray.

The downside, which you might not be fully aware of, is that you are also bypassing xarray’s very useful encoding system. Variables in netCDF world can be encoded following CF conventions. These specify attributes such as scale_factor and add_offset. Those are straightforward, but datetime encoding is where the real pain is. Datetime encoding in CF requires specification of both calendar and units (e.g. days since 1980-01-01.

When xarray opens a dataset, it automatically looks for these attributes, applies the relevant decoding functions, and pops the attributes into a property called .encoding. Xarray tries to verify the compatibility of encodings when writing to a dataset. Unless you are 100% sure that the source files have no encoding surprises, it’s risky to bypass this.

As a concrete example, imagine having one netCDF file per day. There is a time variable in which the values are all 1, but the attribute changes, i.e. days since 1980-01-01, days since 1980-01-02, etc. If you just naively open these files with h5py and dump the raw values into a zarr, ignoring this attribute, you will obviously get your time coordinate wrong. Similar problems can happen with data variables (e.g. imagine scale_factor is different on every dataset.) While this might sound obscure, problems like this happen all the time with real datasets. :upside_down_face:

An intermediate approach that I am very excited about is to use xarray’s new region option in to_zarr.

This is what I use in my latest Pangeo Forge refactor: first “allocate” the dataset using Xarray (your steps 2&3), but don’t actually write any data variables. Then call to_zarr(target, region=some_slice) from within a parallel loop. I’d be curious how this works for your application.

1 Like

Thanks. The attrs section attempts to deal with that, but I did give up on the calendar / units encoding.

The region argument does look useful if you already have an xarray dataset. I’m curious about the fastest way to construct that (potentially dummy) xarray dataset from a collection of netcdf files, where all you need is the “schema” for the dataset. My initial attempts to use xr.open_dataset or xr.open_mfdataset weren’t promising. I’ll need to look more closely at the snakeviz profile.

1 Like

This notebook demonstrates creating the “template” / “schema” xarray.Dataset. It’s much fewer lines of code, probably easier to generalize, and probably more correct (e.g. properly handling attrs) than my first example.

For the same Hawaii dataset, it takes ~10 seconds to do all the operations necessary to create the dataset. As expected, almost all of that time is in xarray.open_dataset.

Things take longer on the dataset for all of North America. That’s something like 5 minutes to do xr.open_dataset on all the “template” files from 1980, so probably about 35 minutes to load all the time coordinates. We can cheat in this case, since we know what the time coordinates are ahead of time.

Next steps would be to

  1. profile xarray.open_dataset to see where time is spent. At a glance, most of it seems to be loading data (probably the coordinates) which may be unavoidable.
  2. Try out writing the data, using xarray’s region trick.
1 Like

@TomAugspurger 90% of your gist is implemented already, it’s just a pain to tell open_mfdataset what you want :frowning:

Try data_vars="minimal", coords="minimal", compat="override"; to loop over files in parallel with delayed use parallel=True. This should only concatenate variables with the time dimension. Variables without the time dimension are picked from the first file (compat="override").

See the “Note” here: https://xarray.pydata.org/en/stable/io.html#reading-multi-file-datasets.

That said, it’s likely xr.concat can be optimized, curious to see what you profiles will show :slight_smile:

Thanks. This takes ~2.5 minutes

ds = xr.open_mfdataset([fs.open(x) for x in files], concat_dim="time",
                       data_vars="minimal", coords="minimal", compat="override")

I think that compares to ~5-10s for my low-level method. I’ll dig through the snakeviz profile when I have time.


One other optimization I noticed for this dataset. Dimensionless coordinates like lat and lon can be completely ignored for all of the appends.


Last thing, the Zarr writes with ds.to_zarr(store, mode="a", region=region) still seem to be causing a lot of HTTP gets for “unrelated” variables (i.e. not the data variable of interest in the region, and not any of its dimensions). We’ll need to look into this, probably by implementing a Zarr store class that logs the number of times each key is accessed.

@TomAugspurger, @rabernat, @dcherian
One approach that might be relevant to this discussion is the practice of pickling xarray Datasets. It occurred to me the other day that we (at CEDA in the UK) are frequently running xr.open_mfdataset(...) on the same set of files. Each time this is run, it might take n minutes to complete. However, if we wrapped the call in some simple code then we could reduce this read time to milliseconds, e.g.:

def wrap_open_dataset(datapath):
      pickle_file = map_path(datapath)

      if os.path.isfile(pickle_file):
            return read_from_pickle(pickle_file)

     ds = xr.open_mfdataset(datapath, ...args...)
     pickle_dataset(ds)

     return ds

Has anybody tried this approach in their pipelines? Are there any known drawbacks with doing it? I think the only extra thing I would include is a check that the data files have not been updated since the pickle file was created. Such a check would trigger a regeneration of the pickle file.

1 Like

Thanks @agstephens. That seems like a fine idea for an analysis workload.

For my use case, I’m doing a one-time conversion from HDF5 files to Zarr. I’m trying to minimize the number of times I call open_mfdataset / open the original netCDF files in the first place, so I hoped to avoid that, but it seems unavoidable. See below:


Here’s a short writeup of my approach. I’ll hopefully post some code later.

  1. Construct a “template” xr.Dataset. This uses xr.open_dataset to read in the actual coordinates, and dask.array.empty to construct dummy data with the correct dtype.
  2. Write the template Zarr dataset template.to_zarr(store, compute=False, mode="w") (I’m writing to Azure blob storage).
  3. Fill the chunks of the empty Zarr store.

Steps 1 and 2 can be done on a single machine. Step 3 can be done in parallel (distributed). In theory, the maximum level of parallelism is determined by the chunking of the output arrays. Each chunk of each variable can be written in parallel.

However, it turns out that reading chunks of an HDF5 file from cloud storage can be slow! So slow that trying to read a single input chunk (~100MB) on a worker was almost as downloading the entire array (~12GB). So I changed the batch writing code from something like

dataset = h5py.File(f)[variable]
zarr_group[variable][target_slice] = dataset[slice]  # single chunk, slow

to

local_file = cache_blob(...)  # download full dataset to local disk
for slice in slices:
    ...
    zarr_group[variable][target_slice] = dataset[slice]  # single chunk, slow

Using a caching idea like @agstephens suggested, but caching as an HDF5 file rather than pickle. So now my level of parallelism is the number of input files, not chunks.

The only wrinkle now is that we’re writing something like 10-20GB of data to disk, so we need to be careful to not fill the disk of the VM. Using Dask, we need to either limit the number of workers per VM, or use something like Dask’s per-worker resource constraints.

So the tl/dr is: We can use normal xarray tools to generate a template dataset with real coords and empty data. And then parallelize writing at the input file level, rather than the chunk level.

Hi @TomAugspurger,

Thanks for your reply. It sounds like your method makes good sense. It seems that each of us has to do a fair amount of prototyping and testing in order to make our workflows efficient with xarray/dask/zarr. It’s a shame we cannot factor that stuff into the background but resource limits such as memory-per-node and disk space per VM are very familiar.

What Tom describes is extremely similar to the approach that I am taking in Pangeo Forge:

However, I believe that we can completely avoid calling open_mfdataset on all the files. Instead, we can just open the first file in the sequence and then manually expand the dims from the zarr level:

Yeah, I’ve been somewhat careful to keep “library” components like creating a template xarray.Dataset or copying a single file separate from “infrastructure” components (starting a Dask cluster / Azure Batch job, etc.) I think pangeo-forge will make a fine home for the library components, once we figure out an API that’s sufficiently flexible.

Hi @TomAugspurger. I am implementing a similar solution to what you posted in this thread. For my use-case, I have a large xarray Dataset that has an underlying DaskArray. Like you recommend, I write a template Zarr dataset: zarr_group = ds.to_zarr(store=store, compute=False, mode=‘w’) and then try to fill the zarr chunks with data.

However, since the Dataset is backed by a DaskArray, zarr_group is a Delayed object which is immutable so I cannot fill the zarr with data. Is the object returned by ds.to_zarr() the zarr_group you’re referring to in your post? Any ideas for how to fill the empty zarr store if the Dataset data is stored in a DaskArray? Thanks!

1 Like

I think I was able to figure this out with the following snippet on the page: Reading and writing files — xarray 0.16.3.dev0+gcc53a77f.d20201130 documentation, explaining how to fill the empty zarr.

ds.isel(x=slice(0, 10)).to_zarr(path, region={“x”: slice(0, 10)})

2 Likes