Extremly slow write to S3 bucket with xarray.Dataset.to_zarr

Hello,

I am running into extremely slow runtime when writing xarray.Dataset to the S3 bucket in Zarr format. I am able to reproduce the problem with this code snippet which creates xr.Dataset with 20 xr.DataArray’s. Am I doing something wrong?

zarr documentation mentions that selected compression can significantly affect the runtime. I tried to use Blosc’s lz4 compression but it ran even longer.

Do I need to convert my arrays to Dask arrays before I write to Zarr?

Is there a recommendation on selecting the blocksize for the compression? I don’t provide one, so it’s selected automatically. But I wonder if I should provide one and if it should be in some agreement with the chunking size that I specify in encoding settings for the Zarr?

import xarray as xr
import numpy as np
import s3fs
import zarr

# Create dataset with a bunch of arrays
ds = xr.Dataset()

compressor = zarr.Blosc(cname="zlib", clevel=2, shuffle=1)
encoding_settings = {}

for index in range(20):
    da_name = f'foo_{index}'
    da = xr.DataArray(
        np.arange(250*800*800).reshape((250, 800, 800)),
        name=da_name,
        dims=('t', 'x', 'y')
    )
    ds[da_name] = da
    encoding_settings[da_name] = {'_FillValue': -32767.0, 'compressor': compressor, 'dtype': 'short', 'chunks': (25000, 10, 10)}

# Write to Zarr
s3_out = s3fs.S3FileSystem(anon=False)
store_out = s3fs.S3Map(root='s3://its-live-data/test_datacubes/ds_s3.zarr', s3=s3_out, check=False)

ds.to_zarr(store_out, mode='w', encoding=encoding_settings, consolidated=True)

This takes about an hour to run on EC2 instance within the same us-west-2 region as the S3 bucket resides in.

Thank you for any input or suggestions,
Masha

1 Like

Thanks for sharing Masha! Writing a dataset of this size should take seconds, not hours.

@martindurant - do you have any idea what might be happening here? We have had similar reports of very slow writes with xarray → zarr → s3fs → s3, e.g.

@mlukis has provided a nice minimal reproducible example we could use for debugging this.

Firstly, I notice that with chunksize (…, 10, 10), you will have 6400 small chunks for each index. I would imagine that you are hitting an AWS rate limit hard with this number.

It is puzzling that you found a difference with different compression - I can only think that maybe it is not the compression that is important, but that this was run second and you were being throttled already from the first write because of the previous dataset.

The above is supposition. It would be good to check whether indeed a lot of CPU time is being used for compression, maybe, and turning on s3fs logging to see whether any LIST operations are happening (which would be bad). Since the latter will make things even slower, I would recommend doing it with a smaller subset of the data.

I just tried this on Pangeo Cloud with a little bit smaller example as follows. It seemed to run ok.

import xarray as xr
import numpy as np
import zarr

# Create dataset with a bunch of arrays
ds = xr.Dataset()

compressor = zarr.Blosc(cname="zlib", clevel=2, shuffle=1)
encoding_settings = {}

for index in range(4):
    da_name = f'foo_{index}'
    da = xr.DataArray(
        np.arange(250*800*800).reshape((250, 800, 800)),
        name=da_name,
        dims=('t', 'x', 'y')
    )
    ds[da_name] = da
    encoding_settings[da_name] = {'_FillValue': -32767.0, 'compressor': compressor, 'dtype': 'short', 'chunks': (250, 10, 10)}

import os
store = zarr.storage.FSStore(os.environ['PANGEO_SCRATCH'] + "zarr_write_test_2")
%time ds.to_zarr(store, mode='w', encoding=encoding_settings, consolidated=True)

It took 48 s.

The one change I made was to change the chunks from (25000, 10, 10) to (250, 10, 10). Since Zarr does not support partial chunk writes, it has to create the full chunk first and then write it. You are using a chunk shape (25000) much large than the actual axis length (250). This is a very inefficient pattern. You should choose chunk shapes that are better aligned with your writes.

@rabernat Thank you for taking a look and running the example.

The reason we are using such a big chunk size in time dimension is because we construct our data set by processing only 250 time points at a time (memory limit on the instance) and keep appending processed batches of data to the target data set. By specifying large chunk size t: 25000 for our target store we avoid rechunking our final data set as we access it by a spacial point, thus reducing access time significantly.

