Why does concatenating zarr.ZipStores with xarray and dask use so much unmanaged memory?

Reposting this here from xarray Discussions in the hope that someone might be able to help me out.

I often want to concatenate sets of zarr collections (stored on a file system) to write back out as single dataset. Often these zarr collections are zipped to keep my HPC storage admins happy (less files = good).

When concatenating many or large zarr.ZipStores, I find that the unmanaged memory (as shown by the dask dashboard) climbs rapidly and often brings down my cluster. This does not happen when the data are not zipped (i.e. zarr.DirectoryStores).

I’ve included an example below that demonstrates the problem, although note that the problem is more exacerbated when working with real/larger datasets. Note too that this example creates many zarr files in the current directory, so please be careful running it:

import os
import glob
import zipfile
import numpy as np
import xarray as xr
from distributed import LocalCluster, Client

cluster = LocalCluster()
client = Client(cluster)

def write_zarr_and_zip(ds, filename):
    """ Write a dataset to zarr and zip the zarr file"""
    def _zip(file):
        filename = file + os.path.extsep + "zip"
        with zipfile.ZipFile(
            filename, "w", compression=zipfile.ZIP_STORED, allowZip64=True) as fh:
            for root, _, filenames in os.walk(file):
                for each_filename in filenames:
                    each_filename = os.path.join(root, each_filename)
                    fh.write(each_filename, os.path.relpath(each_filename, file))
    ds.to_zarr(filename, mode='w')

# Generate some example data
data = np.random.rand(1000,1000)
ds = xr.Dataset(
    {'var1': (['x','y'], data)}, 
    {'x': range(1000), 'y': range(1000)})

# Write the same data over and over for concatenation below
for n in range(500):
    write_zarr_and_zip(ds.expand_dims({'z': [n]}), f'ds_{n:04d}.zarr')

# Concatenate the unzipped files and write: unmanaged memory stays ~300 MB
ds_concat = xr.concat(
    [xr.open_zarr(f) for f in sorted(glob.glob('ds_????.zarr'))],
ds_concat.to_zarr(f'ds_concat.zarr', mode='w')

# Concatenate the zipped files and write: unmanaged memory continuously grows to ~3.3 GB
ds_concat = xr.concat(
    [xr.open_zarr(f) for f in sorted(glob.glob(f'ds_????.zarr.zip'))],
ds_concat.to_zarr('ds_concat.zarr', mode='w')

Note that in the above example, the total size of all the files being concatenated is 3.3 GB.

My question: Is this expected behaviour? Am I just seeing the additional overhead associated with having to open the zip file? Or is there potentially a close or copy or something else needed somewhere? Any insight/advice would be greatly appreciated!

Note, I came across this open PR about closing zarr.ZipStores: https://github.com/pydata/xarray/pull/4395. But explicitly closing the ZipStores as follows doesn’t make any difference:

import zarr

# Concatenating the zipped files: unmanaged memory continuously grows to about 3.3 GB
to_concat = []
for f in sorted(glob.glob(f'ds_????.zarr.zip')):
    zs = zarr.ZipStore(f)
ds_concat = xr.concat(to_concat, dim='z')
ds_concat.to_zarr('ds_concat.zarr', mode='w')
1 Like