Best practice reading zarr from s3

Dear zarr community,

I am using s3 STANDARD storage class to store numerical weather predictions as zarr archive.

    s3_out = S3FileSystem(anon=False)
    return xarray.open_zarr(
        store=s3fs.S3Map(
            root=f"s3:///{bucket_name}/{dataset_name}.zarr", s3=s3_out, check=False
        )
    )

To open the dataset and the using ìsel() to index the dataset for given coordinates within the dataset.

Is there a faster way to acess the data?
Maybe using dask or zarr barely?
Using zarr only yields to the problem that I have to iterate through my variables, maybe you know another way to access al variable with one index operation?
I am asking myself if this way is still using async access to the filesystem?
I have tried activating async for s3fs but it was receiving missing Loop in MainThread error.

Would be really great to receive some best practices of handling zarr in s3.

Best regards and special thanks to all supporters :slight_smile:

Hi @meteoDaniel,

I have a few questions that may help determine the best path for you:

  1. Which part of your data access pattern is slow? Is it the open_zarr call? If so, is the zarr store you are opening consolidated? Is it the indexing/loading of data from multiple variables? If so, how is your data chunked (sharing an Dataset.__repr__() will help us understand potential access pattern issues).
  2. Have you tried optimizing any of the open_zarr keyword arguments? For example, options like decode_cf or drop_variables can be very handy in certain circumstances.
  3. Are you using Xarray/Zarr in conjunction with Dask? Perhaps you could also share more about your attempts to use s3fs’s async features.

Also, if you can stage a representative dataset on s3 in a public bucket, I’m sure we can help figure out what is going on.

Dear @jhamman ,
thanks for your reply.

This is how my dataset looks.

<bound method Dataset.__repr__ of <xarray.Dataset>
Dimensions:                       (dt_calc: 1, dt_fore: 112, latitude: 1441,
                                   longitude: 2879)
Coordinates:
  * dt_calc                       (dt_calc) datetime64[ns] 2022-07-25
  * dt_fore                       (dt_fore) float64 0.0 1.0 2.0 ... 174.0 177.0
    lat                           (latitude, longitude) float64 dask.array<chunksize=(181, 720), meta=np.ndarray>
  * latitude                      (latitude) float64 -90.0 -89.88 ... 89.88 90.0
    lon                           (latitude, longitude) float64 dask.array<chunksize=(181, 720), meta=np.ndarray>
  * longitude                     (longitude) float64 -180.0 -179.9 ... 179.8
Data variables:
    air_temperature_2m            (dt_calc, dt_fore, latitude, longitude) float16 dask.array<chunksize=(1, 14, 181, 360), meta=np.ndarray>
    dewpoint_2m                   (dt_calc, dt_fore, latitude, longitude) float16 dask.array<chunksize=(1, 14, 181, 360), meta=np.ndarray>
    global_horizontal_irradiance  (dt_calc, dt_fore, latitude, longitude) int16 dask.array<chunksize=(1, 14, 181, 360), meta=np.ndarray>
    max_wind_gust_10m             (dt_calc, dt_fore, latitude, longitude) int16 dask.array<chunksize=(1, 14, 181, 360), meta=np.ndarray>
    relative_humidity_2m          (dt_calc, dt_fore, latitude, longitude) int16 dask.array<chunksize=(1, 14, 181, 360), meta=np.ndarray>
    total_cloud_cover             (dt_calc, dt_fore, latitude, longitude) int16 dask.array<chunksize=(1, 14, 181, 360), meta=np.ndarray>
    total_precipitation           (dt_calc, dt_fore, latitude, longitude) float16 dask.array<chunksize=(1, 14, 181, 360), meta=np.ndarray>
    weather_synop_code            (dt_calc, dt_fore, latitude, longitude) int16 dask.array<chunksize=(1, 14, 181, 360), meta=np.ndarray>
    wind_direction_10             (dt_calc, dt_fore, latitude, longitude) int16 dask.array<chunksize=(1, 14, 181, 360), meta=np.ndarray>
    wind_speed_10                 (dt_calc, dt_fore, latitude, longitude) int16 dask.array<chunksize=(1, 14, 181, 360), meta=np.ndarray>>

the step ds.isel() is the most time consuming part of the code.

Reading the metadata and attributes takes ~300ms, The rest approx.2,5 seconds.
I have just looked here for async support : https://s3fs.readthedocs.io/en/latest/. But as I said I receive loop is missing in MainThread.

This dataset AWS S3 Explorer comes very close to the one I am using. Unfortunately the Dataset is empty for the overlaying directory

bucket_name='noaa-hrrr-bdp-pds'
dataset_name ='sfc/20200801/20200801_00z_anl'

I hope this helps to understand my issue. Thanks a lot

Daniel

Specifically on async: s3 file system instances are always async internally. The argument is used to specify whether you will be calling the instance from async def code - you do not need it.

When combined with zarr, multiple chunks of a given variable can be requested concurrently, so you do not pay the latency cost many times over; it does not actually improve your maximum bandwidth, though. Furthermore, zarr does not currently support concurrent reads across different variables. It would be nice!

Using xarray, as opposed to zarr directly, gives you indexing by coordinate and eagerly loads the coordinate arrays. You are not using this, so perhaps it is not useful.

You are already using dask, this is assumed by open_zarr (not the same default as open_dataset(engine=“zarr”).

However, it is strange that the isel() call is taking long, and we would be interested in knowing why. There is no data loading happening, but you are constructing a dask graph. Maybe passing chunks=None helps you, in which case dask has some work to do.

Dear @martindurant,

isel() is combined with to_dataframe which takes the most time.

In [5]: %timeit ds.isel(latitude=100, longitude=100).to_dataframe()
1.75 s ± 194 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

In [13]: %timeit ds.isel(latitude=100, longitude=100).to_dataframe()
The slowest run took 6.22 times longer than the fastest. This could mean that an intermediate result is being cached.
3.63 s ± 3.25 s per loop (mean ± std. dev. of 7 runs, 1 loop each)

In [6]: %timeit ds.isel(latitude=100, longitude=100)
3.61 ms ± 31.3 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)

In [12]: %timeit ds.isel(latitude=100, longitude=100).to_dask_dataframe().compute()
1.21 s ± 95.8 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

I think using to_dataframe is not a good idea. So what do you think should I do instead?

Sorry, I don’t know much about the interaction of xarrays and dataframes. Running snakeviz or similar on ds.isel(latitude=100, longitude=100) alone might still be interesting.

1 Like

I think .to_dataframe() might be slow because it’s loading the data to convert to dataframe.

I think one thing that you can try is to change the chunk size - perhaps something like (1, 121, 50, 50) if you tend to read the data as time-series? For the current chunk size of (1, 14, 181, 360), you’d be reading 8 * 1 * 14 * 181 * 360 (~7.3m elements) for each parameter whereas if the dataset was re-chunked (to 1 * 121 * 50 * 50), it would be about ~0.3m elements per parameter and also 8 times lower number of objects being read.

1 Like

That depends. What is your end goal?

1 Like
  1. Does ds.isel().compute().to_dataframe() the same as ds.isel().to_dataframe()?
  2. Am I right, when I say I have to chunk my data regarding the most common queries I want to perform on it? In my case : I just want to retrieve point forecasts. So @josephyang suggestion might be the best for that case ?
  3. More chunks will be a problem in case I want to request data from multiple chunks?

Thanks a lot for your support! This is really helpful.