There is a trick to solving the easy part of your problem.
# open the dataset with no dask chunking
ds_all = xr.open_zarr(zarr_filename_in_google_bucket, consolidated=True, chunks=False)
# select the day you want, lazy, but no dask involved
ds_day = ds_all.isel(time=0)
# now do what you want, including chunking, with your small piece of data
This is a poorly documented but very useful way to work with data. It’s how my llcbot works.
If you have your own system for parallelization, you could use that here to map over many tasks
Each variable 1.3 TB of data, in ~10_000 x 100 MB chunks along the time axis. There are 17 variables
The Dask graph that comes out of groupby("time.dayofyear").mean().compute() creates communication patterns that are extremely memory intensive, since it needs to combine data from every single chunk at every single point in space. There has been lots of work in Dask recently on improving memory management and task scheduling that has slowly improved this use case, which you can read about here:
However, the bottom line is that it is just a very hard computational problem. There are two approaches you can take:
Throw a ton of memory at it
You cluster probably needs like 10x more memory than the data you are trying to process. So if you have 1 TB of data, you would need 10 TB of aggregate memory. You could use a cluster of 100 nodes with 100 GB RAM each. That might work
Rechunk your data
A similar issue became the most common thread on this forum:
What we ended up doing is creating a new package called rechunker whose job is just to scalably alter the chunk structure of big Zarr arrays.
If you rechunk your data to have a contiguous time dimension (no chunks in time) but instead use chunks in the spatial dimension, your problem becomes embarrassingly parallel. Then things should move very quickly.
Hope that’s helpful. Please report back because this sort of problem is very interesting to us.