Reading larger than memory HDF data and writing concatenated xarray (or Zarr) dataset on HPC

Hi everyone,

Thrilled to be a part of the pangeo forum, my first post here!

I am using a numerical model which generates HDF format output. Since I am running an ensemble, therefore, each simulation contains N x M output files (where N is the number of ensemble members and M are the time steps at which each ensemble member dumps an output file).

As an example, my latest simulation involves N = 39 and M = 47 (NxM = 1833 files in total). My aim is to generate one xarray Dataset per each ensemble member (i.e., 39 netcdf files) with the liberty to concatenate those 39 files to generate a mega netcdf file containing all data.

My attempts so far:

  1. I could succeed in deploying dask distributed client on my university HPC account to save netcdf file for all 1833 files which could be concatenated subsequently to generate one zarr object per ensemble member. While this solves the problem, I believe there definitely exists a better method to achieve the intended result by AVOIDING the middle step of saving netcdf files for each output file.

  2. The second approach closely follows xarray documentation provided at this link (h/t Deepak Cherian).

Here’s a minimal example (running my python code from within a jupyter notebook):

tasks = [dask.delayed(arps_read_xarray_cf)(f, **open_kwargs) for f in files]  # testing files for only one ensemble member (47 in number)

datasets = dask.compute(tasks,scheduler=client)  #dask performs computations (dashboard shows 47 processes in memory but ipynb kernel dies 
arps_read_xarray_cf() is a custom function I wrote to read HDF data from a single file and return an xarray Dataset with all variables as dask arrays.

Here is a relevant screenshot:

Since I am a new user, I cannot share more than one media file. I’ll try writing down rest of the story here: In the dask dashboard, I see that dask workers processed all the tasks and ideally my jupyter notebook should have returned the xarray datasets into ‘datasets’ variable. Unfortunately, it doesn’t happen and the notebook shows busy status until the kernel dies.

Has anybody faced a similar problem or found a workaround?

