I investigated this on a cloud cluster.
Step 1: Ingest Data using Pangeo Forge
To start, I ingested your data into Google Cloud Storage in the Zarr format using the following Pangeo Forge recipe
import os
from pangeo_forge_recipes.patterns import ConcatDim, FilePattern
from pangeo_forge_recipes.recipes import XarrayZarrRecipe
from pangeo_forge_recipes.storage import CacheFSSpecTarget, FSSpecTarget, MetadataTarget, StorageConfig
import gcsfs
import pandas as pd
# after 2020 data become netCDF4 files 🤦♂️ - this causes problems
#dates = pd.date_range('2000-01-01', '2021-12-01', freq='MS')
dates = pd.date_range('2000-01-01', '2019-12-01', freq='MS')
def format_url(time):
return (
"https://cluster.klima.uni-bremen.de/~fmaussion/teaching/climate/dask_exps/HiRes_Hourly_Surf/"
"ERA5_HiRes_Hourly_tp_{time:%Y}_{time:%m}.nc"
).format(time=time)
pattern = FilePattern(
format_url,
ConcatDim("time", dates),
file_type='netcdf3'
)
recipe = XarrayZarrRecipe(
pattern,
target_chunks={'time': 24},
subset_inputs={'time': 4}
)
fs = gcsfs.GCSFileSystem(skip_instance_cache=True, use_listings_cache=False)
target = FSSpecTarget(fs, f"{os.environ['SCRATCH_BUCKET']}/ERA5_HiRes_Hourly.zarr")
cache = CacheFSSpecTarget(fs, f"{os.environ['SCRATCH_BUCKET']}/ERA5_HiRes_Hourly/cache")
metadata = MetadataTarget(fs, f"{os.environ['SCRATCH_BUCKET']}/ERA5_HiRes_Hourly/metadata")
recipe.storage_config = StorageConfig(target, cache, metadata)
delayed = recipe.to_dask()
delayed.compute()
This was pretty slow, since I was only about to get about 5 MB/s from the server. But it worked!
Step 2: Flox with “Cohorts”
I was able to get the computation to run with the following cluster settings
from dask_gateway import Gateway
g = Gateway()
options = g.cluster_options()
options.environment = {"MALLOC_TRIM_THRESHOLD_": "0"}
options.worker_memory = 40
gc = g.new_cluster(cluster_options=options)
gc.scale(20)
At this point my cluster will have 800 GB of memory. This matches a rule of thumb that I have observed in this type of workload: it will only really work if you have more memory in the cluster than the total dataset size! I consider this a serious performance limitation of our stack for this type of computation. It means that we can’t actually do “streaming” style processing, where the cluster memory is much smaller than the actual data.
These settings allowed the computation to complete in
import xarray as xr
import os
import flox.xarray
url = f"{os.environ['SCRATCH_BUCKET']}/ERA5_HiRes_Hourly.zarr"
ds = xr.open_dataset(url, engine="zarr", chunks={})
# method = "map-reduce" # <- cluster ran out of memory
method = "cohorts"
tpm = flox.xarray.xarray_reduce(ds.tp, ds.time.dt.hour, func="mean", method=method)
tpm.compute()
The workers got totally saturated with memory and started spilling to disk, but they did manage to recover and finish the computation in about 15 minutes.
tpm[0].plot(robust=True, figsize=(14,8))
I am now investigating using rechunker. This operation is basically embarrassingly parallel in space, similar to timeseries analysis at each point. So I am hoping that, by rechunking the data to be contiguous in time, and using flox with method="map-reduce"
, I will get a much happier (and faster) dask computation.