Extremely slow rechunking of Zarr store with xarray

Hello,

Just hoping if somebody has some guidance or recommendation for the Zarr store chunking problems we are experiencing.

We are constructing multi-mission time series of velocity data. We create initial Zarr store with (time, y, x) coordinates and then append a number of “layers” in time dimension as we process thousands of new data layers. We end up with a Zarr store where chunking is determined automatically by Zarr. For our data analysis we need to process one (x, y) spacial point at a time. We ran some tests and found out that rechunking our Zarr store with {x: 10, y: 10} gives us much faster access (100x) to the time series of each spacial (x, y) point. We tried to re-chunk with {x: 1, y: 1}, but that was taking extremely long time with xarray (4.5hours to write 5.8Mb out of 5.4Gb). I have my observation filed with xarray user group, but have not received any feedback.

Python’s rechunker package did a much better job by rechunking 5.4Gb Zarr in a “reasonable” 1.5 hours. To our surprise, access time is actually ~2x longer than for the Zarr store chunked at x and y of 10 (6s vs 3.4s). Is there an explanation for it?
I also tried to provide chunks values as encoding parameters at the time of Zarr creation with xarray.to_zarr(), but that didn’t seem to work as read back in Zarr store seems to be using chunk sizes as determined by the Zarr and not the ones I provided.

Rechunking of the final Zarr store could be an option if using “rechunker”, but that would seem to add significant amount of runtime to our processing.

Any suggestions or advice on how to handle chunking for the Zarr store which is ever “growing” in one of the dimensions is much appreciated! Thank you in advance.

2 Likes

Thanks for the question. We’ll do our best to help. I’d like to first offer some feedback on how to make your post more easily “answerable.”

  1. Include code / data examples that will allow us to reproduce your problem.
  2. Your post contains several related but distinct questions. You might get a better response by just asking one question at a time.

Let me try to enumerate the distinct questions

2x longer than what? How are you measuring “access time”? I don’t understand what you are asking here., so I’m not going to try to answer this part unless you can provide some clarification.

This sounds like it could be a bug with Xarray. Xarray should respect the chunks attribute the variable encoding when writing to Zarr. To check this, I wrote a short test:

import xarray as xr
import numpy as np
import zarr

ds = xr.DataArray(
    np.arange(100),
    name='foo',
    dims='x'
).to_dataset()
ds['foo'].encoding = {'chunks': 10}
ds.to_zarr('test.zarr', mode='w')

zgroup = zarr.open('test.zarr')
assert zgroup['foo'].chunks == (10,)

This verifies that Xarray is using the chunks specified in the encoding when writing the Zarr array. Without seeing your code, it’s hard to know what is going wrong in your case.

It sounds like you want to create a Zarr array that is contiguous in time but chunked in the other dimenions (x,y ); and then you want to append to your array in time. This will never work well in Zarr. The reason is fundamental: whenever you make a write that touches a Zarr chunk, the entire chunk has to be rewritten. Zarr does not support “partial chunk writes.” (Note that other storage formats like TileDB might work better here.) So every time you append to your array that is chunked this way, you basically have to rewrite the entire file. If your ingestion process requires you to append to a Zarr array along the time dimension, you are definitely better off chunking the array in time from the beginning.

However, it also sounds like you want to do timeseries analysis at each point. This requires an access pattern that is orthogonal to your write pattern. :man_facepalming: This was the same issue discussed in this epic thread.

The best solution we can offer is the rechunker package, which you are already using.

This should definitely not take so long. On a fast machine, you should be able to rechunk 5.4 GB of data in just a few seconds. If you provide more details of how you are invoking rechunker, maybe we can help you improve this.

This is probably not a good idea. You should think hard about your access patterns and choose chunks that are optimized for your use case. There is no universal “good” chunking schemes. Everything depends on how you will access the data. Just remember the main rule: there are no partial reads / writes to Zarr chunks. If your operation touches a single item in a chunk, the whole chunk needs to be read / written.