P.S. Running dask.compute(tasks[0]) indeed returns an xarray dataset but `dask.compute(tasks) fails miserably producing the errors messages shown above, eventually crashing the ipynb kernel.

1 Like

Thanks for this interesting question! Welcome @milind!

I’m a bit confused about some aspects of your post, so I’ll ask a few clarification questions. I’ll start with the most important one: what is your end goal? You describe wanting to reformat your HDF files to netCDF files, using Xarray, possibly writing Zarr, etc. But these are all presumably intermediate steps on the way to some broader scientific goal, e.g. analyze variability across the model ensemble. If you can tell us what this goal is, we might be able to provide better advice.

As a general principle, I would suggest that your start first by opening your existing files into xarray, without trying to write any data. Some questions about those files

  1. Can you open your model’s original HDF files using xarray.open_dataset?
  2. If not, have you tried opening them with dask (see this example)?
  3. Why are you convinced you need a distributed scheduler to open these files? 1833 files is not really so many. How long does it take to just loop over all the files and open each one individually, in serial?

In general, you should avoid distributed until you absolutely need it. It’s like you want distributed to analyze the data but not necessarily just to open the files. This is an important distinction.

Hi @rabernat, thanks for your comments. Here’s my reply to your questions:

what is your end goal?

The final goal is to perform dynamical analyses (solve Poisson equations for vertical acceleration, using xgcm and a PDE solver) on the ensemble mean data. However, I also wish to calculate mean of some fields by averaging fields of individual ensemble members.

Also, since operating on xarray datasets is much more convenient, I want to store all model output data into netcdf/zarr format for easy loading followed by further computations and plotting.

  1. Can you open your model’s original HDF files using xarray.open_dataset ?

Yes, I can, using xr.open_dataset(file,engine='pynio') but I get a messed dataset with artificially created (also unintended) dimensions and coordinates because there is no metadata as such in the HDF files.

  1. If not, have you tried opening them with dask?

I believe the model output is in HDF4 format and h5py is not able to read in data from there. See stack_overflow link

  1. Why are you convinced you need a distributed scheduler to open these files? 1833 files is not really so many. How long does it take to just loop over all the files and open each one individually, in serial?

My idea was to speed up the process. The custom function I wrote to read HDF file and return an xarray dataset (with all metadata included, following cf_xarray conventions) can be mapped to all the files in parallel. So, instead of returning the xarray dataset I can instead just save each dataset as a netcdf file too.

These 1833 netcdf files can then be read again (in batches, one batch per ensemble member) using another function which can also be mapped to these batch files.

Short answer is ‘to speed things up’.

Hope this makes sense.

Thanks again!

1 Like

Ok understood. Can you post, or post a link to, your function arps_read_xarray_cf?

Here you go!

arps_read_xarray_cf

Have you considered using GRIB format or using tools such as CDO or WGRIB2? I find using these tools allow you to work with the data with reasonable RAM, although they do require generating some intermediary files. As a comparison, concatenating about 800 time steps of hourly GFS data (1440x720) containing ~60 parameters in GRIB format and converting to NetCDF takes about 10~15 minutes on an 8GB, 4vCPU machine. When creating Zarr from NetCDF format, I find having these re-chunked before working with Xarray can also reduce memory usage substantially.

Short answer: No.

Explanation: GRIB is definitely getting obsolete (at least in atmospheric sciences). Has a lot of issues with metadata standards etc. Going that route would not be a good idea, IMO.

My understanding is that xarray’s conception was based on getting rid of such uninformative data formats and be able to load datasets with all the desired cf-compliant standards.

Moreover, my struggle right now is not with memory issues (I am working on HPC systems). Rather, I want to speed up the writing of zarr objects by cutting down the middle step of having to write individual netcdf files.

In other words, if I can read HDF files -> create xarray dataset -> concatenate xarray datasets for all time steps (producing one xarray dataset per ensemble member) -> save that as a zarr object as part of one cycle (loop) through a function, that would solve the issue.

Got it. Kernel dying usually happens for me when I run out of memory so I figured it might have been the same in your case. Out of curiosity, how large is your dataset and how long does your process run before it crashes?

I looked through your example. There is a lot going on there. In the future, I would love to be able to take some time to help you work though refactoring this in detail. But for the moment, I will point you to this example notebook for reading GOES data on AWS:

The basic pattern here is to write a simple function that can read a numpy array from one of the files, and then wrap this function in a dask delayed call. In the notebook, it looks something like this

import xarray as xr
import dask.array as dsa

def load_data(url):
    """Extract just the data from a single file."""
    f = fs.open(url)
    # handles decoding of the data automatically, e.g. scaling, units, etc
    ds = xr.open_dataset(f) 
    return ds['RRQPE'].values

all_arrays = []
for url in flat_list:
    array = dsa.from_delayed(dask.delayed(load_data)(url),
                             shape, dtype=dtype)
    all_arrays.append(array)
all_data = dsa.stack(all_arrays)

Perhaps you could adapt something like that to work with your data.

See also these docs:

@milind Is arps_read.hdfreadvars3d reading in actual data? If so, that might be the issue. You’ll want to daskify in that function rather than daskifying the data returned by that function.

@rabernat Thanks for the suggestion. I tried that method two ways: returning a full dataset as well as a single dataarray from the dataset by modifying the arps_read_xarray_cf function). Unfortunately it also suffers from the same problem of too much overhead time before the compute option throws

I must mention that while trying it out yesterday, it did work for just one ensemble member (47 files) one time (I forget what worked differently that time) but even then when I tried saving the stacked dask array to zarr format, it threw an error like ValueError: missing object_codec for object array.

As of now, it works fine when I am returning a single dataarray for a single file (1 time) corresponding to 1 ensemble member which obviously fails the purpose.

@dcherian Thanks for the suggestion. However, your suggestion to daskify arps_read.hdfreadvars3d does not look so straightforward to me right now (probably due to my limited understanding or perception of what daskify actually means). From what I understood, I thought I could convert this function as a gufunc and then return a dask array as output. I was successful in converting it into a gufunc form.

However, I encountered issues with that approach. First, the legacy function returns a dictionary and secondly subsequent computations on the variables like u,v,w etc. using gucalc_us (calculate u at scalar points) again need/expect the computed values of returned dask array. All of this makes it more convoluted than I believe any of us desire.

For now, my usual way of saving intermediate netcdf files before creating concatenated zarr objects works just fine. Thinking of leaving it like this for now as I may be spending more time doing data conversion than actual science.

In any case, thank you all for your responses.

@rabernat I think the solution at this link does not work because it is written only for dask arrays. Since I am trying to work with a dataset here therefore, the shape and dtype arguments pose a problem while computing the stack output (even when I manually enter the dataset shape based on dimensions).

@dcherian Just for fun, I tried testing the limits of dask.compute (screenshot attached). It worked fine till I used the first 30 files for tasks.compute(). However, decided to give up when I raised it to 40. Could be a memory issue after all?

Note: This was for just one ensemble member (so not even all 47 files could be read completely)

Also, the original arps_read_xarray_cffunction had dimensions of the form time:47, xc:243, xe:243, yc:243, ye:243, zc:243, z_left:243 which I believe was just too much (probably ~50 TB when I saw the dask chunk box visualization). So, I reduced the dimensions to bare minimum cut down to time:47, xc:243, yc:243, zc:243. Screenshot attached above is when these new dimensions were used.

Dask arrays can go inside xarray datasets!

http://xarray.pydata.org/en/stable/dask.html