Optimizing climatology calculation with Xarray and Dask

Sorry my fault! I just love the BIG DATA! :upside_down_face:

Update on the rechunked version

Rechunking Configuration

import zarr
import os
from rechunker import rechunk
import gcsfs

url = f"{os.environ['SCRATCH_BUCKET']}/ERA5_HiRes_Hourly.zarr"
zg = zarr.open_group(url)

fs = gcsfs.GCSFileSystem(skip_instance_cache=True, use_listings_cache=False)

temp_path = f"{os.environ['SCRATCH_BUCKET']}/ERA5_HiRes_Hourly_rechunk/temp.zarr"
target_path = f"{os.environ['SCRATCH_BUCKET']}/ERA5_HiRes_Hourly_rechunk/target.zarr"

temp_store = zarr.storage.FSStore(temp_path)
target_store = zarr.storage.FSStore(target_path)

target_chunks = {
    'tp': (7305, 103, 10),
    'time': None,
    'longitude': None,
    'latitude': None,
}

max_mem = '8GB'

r = rechunk(zg, target_chunks, max_mem, target_store, temp_store=temp_store)

# done with a 20 worker dask cluster w/ 40 GB memory each
r.execute()

This takes a long time (like an hour), but it is pretty stable and reliable.

Now do flox groupby with map-reduce

I open the data and specify even longer chunks in time such that the time axis is totally contiguous.

target_path = f"{os.environ['SCRATCH_BUCKET']}/ERA5_HiRes_Hourly_rechunk/target.zarr"
dsr = xr.open_dataset(
    target_path, engine="zarr", consolidated=False,
    chunks={'time': -1, 'latitude': 103, 'longitude': 10}
)

The groupby now executes flawlessly with map-reduce.

method = "map-reduce"
tpmr = flox.xarray.xarray_reduce(dsr.tp, dsr.time.dt.hour, 
                                func="mean", 
                                method=method)

I used the same configuration with 40GB of memory per worker, but the memory usage never rose above about 10GB per worker, meaning I could have got by much cheaper

This is what my dask task stream looked like. It took about 4 minutes to do 678 GB of data. :sunglasses:

So I think this illustrates two key takeaways.

Neither of these packages existed two years ago. I am tempted to say we have nailed it. :rocket: However, I think there is more work to do to make rechunker faster and more memory efficient.

@fmaussion - you should try rechunking the data on your HPC system and see how it goes.

1 Like

Well with that chunking you can use method="blockwise" to save some time and tasks :slight_smile:

what happens with

xr.open_dataset(
    target_path, engine="zarr", consolidated=False,
    chunks={}
)

and method="map-reduce". I’d hope it’s just as performant. If not, it’s down to distributed’s scheduling.

1 Like

I am currently crunching through 20 years of random hourly data, chunked somewhat like the initial dataset, on my laptop (16GB RAM) !!!

with “map-reduce” and it works. It’s slow but memory is stable. What’s interesting is that groupby_nanmean-chunk (first row under Progress) isn’t fused with random_sample. But that initial blockwise reduction still is executed frequently enough for memory to be stable.



So I still think there’s something about the I/O that’s making things worse in the cloud. Maybe setting inline_array=True will help?

These are my software versions:

pandas: 1.3.5
xarray: main
numpy : 1.21.5
dask  : 2022.3.0

This is how I set up distributed:

from dask.distributed import Client

# Setup a local cluster.
# By default this sets up 1 worker per core
client = Client(memory_limit="3 GiB", threads_per_worker=1, n_workers=4)
client.cluster

so mimicking what I would do with netCDF in a HPC setting


EDIT: This did end up finishing though my laptop went to sleep in the middle:

2 Likes

