ZCollection: a library for partitioning Zarr dataset

To store SWOT data, we use the Zarr data format. This data format allows us to obtain excellent processing performance when parallelizing processing on a laptop or a cluster with Dask to scale our processing.

This storage format allows resizing the shape of the stored tensors to concatenate new data into a tensor without rewriting the entire stored data. On the other hand, if we perform an update that requires reorganizing the data (insertion of new data), it will be necessary to copy the existing data after modifying the shape of the tensor to update the data we want.

This problem also exists when using the Parquet format to store tabular data. The PyArrow library has solved this problem by introducing a partitioned dataset or multiple files.

The ZCollection library does the same using Zarr as the storage format.

First, let’s look at how our library works, using an example. In this example, we will manipulate SWOT data.

Let’s start by loading our dataset.

>>> import xarray as xr
>>> dataset = xr.open_mfdataset("~/swot/SWOT_L2_LR_SSH_Expert_*",
>>>                   concat_dim="num_lines",
>>>                   combine='nested',
>>>                   parallel=True)
>>> dataset
Dimensions:    (num_lines: 5761870, num_pixels: 71)
    latitude   (num_lines, num_pixels) float64 dask.array<chunksize=(9866, 71), meta=np.ndarray>
    longitude  (num_lines, num_pixels) float64 dask.array<chunksize=(9866, 71), meta=np.ndarray>
Dimensions without coordinates: num_lines, num_pixels
Data variables:
    time       (num_lines) datetime64[ns] dask.array<chunksize=(9866,), meta=np.ndarray>
    ssh_karin  (num_lines, num_pixels) float64 dask.array<chunksize=(9866, 71), meta=np.ndarray>

Before we start storing the data, we will choose a partitioning mode. This example will split the data daily from the values stored in our dataset’s time variable.

>>> import zcollection
>>> partition_handler = zcollection.partitioning.Date(("time", ), "D")

The library allows you to create partitioning using sequences of values
representing an index. For example, we would write the following code to
partition our SWOT data by half-orbit.

zcollection.partitioning.Sequence(("cycle_number", "pass_number"))

We will start by creating the dataset that we will manipulate with this library.