I changed the number of data arrays within data set to 4 like you did and reduced the chunking size to [250,10,10]. I am still getting 180 seconds to write to the S3 bucket. I wonder why there’s still such big difference in runtime compared to your numbers?
I thought dependency version numbers, built brand new conda environment with latest packages, but it didn’t help.

Writing to the local EBS volume takes about 300 seconds for complete set of our ~20 data arrays with [25000,10,10] chunking. So it seems that still the most efficient option is to create data set locally to EC2 and then copy the whole thing to S3 using AWS CLI which makes use of parallel copies.
The problem is that we still need to update our data set as new data will be coming in. Copying the whole data set locally to the EC2 from S3 for the update, updating it, and then pushing the whole data set back to S3 is inefficient and pricey as our datasets are rather large. So I was hoping to get writing/appending directly to the data set residing in S3 working.

Here are the versions I am using:

INSTALLED VERSIONS
------------------
commit: None
python: 3.9.6 | packaged by conda-forge | (default, Jul 11 2021, 03:39:48) 
[GCC 9.3.0]
python-bits: 64
OS: Linux
OS-release: 5.11.0-1028-aws
machine: x86_64
processor: x86_64
byteorder: little
LC_ALL: None
LANG: C.UTF-8
LOCALE: ('en_US', 'UTF-8')
libhdf5: 1.12.1
libnetcdf: 4.8.1

xarray: 0.21.1
pandas: 1.4.1
numpy: 1.21.5
scipy: 1.8.0
netCDF4: 1.5.8
pydap: None
h5netcdf: 0.13.1
h5py: 3.6.0
Nio: None
zarr: 2.11.0
cftime: 1.5.2
nc_time_axis: None
PseudoNetCDF: None
rasterio: None
cfgrib: None
iris: None
bottleneck: None
dask: 2022.02.0
distributed: 2022.02.0
matplotlib: None
cartopy: None
seaborn: None
numbagg: None
fsspec: 2022.02.0
cupy: None
pint: None
sparse: None
setuptools: 59.8.0
pip: 22.0.3
conda: None
pytest: None
IPython: None
sphinx: None
>>> import s3fs
>>> s3fs.__version__
'2022.02.0'

So here you make the choice to optimize for reading at the expense of writing. But I’m surprised that chunk sizes of 250 vs 25000 make such a difference for read time. Have you actually benchmarked this? Fsspec’s async capabilities should mitigate any differences.

I agree there is a better path here. Digging deeper will probably require profiling. If you have the patience to stick with it, we can keep iterating.

I would start by taking away complexity. Create a 1GB test file and see how fast it takes to copy to S3 using the aws CLI. This is your baseline. Then do the same thing using the s3fs mapper. Is there a big difference?

Thank you for your help!

Yes, we benchmarked access time to data sets with different chunking size and see a big difference. Hmmm, though we compared access to (250,100,100) vs. (25000,10,10). Taking your concern in, I should compare “apples-to-apples”: (250,10,10) instead of (250,100,100).

Thank you for the offer! Yes, I am sticking around to iterate and hopefully find a solution. I will try it and get back to you.

@rabernat I did confirm that using chunking of (250,10,10) vs. (25000,10,10) for our dataset slows down access time by a factor of ~6 when reading with xarray or zarr.
To access 100 random points when using Julia with (25000,10,10) chunking:

23.827233 seconds (1.81 M allocations: 1.112 GiB, 0.96% gc time, 2.88% compilation time)
--- using 4 threads ---
7.413636 seconds (3.12 M allocations: 1.204 GiB, 3.21% gc time, 19.24% compilation time)

vs. with (250,10,10) chunking:

869.085964 seconds (72.33 M allocations: 2.827 GiB, 0.06% gc time, 0.03% compilation time)
--- using 4 threads ---
283.548002 seconds (12.25 M allocations: 1.608 GiB, 0.13% gc time, 0.49% compilation time)

I also generated a baseline by creating 1Gb data array and wroted it in Zarr to S3 with xarray vs. copied it to the S3 with AWS CLI. The difference is 12 seconds vs. 2 seconds, so using xarray is 6 times slower. This is not using any compression when writing data.

1 Like

The numbers you posted show closer to a 40x difference, not 6x.