As a final question, let me ask this: why Zarr? Did you try a more mature format like HDF5 before deciding that you needed Zarr? How did that go?

Was curious so took a quick look at the data - 5GB is really not that big and as @rabernat mentioned, rechunking should be much faster.

Here are some observations and suggestions.

  • The dataset seems to have a mix of 1-D data, 3-D data, and 0-D data, as well as meta-data (string URLs, software versions etc.) stored as data variable (see below). I don’t know exactly how this effects Zarr operations but would imagine this wouldn’t be very efficient. The ‘mid_date’ coordinate also seems not to have been sorted by time.
  • I noticed that chunk file sizes in the source dataset are very small - few kilobytes to at most ~500kB (eg. 3-D data chunked as {mid_date: 19, x: 209, y: 209} in the source) that I can see. I understand each object has a fixed overhead so the large number of objects would be slowing things down.
  • There is really no need to chunk down to {x:1, y:1} for dataset of this size - one reason why this was 2x slower than {x:10, y:10} might be that instead of reading one object for each parameter, it’s reading multiple objects to interpolate to a specific point.
Dimensions:                    (mid_date: 11117, y: 833, x: 833)
Coordinates:
  * mid_date                   (mid_date) datetime64[ns] 2014-03-24T14:42:24....
  * x                          (x) float64 -1.999e+05 -1.997e+05 ... -1e+05
  * y                          (y) float64 -2.2e+06 -2.2e+06 ... -2.3e+06
Data variables: (12/72)
    acquisition_date_img1      (mid_date) datetime64[ns] 2014-03-08T14:42:38 ...
    acquisition_date_img2      (mid_date) datetime64[ns] 2014-04-09T14:42:10 ...
    autoRIFT_software_version  (mid_date) <U5 '1.3.1' '1.3.1' ... '1.3.1'
    chip_size_height           (mid_date, y, x) float32 ...
    chip_size_width            (mid_date, y, x) float32 ...
    date_center                (mid_date) datetime64[ns] 2014-03-24 ... 2016-...
    ...                         ...
    vyp_error_mask             (mid_date) float64 nan nan nan ... nan nan nan
    vyp_error_modeled          (mid_date) float64 nan nan nan ... nan nan nan
    vyp_error_slow             (mid_date) float64 nan nan nan ... nan nan nan
    vyp_stable_shift           (mid_date) float64 nan nan nan ... nan nan nan
    vyp_stable_shift_mask      (mid_date) float64 nan nan nan ... nan nan nan
    vyp_stable_shift_slow      (mid_date) float64 nan nan nan ... nan nan nan

As for the growing time-slices of data, few time-slices of data per day is small enough that it should be fairly straightforward to handle once the data is cleaned up a little. Suggest reviewing the workflow to generate the source Zarr store from the NetCDF files.

1 Like

Thank you @rabernat for your response. Sorry for asking too many questions in one post, just wanted to get it all out in hope of getting some clarification and guidance. There is a number of questions we accumulated as we trying to get Zarr store working on our end.

Sorry if I was not clear in original post, it takes 2x longer to access the whole time series for one spacial point (for example, x=0, y=0) with Zarr re-chunked at x,y=1 than Zarr re-chunked at x,y=10.
@josephyang might have an explanation for it, thank you @josephyang! Any clarification is much appreciated.

Thank you for the example. My bad, I should have provided an example of exact problem I am observing. Our xr.Dataset is rather complex and has 1D and 3D data variables. We are trying to rechunk 3D variables. It appears that when one provides chunking for only some of the data variables, xarray ignores (?) it - could be the xarray problem:


import xarray as xr
import numpy as np
import zarr

data = np.arange(15)

dims = (len(data), 10, 6)
num_values = dims[0] * dims[1] * dims[2]

data_values = np.arange(num_values).reshape(dims)

ds = xr.Dataset(
    {
        "foo_a":  (('t', 'x', 'y'), data_values),
        "dt":   (('t'), data)
    },
    coords={
        't': (data),
        'x': (np.arange(0, 10)),
        'y': (np.arange(0, 6)),
    }
)

