Optimising Access For Zarr on S3 Data by LAT/LONG (Dask)

Hi there, thanks very much for all of your efforts, hoping you can help me with an issue I’m encountering in this space. I’m not sure if the issue I’m having is with Zarr or Dask exactly so apologies if I’m misunderstanding something.

I have a Zarr dataset (WRF model output converted from NetCDF) , this data is around ~15TB in size in total, made up initially of daily netCDF files which contain hourly data, overall there are around 150 variables hence it’s a large data set. Allowing access through this method would be hugely beneficial for making the data more widely available to individuals.

The issue I’m having however is formatting the data in such a way to make individual long/lats accessible e.g “for this specific lat/long, return this variable as a time series for the 20 year period”.

What I’ve tried so far;

  • Converting the entire data set into Zarr (appending 1 year at a time) with some basic chunking for size-consideration.
  • Using rechunker to rechunk e.g one variable across the entire time series with values of south_north and west_east values set to produce reasonable chunk size.

In each case when I access the data and try to filter on the relevant data by doing something like the following (note I’m using a Dask cluster with ~10 workers), and ensuring that I’m using

catalogue = intake.open_catalog(’./catalogue.yaml’)
wrf_data = catalogue.wrf_2001_2019.to_dask()
wrf_t2_point = wrf_data.T2.where(((wrf_data.XLONG == wrflon) & (wrf_data.XLAT == wrflat)), drop=True)

This takes a large amount of time, specifically I can see that initially the Dask workers do all of the get-items operations, however then the local machine then works for ~half an hour with 100% CPU and memory presumably to evaluate all of the chunks before creating the filtered result.

I have a feeling I’m really misunderstanding how to do these things, what I want in the end is simply a data frame type object I can work with an visualise locally which should be a tiny amount of raw data.

Any tips or points in the right direction would be really appreciated.

1 Like

Thanks for this interesting question. We can help you.

Can you please post the details of the arrays for each dataset? Chunk size, total shape, etc.

Even better, can you make the data public and give us the URL?

Hi @rabernat thanks very much for the speedy response. See the catalogue.yaml here;

    description: WRF 3.7.1 2001-2019
    driver: zarr
      urlpath: 's3://wrf-3-7-1-2001-2017-4-1-5-2018-2019-d03'
      consolidated: True
        use_ssl: True
        anon: True
          endpoint_url: 'https://ceh-datalab-U.s3-ext.jc.rl.ac.uk'
    description: WRF 3.7.1 2001-2019 One Variable - T2 Rechunked
    driver: zarr
      urlpath: 's3://wrf-rechunk-t2-v2'
      consolidated: True
        use_ssl: True
        anon: True
          endpoint_url: 'https://ceh-datalab-U.s3-ext.jc.rl.ac.uk'

Still playing around with this functionality but these are representative at least of the general data

1 Like

Ok I think I understand the problem. The way you have built these datasets has created coordinates that are impossible to use for indexing.

Here is the dataset produced by

ds = cat['wrf_2001_2019'].to_dask()

You can see that your coordinate variables have the same dimensionality as your data variables. So when you do wrf_data.T2.where(((wrf_data.XLONG == wrflon) & (wrf_data.XLAT == wrflat)), drop=True), you are triggering a very expensive masking operation that uses the 3D timeseries of XLONG and XLAT.

A very common mistake in xarray usage is to think that .where is an indexing operator. where is not an indexing operator! You want to use .sel or .isel to index your data. In order to do this, you need to select on dimension coordinates (north_south and west_east_stag) or just use numpy style indexing.

Here is an example:

ds_rechunk = cat['wrf_2001_2019_t2_rechunk'].to_dask()
ds_rechunk.T2[:, 0, 0].compute()
# equivalent to ds_rechunk.T2.isel(north_south=0, west_east_stag=0)

This still has to pull 6574 individual chunks, but it should be much more efficient that what you were doing before. (It completed for me in 5min 27s, pulling the data over the internet to NYC.) If you really want to optimize for timeseries at a single point, I would rechunk much more aggressively, such that you can get the whole timeseries into a single chunk.

I examined these non-dimension coordinate variables (XLONG and XLAT) and concluded that it looks like you just have a regular lat lon grid.





Therefore, there is absolutely no need to be storing this data as 3D arrays. If you can manage to make them 1D coordinate variables, you would be able to do something this

ds_rechunked.T2.sel(lon=wrflon, lat=wrflat)

The right place to do this would have been before you made the Zarr. However, you can retroactively do it like this:

ds_fixed = (
    ds.assign_coords({'lon': ('south_north', lon), 'lat': ('west_east', lat)})
      .swap_dims({'south_north': 'lon', 'west_east': 'lat'})

Notice how the dimension coordinates appear in bold. That means they can be used for indexing.

Here it looks like the root problems were:

  1. a misunderstading of how Xarray indexes data (sel vs where)
  2. a dataset that unnecessarily explodes the dimensionality of the coordinates.

The process of making analysis ready Zarr data from NetCDF or GRIB files is not trivial and requires quite a bit of nuance. We are trying to make this easier with the Pangeo Forge project. I invite you to give that tool a try. It might have been able to help avoid problem 2.

1 Like

@rabernat thank you so much for the comprehensive response, really appreciated.

I’ll go through this now and get back to you, it’s clear to me that I need to spend more time understanding the fundamentals.

1 Like

Thanks for taking the time to post your question publicly. I think a lot of users encounter similar issues, so it’s super helpful for the community to work through examples like this.

Please do update us once things are working to your liking (or come back to ask more questions). A good baseline for “how fast should this be?” for I/O bound operations is simply your network bandwidth. If you have a 1 gbps network connection between your object store and the computer where you analysis is running, you should be able to load data at 1 gbps (or even faster if the data are compressed).

Thanks very much, have gone through this now which is making much more sense, now going to re-run the Zarr creation to index based on lat/long by default as well as implement the more aggressive chunking you mentioned. I’m sure I’ll have more questions but this has already been incredibly useful.

Just to let you know @rabernat I’ve rechunked the data more aggressively for a number of variables across the entire timeseries and now achieving much better results - 2 seconds for the full time series of one variable for a specific long/lat, very happy! :smiley:


That’s what we love to hear! :clap: :clap:

So what does your dataset look like now?

I want to bring in a smart solution to avoid using xarray indexers.

What you can do, is taking all of the coordinates and build a kdtree from scipy spatial or scikit learn nearest neighbours (uses balltree as default and has the ability to use haversine method) . You have to ravel your coordinates

tree = cKdTree(np.c_[np.radians(ds.XLAT[0].values.ravel()), np.radians(ds.XLONG[0].values.ravel())])

Now you can query the tree e.g. with:

tree.query(np.c_[np.radians(np.array([48.9])), np.radians(np.array([10.3])))

And you will get the index of the closest grid point of the 1D representation from your coordinates. With the shape of your dataset and np.unravel_index you will get the 2D coordinates you need to access specific points in your zarr archive.

For operational use, you can pickle the tree object. This will take a few miliseconds to get the required indices out of the tree.

FYI there’s the xoak library, which does exactly what @meteoDaniel describes via an Xarray compatible interface (Dataset / DataArray accessors). For lat / lon data it also reuses the S2 library (via pys2index) for even more efficient index build and query times.