Map_blocks and to_zarr(region=)

Season’s Greetings!

I’ve recently been processing some fairly large model output from sub-daily resolution to daily resolution. I have 3 different spatial resolution. With the highest-resolution data i really run into a processing bottleneck. This is the display of the zarr store containing two variables to reprocess to daily which cause the bottleneck

Main points:

  • this data set is not something to hold in memory
  • daily averages only change the temporal dimension (8:1) but not the spatial dimension (so this is going to be embarrassingly parallel in space)

To me this just shouts map_blocks with to_zarr(region) on each block/region!

I’ve followed 3 mains approaches to the problem, without any breakthroughs or major successes. There were several variations on these but here’s the summary of the 3 approaches: a) things I tried, b) what happened, and c) my a priori expectations when trying to process just a modest (~6000 x ~4000) spatial sub region of one the above variables (times below are for processing a single variable, using generally ~20 workers, though the number of workers above this amount didnt have a huge impact).


1
I “rolled my own” naively without map_blocks: ~40hours to compute. This may not be the worst thing, but the order of my computations was not really optimized when I reviewed these in the end. Map blocks helps lay out (and check) all the relevant input and output sizes, and it therefore seems to aid in ordering the computations correctly compared to going it alone. When looking at the dask workers in the dash board, the computations have lots of holes, like swiss cheese. This further suggests potential optimization.

2
I put to_zarr() inside the function passed to map_blocks and returned a dummy value of 1 for each output chunk: this was super flaky and unreliable, sometimes it would work, but sometimes promised keys would go missing and there would be timeouts. A priori, this would be the best approach (and maybe i need to revisit it when my cluster is not having a bad day? I’m not sure it was or was not having a bad day). But it was basically random if my 9 output chunk test dataset would successfully complete. Any thoughts about what is going on here? I frankly dont remember much about the activity of the workers in time because it was too erratic.

3
Loop over subsets/regions of the input data and call map_blocks on these subsets, then write the returned data to_zarr(region=): this completed in about ~36 hours. This is stable, but not much more efficient that #1. More details of the approach: outside the loop, identify a full-space slice of time to process on the input, and then do (without computing)

input           
.isel(time=time_in_slice)
.resample(time="1D").mean(dim='time')
.chunk(dict(
    time=time_out_chunk_size, x=space_chunk_size, y=space_chunk_size))
.to_zarr(compute=False, append_dim='time')

Then for each separate spatial chunk of this temporal subset/slice, do

map_blocks(
    avg_daily, 
    input.isel(time=time_in_slice, x=x_slice, y=y_slice)
).load().to_zarr(region)

The dask dashboard looks great when reading and computing daily averages and rechunking, very tight block across all workers. But between those dask operations and the to_zarr(region) part (which is nicely parallel), there is a very large gap without any dask activity. Some notes: When doing this, i have to load in: xr.map_blocks().load().to_zarr() or the code fails. My additional observations about this are very anecdotal, but they suggest that the gap might be in arranging the output along the coordinates: the gap between processing the input in dask and writing to_zarr slows down if I try to process more than a few output time chunks, ie if the subset I’m taking is long in time then I pay additional overhead. It also appears that if I add multiple dimensions to what I’m passing to map_blocks(), the slow down is worse (that is when x_slice and y_slice are more than single chunks in those dimensions).


I’m happy to go any direction that makes sense with this and provide more details, I’m just not sure what the best direction is. Please let me know what you think and I’m happy to add more.

A few additional notes:
I’ll note that I think the input is reasonably well chunked for the task at hand, but I could be wrong about that.

The exercise with to_zarr(append_dim, compute, region) in this thread

is extended somewhat. I’ve found that when doing to_zarr(compute=False) on large output data sets is very slow and does not apparently scale well. It is much more efficient to slice the output in time and loop to_zarr(compute=False, append_dim=‘time’) to generate the full uncomputed zarr store. Then you can write to this by region later (and in parallel)

My final solution was to run #3 with another level of parallelization at the script level. I define “team_members” 0-5 and pass a team member id to each script which identifies which temporal chunks to process for each member. This works nicely and I get the processing done in ~6hours. but that’s just brute force!

1 Like

Hi @Jamesmcc

I don’t know about the to_zarr bits but you should be able to avoid map_blocks with
flox (please double check the output though)

import flox.xarray

ds =  xr.tutorial.open_dataset('air_temperature', chunks={"time": 24*3})
flox.xarray.resample_reduce(ds.resample(time="1D"), func="mean")

This will be equivalent to your map_blocks thing but will avoid the per-chunk overhead that comes with map_blocks (you have so many chunks!). I’m hoping the decrease in number of tasks is enough to make things work smoothly.

1 Like

My understanding is that to_zarr(compute=False) still computes the dask graph to write the zarr store, which can take a really long time. This is not actually necessary when all you’re wanting to do is initialize the empty zarr store. You could hack the code from xarray.backends to write the empty store without computing the dask graph. E.g. something like the following should write the empty store almost instantly even for very large datasets (note, I have not tested the following and have no idea how robust it is):

def write_zarr_metadata(dataset, output_file):
    """ Write zarr metadata without computing the dask graph.
        Stolen and adapted from xarray.backends:
        https://github.com/pydata/xarray/blob/6a29380008dcd790f9adfbc290affcb767c913b2/xarray/backends/api.py#L1309
    """

    from xarray.backends.common import ArrayWriter, _normalize_path
    from xarray.backends.api import _validate_dataset_names, _validate_attrs, dump_to_store
    import xarray.backends as backends
    
    # expand str and Path arguments
    output_file = _normalize_path(output_file)
    
    mapper = output_file
    encoding = {}
    mode = "w-"

    _validate_dataset_names(dataset)
    _validate_attrs(dataset)
    
    zstore = backends.ZarrStore.open_group(
        store=mapper,
        mode=mode,
        consolidated=False,
        consolidate_on_close=True,
        stacklevel=4,
    )

    writer = ArrayWriter()
    dump_to_store(dataset, zstore, writer, encoding=encoding)
1 Like

@dougiesquire Thanks for this suggestion. I had guessed that the wait was for the graph and it also seems that the time to calculate the graph grows faster than the size of the data. Honestly, this piece of the problem is easy, though it creates unnecessary code. I guess it’s a question if this should be a change or and additional option to write_zarr? I might be tempted to try your proposed solution next time I’m dealing with this and see if that has a path back to xarray.
Cheers, James

Thanks @dcherian! I’ll let you know when I get around to trying this.

FYI, I’ve opened an issue with xarray to see if there is interest in adding an option/method to initialise zarr metadata without computing the dask graph: Initialise zarr metadata without computing dask graph · Issue #6084 · pydata/xarray · GitHub

3 Likes

The above seemed to work if I took out stacklevel - an older version thing maybe in my particular environment.