I’m also curious whether you see a significant difference between Julia vs. Python (taking care to use the same number of threads). On one hand, Julia is just generally faster. On the other, this should be purely I/O bound, so not language dependent. Do you know if Julia is using async http calls?

Great, so this is a very useful baseline for assessing the xarray → dask → zarr → s3fs performance. We should next do some profiling to understand where this stack is spending its time. @martindurant - any ideas here?

Ideas, but no evidence :wink:
The fsspec code may be async and waiting on a bunch (up to ~1000) connections at a time; but it’s still python code, with slicing and copying bytes objets and the like, and requiring the GIL. All that adds some overhead and no benefit from threads, particularly important for small tasks. I suspect Julia does this much better. Profiling might tell us how much time is spent manipulating bytes or waiting for the GIL versus waiting on comms.

Note that there is an outstanding issue in fsspec (can’t find it right now) in which sometimes too many coroutines were getting submitted at once and swamping the asyncio event loop. Oh, and it is, of course, well known that python’s builtin event loop is not the most performant.

As far as the AWS CLI, they probably have a lot of tricks, like merging HTTP requests into a smaller number of multi-part calls - I have heard this is possible, but I can’t find a reference. The code in boto3.s3.transfer may be instructional.

1 Like

To be clear, no one has directly compared Python and Julia yet. We have compared xarray → dask → zarr → s3fs write time to the AWS CLI copy operation.

@rabernat To clarify reported 6x slower access time dependent on data chunking as read by Python. Using %time in notebook to access 3 different points in the time series in data with (25730, 10, 10) chunking:

s3://its-live-data/test_datacubes/forAlex/ITS_LIVE_vel_EPSG3413_G0120_X-150000_Y-2250000_rechunked_t_25730_xy_10_test.zarr: 
Cube sizes: (25730, 833, 833)
v chunk sizes: (25730, 10, 10)
CPU times: user 34.4 ms, sys: 6.45 ms, total: 40.8 ms
Wall time: 980 ms
CPU times: user 33.7 ms, sys: 8.69 ms, total: 42.4 ms
Wall time: 883 ms
CPU times: user 26 ms, sys: 3.95 ms, total: 30 ms
Wall time: 985 ms

vs. accessing the same points in data with chunking of (250,10,10):

s3://its-live-data/test_datacubes/forAlex/ITS_LIVE_vel_EPSG3413_G0120_X-150000_Y-2250000_t_25370_xy_10_create_ebs_r5n.zarr: 
Cube sizes: (25730, 833, 833)
v chunk sizes: (250, 10, 10)
CPU times: user 229 ms, sys: 12.9 ms, total: 242 ms
Wall time: 1.99 s
CPU times: user 200 ms, sys: 7.56 ms, total: 208 ms
Wall time: 1.42 s
CPU times: user 238 ms, sys: 10.1 ms, total: 248 ms
Wall time: 1.71 s

So the difference still stands at ~6x slower.
40x difference is for accessing 100 random points with Julia sequentially and in parallel.

1 Like

