Trick for improving Kerchunk performance for large numbers of chunks/files

Hi all,

I’ve been impressed by kerchunk for datasets where the JSON references file is also of manageable size.

I recently had a use-case, however, that required processing a massive 100 TB+ dataset (SS1330 CHIRP AIRS L1C radiances) spanned by a very large number of files (1 million+ as we are talking data being archived at 6 minute granularity here) and the resulting JSON was many GB.

For this particular use-case the reference files are especially large even when attempting to turn off inlining and vlen encoding because simply put there are a ton of URLs and variables that need to be referenced.

Others have already noted how this can create problems with not only memory usage but also longer data access times.

(I also noticed this when looking at one of the case studies notebooks mentioned in the documentation for the MUR SST dataset which I would also imagine has a larger references file than for most and noticed in the provided benchmark numbers that although opening the full dataset was still a big improvement over pure netCDF, it still lags far behind the native zarr versions.)

That’s when the idea hit me: Rather than passing in entire JSON files, maybe it would make sense to store the kerchunk references themselves as another zarr store.

I don’t know if others have already tried something like this but I used this approach to dramatically improve the performance for use-cases with large number of files and variables, which is often the case for satellite datasets available from NASA Earthdata cloud.

I have provided the details with some quick benchmarks shown in this notebook.

The approach is to generate a pseudo-zarr store-like directory structure with the actual JSON with the byte ranges taking the place of the chunked array files, then use an fsspec supported filesystem mapping (wrapped by a preprocessor to format the data correctly) as the input to the fsspec reference filesystem in place of the JSON references file. The end result looks something like this:

local_fs = fsspec.filesystem('file') # can also be s3fs, gcsfs, etc
mapper = ReferencesPreprocessor(local_fs.get_mapper(store_name))
ref_fs = fsspec.filesystem('reference', fo=mapper, **storage_options)
ds = xr.open_zarr(ref_fs.get_mapper(''), consolidated=True)

I have not extensively tested yet so I do not yet know how robust it is in a production setting but I have gotten very promising results so far. Would like to hear any thoughts and comments from others. If this proves to be useful I think it would be great if perhaps some of these changes could make their way upstream to fsspec and/or kerchunk since some aspects of the trick (particularly the preprocessor) feel kind of hacky to implement.

7 Likes

Thanks for trying to improve on the “big JSON” problem!
Since I don’t have a “.netrc”, I can’t compare directly, and I haven’t tried your trick with another dataset yet, but it will be interesting.

In the meantime, I have also been trying to work both on the problem of slow startup, and also the in-memory and on-disk size of the reference set. I have been making an experimental alternative storage backend (<- merged yesterday) using dataframes/parquet. Maybe you can try it with your benchmarks?

You should get data that is >>10x smaller than as JSON. Furthermore, with partition=True on write and lazy=True on read, the metadata are also separated out as in your code and read eagerly by xarray, but the references or the other variables are only read as required.

Great, I appreciate the response! I went ahead and installed the latest development versions of fsspec and kerchunk and did a quick test of these changes. I have never actually used parquet before so I want to verify that I set this up correctly. First I made my partitioned references:

from kerchunk.df import refs_to_dataframe
refs_to_dataframe(refs, 'refs_lazy.parq', partition=True)

Then I loaded the dataset as follows:

import xarray as xr
from fsspec.implementations.reference import DFReferenceFileSystem
fs = DFReferenceFileSystem(fo='refs_lazy.parq', lazy=True, remote_protocol="s3", remote_options=remote_options)
ds = xr.open_zarr(fs.get_mapper(''), consolidated=False)

This is giving me about a 50% improvement in time to open the dataset from the regular reference filesystem but still 2-3x slower than my consolidated pseudo-zarr store trick (still not sure what it should be called at this point). Running times for full loads / computations seem to be about the same with both methods.

However there is no question that the parquet-based method has the advantage when it comes to file sizes, as now the references file has indeed been reduced in size by over an order of magnitude. Conversely with my trick the reference file sizes are more than an order of magnitude greater than the original JSON when storing locally due to the nature of block-based filesystems, so using cloud object based storage would be necessary to keep the file sizes reasonable (though this is something that should be done anyway). I am thinking which approach would work best might ultimately be use-case dependent in the end.

If you want to try it with the same dataset I am using it shouldn’t be too difficult to setup a .netrc, all that you would need to do is sign up for a NASA Earthdata Cloud account. The main caveats are that that the S3 urls are only accessible from AWS resources in the us-west-2 region and that authentication tokens have to be refreshed hourly. Having to find myself now regularly working with NASA cloud datasets both at JPL and through Earthdata cloud, I could go on a rant for hours about how these unreasonable cybersecurity policies run counter to the principles of open source science and productivity in general, so I think it would be great if we as a community could continue to push back against this sort of thing a bit more. For now I can only assume that the DAACs probably have their hands tied and need to follow some higher level CISO directive.

