Hi all,
I recently went through the process of converting some netCDF files in HDF5 format to Zarr. I now feel your pain!
I wanted to share a notebook that demonstrates my workflow in an attempt to
- Learn what mistakes I might have made along the way.
- Discover which components of the workflow can be generalized and placed in a library (pangeo-forge?)
for others to use. - Learn about existing tools that already exists for this workflow.
And one hopefully easy, technical question:
- Should coordinate variables be chunked? Does it matter whether they’re
dimension coordinates or non-dimension coordinates?
The full notebook is at https://nbviewer.jupyter.org/gist/TomAugspurger/b4f95f5432fced544d1f23e18ea8368f I’ll give a summary here.
The Dataset
We’re working with the Daymet dataset. It’s a public dataset on Azure Blob Storage. I’m also writing the data converted to Azure Blob Storage.
There are a few aspects of this dataset that matter for the conversion to Zarr.
- There are three completely independent regions (Hawaii, North America, Puerto Rico). I’ve only done Hawaii so far.
- There are several variables of interest that share the same coordinates (
tmin
,tmax
,prcp
, etc.). These are indexed by(time, x, y)
. - A new version of this dataset is generated each year. We want to make it easy to expand this in time.
- The source files are netCDF datasets (stored as directories of HDF5 files) named like
{variable}_{year}_{region}.nc4
.
So the raw data is like
daymetv3_dayl_1980_hawaii.nc4
daymetv3_tmax_1980_hawaii.nc4
...
daymetv3_tmax_1980_na.nc4
...
daymetv3_tmax_2010_hawaii.nc4
And here’s the repr for the dataset that we’ll generate for Hawaii.
<xarray.Dataset>
Dimensions: (nv: 2, time: 14600, x: 284, y: 584)
Coordinates:
lat (y, x) float32 dask.array<chunksize=(584, 284), meta=np.ndarray>
lon (y, x) float32 dask.array<chunksize=(584, 284), meta=np.ndarray>
* time (time) datetime64[ns] 1980-01-01T12:00:00 ... 20...
* x (x) float32 -5.802e+06 -5.801e+06 ... -5.519e+06
* y (y) float32 -3.9e+04 -4e+04 ... -6.21e+05 -6.22e+05
Dimensions without coordinates: nv
Data variables:
dayl (time, y, x) float32 dask.array<chunksize=(365, 584, 284), meta=np.ndarray>
lambert_conformal_conic float32 ...
prcp (time, y, x) float32 dask.array<chunksize=(365, 584, 284), meta=np.ndarray>
srad (time, y, x) float32 dask.array<chunksize=(365, 584, 284), meta=np.ndarray>
swe (time, y, x) float32 dask.array<chunksize=(365, 584, 284), meta=np.ndarray>
time_bnds (time, nv) datetime64[ns] dask.array<chunksize=(14600, 2), meta=np.ndarray>
tmax (time, y, x) float32 dask.array<chunksize=(365, 584, 284), meta=np.ndarray>
tmin (time, y, x) float32 dask.array<chunksize=(365, 584, 284), meta=np.ndarray>
vp (time, y, x) float32 dask.array<chunksize=(365, 584, 284), meta=np.ndarray>
yearday (time) int16 dask.array<chunksize=(14600,), meta=np.ndarray>
Our overall strategy is to
- Treat each region completely independently. They’ll end up in different Zarr stores.
- Construct a dictionary with all of the
attrs
we whish to preserve (global attributes and per-variable attributes) - “Allocate” an empty Zarr Group containing the dataset. “Allocate” is in quotes, since we aren’t writing any actual data here. We just want to get the layout of the Zarr metadata right. We’ll copy the actual data later.
- Copy the data for “static” variables. These are coordinates / variables like
lat
,lon
, orlambert_conformal_conic
that not indexed bytime
. We only need to copy them once. - Process each year’s worth of data.
This is a deliberately low-level strategy that’s attempting to do the conversion as efficiently as possible.
In particular, we avoid any expensive inference operations in xarray, and we’ve set up the problem to be as parallel as possible.
The Conversion
The notebook has the details of the conversion but I want to highlight two aspects in particular
- Preallocating the Zarr groups
- Computing offsets: the start position in the output Zarr array for each input file
Together, these allow us to completely parallelize the most expensive part of the pipeline: copying the actual data form HDF5 to Zarr.
# Preallocate groups
for variable in variables: # tmax, tmin, pcrp, etc.
h5file = h5py.File(fs.open(path/to/variable.nc4))
dataset = h5file[variable]
chunks = dask.array.empty_like(dataset).chunksize
# We explicitly use a chunksize of 365 on the time axis.
# This makes it easier to append in the future, since the chunks will always be
# uniform. We won't have to rechunk a dangling chunk of data from the previous year.
chunks = (365,) + chunks[1:]
v = group.empty(
dataset.name, shape=full_shape, dtype=dataset.dtype, chunks=chunks, overwrite=True
)
v.attrs.update(attrs[dataset.name.lstrip("/")])
offsets = 365 * np.arange(len(years)) # the starting point in the output for each input
...
# Writes are completely parallel for each variable, year
map(process_file, all_variables_all_years, offsets, ....)
In the notebook, I parallelized this with Dask on a cluster. You could use Dask on a single
machine (though bandwidth would probably become a bottleneck), or anything implementing the
concurrent.futures
interface.
Reading
Now we have an analysis-ready, cloud optimized dataset, when accessed from the same region.
import fsspec
import xarray as xr
store = fsspec.get_mapper(
"az://daymet-zarr/daymet_v3_hawaii.zarr",
account_name="daymeteuwest",
)
%time ds = xr.open_zarr(store, consolidated=True)
CPU times: user 187 ms, sys: 31.3 ms, total: 219 ms
Wall time: 555 ms
With the original files, that would have taken ~30 minutes. Apologies for the long post, and thanks for any feedback.