Hi all,
Problem:
I am attempting to merge a bunch of individual DataArrays that represent various rectangles spatially around the globe. The catch is that many domains overlap or are nested within one another (although any given grid cell will only have one value across all rectangles). I’ve tried many approaches, and keep running into memory issues. I’m really running out of ideas here and hoping someone from the Pangeo community has some keen insight here!
Data Description:
- I have 25 large regions that are each about 30GB in size, stored as Zarrs with a chunk scheme of
{'time': 50, 'lat': -1, 'lon': -1}
. The total dimensionality is 42,340 time steps, with variable lat/lon sizes of O(100). This leads to about 30-50MB chunk sizes. - I have another 55 individual small rectangles, also chunked under
{'time': 50, 'lat': -1, 'lon': -1}
. These are calculated as a byproduct of the previous step, and have a total dimensionality of 42,340 time steps, with variable lat/lon sizes of O(10). These are anywhere from 100MB - 5GB in total size, and under this chunking scheme, we have really small chunks of anywhere from KB scale to maybe 15MB max.
Workflow:
- This workflow is run on a memory-optimized EC2 instance with 96CPU and 768GB of RAM with a dask
LocalCluster()
running in batch. - I do a series of calculations to derive the 55 small rectangles (forgive the vagueness here, as this is proprietary code). I can get through this calculation, and can even calculate all of these overlaps into memory (~200GB) and then rechunk them. I’ve done this as a test to simplify the upstream dask graph before the merge, wondering if this was causing issues. We can of course keep this series of computations out-of-memory, as it is mostly simple algebra.
- In the final step, I need to merge all of the small rectangles and regions (80 in total) into a global dataset. At any grid cell, there is only one value across the 80 DataArray stack (the rest would be NaN). However, plenty of these DataArrays overlap in space, with some nested within others. Any overlapping areas would have NaNs, except for one of the rectangles.
- Since there is nesting and overlapping,
xr.combine_by_coords
doesn’t work with a monotonic coordinate error. - The resulting merge of all of these is size
(42340, 1471, 3600)
and 800GB at float32. I had chunked everything upstream with the full-space chunks and 50 time step chunks because of this. This was to ensure consistent chunksizes across all rectangles so that when the merge happens, we don’t have to do any interactive rechunking or chunk unifying. Full-space chunks would make sense as we are merging in space. The merge then natively leads to a global dataset with chunk sizes of{'time': 50, 'lat': 1471, 'lon': 3600}
of size 1GB for chunks for a float32 data type. A little on the large side, but any smaller time chunks upstream and our small rectangles will have really small KB chunks across the board.
- Since there is nesting and overlapping,
What I’ve tried:
Across all cases, I can load in all of the core datasets out-of-memory and derive all of the small rectangles. Upon the final merge, I get memory crashes on this huge machine. I’m keeping this merge out-of-memory, then pointing to an S3 location to save out a Zarr.
- Compute the smaller rectangles into memory to simplify their task graph (lots of operations upstream on small chunks), then rechunk into the
50, -1, -1
chunking scheme. This stores about 200GB into memory. After trying to merge in the next step the cluster crashes from memory errors, despite essentially entering this merge with a clean task graph. - Use
xr.merge(datasets, compat='no_conflicts')
, since we shouldn’t have any grid cell with more than one non-null value. Everything going into this is still dask arrays. It seems to take a very long time to even execute this command out-of-memory. Then the cluster hangs for 20-30 minutes with minimal calculation and crashes. This happens whether or not I do step (1) first. - Try
xr.concat(datasets, dim='dummy').sum('dummy')
. The thinking here is that doing concat and sum all out-of-memory might be simpler than whatever the merge algorithm is doing. However, this creates an intermediate ~60TB dataset with all 1GB chunks. This runs for awhile, then crashes the cluster. - Try doing this merge in smaller time batches. E.g. do about 1000 time steps at a time, calculate that whole batch into memory across all 80 rectangles, merge in memory, then chunk and save as Zarr. In the end, we’d just bring in all of the locally saved Zarrs and combine it all into a remote save of the full dataset. This was working, but looked like it would take 20-40 hours to execute. Seems like a super brute-force, naive approach for the machinery we have here.
- A variety of different chunking strategies upstream. I’ve done full-time chunks with variable space, full-space chunks with variable time. I’ve settled on the current scheme for reasonable upstream chunks with no rechunking required, and somewhat reasonable downstream chunks (although 1GB seems a little big).
- I’m now pointing it at a serverless Dask cluster with the ability to scale up to a multi-TB cluster. Early testing of the previous combinations has lead to this thing crashing too.
Any guidance here would be immensely helpful! I’m wondering if I need to create the original objects with chunking more like {'time': 10, 'lat': -1, 'lon': -1}
, which would lead to globally merged chunks of 200MB. Or maybe I’m missing something here on more efficient merging strategies.