Anyways apologies for going on a bit of a tangent there. I am definitely interested in seeing what these numbers would look like once scaled for the entire dataset, which is something we could try exploring next.

1 Like

I also have a dataset with a lot of small files for which we’re trying out kerchunk, so very interested to try these alternatives once we get it run. Cmip6 wrf wus by thenaomig · Pull Request #247 · pangeo-forge/staged-recipes · GitHub

OK, so I tried your code on a big ERA5 JSON reference set I had lying around, and it took a very very very long time to run, used up loads of disk space (>>10GB many times bigger than the original JSON) and even more files (millions, so many that I am having trouble counting them). The parquet version of the same references was 95MB and loaded from local files (lazily) in 26ms.

I don’t think that zarr is the right choice for this particular dataset!

Unfortunately that sounds like what would happen if attempting to save the store locally. It creates countless tiny JSON files in place of where the zarr array chunks would normally be which have the byte ranges and urls in them. The issue is that they end up being very small in apparent size (~0.2 KB) but on local disk they will each take up a minimum of 4KB due to the blocked filesystem structure, hence my comment earlier stating that it would make more sense to load the references from cloud object storage like S3 in this case. That is most definitely a limitation of this method.

I would expect generating the store to be quite slow (although as I mentioned in my notebook, it can be improved upon to avoid using to_zarr() explicitly to generate the directory structure, doing it manually would be much quicker). However I am surprised that just opening the dataset would also be slow with consolidated=True since in principle that part should work exactly the same way as if it were an actual zarr store, which was kind of the point of my method. 26 ms for opening is also substantially quicker than for the dataset I am using with either method. Is the references file you are using for this example something that’s just small enough you could share? If not I can go ahead and try scaling up my own example and see what happens.

The references files as a JSON is 180MB after zstd compression

@martindurant

Based on your feedback I have done some work to substantially improve on my approach and would appreciate your thoughts.

The parquet dataframe based references you are currently working on certainly has a lot of nice properties to it, particularly the reduced file-sizes but it is still not quite ideal for my use-case because it still loads all the references into memory and loops through an entire dataframe, so even for the case of partitioning the variables into separate dataframes, I eventually notice the additional overhead of loading the references once a computation on the variable I need is required no matter how small. These are probably not a big deal with the ERA-5 dataset you are using, but the total number of references for the dataset I am using from Earthdata cloud is approaching 100 million. I have not tested it explicitly but I would guess this overhead will become more noticeable when applied to the entire dataset since the uncompressed size of the references is close to 100 GB. Given this even though the parquet also reduces the memory usage somewhat, it will still be O(n). So a desirable feature would be to have more precise control over how many references are allowed to be loaded at a time to keep memory usage low and to keep loading overhead small for cases when only a fraction of data need to be accessed, regardless of the total size and number of references for the dataset. I tried but couldn’t do this with a large parquet dataframe as the API only seems to allow specifying loading individual columns from disk rather than rows which is what is needed.

I see this as one of the main benefits to my method since one reference per chunk is processed at a time. However as you said having tons of small files ends up actually increasing the size on disk significantly (unless using object storage like S3), and most importantly it takes a very long time to write out all those tiny files. The reason I didn’t notice this initially is because I was running my example on a JupyterHub server with EBS SSD storage so writes were extra fast. When I tested it again on HDD based storage this part took much longer, perhaps as long as writing the original JSON references if not longer. So in any case, I think those two drawbacks would make my method difficult to scale for the entire dataset.

Anyways, I have finally come up with a solution that should address these problems and the needs for my use-case. I have uploaded the code here:

Loading kerchunk references as split records · GitHub

Basically, I am still splitting up the references into multiple files like before but instead of perfectly mimicking a zarr store and having one tiny JSON file per chunk, the references for each chunk are now grouped into larger record files. By default I am now placing 1000 references per record file and also compressing them, so I have reduced the file size by around 20x in my testing using zstd compression (I haven’t tried encoding the common URLs like you have with the parquet references so perhaps this could be improved upon even further).

The result is that since it no longer looks like an actual zarr store, I had to implement a much more complicated mutable mapping class to properly map each zarr chunk key to the correct record file, but the results were worth it. Now the reference store is generated much more quickly even on disk storage and I am seeing very little overhead when loading one small slice of the data. Finally I should mention that I also implemented an LRU cache to the mapper upon opening / decompressing each record JSON file, so in this way when combined with specifying the record size the user has fairly precise control over memory usage as well now.

Here is a working example:

import fsspec
import xarray as xr

