So here is a small benchmark and code for the forecast data I want to store. It covers 5 forecast runs.
At first, here is a snippet of how I store the forecast data into one zarr archive:
data_type_mapping = DTYPE_MAPPING[DataTypeDTypeMapping.PYTHON.value]
compressor = zarr.Blosc(cname="zstd", clevel=5, shuffle=2)
if Path(f"/app/data/{grid_data.model_name.value}.zarr").exists():
data.to_zarr(f"/app/data/{grid_data.model_name.value}.zarr", mode='a', append_dim='dt_calc')
else:
encoding = {key: {"dtype": data_type_mapping[key], "compressor": compressor}
for key in list(data.keys()) if key not in ['dt_calc', 'dt_fore']}
encoding.update({'dt_calc': {"dtype": "float64"}, 'dt_fore': {"dtype": "float64"}})
data.to_zarr(f"/app/data/{grid_data.model_name.value}.zarr", encoding=encoding)
And here is how I access them one using xarray and one using zarr library and pandas.
import zarr
import xarray as xr
from datetime import timedelta
import pandas as pd
from typing import List
COLUMN_DT_CALC = 'dt_calc'
COLUMN_DT_FORE = 'dt_fore'
COLUMN_POSITIONAL_INDEX = 'positional_index'
def get_data_from_multi_zarr_archives_xarray(
zarr_path: str = '/app/data/harmonie_knmi.zarr',
lat_indices: List[int] = [100],
lon_indices: List[int] = [100]
) -> pd.DataFrame:
dataset = xr.open_dataset(zarr_path, engine='zarr')
selected_data = dataset.isel(x=lon_indices, y=lat_indices).to_dataframe()
selected_data = selected_data[
selected_data.index.get_level_values("y")
== selected_data.index.get_level_values("x")
].droplevel("y")
selected_data.index = pd.MultiIndex.from_tuples([(dt_calc, dt_fore, pos_index) for dt_calc, dt_fore, pos_index in
zip(pd.to_datetime(
selected_data.index.get_level_values('dt_calc').astype(int)),
pd.to_datetime(
selected_data.index.get_level_values('dt_calc').astype(
int)) + selected_data.index.get_level_values(
'dt_fore'),
selected_data.index.get_level_values(2))],
names=[COLUMN_DT_CALC, COLUMN_DT_FORE, COLUMN_POSITIONAL_INDEX])
selected_data.drop(['lat', 'lon'], axis=1, inplace=True)
return selected_data
def get_data_from_multi_zarr_archives_zarr(
zarr_path: str = '/app/data/harmonie_knmi.zarr',
lat_indices: int = 100,
lon_indices: int = 100
) -> pd.DataFrame:
_zarr_store = zarr.open(zarr_path, mode='r', )
selected_data = {}
for name, values in _zarr_store.arrays():
if name == 'dt_calc':
dt_calc = values
elif name == 'dt_fore':
dt_fore = values
elif len(values.shape) > 3:
selected_data[name] = values[:, :, lon_indices, lat_indices].ravel()
selected_data = pd.DataFrame(selected_data, index=pd.MultiIndex.from_tuples(
[(dtc, dtc + timedelta(hours=dtf), 0) for idx, dtc in enumerate(pd.to_datetime(dt_calc[:].astype(int))) for dtf
in dt_fore[:]], names=[COLUMN_DT_CALC, COLUMN_DT_FORE, COLUMN_POSITIONAL_INDEX]))
return selected_data
data = get_data_from_multi_zarr_archives_zarr()
data_2 = get_data_from_multi_zarr_archives_xarray()
pd.testing.assert_frame_equal(data, data_2)
% timeit
get_data_from_multi_zarr_archives_zarr()
# 248 ms ± 13.5 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
% timeit
get_data_from_multi_zarr_archives_xarray()
# 324 ms ± 8 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
During the process I came across one issue that I was able to work around.
I have 2 time axis (dt_calc and dt_fore) but dt_fore was overwirtten everytime. So I defined dt_fore as timedelta , in combination with dt_calc I was able to reproduce the correct dt_fore. Unfortunately this will only work for forecasts with the same forecast horizon, but I think the design of zarr forces to have equal shaped arrays. So I think it would not be possible to store forecast with different lengths. Or do you think there is a way to store different shaped arrays in the same zarr archive? At the end this would just result in a tiny different design of the database. The name of the Archive would be <model_name>_<run>.zarr instead of <model_name>.zarr because the length of the forecast just differs between the different runs during the day.
I am really looking forward to your thoughts about it.
At the moment I am just asking me how this will work in case of 1000 forecasts and not 5?
How can I use dask to increase process speed?
Thanks a lot !