Welcome, I need some support for the design of a forecast archive with Zarr

Welcome fellows of pangeo. I am reading your stuff for a long time. Thanks a lot for those great articles. Actually I am not feeling alone anymore with the pain of handling geospatial datasets.

Actually I am struggeling with my historical forecast database design and I would like to move it into Zarr format. One reason is that I am not able to acess the data by a bbox because I have chunked them according to the google maps tiles. If you find a way to merge different shaped netcdf files, feel free to answer my stackoverflow question.

Now to my question:
How should I organise the Zarr archive or archives when I want to access point forecasts e.g. for a whole year?
I am always starting with a xarray.Dataset that contains a whole run with all variables and leading forecast timesteps.

  1. model_YYYYMMDD_HH.zarr/
    In this case, I would store every run as a single Zarr Archive. Later I would loop through multiple Zarr Archives.
  2. model.zarr/
    The second option is one zarr archive for all runs, and I would tell zarr to select by a date range of runs.

It would be great if this design would work in the aws cloud as well.

I hope you have some recommendations for me. Thanks a lot in advance for your support :slight_smile:

Daniel

2 Likes

Hi @meteoDaniel and welcome to the forum!

This is a big and complex topic. I’ll offer a few general guidelines.

Put as much as you can into a single Zarr group / Xarray dataset

This will make it easier to view and query the data. In general, we try to encourage the idea of “thinking in datasets, not files”. If you have one store for each run, you will have to merge them somehow in postprocessing. That may become annoying / slow.

However, one limitation is that Xarray cannot currently hold arrays of different shapes / dimensions in a single Dataset object. That seems related to your SO question. We are working hard on implementing a new, higher-level data structure for Xarray called Datatree which will likely help with that problem. You might want to give it a try! It is WIP but definitely useable:

Think about how you want to query the data

You said “I want to access point forecasts e.g. for a whole year?”. In Xarray, that might translate to a query like