output_dir = 'test_ds.zarr'

# 1. One way to provide chunks for encoding
# encoding_settings = {
#     'foo_a': {'chunks': {'x': 2, 'y': 2}}
# }
# ds.to_zarr(output_dir, encoding=encoding_settings, consolidated=True, mode='w')

# 2. Another way to provide chunks for encoding
ds['foo_a'].encoding = {'chunks': {'x': 2, 'y': 2}}
ds.to_zarr(output_dir, consolidated=True, mode='w')

# Read Zarr back in
zgroup = zarr.open(output_dir)
_, x_chunk, y_chunk = zgroup['foo_a'].chunks
assert x_chunk == 2 and y_chunk == 2, f"Unexpected chunking: x={x_chunk}, y={y_chunk}"

asserts with: AssertionError: Unexpected chunking: x=10, y=6

I probably didn’t say it right. We are creating Zarr store and keep appending new data in time dimension - we are not trying to create in memory or on disk contiguous data set.
If our disk store is Zarr, does Zarr still re-write previously written to disk “chunks” when we append new data to Zarr store in time dimension? I thought not, but may be I am mistaken.

We are processing ~150K NetCDF files (will be more in the future) where data from each file becomes one layer in our time series dataset. We can’t hold all 150K files in memory at the same time, so we are processing only, let’s say, 500 files at a time to create our time series composite. Once we processed 500 “layers”, we store it to the Zarr, then continue to the next 500 “layers”. Once second set of 500 “layers” was processed, we append to the same Zarr store in time dimension, and so on. Just to note that not all 500 files end up as layers in time series. I don’t think we have a way out of appending to our dataset in other than time dimension.
As you mention, it 's probably not a good idea to allow Zarr to pick the chunking size for the time dimension. I should probably specify chunking value in time dimension while constructing our datasets, and see if it has any effect on data access.

As @josephyang noted, our data set consists of 72 data variables with a mix of 0d, 1d, 3d ones. My intent is to rechunk only 3d data variables since only they have x and y dimensions.
Here is a code example of re-chunking our dataset with rechunker:

from rechunker import rechunk
import xarray as xr
import zarr
from dask.diagnostics import ProgressBar

# Use "mid_date" size as rechunk size - is it a good idea?
x_y_chunk_size = 10
dask_chunks_3d = {'mid_date': 11117, 'x': x_y_chunk_size, 'y': x_y_chunk_size}

CHUNK_3D = [
    'interp_mask',
    'chip_size_height',
    'chip_size_width',
    'v_error',
    'v',
    'vx',
    'vx_error',
    'va',
    'vy',
    'vr',
    'vp',
    'vp_error',
    'vxp',
    'vyp',
]


# Create chunking dictionary
chunking_dict = {}

for each in CHUNK_3D:
    chunking_dict[each] = dask_chunks_3d

max_mem = '1Gb'

# This it original location of the Zarr in S3 bucket, using local copy of the store for the run
# output_dir = "s3://its-live-data.jpl.nasa.gov/datacubes/v1/N60W040/ITS_LIVE_vel_EPSG3413_G0120_X-150000_Y-2250000.zarr"
cube_dir = 'ITS_LIVE_vel_EPSG3413_G0120_X-150000_Y-2250000.zarr'
fixed_store = f"ITS_LIVE_vel_EPSG3413_G0120_X-150000_Y-2250000_rechunked_xy_{x_y_chunk_size}_test.zarr"
temp_store = f"ITS_LIVE_vel_EPSG3413_G0120_X-150000_Y-2250000_rechunked_xy_{x_y_chunk_size}_test_temp.zarr"

ds = xr.open_zarr(cube_dir, decode_timedelta=False, consolidated=True)
rechunk_cube = rechunk(ds, chunking_dict, max_mem, fixed_store, temp_store=temp_store)
# I also use encoding options to provide _FillValue, dtype and compressor settings, but omit for the example:
# target_options=zarr_to_netcdf.ENCODING_ZARR

