Understanding Async

I don’t understand the results I’m getting reading a Zarr dataset with s3fs=0.4.2 (non-async) vs s3fs=0.5.1 (async).

TL;DR: The dataset opens much faster with async, but when actually reading the data (with 20 workers) there is no significant speedup.

Here are the two notebooks:
s3fs=0.4.2 : https://nbviewer.jupyter.org/gist/rsignell-usgs/edec88157437523155cc27ca68f421c3
s3fs=0.5.1 : https://nbviewer.jupyter.org/gist/rsignell-usgs/cd7abe76904099fdda66f0553ba5e8fc

To open this National Water Model Zarr dataset with consolidated metadata in Xarray, it takes 1min 7s with the old s3fs, and only 6.7 s with the new s3fs async (10x faster!). I was thinking this must be because it loads the coordinate chunks faster, but the variables containing the coordinate data don’t have multiple chunks. So that’s the first thing I don’t understand.

When we get to actually reading the data with a cluster of 20 workers (older 0.6.1 version of Dask Gateway cluster on qhub), there is virtually no difference between the read times. For the big computation of the mean river transport over a year (13,000 tasks or so), the times are not significantly different (90 s vs 92 s). I would have expected some speedup, so that’s the second thing I don’t understand.

@martindurant, I imagine you have some ideas here…

2 Likes

async will give a speed-up if multiple chunks are being read at once (i.e., the dask partition is larger than the zarr chunksize by some factor), and that the latency/overhead of each request is a significant fraction of the overall time. Once you are in the regime that you were bandwidth limited anyway, as opposed to waiting, then async doesn’t help.
I am not sure exactly what happens during zarr open from xarray.

1 Like

@martindurant, so just to make sure I understand: async should typically help when we encounter a dataset in object storage with lots of little chunks (like COG with 512x512 chunks) but not so much when we read datasets with 100mb chunks like we recommend, right?

1 Like

Correct - which is exactly the origin or the recommendation for >=100MB chunks in the first place. async lessens the overhead for smaller chunks (so long as they are loaded concurrently, meaning a dask partition containing many chunks), so you could now choose to chunk to smaller sizes than before, which can be very convenient in some cases, or merely accidental in others.

3 Likes

One consequence of the async capability is that there is less of a performance penalty for using smaller Zarr chunks. You still want to use Dask chunks of ~100MB for large compute jobs. But the underlying Zarr chunks can be smaller.

Consider this dataset

from intake import open_catalog
cat = open_catalog("https://raw.githubusercontent.com/pangeo-data/pangeo-datastore/master/intake-catalogs/ocean/channel.yaml")
ds  = cat["channel_ridge_05km_float_run"].to_dask()
ds

These chunks are considered “too small” by our old standards. If we use the automatic chunks, we would get too many tiny dask tasks. Instead, we can do this

ds2 = cat["channel_ridge_05km_float_run"](chunks=None).to_dask()
ds2 = ds2.chunk({'time': 160})
ds2

There is no real penalty for this, because gcsfs will now internally load the small chunks asynchronously inside each task.

%time _ = ds2.ETAN.data.compute() 
CPU times: user 2.07 s, sys: 1.11 s, total: 3.18 s
Wall time: 2.54 s

However, if we just want to read one small, piece, we may be better of with the origin chunking of ds, or no chunks at all.

2 Likes

I should also note that there is still a penalty for using many small chunks. This requires more API calls (which costs money), and is harder to manage / move / delete. It’s just not a performance penalty.

I should also note that there is still a penalty for using many small chunks. This requires more API calls (which costs money), and is harder to manage / move / delete. It’s just not a performance penalty.

This is one area where it seems like COG (single object but capable of HTTP GET Range Requests) is fundamentally different to ZARR (Many objects each obtained with a single GET, perhaps asynchronously).

But couldn’t Zarr objects also support range requests (maybe they already do), where requests for adjacent bytes are merged? For example, you can ask for 1MB of a 100MB object and that is 1 GET request, or ask for all 100MB and it is still 1 API call because you just ask for the entire byte range. For COGs this behavior can be tuned with GDAL HTTP environment variables https://trac.osgeo.org/gdal/wiki/ConfigOptions#GDAL_HTTP_VERSION

1 Like