>>> ds = zcollection.Dataset.from_xarray(dataset)
>>> ds
Dimensions: "('num_lines: 5761870', 'num_pixels: 71')"
Data variables
    time      (num_lines  datetime64[ns]: dask.array<chunksize=(9866,)>                                                 
    latitude  (num_lines, num_pixels  float64: dask.array<chunksize=(9866, 71)>                                         
    longitude (num_lines, num_pixels  float64: dask.array<chunksize=(9866, 71)>                                         
    ssh_karin (num_lines, num_pixels  float64: dask.array<chunksize=(9866, 71)>

Our library uses the fsspec library to write the data to the file system. It allows, for example, to store the collections on an S3 storage server. In this example, we will use the local file system.

>>> import fsspec
>>> filesystem = fsspec.filesystem("file")

We will now create our collection.

>>> collection = zcollection.create_collection(
...     axis="time",
...     ds=ds,
...     partition_handler=partition_handler,
...     partition_base_dir="/tmp/swot_collection",
...     merge_strategy="update_time_series",
...     filesystem=filesystem)

The merge_strategy parameter allows us to define the merge strategy
implemented when the collection is updated. For the moment, there are two
strategies: None to overwrite the old data with the new ones or
to update the partition with the new data. This operation mode allows updating
the existing data with the latest observations.

The previous operation just created an empty directory containing the configuration file of our collection.

>>> !tree -a /tmp/swot_collection
└── .zcollection

We can now insert our dataset into the collection.

>>> collection.insert(ds)

If we examine the data written to the file system, we can see the different partitions created: year=2014/month=04/day=12, etc.

>>> !tree -a /tmp/swot_collection
├── year=2014
│   ├── month=04
│   │   ├── day=12
│   │   │   ├── latitude
│   │   │   │   ├── 0.0
│   │   │   │   ├── .zarray
│   │   │   │   └── .zattrs
│   │   │   ├── longitude
│   │   │   │   ├── 0.0
│   │   │   │   ├── .zarray
│   │   │   │   └── .zattrs
│   │   │   ├── ssh_karin
│   │   │   │   ├── 0.0
│   │   │   │   ├── .zarray
│   │   │   │   └── .zattrs
│   │   │   ├── time
│   │   │   │   ├── 0
│   │   │   │   ├── .zarray
│   │   │   │   └── .zattrs
│   │   │   ├── .zattrs
│   │   │   ├── .zgroup
│   │   │   └── .zmetadata
│   │   ├── day=13
│   │   │   ├── latitude
│           ├── .zgroup
│           └── .zmetadata
└── .zcollection

The written data can be loaded.

>>> collection.load()
Dimensions: "('num_lines: 5761870', 'num_pixels: 71')"
Data variables
    latitude  (num_lines, num_pixels  float64: dask.array<chunksize=(138078, 71)>                                       
    longitude (num_lines, num_pixels  float64: dask.array<chunksize=(138078, 71)>                                       
    ssh_karin (num_lines, num_pixels  float64: dask.array<chunksize=(138078, 71)>                                       
    time      (num_lines  datetime64[ns]: dask.array<chunksize=(138078,)>

The partitions can be filtered, just as can be done with PyArrow.

>>> collection.load("year==2014 and month==4 and day in range(13, 15)")
Dimensions: "('num_lines: 552314', 'num_pixels: 71')"
Data variables
    latitude  (num_lines, num_pixels  float64: dask.array<chunksize=(226918, 71)>                                       
    longitude (num_lines, num_pixels  float64: dask.array<chunksize=(226918, 71)>                                       
    ssh_karin (num_lines, num_pixels  float64: dask.array<chunksize=(226918, 71)>                                       
    time      (num_lines  datetime64[ns]: dask.array<chunksize=(276157,)>

We can add a new variable:

>>> import zcollection.meta
>>> ssh_karin_2 = zcollection.meta.Variable(
...    name="ssh_karin_2",
...    dtype="float64",
...    dimensions=["num_lines", "num_pixels"],
...    attrs=[
...        zcollection.meta.Attribute("long_name", "sea surface height"),
...        zcollection.meta.Attribute("units", "cm"),
...    ],
...    fill_value=numpy.nan)
>>> collection.add_variable(ssh_karin_2)

Update it.

>>> def scale(ds):
...     return ds.variables["ssh_karin"].values * 100
>>> collection.update(scale, "ssh_karin_2")

Delete it.

>>> collection.drop_variable("ssh_karin_2")

These last operations require to have written access to the collection. If this is not the case, it is possible to enrich an existing collection using views.

>>> view = zcollection.create_view("/tmp/swot_view",
...                                zcollection.view.ViewReference(
...                                    "/tmp/swot_collection", filesystem),
...                                filesystem=filesystem)
>>> view.load()
Dimensions: "('num_lines: 5761870', 'num_pixels: 71')"
Data variables
    latitude  (num_lines, num_pixels  float64: dask.array<chunksize=(138078, 71)>                                       
    longitude (num_lines, num_pixels  float64: dask.array<chunksize=(138078, 71)>                                       
    ssh_karin (num_lines, num_pixels  float64: dask.array<chunksize=(138078, 71)>                                       
    time      (num_lines  datetime64[ns]: dask.array<chunksize=(138078,)>
>>> view.add_variable(ssh_karin_2)
>>> view.load()
Dimensions: "('num_lines: 5761870', 'num_pixels: 71')"
Data variables
    latitude    (num_lines, num_pixels  float64: dask.array<chunksize=(138078, 71)>                                     
    longitude   (num_lines, num_pixels  float64: dask.array<chunksize=(138078, 71)>                                     
    ssh_karin   (num_lines, num_pixels  float64: dask.array<chunksize=(138078, 71)>                                     
    time        (num_lines  datetime64[ns]: dask.array<chunksize=(138078,)>                                             
    ssh_karin_2 (num_lines, num_pixels  float64: dask.array<chunksize=(17260, 9)>

Finally, it is possible to use indexers in order to easily find stored data.

You will find, in the online documentation, an example of use of these indexes and other information on the library. The source code is available on GitHub.

Your comments and feedback are welcome.

1 Like

Hi Frédéric,

always impressed by your work on this topics :smiley:
I will definitely have a look at the git repository.

2 quick questions meanwhile:

1- the first open_mfdataset, is it loading netcdf files ?

2- I also tried some netcdf / zarr conversion on my side so your publication is very useful. By the way, do you have any advice on the performance of loading of many netcdf files BEFORE building the zarr dataset ? I found open_mfdataset quite under-performing on a large number of files but I did not found a practical alternative (see a detailed question in the pangeo forum here)

In my example, it’s indeed the open_mfdataset method of xarray. However, for
the acquisition of SWOT products we did not use this method, but a homemade
method based on h5netcdf to do the conversion.

This method doesn’t abstract the data used like “open_mfdataset” and isn’t
generic at all. But it has the advantage to be much faster.

The idea is to build the Dask array with an optimized loading function after
having determined the chunks of all the files to be processed. For SWOT
products, this consists in reading for each product the value of the num_lines
dimension. For grids, these values are constant, so we can skip this step.

For grids, the pseudo-code is the following.

def load_data(path) -> numpy.ndarray:
    with netCDF4.Dataset(path) as ds:
        return ds.variables["?"][...]

def get_shape(path) -> Tuple[int, ...]:
    return load_data(path).shape

def make_dask_array(products) -> dask.array.Array:
    # If the dimension of the variable to be read changes in each file,
    # all files must be read to determine chunks
    shape = get_shape(products[0])
    # In this demo, we handle 3-dimensional grids.
    assert len(shape) == 3
    # We get the properties. We make the assumption that all the files are
    # identical, but we don't check it.
    ntimes, nlon, nlat = shape

    name = 'sla'
    # Graph construction
    layer = {(name, ix, 0, 0): (load_data, products[ix])
             for ix in range(len(products))}
    dsk = dask.highlevelgraph.HighLevelGraph.from_collections(name,
    # Setup chunks
    chunks = ((ntimes, ) * len(products), (nlon, ), (nlat, ))
    # Replace with the data type of your variable.
    dtype = "float64"
    return dask.array.Array(dsk, name, chunks, dtype)

# For grids the creation is super fast.
array = make_dask_array(products)

If the dimension changes in each file, you have to read the dimension in each
file to tell Dask how much data there is in each chunk. If it’s on a static file
collection, you can store the information, this will save you from re-reading the

I haven’t found faster with Python.

Ref: Internal Design — Dask documentation

Hi @fbriol and thanks for sharing! This looks extremely interesting and useful to our community.

Given that you are working in this space, you may be interested in Pangeo Forge. We are working very hard on this project at the moment.

Pangeo Forge is an open source framework for data Extraction, Transformation, and Loading (ETL) of scientific data. The goal of Pangeo Forge is to make it easy to extract data from traditional data archives and deposit in cloud object storage in analysis-ready, cloud-optimized (ARCO) format.

I think that ZCollection could be quite useful internally in Pangeo Forge.

Can I invite you to give a short presentation about ZCollection at the next Pangeo Forge meeting: Apr 11, 14:00 US EST?

@bpicard_fluctus - for this you may wish to look into Pangeo Forge, as it is explicitly designed to help with that use case.

Yes I can prepare a small presentation if you want and show you what we have built with them to do the SWOT data calibration and validation studies. How much time do I have?

1 Like

Hi @rabernat,
thank you for your advice. I already checked the pangeo-forge documentation but I could not find an answer to this precise question (maybe I was too quick). Do you have a particular page or a method in mind that could help me ?

Thank you Fred, very useful !

I’m not sure what “precise question” you are referring to. Pangeo Forge will let you make a big Zarr out of many NetCDFs. Here is an simple tutorial walking through how to do that:

Would 10 minutes be too short?

No, that’s enough :wink:

1 Like

I was referring to the topic I created here.

Again, thank you, I will go through this tutorial on more time, more carefully.

Have you raised an issue with zarr to let them know of this effort? I am fully supportive of this kind of extra feature being used in conjunction with zarr.

(aside: on the slowness of open_mfdataset and many netCDFs, please have a look at kerchunk, which could also be used to make virtual partitioned zarr datasets)

No, I hadn’t thought of that. I’ll do it right away.

I didn’t know this library at all. It looks great. Thanks for the tip.

I would also add that Pangeo Forge Recipes supports the kerchunk approach (although we are still calling it “reference”): Recipes — Pangeo Forge documentation

@fbriol - we are meeting now at Video Meetings, Video Conferencing, and Screen Sharing - Whereby (formerly appear.in) and would love to see you! If today doesn’t work we can postpone to a later date, no worries.

Sorry but I couldn’t find the link to the meeting. We can try the presentation again next time. Sorry again.

1 Like