Xarray unable to allocate memory, How to "size up" problem

Not sure if this is the right forum for this question but I’m giving it a try:

My biggest problem as a scientist (not computer) with xarray/dask is usually that the time saved by having minimal amounts of code to solve a science problem is often lost by trying to figure out how to scale it so it can actually run. Here is an example:

I want to generate a map of the maximum daily air temperature anomaly at each location from daily ERA5 data. The wonderfully simple code for this:

nyears=73
path="/data/axel0/era5/ncar/daily/inst/*/*.nc"
ds=xr.open_mfdataset(path,chunks{"time":366*nyears,"latitude":200,"longitude":200},parallel=True)
anom = ds.groupby("time.dayofyear")-ds.groupby('time.dayofyear').mean(dim='time')  
maxv=anom.VAR_2T.max(dim='time')

There are 73 files (one for each year) with a single variable VAR_2T (+ coordinates) in the path
I get an error of “unable to allocate 102 GB” of an array shape of 26267,721,1440…with each year amounting to about 1.5GB of uncompressed data (3657211440*4)

Ok. So this apparently won’t fit into my machine with 96 GB and 16 Cores (though I don’t really understand why because I thought DASK would handle this by smartly sizing the problem into workable chunks?). So next step is to reduce the problem to only use files from 2010-2022 sort of works but is very slow (runs overnight).

So I thought I’ll try to speed this up a bit by setting up a local cluster and run this in parallel

client=Client(processes=False,memory_limit='10GB',n_workers=8)
(note I tried a distributed cluster (on local machine) also but that fails completely)

Nope, I get a lot of chatter and I can watch the entertaining (but inscrutable to me) DASK status monitor. But eventually some of the workers die (something but losing connection), I suppose because the machine gets overloaded (note that I’ve played around with chunk settings, and the resulting chunks seem to be different from what I am setting but I don’t seem to be able to get much improvement)

While a big problem, it certainly doesn’t seem to be too big for my system and I could break it down into smaller pieces (with and/or without use xarray) but the question is “should I really have to?”. Isn’t this what xarray was (also) made for?). It would be helpful if there was some guidance on “sizing” a problem, what to “expect” and maybe a set of recipes of things to try fit it. Maybe this exists or maybe I’m asking too much?

