Zarr era5 reading causes huge number of tasks

I’m attempting to calculate some climatologies over an ERA5 dataset stored in google cloud storage stored as a zarr file using xarray/dask. I’m attempting to do this over individual chunks at a time for reasons of memory (Doing the entire operation at once crashes the machine). Whenever I open the file however and look at the task graph, dask creates a reading task for every chunk in the file, not just those accessed. Normally this is not a large issue as when the task gets submitted the graph gets optimized and the extra reads dropped. This is however a large number of tasks (42k in our case) per chunk I process.

However when attempting to iterate over chunks and submitting each of the processing steps, this bogs down the scheduler and also it seems like the read gets stuck and capped at one. Additionally instead of doing depth first and finishing up each chunk, it seems to want to read all of the chunks at once, before any processing starts, which tends to crash the node. I can generate a task graph if I restrict it to a 1 year file, but it has to plot it small enough to make it fairly hard to use.

This begs the question of the proper way to do what I am doing. Essentially I want to do the following:

import xarray as xr
import gcsfs
import numpy as np
import dask as da
import logging
import warnings
from datetime import datetime

ds_all = xr.open_zarr(zarr_filename_in_google_bucket, consolidated=True)
climatology_mean = ds_all.groupby("time.dayofyear").mean().compute()

But this tends to crash. I’ve also tried splitting it up by chunks which works, but can be incredibly slow as it essentially only uses a single CPU for everything. An example is at https://gist.github.com/josephhardinee/bb1d5cf91b55caa151e0b9096294bd4c

I also tried this with futures which I think may be the correct way, but either scheduler gets overwhelmed with # of tasks (Most of which aren’t needed as dependencies for each path down the tree) or one node somehow ends up grabbing all of the tasks. An example is at https://gist.github.com/josephhardinee/5e1b8da4764239a029c16cf4ceaaca8e

Is there something obvious I am doing wrong with these? The zarr dataset being loaded in is 400000x721x1440 chunked as (100000, 10, 10) if that is helpful.

1 Like

The good news

There is a trick to solving the easy part of your problem.

# open the dataset with no dask chunking
ds_all = xr.open_zarr(zarr_filename_in_google_bucket, consolidated=True, chunks=False)
# select the day you want, lazy, but no dask involved
ds_day = ds_all.isel(time=0)
# now do what you want, including chunking, with your small piece of data

This is a poorly documented but very useful way to work with data. It’s how my llcbot works.
If you have your own system for parallelization, you could use that here to map over many tasks

The bad news

I have never seen code like this…

climatology_mean = ds_all.groupby("time.dayofyear").mean().compute()

…work with data that is chunked in time at the scale of ERA5.

I don’t know if your dataset is public, but here is what Pangeo’s ERA5 data looks like

from intake import open_catalog
cat = open_catalog("https://raw.githubusercontent.com/pangeo-data/pangeo-datastore/master/intake-catalogs/atmosphere.yaml")
ds = cat['era5_hourly_reanalysis_single_levels_sa'].to_dask()
display(ds)

Each variable 1.3 TB of data, in ~10_000 x 100 MB chunks along the time axis. There are 17 variables

The Dask graph that comes out of groupby("time.dayofyear").mean().compute() creates communication patterns that are extremely memory intensive, since it needs to combine data from every single chunk at every single point in space. There has been lots of work in Dask recently on improving memory management and task scheduling that has slowly improved this use case, which you can read about here:

However, the bottom line is that it is just a very hard computational problem. There are two approaches you can take:

Throw a ton of memory at it

You cluster probably needs like 10x more memory than the data you are trying to process. So if you have 1 TB of data, you would need 10 TB of aggregate memory. You could use a cluster of 100 nodes with 100 GB RAM each. That might work

Rechunk your data

A similar issue became the most common thread on this forum:

What we ended up doing is creating a new package called rechunker whose job is just to scalably alter the chunk structure of big Zarr arrays.

If you rechunk your data to have a contiguous time dimension (no chunks in time) but instead use chunks in the spatial dimension, your problem becomes embarrassingly parallel. Then things should move very quickly.


Hope that’s helpful. Please report back because this sort of problem is very interesting to us.

Thanks for the help @rabernat. So I’m adding a few more details and maybe getting a hint of what is going wrong in my specific case. I opened a zarr file with just a single variable (and conveniently without time chunked). I open the file, grab a single chunk, then do the groupby.mean on that. A notebook that shows this can be found at https://nbviewer.jupyter.org/urls/dl.dropbox.com/s/lts06m3qchri50y/zarr_speed_tests.ipynb

