Ryan’s post does a good job describing Dask’s issues with this. I’ll summarize them, and then provide a potential solution.
The basic idea is to help out Dask by doing the rechunking in two passes:
- A split only pass, which rechunk on the dimensions you aren’t merging (
1and2in this case). This output is written to disk - A merge pass, which reads the output from step 1 and merges the merge dimension (
0ortimein this case)
With this, we should be able to efficiently repartition larger than memory datasets on a cluster (see some caveats at the end).
Let’s generate a random dataset that’s chunked along the first dimension (think time) and write that to disk (using Zarr, but everything I’m talking about here applies equally well to HDF5). This represents the raw data as handed to us; we don’t have any control over it.
import dask.array as da
a = da.random.random((4, 4000, 4000), chunks=(1, 4000, 4000))
da.to_zarr(a, "/tmp/data/original.zarr", overwrite=True)
Read it into a Dask Array:
data = da.from_zarr("/tmp/data/original.zarr")
data

For analysis, we’d love to merge the first dimension (time). To keep the blocks of reasonable size we’ll also split the other dimensions. We’ll rechunk such that the size of each block is the same.
desired = data.rechunk({0: 4, 1: 2000, 2: 2000})
desired

Inspecting this task graph shows the problem from Dask’s point of view: a global communication pattern where each output chunk depends on each input chunk.
desired.visualize()
As written, our workers would need a lot of memory to finish this job, at least without a lot of spilling to disk. And Dask tends to struggle with workloads that push the memory limits of the cluster, leading to crashing workers and sad users.
A two pass solution
We can work around these issues in Dask by processing the data in two passes
- A split only pass, which rechunk on the dimensions you aren’t merging (
1and2in this case). This output is written to disk - A merge pass, which reads the output from step 1 and merges the merge dimension (
0ortimein this case)
# split only
data2 = data.rechunk({1: 2000, 2: 2000})
# write
data2.to_zarr("/tmp/data/temp.zarr", overwrite=True)
# read output of step 1. Identical to `data2`
data3 = da.from_zarr("/tmp/data/temp.zarr")
data3

# merge pass
data4 = data3.rechunk({0: 4})
data4

data4 has a communication pattern that’s much friendlier to Dask.
data4.visualize()
Each of the 4 rechunk-merge output chunks depends on just 4 input chunks each: the “split” versions of the original chunks. So our peak memory use for this stage is just the size of rechunk-merge output, which we know fits in memory since it’s the size of the original chunks given to us.
That can be written to disk, ready for analysis. At least, that’s the dream. For the single-machine threaded scheduler we actually achieve that. On my laptop I was able to scale this workload up to a 16 GB dataset without issue (finished in ~1 minute. Most of the time the memory usage was ~1.5Gb).
But with the distributed scheduler we run into some issues. Follow https://github.com/dask/dask/issues/5105 for updates, but we potentially end up scheduling (from-zarr, 3,0,0) on a different machine than (from-zarr, 1, 0, 0). That means we need to communicate between machines / processes, which blows up our memory budget. I’m hoping to have a fix for this (by fusing those 4 from-zarr tasks into one, so that the distributed scheduler can’t make the “wrong” choice). If that fails I’ll share a trick to “manually” schedule these, but I hope it doesn’t come to that.
Anyway, at this point we should be able to write data4.to_zarr() to disk in the analysis-ready format.
Notebook at https://gist.github.com/6b548371b60d9b397f24c9464933fad3