Nice Deepak! I tried again with map-reduce using my original 20 year data in chunks of {'time: 24}. I opened it with larger chunks {'time': 480} , which seemed to make a big dfference.

This ran in 2:30 on my 20 node dask cluster with 40GB of memory per worker! That seems amazingly fast! As you can see, the dask cluster is pretty happy.

However, you can also see that there are plenty of memory spikes reaching up to 16GB or more. I was very happy to have the memory headroom of 40GB per worker. In my experience, once the workers feel memory constraints and start spilling to disk, it’s over.

:clap: :clap: :clap: on flox. This was my first time really doing a deep dive. It’s remarkable.

1 Like

Hi everyone, I’m following this thread, and just wanted to say something about this part: this is something that has been seen several times. Memory management on a LocalCluster with few worker processes is often better than with a large cluster on the Cloud or on a HPC. Others here have probably also noticed this.

See for example:

Maybe if we could understand where this comes from, what are the key differences, Dask scheduling could be improved:

  • Does this comes from the fact that Object stores and HPC Systems bandwith is so high that too many blocks are read before Scheduler can start dependant tasks?
  • Does this comes from the fact that there are too many workers to handle for the scheduler and dependent tasks are not updated fast enough?
1 Like

Thank you all for jumping in, this is really awesome!

I can’t concur to this. My original notebook attempts to process 12 files (one for each month) of global hourly data. This sums up to 18,2 GB on disk and 33.88 GiB in memory (the data is lossy compressed as int16, which is likely to cause I/O issues).

The premise is to see it we can use a reasonably sized laptop (4CPUs, 16Gb memory) to work this through. My personal summary so far:

  • averaging the full dataset using dask on laptop works OK, with some memory spills to SWAP
  • groupby(‘time.hour’) does not work even with small chunks of with flox
  • the reason for both issues is very likely an I/O issue with netcdf data, since Deepak managed to get the computations done without I/O.

The fact that it works that well on Ryan’s cluster with the full 20 years of data is truly a feat! When I’ll have the time I’ll convert the data to zarr to see if it makes things better on laptop as well.

1 Like

@rabernat Yeah! that’s what I expected to see. 24 was just a bad choice =) (See the flocking number C/T below haha)

Basically for time grouping where groups are periodic with period T, you want chunksize C > T and use “map-reduce”, or C < T and use “cohorts”. If C~T then it’s just bad memory wise (can we call C/T the flocking number)

I bet we could improve the temporary memory spikes but it’s a little hard. I think the best way to proceed would be to build out numbagg’s grouped reductions, so we benefit from numba avoiding memory copies.

@fmaussion your experience is very confusing! It really should work really well.

1 Like

@geynard your experience is really interesting. I think this is starting to make sense. I’ve been using LocalCluster and assuming that its heuristics are the same as dask_jobqueue clusters but perhaps not! It might explain why I see good results on LocalCluster with laptop, and things are worse on the cloud/HPC. It would be interesting to experiment with inline_array=True in to_zarr.

1 Like

Hi :grinning:, rabernat! Can you tell me how to get the number of tasks? In my jupyter lab, it shows dask graph. Any relationships between tasks and dask graph? Or tasks = chunks * graph layers ?

Dear all,

The reactivation of this old thread makes me discovered it. I found the topic very interesting. I wonder whether this “exercise” has become a sort of benchmark.

When I read the walltimes, I’ve been shocked. The best performance, reported by Ryan, was 15 minutes to compute the climatology using 7274 days, from datetime(2000, 1, 1) to datetime(2019, 12, 1). I found it slow. The required memory O(800GB) is indeed ridiculous given the size of the output array (721, 1440).

I decided to do the same computation but with a completely different approach, not relying on pangeo + dask, but just using plain basic Python. Okay it’s bad, but it is interesting because it somehow sets a bar on what could/should be achieved by the pangeo+dask suite.

I used 28 threads, corresponding to number of cores of one node of our local cluster. The ERA5 database is archived locally, next to the cluster. Verdict: 6 minutes and 35 sec to compute the climatology. The memory footprint was 28 times the 2D array size (I used Float64), namely : 28 * 721 * 1440 * 8 / 1e9 = 0.2 GB. Memory should not be an issue for this kind of computation, and very likely for most of basic diagnostics.

PS: with 56 threads (2 threads per core) : 4 minutes 13 seconds

I don’t understand, the best time is 2m30s here:Optimizing climatology calculation with Xarray and Dask - #25 by rabernat

Things have also changed dramatically in Xarray, flox, and dask in the mean time.

Seconding what @dcherian said…the work we have done in this forum to define common workloads and identify areas for performance improvements has lead to massive upstream improvements in Xarray, Flox, Dask, Zarr, etc.

If you’ve been frustrated with this computations in the past, I strongly encourage you to check out the lastest versions of these packages and see for yourself how things have improved.

Thanks, yes, indeed, lots of progresses have been made. Congratulations.