# refs is original references JSON dict with consoliated metadata key.
# Note I ran this test using the same dataset available locally 
# instead of from Earthdata cloud S3 access
make_records_store('refs_test', refs, cname=b'zstd', clevel=1)
mapper = ReferenceRecordMapper('refs_test/')
fs = fsspec.filesystem('reference', fo=mapper, remote_protocol='file') 
ds = xr.open_zarr(fs.get_mapper(''))

Please let me know if you have any questions. I would really like to get more feedback.

You can do exactly this, splitting the data into what parquet calls “row-groups” (there can be many within a file).

You are generally right that holding all of the references of a variable i memory might not be optimal, even when that storage is more efficient that dicts. Iterating over row-groups is an option.

I like your trick of figuring out from the ints in a key name which chunk it must be in. I guess zarr can already do this if you used index notation (getitem with tuple(int(_) for _ in key.split("."))). Parquet stores max/min values for each column per row-group, so it can do this efficiently too.

I think the main difference between our approaches, is that yours targets fastest random-access and mine for bulk batch operations. Your is certainly simpler and reusing zarr, which is already in the loop, is good.

Thanks! I decided to make one more variant on my method that saves the variables for the references as parquet files instead of compressed JSON and was honestly very impressed by the results!

Read performance was mostly similar with using the JSON, just a tiny bit more overhead when opening the parquet files for the first time since I am guessing it needs to get info for each row group but it probably won’t be that big of a deal unless the ratio of row groups is very, very large. When using zstandard compression instead of snappy I managed to cut the file size in half relative to the JSON counterpart so the total compression ratio was nearly 50x with respect to the original uncompressed JSON. A few other notes:

  • I pad the end of the dataframe with empty rows to ensure that the total number of rows is evenly divisible by the row group size, this is necessary to ensure the number of actual rows in each row group matches the provided size and allow my fast lookup strategy to work.
  • Keys are looked up by floor division of number of chunks by row group size (as I did previously with the separate JSON records) but since metadata are still stored separately like in a zarr store, I can determine the exact row in the row group that corresponds to the given key by taking the remainder of number of chunks divided by row group size. This also allows me to avoid having to explicitly save each key thus reducing file size even further.
  • Data are saved as bytes rather than having separate columns for path/byte range vs raw base64 reference types.
  • I use an LRU cache like before, this time to store the contents of each row group.
  • Like before I just implement a custom MutableMapping to read the references and then just plug it into the original references filesystem. In this sense it feels a little hacky but also convenient. On the other hand your method implements the logic as a separate fsspec filesystem class. I am not really sure which is better here…
  • I haven’t tried eliminating common URLs so maybe the file sizes could be reduced even further? I believe your method currently does that through a separate metadata parquet files that has an index for all the URLs. I also noticed there was an option to save entire coordinate variables in this file so that they do not need to be extracted from separate chunks.
  • Lastly as you may have noticed I ended up using pyarrow instead of fastparquet in my example. Unfortunately I could not get the code to run with fastparquet using smaller row groups. The failure was happening for one specific variable in my dataset with very long base64 strings. When attempting to write the parquet file out I would get a segfault. So perhaps I should go ahead and raise this issue separately in the fastparquet repo.

I would be happy to help incorporate some of these ideas into fsspec / kerchunk. Is that something you would be interested in?

Well you are completely right that there’s no need to store the key value, if all the keys are present, or most keys are present and we pad for the ones that aren’t. This is certainly a good optimization.

Like before I just implement a custom MutableMapping to read the references and then just plug it into the original references filesystem.

Yes, there is definitely a benefit to having just one implementation. I didn’t do it for the sake of getting something up and running. I don’t yet know if there is a downside (for example, we must make sure to never list all the keys in the store and accidentally load the references).

I haven’t tried eliminating common URLs so maybe the file sizes could be reduced even further?

I am not certain that removing prefixes helps that much, since zstd should be able to find the repeated strings efficiently. Dict/categorical encoding, in the other hand, will be a big win both on disk and in memory.

Unfortunately I could not get the code to run with fastparquet using smaller row groups. The failure was happening for one specific variable in my dataset with very long base64 strings.

Please try with latest main branch and report the issue if it persists.

I would be happy to help incorporate some of these ideas into fsspec / kerchunk. Is that something you would be interested in?

Yes, definitely I appreciate you taking this time to investigate and there are good ideas here that should be integrated. In fact, we seem to have converged somewhat. It’s worth doing this right!

Sounds good to me! Upgrading fastparquet to the latest development version resolved the issue I was having so I have updated the code in my gist to support reading/writing with both pyarrow and fastparquet. They both seem to work equally well with my use-case so far.

To follow up on this maybe we should move further discussion on this back onto GitHub. Would you prefer if this were done in the kerchunk or fsspec repo?