Ok thanks @mliukis for the clarification! I am clearly having trouble keeping track of all the different dimensions to this comparison! (chunk scheme, # threads, python vs. julia, read vs. write, etc.) :man_facepalming:

1 Like

@martindurant Any specific task you would recommend to profile and understand the problem?

We probably would take Julia out of equation as it was just to demonstrate that it’s not only Python’s problem accessing data with (250,10,10) vs. (25000,10,10) chunking. So this kind of difference in chunking in time dimension has big impact on data access. This is just to address @rabernat 's concern:

But I’m surprised that chunk sizes of 250 vs 25000 make such a difference for read time. Have you actually benchmarked this? Fsspec’s async capabilities should mitigate any differences.

The main concern is why xarray → dask → zarr → s3fs is so much slower than AWS CLI, which does not allow us to write/update data directly in the S3 bucket with our wishful chunking :slight_smile:

1 Like

@rabernat @martindurant Would the approach as described in this PR be an option? Should I try to parallelize writes of data regions to the Zarr in S3?
Or xarray → dask → zarr → s3fs is supposed to parallelize the writes under the hood?

If you are using dask, and the source array is chunked, then you should be getting parallel writes already. You should not have to resort to the explicit parallelization strategy in that PR.

Can you provide more details on the source dataset? Is it chunked with dask? How do the source chunks compare with the target chunks?

I am not chunking array as I am creating it, I do specify its chunking for encoding when writing data to the Zarr store though. I did wonder if I need to chunk the data explicitly before I write it?
I will give it a try to see if it makes a difference.

It was actually one of the questions in my original post :slight_smile:

Do I need to convert my arrays to Dask arrays before I write to Zarr?

1 Like

Sorry for being so dense, but I still don’t see where you are are getting 6x.

chunk size wall time
(25730, 10, 10) 980 +/- 5 ms
(250, 10, 10) 1600 +/- 20 ms

It seems like less than 2x to me.

@rabernat Don’t mind it at all - I appreciate you looking into the problem very much.

Here is the code that accesses 100 “random” time series from the v data variable of each data set:

import numpy as np
import s3fs
import zarr

output_dirs = [
    's3://its-live-data/test_datacubes/forAlex/ITS_LIVE_vel_EPSG3413_G0120_X-150000_Y-2250000_rechunked_t_25730_xy_10_test.zarr',
    's3://its-live-data/test_datacubes/forAlex/ITS_LIVE_vel_EPSG3413_G0120_X-150000_Y-2250000_t_25370_xy_10_create_ebs_r5n.zarr'
]

def access_random(v):
    for each in np.arange(0, 800, 8):
        _ = v[:, each, each]
        
for each in output_dirs:
    print(f"=========================== S3 Zarr")
    print(f"{each}: ")
    s3_in = s3fs.S3FileSystem(anon=True)
    store = s3fs.S3Map(root=each, s3=s3_in, check=False)
    root = zarr.group(store=store)
    print(f"Cube sizes: {root.v.shape}")
    print(f"v chunk sizes: {root.v.chunks}")

    %time access_random(root.v)

generates on my laptop:

=========================== S3 Zarr
s3://its-live-data/test_datacubes/forAlex/ITS_LIVE_vel_EPSG3413_G0120_X-150000_Y-2250000_rechunked_t_25730_xy_10_test.zarr: 
Cube sizes: (25730, 833, 833)
v chunk sizes: (25730, 10, 10)
CPU times: user 2.13 s, sys: 452 ms, total: 2.58 s
Wall time: 20.6 s
=========================== S3 Zarr
s3://its-live-data/test_datacubes/forAlex/ITS_LIVE_vel_EPSG3413_G0120_X-150000_Y-2250000_t_25370_xy_10_create_ebs_r5n.zarr: 
Cube sizes: (25730, 833, 833)
v chunk sizes: (250, 10, 10)
CPU times: user 24.5 s, sys: 1.08 s, total: 25.6 s
Wall time: 2min 28s

I am getting the same runtime differences when using xarray to read the data

1 Like

Hi Masha and Ryan, hi everyone,

I’ve been following Pangeo discussions regularly over the past two years. Several times I wanted to comment and maybe I should have.The first time was on the famous rechunking issue. What a thread! Today, this thread is the one for my coming out.

First of all, I totally agree with Ryan answer #1: writing 12;8 GB of data should take a few seconds not one hour. I saw Masha’s problem as a nice homework. It took me 30’ to set up a script, optimize it a little bit, to finally reach the speed of 10.3s to write the 12.8 GB (5,000 x 800 x 800 float32). Reading 100 time-series (only 5,000 instants each) take 5.16 s. 20,000 snapshots will simply be four times longer, viz ~ 21 s. On the reading I’m achieving your speed. On the writing I’m much faster. Even faster than Ryan who announced 48s, but only for 1,000 snapshots.

Okay, what tools am I using on what architecture ? Well, these are my tools, not Pangeo. They are in pure Python, there are still in a prototype stage, but I believe they may be of interest for some of the community. There are adapted for the Lustre architecture. I know Lustre is not the kind of storage that receives the favours of Pangeo but still, a lot of clusters do have a Lustre filesystems attached. I designed these tools to achieve fast reading access to large datasets. When I mean large, I mean large. Our experiment is … 1.6 PB. Yes, you read well, petabytes. The policy on Lustre is to have few large files rather than lots of small ones. So zarr, if I understand correctly, is not really an option. Well, I should probably continue in another thread.

To summarize: 10.3 s to write 12.8 GB of data, 5.3 s to read 1,000 time series of 5,000 points each. I’d be glad to see someone going faster ! this would worth a beer.

Cheers,

Guillaume