Trick for improving Kerchunk performance for large numbers of chunks/files

Hi all,

Recently I have been spending a lot of time experimenting with kerchunk and I have been quite impressed by the performance so far, but most of the datasets I have tried until now were spanned by a relatively small number of files for their size, which in turn means the JSON references files are also of manageable size. However I am now finding myself in a predicament where I have a use-case that requires processing a massive 100 TB+ dataset (SS1330 CHIRP AIRS L1C radiances) that is spanned by a very large number of files (1 million+ as we are talking data being archived at 6 minute granularity here). I have quickly come to realize that for this particular use-case my reference files are especially large even when attempting to turn off inlining and vlen encoding because simply put there are a ton of URLs and variables that need to be referenced. Others have already noted how this can create problems with not only memory usage but also longer data access times. I also noticed this when looking at one of the case studies notebooks mentioned in the documentation for the MUR SST dataset which I would also imagine has a larger references file than for most and noticed in the provided benchmark numbers that although opening the full dataset was still a big improvement over pure netCDF, it still lags far behind the native zarr versions. That’s when the idea hit me: Rather than passing in entire JSON files, maybe it would make sense to readapt the kerchunk references directly as just another zarr store (created as it would look on a real filesystem). I don’t know if others have already tried something like this but I have managed to develop a trick to dramatically improve the performance for these use-cases with relatively large number of files and variables, which is more often than not the case with satellite datasets available from NASA Earthdata cloud.

I have provided the details with some quick benchmarks shown in this notebook. and thought I would share them here knowing that many other users of zarr/kerchunk are members of this community. Without describing the full details the basic gist of the trick is to generate an essentially pseudo-zarr store like directory structure with the actual JSON with the byte ranges taking the place of the chunked array files, then use an fsspec supported filesystem mapping (wrapped by a preprocessor to format the data correctly) as the input to the fsspec reference filesystem in place of the JSON references file. The end result looks something like this:

local_fs = fsspec.filesystem('file') # can also be s3fs, gcsfs, etc
mapper = ReferencesPreprocessor(local_fs.get_mapper(store_name))
ref_fs = fsspec.filesystem('reference', fo=mapper, **storage_options)
ds = xr.open_zarr(ref_fs.get_mapper(''), consolidated=True)

I have not extensively tested yet so I do not yet know how robust it is in a production setting but I have gotten very promising results so far. Would like to hear any thoughts and comments from others. If this proves to be useful I think it would be great if perhaps some of these changes could make their way upstream to fsspec and/or kerchunk since some aspects of the trick (particularly the preprocessor) feel kind of hacky to implement.

5 Likes

Thanks for trying to improve on the “big JSON” problem!
Since I don’t have a “.netrc”, I can’t compare directly, and I haven’t tried your trick with another dataset yet, but it will be interesting.

In the meantime, I have also been trying to work both on the problem of slow startup, and also the in-memory and on-disk size of the reference set. I have been making an experimental alternative storage backend (<- merged yesterday) using dataframes/parquet. Maybe you can try it with your benchmarks?

You should get data that is >>10x smaller than as JSON. Furthermore, with partition=True on write and lazy=True on read, the metadata are also separated out as in your code and read eagerly by xarray, but the references or the other variables are only read as required.

Great, I appreciate the response! I went ahead and installed the latest development versions of fsspec and kerchunk and did a quick test of these changes. I have never actually used parquet before so I want to verify that I set this up correctly. First I made my partitioned references:

from kerchunk.df import refs_to_dataframe
refs_to_dataframe(refs, 'refs_lazy.parq', partition=True)

Then I loaded the dataset as follows:

import xarray as xr
from fsspec.implementations.reference import DFReferenceFileSystem
fs = DFReferenceFileSystem(fo='refs_lazy.parq', lazy=True, remote_protocol="s3", remote_options=remote_options)
ds = xr.open_zarr(fs.get_mapper(''), consolidated=False)

This is giving me about a 50% improvement in time to open the dataset from the regular reference filesystem but still 2-3x slower than my consolidated pseudo-zarr store trick (still not sure what it should be called at this point). Running times for full loads / computations seem to be about the same with both methods.

However there is no question that the parquet-based method has the advantage when it comes to file sizes, as now the references file has indeed been reduced in size by over an order of magnitude. Conversely with my trick the reference file sizes are more than an order of magnitude greater than the original JSON when storing locally due to the nature of block-based filesystems, so using cloud object based storage would be necessary to keep the file sizes reasonable (though this is something that should be done anyway). I am thinking which approach would work best might ultimately be use-case dependent in the end.

If you want to try it with the same dataset I am using it shouldn’t be too difficult to setup a .netrc, all that you would need to do is sign up for a NASA Earthdata Cloud account. The main caveats are that that the S3 urls are only accessible from AWS resources in the us-west-2 region and that authentication tokens have to be refreshed hourly. Having to find myself now regularly working with NASA cloud datasets both at JPL and through Earthdata cloud, I could go on a rant for hours about how these unreasonable cybersecurity policies run counter to the principles of open source science and productivity in general, so I think it would be great if we as a community could continue to push back against this sort of thing a bit more. For now I can only assume that the DAACs probably have their hands tied and need to follow some higher level CISO directive.

Anyways apologies for going on a bit of a tangent there. I am definitely interested in seeing what these numbers would look like once scaled for the entire dataset, which is something we could try exploring next.

1 Like

