Unclear behavior of NetCDF4 files loaded with intake-xarray

In a previous post regarding high unmanaged memory warnings when loading in a NetCDF3 file, @rabernat suggested:

Converting the 2D ETOPO1_Ice_g_gmt4.nc topography data to NetCDF4 got rid of the warning I was receiving, but I recently began to test a larger dataset where a new problem appeared. This new data set is a 3D (lat, lon, time) of sea level pressures called slp.1948-2009.nc, which is also originally in NetCDF3 format.

After converting this to NetCDF4 with internal chunking & compression using PyFerret, I’ve observed 2 different issues using 2 separate methods:

Method 1

chunks='auto' is used in the intake.open_netcdf(...) line:

ds = intake.open_netcdf('gs://mybucket/netcdf4file.nc', chunks='auto').to_dask()
da = ds.to_array().data
da

image

The above code block executes very fast and uses little memory, but the trouble comes when I try to read the data from cloud storage. The memory usage is not increasing past 3GB during the store, but I receive an error:

with diag_timer.time(nworkers=total_workers(), nthreads=total_nthreads(), 
                      ncores=total_ncores(), **diag_kwargs):
     print('Beginning Store')
     future = dsa.store(da, null_store, lock=False, compute=False)
     dask.compute(future, retries=5)
     print('Store Complete')

I’ve seen people encounter a similar issue, but there were no explanations or fixes given.

Method 2

intake.open_netcdf(...) is used without any input arguments except the object storage path, which is then converted into a Dask Array to be stored.

ds = intake.open_netcdf('gs://mybucket/netcdf4file.nc').to_dask()
data = ds.to_array().data
da = dsa.from_array(data)
da

Accessing the raw values using ds.to_array().data takes a very long time to execute relative to Method 1 and uses up ~7GB of memory. This leads me to believe that somehow the entire file is being read from cloud storage. Next, I execute the store:

with diag_timer.time(nworkers=total_workers(), nthreads=total_nthreads(), 
                      ncores=total_ncores(), **diag_kwargs):
     print('Beginning Store')
     future = dsa.store(da, null_store, lock=False, compute=False)
     dask.compute(future, retries=5)
     print('Store Complete')

Using Method 2, I recieve the same unmanged memory problem I previously had, despite using NetCDF4 format.

It is worth noting that slp.1948-2009.nc is not internally chunked into the same sizes that Dask chunks the data, so I think I might completely misunderstand how this process works. Do I need to have the internal chunks the exact same size as Dask is determining them? Why am I getting two different issues, despite (to my knowledge) putting the data into the same view in both methods before I execute the store? Overall, the observed behavior of intake.open_netcdf(...) is unclear to me.

Any insight or suggestions would be greatly appreciated, and if you need more background info on the code it can be found in my Git repository.

If you want to run the code for this file use the public URL:

intake.open_netcdf('https://storage.googleapis.com/cloud-data-benchmarks/slp.1948-2009.nc', ...)
1 Like

Have you tried with

intake.open_netcdf(
    'https://storage.googleapis.com/cloud-data-benchmarks/slp.1948-2009.nc', 
    chunks={}
)

Just tried that, but I still get an unmanaged memory warning with just 2 workers. Doesn’t passing chunks={} into intake.open_netcdf(...) allocate just a single chunk per array? From what I understand, storing that set means all ~6GB of my data would be read by each worker.

Oh sorry, I didn’t reallise that there was only one single 6GB chunk in there. So dask can’t help you, it cannot sub-chunk the data, even in the case that it is uncompressed. For the specific case of uncompressed, kerchunk could offer subchunking of the buffers, but this is not implemented yet.

However, this data IS compressed, so I don’t think there’s any way whatever to load less than he full 6GB in one go, plus temporary buffers and decompress overhead.

1 Like

Thank you for the explanation!

I’ll look into it more, but does this mean I don’t have the internal chunks of my NetCDF4 file configured properly? If Dask can’t sub-chunk the data, and the full 6GB set is being loaded by each worker, to me it seems like the file is stored on a single chunk in cloud storage or all chunks are being read at the same time by each worker, rather than the read being split between them.

After much experimentation, I have found a solution to my problem. @martindurant your explanation of Dask’s affect (or lack of affect) on sub-chunking was extremely helpful in determining what I had done wrong, and I appreciate the guidance.

Basically, when I converted the file from NetCDF3 to NetCDF4, there was a “phantom dimension” that caused PyFerret to interpret the data as pressure(LAT, LON, empty, TIME), giving 4 dimensions total. When saving to a new file, automatic chunking only took into account the first 3 dimensions, thus ignoring time. I had to manually specify chunk sizes in 4 dimensions to get the correct chunking scheme.

Next, when loading into an Xarray DataSet, I also had to manually specify the chunk sizes to set into a Dask array. I did not realize that the Dask chunks had to be the exact same size as the on-disk chunks (in this case the data was in object storage in Google Cloud), but once the chunks were the same the code behaved as expected.

Note strictly speaking true, but having exactly the same, or an exact multiple, is far more efficient.

1 Like