Season’s Greetings!
I’ve recently been processing some fairly large model output from sub-daily resolution to daily resolution. I have 3 different spatial resolution. With the highest-resolution data i really run into a processing bottleneck. This is the display of the zarr store containing two variables to reprocess to daily which cause the bottleneck
Main points:
- this data set is not something to hold in memory
- daily averages only change the temporal dimension (8:1) but not the spatial dimension (so this is going to be embarrassingly parallel in space)
To me this just shouts map_blocks with to_zarr(region) on each block/region!
I’ve followed 3 mains approaches to the problem, without any breakthroughs or major successes. There were several variations on these but here’s the summary of the 3 approaches: a) things I tried, b) what happened, and c) my a priori expectations when trying to process just a modest (~6000 x ~4000) spatial sub region of one the above variables (times below are for processing a single variable, using generally ~20 workers, though the number of workers above this amount didnt have a huge impact).
1
I “rolled my own” naively without map_blocks: ~40hours to compute. This may not be the worst thing, but the order of my computations was not really optimized when I reviewed these in the end. Map blocks helps lay out (and check) all the relevant input and output sizes, and it therefore seems to aid in ordering the computations correctly compared to going it alone. When looking at the dask workers in the dash board, the computations have lots of holes, like swiss cheese. This further suggests potential optimization.
2
I put to_zarr() inside the function passed to map_blocks and returned a dummy value of 1 for each output chunk: this was super flaky and unreliable, sometimes it would work, but sometimes promised keys would go missing and there would be timeouts. A priori, this would be the best approach (and maybe i need to revisit it when my cluster is not having a bad day? I’m not sure it was or was not having a bad day). But it was basically random if my 9 output chunk test dataset would successfully complete. Any thoughts about what is going on here? I frankly dont remember much about the activity of the workers in time because it was too erratic.
3
Loop over subsets/regions of the input data and call map_blocks on these subsets, then write the returned data to_zarr(region=): this completed in about ~36 hours. This is stable, but not much more efficient that #1. More details of the approach: outside the loop, identify a full-space slice of time to process on the input, and then do (without computing)
input
.isel(time=time_in_slice)
.resample(time="1D").mean(dim='time')
.chunk(dict(
time=time_out_chunk_size, x=space_chunk_size, y=space_chunk_size))
.to_zarr(compute=False, append_dim='time')
Then for each separate spatial chunk of this temporal subset/slice, do
map_blocks(
avg_daily,
input.isel(time=time_in_slice, x=x_slice, y=y_slice)
).load().to_zarr(region)
The dask dashboard looks great when reading and computing daily averages and rechunking, very tight block across all workers. But between those dask operations and the to_zarr(region) part (which is nicely parallel), there is a very large gap without any dask activity. Some notes: When doing this, i have to load in: xr.map_blocks().load().to_zarr() or the code fails. My additional observations about this are very anecdotal, but they suggest that the gap might be in arranging the output along the coordinates: the gap between processing the input in dask and writing to_zarr slows down if I try to process more than a few output time chunks, ie if the subset I’m taking is long in time then I pay additional overhead. It also appears that if I add multiple dimensions to what I’m passing to map_blocks(), the slow down is worse (that is when x_slice and y_slice are more than single chunks in those dimensions).
I’m happy to go any direction that makes sense with this and provide more details, I’m just not sure what the best direction is. Please let me know what you think and I’m happy to add more.
A few additional notes:
I’ll note that I think the input is reasonably well chunked for the task at hand, but I could be wrong about that.
The exercise with to_zarr(append_dim, compute, region) in this thread
is extended somewhat. I’ve found that when doing to_zarr(compute=False) on large output data sets is very slow and does not apparently scale well. It is much more efficient to slice the output in time and loop to_zarr(compute=False, append_dim=‘time’) to generate the full uncomputed zarr store. Then you can write to this by region later (and in parallel)
My final solution was to run #3 with another level of parallelization at the script level. I define “team_members” 0-5 and pass a team member id to each script which identifies which temporal chunks to process for each member. This works nicely and I get the processing done in ~6hours. but that’s just brute force!