To store SWOT data, we use the Zarr data format. This data format allows us to obtain excellent processing performance when parallelizing processing on a laptop or a cluster with Dask to scale our processing.
This storage format allows resizing the shape of the stored tensors to concatenate new data into a tensor without rewriting the entire stored data. On the other hand, if we perform an update that requires reorganizing the data (insertion of new data), it will be necessary to copy the existing data after modifying the shape of the tensor to update the data we want.
This problem also exists when using the Parquet format to store tabular data. The PyArrow library has solved this problem by introducing a partitioned dataset or multiple files.
The ZCollection library does the same using Zarr as the storage format.
First, letβs look at how our library works, using an example. In this example, we will manipulate SWOT data.
Letβs start by loading our dataset.
>>> import xarray as xr
>>>
>>> dataset = xr.open_mfdataset("~/swot/SWOT_L2_LR_SSH_Expert_*",
>>> concat_dim="num_lines",
>>> combine='nested',
>>> parallel=True)
>>> dataset
<xarray.Dataset>
Dimensions: (num_lines: 5761870, num_pixels: 71)
Coordinates:
latitude (num_lines, num_pixels) float64 dask.array<chunksize=(9866, 71), meta=np.ndarray>
longitude (num_lines, num_pixels) float64 dask.array<chunksize=(9866, 71), meta=np.ndarray>
Dimensions without coordinates: num_lines, num_pixels
Data variables:
time (num_lines) datetime64[ns] dask.array<chunksize=(9866,), meta=np.ndarray>
ssh_karin (num_lines, num_pixels) float64 dask.array<chunksize=(9866, 71), meta=np.ndarray>
Before we start storing the data, we will choose a partitioning mode. This example will split the data daily from the values stored in our datasetβs time
variable.
>>> import zcollection
>>>
>>> partition_handler = zcollection.partitioning.Date(("time", ), "D")
The library allows you to create partitioning using sequences of values
representing an index. For example, we would write the following code to
partition our SWOT data by half-orbit.
zcollection.partitioning.Sequence(("cycle_number", "pass_number"))
We will start by creating the dataset that we will manipulate with this library.
>>> ds = zcollection.Dataset.from_xarray(dataset)
>>> ds
<zcollection.dataset.Dataset>
Dimensions: "('num_lines: 5761870', 'num_pixels: 71')"
Data variables
time (num_lines datetime64[ns]: dask.array<chunksize=(9866,)>
latitude (num_lines, num_pixels float64: dask.array<chunksize=(9866, 71)>
longitude (num_lines, num_pixels float64: dask.array<chunksize=(9866, 71)>
ssh_karin (num_lines, num_pixels float64: dask.array<chunksize=(9866, 71)>
Our library uses the fsspec
library to write the data to the file system. It allows, for example, to store the collections on an S3 storage server. In this example, we will use the local file system.
>>> import fsspec
>>>
>>> filesystem = fsspec.filesystem("file")
We will now create our collection.
>>> collection = zcollection.create_collection(
... axis="time",
... ds=ds,
... partition_handler=partition_handler,
... partition_base_dir="/tmp/swot_collection",
... merge_strategy="update_time_series",
... filesystem=filesystem)
The
merge_strategy
parameter allows us to define the merge strategy
implemented when the collection is updated. For the moment, there are two
strategies: None to overwrite the old data with the new ones or
update_time_series
to update the partition with the new data. This operation mode allows updating
the existing data with the latest observations.
The previous operation just created an empty directory containing the configuration file of our collection.
>>> !tree -a /tmp/swot_collection
/tmp/swot_collection
βββ .zcollection
We can now insert our dataset into the collection.
>>> collection.insert(ds)
If we examine the data written to the file system, we can see the different partitions created: year=2014/month=04/day=12
, etc.
>>> !tree -a /tmp/swot_collection
/tmp/swot_collection
βββ year=2014
β βββ month=04
β β βββ day=12
β β β βββ latitude
β β β β βββ 0.0
β β β β βββ .zarray
β β β β βββ .zattrs
β β β βββ longitude
β β β β βββ 0.0
β β β β βββ .zarray
β β β β βββ .zattrs
β β β βββ ssh_karin
β β β β βββ 0.0
β β β β βββ .zarray
β β β β βββ .zattrs
β β β βββ time
β β β β βββ 0
β β β β βββ .zarray
β β β β βββ .zattrs
β β β βββ .zattrs
β β β βββ .zgroup
β β β βββ .zmetadata
β β βββ day=13
β β β βββ latitude
β βββ .zgroup
β βββ .zmetadata
.../...
βββ .zcollection
The written data can be loaded.
>>> collection.load()
<zcollection.dataset.Dataset>
Dimensions: "('num_lines: 5761870', 'num_pixels: 71')"
Data variables
latitude (num_lines, num_pixels float64: dask.array<chunksize=(138078, 71)>
longitude (num_lines, num_pixels float64: dask.array<chunksize=(138078, 71)>
ssh_karin (num_lines, num_pixels float64: dask.array<chunksize=(138078, 71)>
time (num_lines datetime64[ns]: dask.array<chunksize=(138078,)>
The partitions can be filtered, just as can be done with PyArrow.
>>> collection.load("year==2014 and month==4 and day in range(13, 15)")
<zcollection.dataset.Dataset>
Dimensions: "('num_lines: 552314', 'num_pixels: 71')"
Data variables
latitude (num_lines, num_pixels float64: dask.array<chunksize=(226918, 71)>
longitude (num_lines, num_pixels float64: dask.array<chunksize=(226918, 71)>
ssh_karin (num_lines, num_pixels float64: dask.array<chunksize=(226918, 71)>
time (num_lines datetime64[ns]: dask.array<chunksize=(276157,)>
We can add a new variable:
>>> import zcollection.meta
>>>
>>> ssh_karin_2 = zcollection.meta.Variable(
... name="ssh_karin_2",
... dtype="float64",
... dimensions=["num_lines", "num_pixels"],
... attrs=[
... zcollection.meta.Attribute("long_name", "sea surface height"),
... zcollection.meta.Attribute("units", "cm"),
... ],
... fill_value=numpy.nan)
>>> collection.add_variable(ssh_karin_2)
Update it.
>>> def scale(ds):
... return ds.variables["ssh_karin"].values * 100
>>> collection.update(scale, "ssh_karin_2")
Delete it.
>>> collection.drop_variable("ssh_karin_2")
These last operations require to have written access to the collection. If this is not the case, it is possible to enrich an existing collection using views.
>>> view = zcollection.create_view("/tmp/swot_view",
... zcollection.view.ViewReference(
... "/tmp/swot_collection", filesystem),
... filesystem=filesystem)
>>> view.load()
<zcollection.dataset.Dataset>
Dimensions: "('num_lines: 5761870', 'num_pixels: 71')"
Data variables
latitude (num_lines, num_pixels float64: dask.array<chunksize=(138078, 71)>
longitude (num_lines, num_pixels float64: dask.array<chunksize=(138078, 71)>
ssh_karin (num_lines, num_pixels float64: dask.array<chunksize=(138078, 71)>
time (num_lines datetime64[ns]: dask.array<chunksize=(138078,)>
>>> view.add_variable(ssh_karin_2)
>>> view.load()
<zcollection.dataset.Dataset>
Dimensions: "('num_lines: 5761870', 'num_pixels: 71')"
Data variables
latitude (num_lines, num_pixels float64: dask.array<chunksize=(138078, 71)>
longitude (num_lines, num_pixels float64: dask.array<chunksize=(138078, 71)>
ssh_karin (num_lines, num_pixels float64: dask.array<chunksize=(138078, 71)>
time (num_lines datetime64[ns]: dask.array<chunksize=(138078,)>
ssh_karin_2 (num_lines, num_pixels float64: dask.array<chunksize=(17260, 9)>
Finally, it is possible to use indexers in order to easily find stored data.
You will find, in the online documentation, an example of use of these indexes and other information on the library. The source code is available on GitHub.
Your comments and feedback are welcome.