I think maybe key to what is going on are the two task graphs generated in that notebook . First is the graph for just opening the file and performing the select. This can be seen at Dropbox - step1.svg - Simplify your life , though you may have to download the file to be able to zoom in enough on it. The general structure is fine, but we do see a lot of spare output nodes that are not actually being used. If we add the groupby and mean on top of this we get Dropbox - step2.svg - Simplify your life . This has the right topology (ie the groupby is isolated to its individual chunk), but we do have a lot of “vestigial” outputs. When handing in a lot of these types of tasks to a scheduler (ie wrap this operation in a function, then submit it as a delayed object), it seems like all of the reads get done before any of the computations. As an example if I pass 2 chunks into the operation at once I get Dropbox - step3.svg - Simplify your life . So it seems like the structure is getting set up appropriately, just with a lot of tasks that could perhaps be pruned. If I submit many of these to a distributed cluster at once, it seems like it wants to do all of the read operations, before it gets to any/many of the actual computation nodes that would let it offload results and intermediate nodes.

So I guess a question would be how I prune this graph to automatically get rid of some of the earlier nodes? I know dask has some documentation on this, but it seems mostly tailored to how to do it on hand generated graphs rather than the automatically generated ones.

As far as your first solution that may solve the other problem I’ve been running into but put on the backburner. Most people here are using time series over a few pixels so our chunking right now is 10 x10 in lat/lon, and cuts ERA5 up into 4 slices in time (~100k points each, except last which has a few less). So if I open it, with chunking off, then select the first time globally with isel(time=0), my understanding is it has to parse all lat/lon and a single time chunk for each, which in our case would be roughly 1/4 of the dataset? This part isn’t a huge problem as we have yearly zarr files as well which is what I was planning on using, but if there is a shorter way to make that work I would be interested.

@rabernat @josephhardinee, complexities of climatologies and chunking aside, glancing at the notebook If I’m not mistaken, I don’t think the dask scheduler actually receives all these tasks reported in Xarray’s HTML depending on the calculation.

I typically use .visualize(optimize_graph=True) which I thought it what the scheduler actually receives (maybe @TomAugspurger could confirm)? I’ll try to illustrate below with a simple example and public data:

import os
import rioxarray
os.environ['AWS_NO_SIGN_REQUEST']='YES'
cog = 's3://sentinel-s1-rtc-indigo/tiles/RTC/1/IW/12/S/YJ/2016/S1B_20161121_12SYJ_ASC/Gamma0_VV.tif'
da = rioxarray.open_rasterio(cog, chunks=dict(x=1024,y=1024))
da

(NOTE 37 tasks needed to read the entire array)

But if you just want values from a single dask chunk:

da.isel(x=0,y=0).data.visualize()

We see all 37 tasks even though only one chunk needs to be accessed:

Instead use optimize_graph=True for a more accurate representation of this computation

da.isel(x=0,y=0).data.visualize(optimize_graph=True, rankdir='LR')

1 Like

Yeah I think that’s correct. (My only hesitation is around the recent changes to avoid materializing low-level graphs on the client, and instead ship high-level graphs, but I think that only applies to dataframes right now.). Either way, visualize(optimize_graph=True) is a better representation of what’s sent to the scheduler.

2 Likes

I haven’t read through the posts closely, but I see references to groupby so it might we worth trying out @dcherian’s dask_groupby: https://github.com/dcherian/dask_groupby.

1 Like

(100000, 10, 10)

How many years are in a single chunk? If its > 1 (larger is better) I bet dask_groupby.xarray.xarray_reduce will work extremely well.

If it is less than one, it won’t work well. For e.g. It will fail extremely quickly for the pangeo style chunking ;). That said, if the pangeo hourly data had chunksize=24 along time, then the current xarray strategy should be optimal for dayofyear grouping.

1 Like

Thanks for the poke here @TomAugspurger .

I think I’ve found a better strategy for this kind of time grouping where the groups repeat at a large distance relative to chunksize (e.g. dayofyear repeats every 365-ish but data may have only 3 days in a chunk). That kind of thing fails with both dask_groupby and xarray’s current strategy.

Help with generalizing this idea is very welcome!

1 Like

@josephhardinee I know it doesn’t get to the core of your issue but could drop in a for loop over year and iterate over those, create temp files etc. I imagine dealing a year at a time will be pretty fast and hopefully you get the climatology. Avoids compute crashes etc. I haven’t taken a look it but I feel xarray beam takes this approach (xarray-beam/era5_climatology.py at main · google/xarray-beam · GitHub) albeit in parallel. That said this forum is for how to do big data analysis so thanks for the Q.

Xarray-Beam creates a high-level task graph in Apache Beam, rather than a bunch of individual tasks. So I wouldn’t say it uses the looping approach. That said, it was designed to solve exactly these sort of problems, so I guess it would handle this pretty well once you figured out how to get started.

1 Like