(this using Python 3.10, dask: 2022.12.1 and xarray 2022.12.0
Best
Axel

4 Likes

Hi Axel! Thanks for sharing! Your frustration is certainly not unique. I think our community is guilty of making it seem like these tools are a bit magic and can just work at any scale with little user intervention. That’s clearly not the case.

It’s worth noting that the problem you have picked–calculating the daily climatology from a high-resolution dataset–is notoriously challenging. Just to benchmark your system, I’d start first by doing something simpler like

ds.mean(dim="time")

just to see if it works. If that doesn’t work, the climatology never will.

Then you might want to read this thread, which at the end gets to some practical solutions to your problem.

Finally, I would note that code like this

ds=xr.open_mfdataset(path,chunks={"time":366*nyears,"latitude":200,"longitude":200})

is potentially wishful thinking, There are underlying physical chunks on disk in the netCDF4 files, and if you tell Dask to use chunks that are not aligned with those, performance could be worse, not better. So you should seek to understand what are the chunks on disk first.

1 Like

Another thought…sometimes problems can arise when you use open_mfsdataset on a set of files that do not have aligned dimensions / coordinates. I would also double check that, just by looking carefully at ds and the individual files that go into it.

Thanks Ryan. This is very helpful! Tough, I think I have come away with my own rule of thumb: If Ryan needs to work through a number of iterations to make this work and finds it challenging, then who am I? (and better to resort to coding this in manageable chunks if I need to get something done rather than experimenting). There seem to be just way too many permutations of cluster configuration, chunking,e etc. to warrant the effort, unless it’s for learning purposes. Since I’ve invested that time and I’m curious, I might as well try to get a little further:

My data is on disk appears to chunked by “month” length in time and has no chunking on the lat,lon dimensions. Since I wrote the daily averages, I guess I could change that.

I did find one problem that makes a huge difference. A subset of the files had an additional variable “utc_date”.

ds=xr.open_mfdataset(path,parallel=True,engine='h5netcdf',drop_variables='utc_data')

now loads the entire 73 year data set fairly quickly. Your “benchmark” command :slight_smile:

ds.mean(dim="time") 
ds.compute()

finishes in about 2 minutes on 8 workers with 10GB each. No rechunking just letting xarray/dask do whatever it thinks best. That’s pretty cool.

daily_avg=ds.groupby('time.dayofyear').mean(dim='time').to_netcdf('daily_avg.nc')

Runs for about 25 minutes and looks almost finished but then locks up the system. Ok on to flox.

method = "cohorts"
tpm = flox.xarray.xarray_reduce(ds.VAR_2T, ds.time.dt.dayofyear,  func="mean",  method=method)
tpm.to_netcdf("daily_avg_flox.nc")

Finishes in 8 minutes. The flox documentation https://flox.readthedocs.io/en/latest/user-stories/climatology.html has some stuff about optimizing this further but this is good enough for me.
Now I am stuck on how I can calculate my anomalies though. When I do

ds=xr.open_mfdataset(path,parallel=True,engine='h5netcdf',drop_variables='utc_data')
tmp=xr.open_dataset('daily_avg_flox.nc')
anom=ds.groupby("time.dayofyear") - tpm 

Executing this cell in my Notebook just shows the kernel as busy but the Dask monitor does’t show any tasks or any progress. Load on the system goes way up and the the total memory usage goes up to the maximum available on the system (96). Appears to be computing this without DASK delay? Totally confused now.

1 Like

On recent Xarray (see discussion), we do this by taking tpm then “expanding” it to match the size of ds along the time dimension and then run the subtraction. In this case, this approach is catastrophic because tpm is eagerly loaded into memory and then “expanded”.

I guess the old for-loop over groups would do much better memory-wise but be slow.

Can you try chunking tpm using xr.open_dataset('daily_avg_flox.nc', chunks={"day": 1}) or something like that. I choose one because ds is daily averaged and you’re subtracting the daily climatology.

A screenshot of ds with the repr for VAR2T expanded would help.

Thanks Deepak

I guess doing looping over each file computing the sum/mean and then then doing a second loop to subtract the mean from each file would have been a lot less effort in hindsight, but that’s what motivated my initial post, how does one predict how much effort it will be to make this work with simple code?

I tried your suggestion

tmp=xr.open_dataset('daily_avg_flox.nc', chunks={"day": 1})
anom=ds.groupby("time.dayofyear")-tmp

DASK task list doesn’t seem to be progressing.

Screenshot of the dataset

Axel

Update. I think I got it. Thanks to everybody who helped. Here is what I do:

import dask
import logging
import flox
import flox.xarray 
import xarray as xr

from dask.distributed import Client 
client = Client(memory_limit='30GB',n_workers=8) # #even though I only have 96GB total, giving each worker enough, seems to work and the total use never goes above 60GB

path="/data/axel0/era5/ncar/daily/inst/*/2T/*" # 73 daily averaged global  files (100+GB)
ds=xr.open_mfdataset(path,engine='h5netcdf',drop_variables='utc_date',parallel=True)
ds=ds.convert_calendar("noleap") # gets rid of Feb 29 days, seems to be the simplest way. Not sure if it matters for performance
# use flox to compute daily averages using xarray syntax
method="cohorts"
ds_daily_avg = ds.groupby('time.dayofyear').mean(dim='time',engine='flox',method=method)
### smooth the daily averages
ds_daily_avg=ds_daily_avg.pad(dayofyear=15, mode='wrap').rolling(center=True, dayofyear=31).mean()
ds_daily_avg=ds_daily_avg.isel(dayofyear=slice(15,16+364))
# compute daily anomalies
anom=ds.groupby("time.dayofyear")-ds_daily_avg
maxv=anom.max(dim="time")
%time maxv.to_netcdf("maxv.nc")

Runs in 6 minutes on my system (96GB, 16 Cores) which is perfectly fine (not accounting for the time I spend to get there).
Now I just need to figure out how to get which dates, the maxv at each location corresponds to, preferably in the same pass through the anomalies as calculating the max…

Making sure the input files are consistent, no extra varables, given the workers enough memory (with total more than the system had!), and flox seem to have been the solution. Things learned, though my confidence that what I learned translates to the next case remains limited.

Thanks again
Axel

1 Like

I’m glad we were able to help you make progress!

I also really take your point that the user experience here is a far from easy. An ideal tool would simply “just work” for any data size, any complexity, utilizing the available compute resources to their maximum potential. This is unfortunately not where Xarray + Dask are today. There is still a lot of work to be done!

I guess the key question is, how easy do we expect it to be? What other tools out there can take 100 GB of data spread over 73 netCDF files (with inconsistent coordinates) and compute the quantity of interest (the maximum daily air temperature anomaly at each location)? The options I see are:

  • Roll your own lower level data processing code, e.g. manually load each file in a for loop, code up the aggregations yourself, etc. This seems to be the default fallback you had in mind if Xarray / Dask did not work. If you spend 5 days tweaking an Xarray / Dask calculation to actually work, this option no longer seems so bad.
  • NCO - did you try it?
  • CDO - did you try it?
  • Xarray Beam (ERA5 climatology example)

I think this particular problem involving climatologies is especially nasty and has no easy solutions. In the meantime, it’s an excellent use case for benchmarking.

My takeaway from many similar interactions over the past few years is that people love the Xarray API (the code you write is really simple and intuitive) but people hate debugging crashing Dask clusters, picking chunks, tweaking worker memory, etc. There have been some big recent improvements to the Dask scheduler, and I’m curious whether you’re running the latest Dask version. But these are not a silver bullet. Exploring alternative execution frameworks is a main focus of the New Working Group for Distributed Array Computing. Hopefully some insights and new approaches will come out of that.

1 Like

Thanks Ryan. Interesting article. First some specific answers:
I didn’t try NCO/CDO. While I use those tools at times, for specific tasks, figuring out how to do something more complicated tends to devolve into some pretty cryptic syntax, and unless I find the solution somewhere on the web, I tend to not venture too far. Wasn’t aware of Xarray-beam. For this particular, relatively simple, problem, the choice was really to just loop through the individual files and aggregate the info, or let xarray/dask do the work.

I did use dask version 2022.12 which should have the fix you mention.

I think your rule of thumb of having at least as much memory as the problem would have been a good guide here to go the “break down the problem” route rather than trying to make DASK work (though I did learn a few things).

I can’t speak to technological solutions to this problem. In my career, and I’m old enough to have punched cards to enter programs, I’ve been on “bleeding” edges a number of times. At times, reverting to existing technologies/approaches was the better way, other times suffering through was better. I remember trying to get data out of NASA pre-EOSDIS system (ISCCP data center) which supposedly allowed you to subset data and write them to tape to be shipped. But was so slow that I wound up getting somebody to just copy the entire data onto a 50 or so 9 track tapes, and ship that way. By comparison, things are so much easier now and what you are all doing with Pangeo is great.

Somebody has to be at that “bleeding” edge to move things forward, given that there is a community that builds the tools, and provides some support, plus being able to search for solutions, makes that infinitely easier than it used to be.

As the article describes, with these larger problems, figuring how to provide community support is harder, because the “minimally failing example” is difficult to provide without being able to share the full data. I suppose if we all moved into interconnected cloud spaces, that problem would be easier? While various funding agencies seem to in part moving into the direction of moving data centers and users into the cloud, I suppose that transition will take another decade+. In the meantime trying to provide stable shared platforms for “adventurous” users to work on and that allow “supporters” access to the same data, will be good. I understand you guys have been doing this with Pangeo and it’s probably my own fault that I didn’t have the problem ready on a Pangeo server (I did have an account once but then I think I got booted off when the grant supporting it ended).

Best
Axel

4 Likes

In this case, this approach is catastrophic because tpm is eagerly loaded into memory and then “expanded”.

This particular foot-gun is now fixed on main by