I also have a dataset with a lot of small files for which we’re trying out kerchunk, so very interested to try these alternatives once we get it run. Cmip6 wrf wus by thenaomig · Pull Request #247 · pangeo-forge/staged-recipes · GitHub

OK, so I tried your code on a big ERA5 JSON reference set I had lying around, and it took a very very very long time to run, used up loads of disk space (>>10GB many times bigger than the original JSON) and even more files (millions, so many that I am having trouble counting them). The parquet version of the same references was 95MB and loaded from local files (lazily) in 26ms.

I don’t think that zarr is the right choice for this particular dataset!

Unfortunately that sounds like what would happen if attempting to save the store locally. It creates countless tiny JSON files in place of where the zarr array chunks would normally be which have the byte ranges and urls in them. The issue is that they end up being very small in apparent size (~0.2 KB) but on local disk they will each take up a minimum of 4KB due to the blocked filesystem structure, hence my comment earlier stating that it would make more sense to load the references from cloud object storage like S3 in this case. That is most definitely a limitation of this method.

I would expect generating the store to be quite slow (although as I mentioned in my notebook, it can be improved upon to avoid using to_zarr() explicitly to generate the directory structure, doing it manually would be much quicker). However I am surprised that just opening the dataset would also be slow with consolidated=True since in principle that part should work exactly the same way as if it were an actual zarr store, which was kind of the point of my method. 26 ms for opening is also substantially quicker than for the dataset I am using with either method. Is the references file you are using for this example something that’s just small enough you could share? If not I can go ahead and try scaling up my own example and see what happens.

The references files as a JSON is 180MB after zstd compression

@martindurant

Based on your feedback I have done some work to substantially improve on my approach and would appreciate your thoughts.

The parquet dataframe based references you are currently working on certainly has a lot of nice properties to it, particularly the reduced file-sizes but it is still not quite ideal for my use-case because it still loads all the references into memory and loops through an entire dataframe, so even for the case of partitioning the variables into separate dataframes, I eventually notice the additional overhead of loading the references once a computation on the variable I need is required no matter how small. These are probably not a big deal with the ERA-5 dataset you are using, but the total number of references for the dataset I am using from Earthdata cloud is approaching 100 million. I have not tested it explicitly but I would guess this overhead will become more noticeable when applied to the entire dataset since the uncompressed size of the references is close to 100 GB. Given this even though the parquet also reduces the memory usage somewhat, it will still be O(n). So a desirable feature would be to have more precise control over how many references are allowed to be loaded at a time to keep memory usage low and to keep loading overhead small for cases when only a fraction of data need to be accessed, regardless of the total size and number of references for the dataset. I tried but couldn’t do this with a large parquet dataframe as the API only seems to allow specifying loading individual columns from disk rather than rows which is what is needed.

I see this as one of the main benefits to my method since one reference per chunk is processed at a time. However as you said having tons of small files ends up actually increasing the size on disk significantly (unless using object storage like S3), and most importantly it takes a very long time to write out all those tiny files. The reason I didn’t notice this initially is because I was running my example on a JupyterHub server with EBS SSD storage so writes were extra fast. When I tested it again on HDD based storage this part took much longer, perhaps as long as writing the original JSON references if not longer. So in any case, I think those two drawbacks would make my method difficult to scale for the entire dataset.

Anyways, I have finally come up with a solution that should address these problems and the needs for my use-case. I have uploaded the code here:

Loading kerchunk references as split records · GitHub

Basically, I am still splitting up the references into multiple files like before but instead of perfectly mimicking a zarr store and having one tiny JSON file per chunk, the references for each chunk are now grouped into larger record files. By default I am now placing 1000 references per record file and also compressing them, so I have reduced the file size by around 20x in my testing using zstd compression (I haven’t tried encoding the common URLs like you have with the parquet references so perhaps this could be improved upon even further).

The result is that since it no longer looks like an actual zarr store, I had to implement a much more complicated mutable mapping class to properly map each zarr chunk key to the correct record file, but the results were worth it. Now the reference store is generated much more quickly even on disk storage and I am seeing very little overhead when loading one small slice of the data. Finally I should mention that I also implemented an LRU cache to the mapper upon opening / decompressing each record JSON file, so in this way when combined with specifying the record size the user has fairly precise control over memory usage as well now.

Here is a working example:

import fsspec
import xarray as xr

# refs is original references JSON dict with consoliated metadata key.
# Note I ran this test using the same dataset available locally 
# instead of from Earthdata cloud S3 access
make_records_store('refs_test', refs, cname=b'zstd', clevel=1)
mapper = ReferenceRecordMapper('refs_test/')
fs = fsspec.filesystem('reference', fo=mapper, remote_protocol='file') 
ds = xr.open_zarr(fs.get_mapper(''))

Please let me know if you have any questions. I would really like to get more feedback.

You can do exactly this, splitting the data into what parquet calls “row-groups” (there can be many within a file).

You are generally right that holding all of the references of a variable i memory might not be optimal, even when that storage is more efficient that dicts. Iterating over row-groups is an option.

I like your trick of figuring out from the ints in a key name which chunk it must be in. I guess zarr can already do this if you used index notation (getitem with tuple(int(_) for _ in key.split("."))). Parquet stores max/min values for each column per row-group, so it can do this efficiently too.

I think the main difference between our approaches, is that yours targets fastest random-access and mine for bulk batch operations. Your is certainly simpler and reusing zarr, which is already in the loop, is good.