So here is a small benchmark and code for the forecast data I want to store. It covers 5 forecast runs.
At first, here is a snippet of how I store the forecast data into one zarr archive:
data_type_mapping = DTYPE_MAPPING[DataTypeDTypeMapping.PYTHON.value]
compressor = zarr.Blosc(cname="zstd", clevel=5, shuffle=2)
if Path(f"/app/data/{grid_data.model_name.value}.zarr").exists():
data.to_zarr(f"/app/data/{grid_data.model_name.value}.zarr", mode='a', append_dim='dt_calc')
else:
encoding = {key: {"dtype": data_type_mapping[key], "compressor": compressor}
for key in list(data.keys()) if key not in ['dt_calc', 'dt_fore']}
encoding.update({'dt_calc': {"dtype": "float64"}, 'dt_fore': {"dtype": "float64"}})
data.to_zarr(f"/app/data/{grid_data.model_name.value}.zarr", encoding=encoding)
And here is how I access them one using xarray
and one using zarr
library and pandas
.
import zarr
import xarray as xr
from datetime import timedelta
import pandas as pd
from typing import List
COLUMN_DT_CALC = 'dt_calc'
COLUMN_DT_FORE = 'dt_fore'
COLUMN_POSITIONAL_INDEX = 'positional_index'
def get_data_from_multi_zarr_archives_xarray(
zarr_path: str = '/app/data/harmonie_knmi.zarr',
lat_indices: List[int] = [100],
lon_indices: List[int] = [100]
) -> pd.DataFrame:
dataset = xr.open_dataset(zarr_path, engine='zarr')
selected_data = dataset.isel(x=lon_indices, y=lat_indices).to_dataframe()
selected_data = selected_data[
selected_data.index.get_level_values("y")
== selected_data.index.get_level_values("x")
].droplevel("y")
selected_data.index = pd.MultiIndex.from_tuples([(dt_calc, dt_fore, pos_index) for dt_calc, dt_fore, pos_index in
zip(pd.to_datetime(
selected_data.index.get_level_values('dt_calc').astype(int)),
pd.to_datetime(
selected_data.index.get_level_values('dt_calc').astype(
int)) + selected_data.index.get_level_values(
'dt_fore'),
selected_data.index.get_level_values(2))],
names=[COLUMN_DT_CALC, COLUMN_DT_FORE, COLUMN_POSITIONAL_INDEX])
selected_data.drop(['lat', 'lon'], axis=1, inplace=True)
return selected_data
def get_data_from_multi_zarr_archives_zarr(
zarr_path: str = '/app/data/harmonie_knmi.zarr',
lat_indices: int = 100,
lon_indices: int = 100
) -> pd.DataFrame:
_zarr_store = zarr.open(zarr_path, mode='r', )
selected_data = {}
for name, values in _zarr_store.arrays():
if name == 'dt_calc':
dt_calc = values
elif name == 'dt_fore':
dt_fore = values
elif len(values.shape) > 3:
selected_data[name] = values[:, :, lon_indices, lat_indices].ravel()
selected_data = pd.DataFrame(selected_data, index=pd.MultiIndex.from_tuples(
[(dtc, dtc + timedelta(hours=dtf), 0) for idx, dtc in enumerate(pd.to_datetime(dt_calc[:].astype(int))) for dtf
in dt_fore[:]], names=[COLUMN_DT_CALC, COLUMN_DT_FORE, COLUMN_POSITIONAL_INDEX]))
return selected_data
data = get_data_from_multi_zarr_archives_zarr()
data_2 = get_data_from_multi_zarr_archives_xarray()
pd.testing.assert_frame_equal(data, data_2)
% timeit
get_data_from_multi_zarr_archives_zarr()
# 248 ms ± 13.5 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
% timeit
get_data_from_multi_zarr_archives_xarray()
# 324 ms ± 8 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
During the process I came across one issue that I was able to work around.
I have 2 time axis (dt_calc
and dt_fore
) but dt_fore
was overwirtten everytime. So I defined dt_fore
as timedelta
, in combination with dt_calc
I was able to reproduce the correct dt_fore
. Unfortunately this will only work for forecasts with the same forecast horizon, but I think the design of zarr forces to have equal shaped arrays. So I think it would not be possible to store forecast with different lengths. Or do you think there is a way to store different shaped arrays in the same zarr archive? At the end this would just result in a tiny different design of the database. The name of the Archive would be <model_name>_<run>.zarr
instead of <model_name>.zarr
because the length of the forecast just differs between the different runs during the day.
I am really looking forward to your thoughts about it.
At the moment I am just asking me how this will work in case of 1000 forecasts and not 5?
How can I use dask to increase process speed?
Thanks a lot !