with ProgressBar():
    rechunk_cube.execute()

print(f"Consolidating metadata")
zarr.convenience.consolidate_metadata(fixed_store)

Would increasing max_mem = '1Gb' help with runtime?

Well, since we are using xarray to handle our data, we quickly discovered that xarray does not support appending to the NetCDF format. So our only choice was Zarr. It turned out to be a good choice since our target store for all generated data products is AWS S3 bucket, so it is actually a win-win situation :slight_smile:

@josephyang Thank you for your response and looking at our data.

Do you mean efficient for rechunker to handle?

This is intentional. Do you see any implications of not having the coordinate sorted when stored on disk? We are sorting the data once we read it in - it does not seem to be a bottle neck.

Could you, please, clarify what clean up do you have in mind? And how it would help to handle on rechunker side?

Could you, please, elaborate what do you mean by generate the source Zarr store from the NetCDF files?

We are using Zarr since our target store is AWS S3 bucket. Also array does not support appending NetCDF format store. We can’t keep all of our input data in memory (~150K NetCDF files), so we are processing in “chunks” of 250-500 files depending on their size. We have to be able to write the initial store and then to append to it in time dimension.

Every chunk that your write touches has to be rewritten. This is independent of the store backend (disk, memory, cloud storage, etc.) The reason is that, because Zarr chunks are compressed, it is not possible to just add a little bit of data to a chunk. To extend or change a chunk, the entire chunk needs to be decompressed, modified, then re-compressed.

The situation is easier to explain with a diagram. The cartoon below shows an appending operation for two different chunking schemes. The first is chunked in time, the second in space.

2 Likes

This will not work because Zarr encoding does not understand named dimensions. encoding['chunks'] needs to be a tuple of ints.

1 Like

@rabernat Thank you for the explanation. In a case of appending in time dimension to existing Zarr where we don’t provide chunking explicitly and chunking is done in time, x and y dimensions by Zarr implicitly, does it mean that we are getting a combo of these two scenarios: appending in time dimension just adds another chunk, and this new chunk is chunked in x and y?
Thank you for all your help!

1 Like

@rabernat Thank you so much! That what I was doing wrong. It would be helpful to have an exception from the Zarr to point to the expected type of inputs as it silently ignores wrong input :slight_smile:

1 Like

@rabernat Would it be helpful to re-chunk the whole dataset after I open the Zarr store to make more efficient access to the time series for each (x, y) pair of coordinates? Or do I have to write newly rechunked data to the disk before accessing the data to take advantage of the re-chunking?
This is my first encounter with Zarr chunking so I apologize if it’s a trivial question.

A number of other questions that I have:

  • I am not very clear on how to determine an optimal chunking for the purpose of data processing when chunking exists in t, x and y dimensions for original dataset. Just try different ones (that’s what we were doing) and see how it works for the kind of processing we do?
  • Would it help at all to re-chunk in time dimension to the whole time dimension size? In other words, if our dataset has dimensions of t: 11000, x: 800, y: 800, do I need to re-chunk it with t: 11000, x: 10, y: 10? Or what if I re-chunk x and y at their dimension size since these are fixed values for the whole dataset?
  • Also it would seem that increasing chunk size for the x and y would help with the access time to all x’s and y’s that belong to the same chunk.
  • When re-chunking, why do I want to keep the original chunk size of the dataset the same? If previous chunk size is 128Mb, should re-chunked chunk size be also 128Mb? This is in relation to the note you made in the epic post, and I don’t really understand why (to guarantee proximity of new chunks, perhaps)?

Thank you so much for all clarifications and help!

Staring with @rabernat’s main rule on Zarr chunks: there are no partial reads / writes to Zarr chunks, also consider that each read /write to Zarr chunks carries fixed overhead regardless of the chunk size that may (cloud storage to some extent) or may not be (your disk) parallelized.