ds.sel(lon=x, lat=y, time=slice(2020, 2021)

In this case, it will be very important to think about the chunk structure of the Zarr data. If know you will only be querying the data on a pointwise basis, you want to have your data contiguous in time and chunked in space. Rechunker can help you transform the chunks of data on disk.


If you can get more specific with your questions, we can get more specific with answers.

1 Like

@meteoDaniel , another approach might be to leave your netCDF files the way they are (no need to split them apart) and aggregate them virtually using kerchunk/referenceFileSystem. Your test data seem to have the same space and time coordinates for all the files, but normally with forecast data you can first generate fsspec referenceFileSystem JSON for each NetCDF and then create a single JSON that represents a virtual dataset aggregated along a time dimension.

This allows the collection of NetCDF files to be accessed efficiently on disk and on cloud as a single Zarr dataset with the Zarr library.

It appears that kerchunk works fine with your test data: Jupyter Notebook Viewer

But Rich, that wouldn’t be particularly optimized for timeseries retrieval, since the raw netCDF data are chunked in time.

Hey @rabernat and @rsignell ,
thanks a lot for your introduction and approaches. Actually I am preparing a short benchmark on a specific part of my dataset to compare different approaches. My idea was to wait for the result to give you a lot more feedback.

@rsignell , I have tried this and it took me in the place to access the data but not really with a benefit. Because deriving the jsons took very long (1 month of MSG hrv satellite images took 45 minutes) and for an operational benefit I would have to built up a way to update all this metadata and I think using zarr instead of netcdf4 might work even better.

So stay tuned with for a tiny benchmark.

Best regards and thanks a lot.

So here we are. Actually this tiny benchmark is not about the forecast data we archive but about the MSG HRV channel satellite images. I use this case to build up knowledge about zarr and write first functions.

I have stored data for one day (96 files) and in 3 ways:

  • netcdf4 , each scan one file (current solution)
  • zarr, each scan one archive
  • one zarr archive

Storage size is nearly the same for all 3 types, thats very important for us.

import glob
from concurrent.futures import ProcessPoolExecutor
from datetime import datetime, timedelta
from pathlib import Path
from typing import List, Tuple

import numpy as np
import pandas as pd
import xarray as xr
import zarr
from alitiq_core.src.time.time_helper import round_time

COLUMN_DT_CALC = 'dt_calc'
COLUMN_DT_FORE = 'dt_fore'
COLUMN_POSITIONAL_INDEX = 'positional_index'


def _extract_dataframe_from_netcdf(
        data: Tuple[Path, List[int], List[int]]
) -> pd.DataFrame:
    """
    opens netcdf file and extracts the required data from satellite image file

    Args:
        data:
         Tuple of required data:
            file_path
            lat_indices
            lon_indices

    Returns:
        DataFrame with MultiIndex dt_calc, positional_index

    """
    file_path = data[0]
    lat_indices = data[1]
    lon_indices = data[2]

    _data = xr.open_dataset(file_path, engine="h5netcdf")

    selected_data = _data.isel(lat=lat_indices, lon=lon_indices).to_dataframe()
    selected_data = selected_data[
        selected_data.index.get_level_values("lat")
        == selected_data.index.get_level_values("lon")
        ].droplevel("lat")

    selected_data[COLUMN_DT_CALC] = _data.time.values.astype("datetime64[m]").astype(
        datetime
    )

    selected_data.reset_index(inplace=True)
    selected_data.set_index([COLUMN_DT_CALC, "lon"], inplace=True)
    selected_data.index = selected_data.index.rename(
        [COLUMN_DT_CALC, COLUMN_POSITIONAL_INDEX]
    )
    selected_data.drop("time", axis=1, inplace=True)
    return selected_data


def get_data_from_single_zarr_archives(file_list: List[Path]):
    data = []
    for file in file_list:
        _zarr_store = zarr.open(str(file), mode='r', )
        data.append((_zarr_store.hrv[:, 1000, 1000][0], _zarr_store.time.attrs['units']))
    data = pd.DataFrame(data, columns=['hrv', 'cf_timestamp'])
    data.dropna(inplace=True)
    data.index = data.cf_timestamp.apply(
        lambda cf_time_unit: datetime.strptime(f"{cf_time_unit.split(' ')[2]} {cf_time_unit.split(' ')[3]}",
                                               '%Y-%m-%d %H:%M:%S.%f'))
    return data


def get_data_from_multi_zarr_archives_zarr(zarr_path: str = '/app/data/msg_europe/HRV.zarr'):
    _zarr_store = zarr.open(zarr_path, mode='r', )
    cf_time_unit = _zarr_store.time.attrs['units'].split(' ')
    cf_timestamp = datetime.strptime(f"{cf_time_unit[2]} {cf_time_unit[3]}", '%Y-%m-%d %H:%M:%S.%f')
    data = pd.DataFrame(np.c_[_zarr_store.hrv[:, 1000, 1000], _zarr_store.time[:]], columns=['hrv', 'cf_timestamp'])
    data.dropna(inplace=True)
    data.index = data.cf_timestamp.apply(
        lambda x: round_time(cf_timestamp + timedelta(days=x)) if ~isinstance(x, float) else cf_timestamp)
    data.drop(['cf_timestamp'], axis=1, inplace=True)
    data.sort_index(inplace=True)
    return data


def get_data_from_multi_zarr_archives_xarray(zarr_path: str = '/app/data/msg_europe/HRV.zarr'):
    dataset = xr.open_dataset(zarr_path, engine='zarr')
    selected_data = dataset.isel(lat=[1000], lon=[1000]).to_dataframe()
    selected_data = selected_data[
        selected_data.index.get_level_values("lat")
        == selected_data.index.get_level_values("lon")
        ].droplevel("lat")
    selected_data[COLUMN_DT_CALC] = dataset.time.values.astype("datetime64[m]").astype(
        datetime
    )

    selected_data.reset_index(inplace=True)
    selected_data.set_index([COLUMN_DT_CALC, "lon"], inplace=True)
    selected_data.index = selected_data.index.rename(
        [COLUMN_DT_CALC, COLUMN_POSITIONAL_INDEX]
    )
    selected_data.drop("time", axis=1, inplace=True)
    return selected_data


nc_files = glob.glob('/app/data/msg_europe/202204/*.nc4')
nc_files.sort()
netcdf_file_list = [Path(file) for file in nc_files]

zarr_files = glob.glob('/app/data/msg_europe/202204/*.zarr')
zarr_files.sort()
zarr_file_list = [Path(file) for file in zarr_files]

zarr_full_archive = Path('data/msg_europe/HRV.zarr')

from IPython import embed
embed()

# existing solution
args = ((nc_file, [1000], [1000]) for nc_file in netcdf_file_list)
start = datetime.utcnow()
with ProcessPoolExecutor(max_workers=1) as executor:
    requested_data = executor.map(_extract_dataframe_from_netcdf, args)
requested_data = pd.concat(requested_data)
print(datetime.utcnow() - start)
# 4 workers 0:00:00.452047
# 1 workers 0:00:01.552317

% timeit get_data_from_multi_zarr_archives_xarray()
# 54.5 ms ± 2.07 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)
% timeit get_data_from_multi_zarr_archives_zarr()
# 41 ms ± 447 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
% timeit get_data_from_single_zarr_archives(zarr_file_list)
# 70.8 ms ± 1.56 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)

As you recommend to store all data in one archive and it seems to be the fastest solution I will go on with that. So all following questions will be about that.
For the next steps I have to figure out how to deal with multiple variables and dt_calc and dt_fore (multiple time dimensions) to store forecast data. I will give you an update.

PS:
At the end, I have some general questions about xarray and zarr. One reason why I chunked the netcdf4 data was the RAM that is required during the .isel step. E.g. it was possible to open several hundred nc4 files with open_mfdataset but during isel() the ram overflows. I tried using dask but I was not able to find a running solution.
As I understand zarr correctly, this issue is solved by underlying chunk mechanism?

So here is a small benchmark and code for the forecast data I want to store. It covers 5 forecast runs.

At first, here is a snippet of how I store the forecast data into one zarr archive:

    data_type_mapping = DTYPE_MAPPING[DataTypeDTypeMapping.PYTHON.value]
    compressor = zarr.Blosc(cname="zstd", clevel=5, shuffle=2)

    if Path(f"/app/data/{grid_data.model_name.value}.zarr").exists():
        data.to_zarr(f"/app/data/{grid_data.model_name.value}.zarr", mode='a', append_dim='dt_calc')
    else:
        encoding = {key: {"dtype": data_type_mapping[key], "compressor": compressor}
                    for key in list(data.keys()) if key not in ['dt_calc', 'dt_fore']}
        encoding.update({'dt_calc': {"dtype": "float64"}, 'dt_fore': {"dtype": "float64"}})
        data.to_zarr(f"/app/data/{grid_data.model_name.value}.zarr", encoding=encoding)

And here is how I access them one using xarray and one using zarr library and pandas.

import zarr
import xarray as xr
from datetime import timedelta
import pandas as pd
from typing import List

COLUMN_DT_CALC = 'dt_calc'
COLUMN_DT_FORE = 'dt_fore'
COLUMN_POSITIONAL_INDEX = 'positional_index'


def get_data_from_multi_zarr_archives_xarray(
        zarr_path: str = '/app/data/harmonie_knmi.zarr',
        lat_indices: List[int] = [100],
        lon_indices: List[int] = [100]
) -> pd.DataFrame:
    dataset = xr.open_dataset(zarr_path, engine='zarr')
    selected_data = dataset.isel(x=lon_indices, y=lat_indices).to_dataframe()
    selected_data = selected_data[
        selected_data.index.get_level_values("y")
        == selected_data.index.get_level_values("x")
        ].droplevel("y")
    selected_data.index = pd.MultiIndex.from_tuples([(dt_calc, dt_fore, pos_index) for dt_calc, dt_fore, pos_index in
                                                     zip(pd.to_datetime(
                                                         selected_data.index.get_level_values('dt_calc').astype(int)),
                                                         pd.to_datetime(
                                                             selected_data.index.get_level_values('dt_calc').astype(
                                                                 int)) + selected_data.index.get_level_values(
                                                             'dt_fore'),
                                                         selected_data.index.get_level_values(2))],
                                                    names=[COLUMN_DT_CALC, COLUMN_DT_FORE, COLUMN_POSITIONAL_INDEX])
    selected_data.drop(['lat', 'lon'], axis=1, inplace=True)
    return selected_data


def get_data_from_multi_zarr_archives_zarr(
        zarr_path: str = '/app/data/harmonie_knmi.zarr',
        lat_indices: int = 100,
        lon_indices: int = 100
) -> pd.DataFrame:
    _zarr_store = zarr.open(zarr_path, mode='r', )
    selected_data = {}
    for name, values in _zarr_store.arrays():
        if name == 'dt_calc':
            dt_calc = values
        elif name == 'dt_fore':
            dt_fore = values
        elif len(values.shape) > 3:
            selected_data[name] = values[:, :, lon_indices, lat_indices].ravel()

    selected_data = pd.DataFrame(selected_data, index=pd.MultiIndex.from_tuples(
        [(dtc, dtc + timedelta(hours=dtf), 0) for idx, dtc in enumerate(pd.to_datetime(dt_calc[:].astype(int))) for dtf
         in dt_fore[:]], names=[COLUMN_DT_CALC, COLUMN_DT_FORE, COLUMN_POSITIONAL_INDEX]))
    return selected_data


data = get_data_from_multi_zarr_archives_zarr()
data_2 = get_data_from_multi_zarr_archives_xarray()


pd.testing.assert_frame_equal(data, data_2)

% timeit
get_data_from_multi_zarr_archives_zarr()
# 248 ms ± 13.5 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

% timeit
get_data_from_multi_zarr_archives_xarray()
# 324 ms ± 8 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

During the process I came across one issue that I was able to work around.
I have 2 time axis (dt_calc and dt_fore) but dt_fore was overwirtten everytime. So I defined dt_fore as timedelta , in combination with dt_calc I was able to reproduce the correct dt_fore. Unfortunately this will only work for forecasts with the same forecast horizon, but I think the design of zarr forces to have equal shaped arrays. So I think it would not be possible to store forecast with different lengths. Or do you think there is a way to store different shaped arrays in the same zarr archive? At the end this would just result in a tiny different design of the database. The name of the Archive would be <model_name>_<run>.zarr instead of <model_name>.zarr because the length of the forecast just differs between the different runs during the day.

I am really looking forward to your thoughts about it.
At the moment I am just asking me how this will work in case of 1000 forecasts and not 5?
How can I use dask to increase process speed?

Thanks a lot !

True true. It wasn’t clear to me that time series retrieval was a prime use case.

1 Like

@meteoDaniel , although it make take quite a while initially to create the JSONs for each file in the forecast archive, you only have to do this once. Then when new forecast files arrives, you create the JSON for those few files and recreate the combined_json from all the individual JSONs, which is quick. Does that make sense?

@rsignell indeed I thought about that but there was another object that took a long time to create , unfortunately I do not know if it was m = fs.get_mapper("") or mzz.translate('./combined.json')

But nevertheless I would like to build that archive upon more powerful code. I have build that pyramid chunking on my own, I am very proud of it but in the end I was under pressure when I started with that and now I think it is better to trust in the community around zarr. And I will give my best to share my experience with zarr to empower the community.

Actually I have not understood how this works with json+netcdf4 and the zarr engine.

Work in progress and I am on my way to a full zarr archive. At the moment I am very happy with the progress and the experiences I have made.

  1. I am having a Zarr Archive with over 23000 Arrays at the moment
  2. Using dask + xarray fasten up to query the data
def get_data_from_multi_zarr_archives_zarr(
    zarr_path: str = "/app/data/msg_europe/HRV.zarr",
) -> pd.DataFrame:
    """function to retrieve data"""
    _zarr_store = zarr.open(
        zarr_path,
        mode="r",
    )
    cf_time_unit = _zarr_store.time.attrs["units"].split(" ")
    cf_timestamp = datetime.strptime(
        f"{cf_time_unit[2]} {cf_time_unit[3]}", "%Y-%m-%d %H:%M:%S"
    )
    data = pd.DataFrame(
        np.c_[_zarr_store.hrv[:, 1000, 1000], _zarr_store.time[:]],
        columns=["hrv", "cf_timestamp"],
    )
    data.dropna(inplace=True)
    data.index = data.cf_timestamp.apply(
        lambda x: round_time(cf_timestamp + timedelta(days=x))
        if ~isinstance(x, float)
        else cf_timestamp
    )
    data.drop(["cf_timestamp"], axis=1, inplace=True)
    data.sort_index(inplace=True)
    return data


def get_data_from_multi_zarr_archives_xarray(
    zarr_path: str = "/app/data/msg_europe/HRV.zarr",
) -> pd.DataFrame:
    """testing multi_zarr access"""
    _ = Client()
    dataset = xr.open_zarr(zarr_path)
    query = dataset.isel(lat=[1000], lon=[1000]).persist()
    progress(query)
    selected_data = query.to_dataframe()
    selected_data = selected_data[
        selected_data.index.get_level_values("lat")
        == selected_data.index.get_level_values("lon")
    ].droplevel("lat")
    selected_data[COLUMN_DT_CALC] = dataset.time.values.astype("datetime64[m]").astype(
        datetime
    )

    selected_data.reset_index(inplace=True)
    selected_data.set_index([COLUMN_DT_CALC, "lon"], inplace=True)
    selected_data.index = selected_data.index.rename(
        [COLUMN_DT_CALC, COLUMN_POSITIONAL_INDEX]
    )
    selected_data.drop("time", axis=1, inplace=True)
    return selected_data

The first method get_data_from_multi_zarr_archives_zarr took 4:51 minutes and the second method get_data_from_multi_zarr_archives_xarray took 2:05 minutes.

But what I was wondering about, as I ran the first a second time it just took 16 seconds? Does zarr caches the data ? And if yes how long? Where can I find some details about this caching behaviour?

Another experience I have made is that the time appending data to the archive is increasing. Do you have ideas how to solve this?

Best regards
Daniel