Xarray to Zarr Parallel Writes with Dask Distributed

Hello, while messing with converting NetCDF-4 datasets to Zarr using Xarray, a sudden point of confusion struck me:

I know from reading the Xarray documentation that lazily representing the data as a Dask array allows for easy configuration of chunk sizes, and the writes do not take up vast amounts of memory. This leads me to believe that they are done out of core by default, even when Dask Distributed is not set up as a local cluster on a single machine. I am a little confused with the inner-workings of this conversion.

Without setting up a local cluster, does Dask automatically parallelize the write process when xarray.dataset.to_zarr(...) is called? Would the amount of workers used for the write correspond to the number of computing resources available in the machine (i.e. a single worker per vCPU in a VM)?

For example, what is the difference in what is happening internally between using Dask Distributed:

from dask.distributed import Client, LocalCluster
cluster = LocalCluster(threads_per_worker=1)
client = Client(cluster)

import xarray as xr
import dask
import intake
da = intake.open_netcdf(<URI>).to_dask().<DATA VARIABLE>
da = da.chunk(chunks = dict(zip(<COORDINATES>, <CHUNKSIZE>)))
ds = da.to_dataset()

ds.to_zarr(store=<NEW URI>, consolidated=True)

and just Xarray & a Dask array:

import xarray as xr
import dask
import intake
da = intake.open_netcdf(<URI>).to_dask().<DATA VARIABLE>
da = da.chunk(chunks = dict(zip(<COORDINATES>, <CHUNKSIZE>)))
ds = da.to_dataset()

ds.to_zarr(store=<NEW URI>, consolidated=True)

Mind sending screenshots using viztracer. I find it’s a good tool for seeing all the func calls on a line/script see to_zarr to s3 with asynchronous=False · Discussion #5869 · pydata/xarray · GitHub

1 Like

Hi @jgreen,

your assumption is correct: Dask will also work without explicitly specifying a cluster. If you don’t set up a cluster yourself, dask will spin one up (and just assume that the whole machine is yours) from here and use a heuristic to determine the number of processes and number of threads per cluster.

As the heuristic won’t necessarily result in threads_per_worker=1, your two snippets likely end up working with a differently shaped cluster.

1 Like

A few remarks on

da = intake.open_netcdf(<URI>).to_dask().<DATA VARIABLE>
da = da.chunk(chunks = dict(zip(<COORDINATES>, <CHUNKSIZE>)))
ds = da.to_dataset()

ds.to_zarr(store=<NEW URI>, consolidated=True)

If I’m not mistaken, intake.open_netcdf() is basically an interface to xarray.open_mfdataset(). This should result in da containing a Dask array right away and in that sense is out of core directly.

However, as there is no chunking specified, intake.open_netcdf(<URI>).to_dask().<DATA VARIABLE> will likely consist of one large chunk spanning all of the array. The subsequent da.chunk() call will add some re-chunking tasks to this and hence the final version of da or ds will have the desired chunking.

But the read operation on the netCDF file will always get the huge full-array chunk which was implicitly requested in the initial opening statement. This can have huge performance implications and is probably not what you had in mind. (You’re not alone with this. The open().chunk().do_something().store() paradigm is feeling pretty natural to most people and I’ve seen it hundreds of times.)

To really work out of core on chunks smaller than the full array, you’d need to specify chunks right at the opening step. I’m not too familiar with intake.open_netcdf(), but for the underlying Xarray method, the following should work the way you likely intended this to behave:

da = xarray.open_dataset(
    <URI>,
    chunks=dict(zip(<COORDINATES>, <CHUNKSIZE>)),
).<DATA VARIABLE>
1 Like

And then there’s the internal serialisation of multi-threaded netCDF reads…

Running

print(
    xr.open_dataset(
        <FILE-W-10-TIMESTEPS>,
        chunks={"time": 1},
    )
    .max()
    .compute()
)

with

multithreaded_only_cluster = dask.distributed.LocalCluster(
    n_workers=1,
    threads_per_worker=10,
)

will have 10 tasks for reading the 10 chuks we split the array into in time direction, but these tasks will all wait for each other because netCDF / HDF5 will internally serialise them.

Running it with

multiprocs_only_cluster = dask.distributed.LocalCluster(
    n_workers=10,
    threads_per_worker=1,
)

however, will (on most systems) actually run 10 parallel read ops on the file.

As Dask’s memory management is more stable in the multithreaded world, a mixture of threads and processes is typically the way to go (and hence implemented in the heuristics I’ve linked above).

Thanks @willirath, I appreciate the information! The reason I am using intake.open_netcdf is because I found it very difficult to get the xr.open_dataset(...) engines to work with Google Cloud object storage. The intake module has proven much easier in this regard.

I have the NetCDF4 chunks internally stored at 100MB apiece, and from a previous post I know that the Dask array chunks must be configured to exactly match – or be exact multiples – of the NetCDF internal chunks for an efficient parallel read. So, the chunksize I am specifying for the Dask array comes from intake.open_netcdf(<URI>).to_dask().<DATA VARIABLE>.encoding['chunksizes']

I will try the write using the mixture of threads/processes and implement the heuristic, I’m eager to see what new information I can gather from this. You really cleared things up for me!

@raybellwaves I think I will start to use this tool more often, it looks very helpful in determining what’s going on under the hood. Thanks for pointing me to it!

Couple of clarifications:

  • when you call to_dask on the intake source, you are effectively specifying chunks={} to xarray, which means it will use whatever is the native chunksize and is often what you want. The repr of the resulting dataset will tell you the chunks the underlying dask arrays are using.
  • using dask from xarray will not create a LocalCluster for you, but use an in-process thread pool. That might be fine, but adding client = dask.distributed.Client(...) lets you create a cluster explicitly and control for all of the configurable options.

You may wish to check out pangeo-forge, which provides a mechanism for executing netCDF->zarr workflows from prescriptive recipes.

Also, you might be interested in the kerchunk project, which can give you a zarr-based view on your original netCDF files without a transform&copy, and provide scalable parallel access that way.

2 Likes

when you call to_dask on the intake source, you are effectively specifying chunks={} to xarray, which means it will use whatever is the native chunksize and is often what you want.

I should have clarified, I was mainly using the chunksize option to write to different Zarr chunksizes for some benchmarking I was doing. Though, since I have 3 different NetCDF files with the desired chunking schemes, it seems that I was just adding an unnecessary line of code.

Defining the local cluster sped up my writes by a considerable amount, so I think going forward with that approach may be the best method in this context.