Assuming a case where you have a bandwidth of 100MB/s (say on HDD disk) and the overhead is 10ms, if you have a single data chunk of 100MB, this would be read in ~1s. If this was broken into 1000 chunks of 100kB, this would take about 10s to read and you can see here how the bottleneck changes depending on the chunking scheme.

I’m not quite sure what the intention would be behind not sorting by the time but this can then mean even reading a small time range (say a few months worth of data) would mean reading from multiple chunks, which sort of defeats the intention of chunking.

As a comparison, my workflow & datasets are similar in a way - every day, a new set of data in time-slice (~60 (parameters) x 24 (hours) x 1440 (lon) x 720 (lat)) is added to the existing data store of about 70 years of hourly data, optimized to be read as a time-series for a given lat/lon point.

Would be curious to learn more about the content & the total size of the 150k NetCDF files and perhaps I can make some suggestions. If each NetCDF file is about 8mb (from what I saw), this would be about 1.2TB in total? As a comparison, I think my primary dataset (ERA5), was about 80TB of NetCDF files to start with.

Feel free to contact me directly at joseph.yang@oikolab.com also.

1 Like

@josephyang Thank you for the nice explanation of an effect of the chunking scheme on the data access speed.

In our particular case new data comes in non-chronological order. Another “feature” of our datasets is that we may need to remove some of the layers if new incoming layer is its re-processed version.
Another consideration for not sorting the dataset in time dimension is that most of the time we will be interested in the whole time series. It is quite possible that somebody will want to look at subset (some years) of the dataset we are creating, but in most cases we will be processing the whole dataset. As I mentioned before, sorting data in time dimension does not seem to be a problem so far.
We would love to learn how you optimized for a given lon/lat point data access. Thank you for your generous offer to contact you directly. I will :slight_smile:

2 Likes

There are several people contributing to this thread and likely many more following along who would also love to learn the answer to this. Why take the conversation private? Is there something confidential or privacy-related that has to be shared?

1 Like

Hi,

Not sure if this comment was for me or @mliukis but thought I’d keep the conversation going since I haven’t heard back. I had initially made the suggestion to contact me directly because it’d be much easier for me to try to understand the issue and see if there is any suggestion that I can make via a direct conversation or a call. My expertise is limited so it’s really just seeing if there is anything that I can help from my own experience.

I can’t speak for the dataset itself but my ETL workflow is essentially a divide & conquer relying quite a lot on CLI tools such as nccopy/cdo/wgrib2 to do some of the heavy lifting because I’m working with limited RAM (16GB) and relatively basic CPUs. I ran into the problem of chunking earlier on and played around with formats (GRIB, NetCDF3/4, Zarr), chunking shapes, and H/W platforms (HDD, SDD, & Cloud Object Storage) to see what worked sufficiently well enough for me to process datasets such as ERA5.

This was all before Rechunker and before I discovered Pangeo community so I think some of the processes could probably use an update. It seems like most people here have access to HPC anyhow so my impression was that my workflow is not generally applicable but thought I could probably offer some pointers to @mliukis given the small size she’s working with.

In case anyone’s interested, a few things that worked well for me that may or may not go against the general practice here:

  • Using CLI tools (cdo, wgrib2 etc.) to merge files (GRIB or NetCDF) first can be a fast & low RAM way to handle a large number of a files before using processing the data in Python.
  • Fetching data from the cloud can be slower than having a local copy if the computing resource fetching the data is not in the same data center.
  • Chunking Data: Choosing Shapes : Unidata Developer's Blog - in case anyone hasn’t seen this yet, this blog was super helpful for me to understand the general consideration when choosing chunking shape.

Hope this helps!

2 Likes

Thanks a lot for sharing Joseph! Very helpful!

FYI, rechunker should work well even in a limited-RAM situation.

1 Like

FYI, rechunker should work well even in a limited-RAM situation.

Agree! Been meaning to try it out for processing ERA5-Land data.