Best way to scale s3 zarr store to handle massive amounts of S3 ingress?

I have a large cluster of EC2 instances writing doing some processing and then saving the results to a petabyte scale Xarray dataset which uses a zarr store with S3. The dataset has lat, lon, and other dimensions.

Right now, we’re running into S3 Slowdown Errors for having too many PUT requests from all of the EC2 instances. This is because S3 has 3500 max requests per bucket per prefix. Based on this, we are looking into restructuring our zarr store to better handle the throughput by partitioning along the spatial dims (lat, lon) as given our use case, those are the best dims to work with. We’ve looked at a few options so far:

  • zCollection: seems promising, but we can only scale along a single dim, and we’d prefer to scale across lat and lon, and I don’t know about it’s compatibility with Xarray

  • NestedDirectoryStore: This seems like the obvious solution, but I’m unsure if this is compatible with S3. Can someone shine some light on the matter?

  • Having a separate layer that creates multiple chunks below it as separate zarr objects. This involves adding a lot more overhead, and this is a solution that I’d like to avoid if possible

1 Like

I had run into something similar before (too many requests per second being uploaded to S3 storage) and a simple fix for us was to change the chunk spec to reduce the number of objects being uploaded by increasing the chunk size.

1 Like

We went down a similar route of using the Nested Directory Store. In our case we still ended up with prefix hotspotting due to the dot separator in the dimensions (IIRC).

Our solution was to write our own Store class where we could control the prefix structure.

class S3Store(FSStore):
    """A Zarr store that uses S3 as a backend and uses the '/' as a dimension separator."""

    def __init__(self, url, **kwargs):
        super().__init__(url, key_separator="/", dimension_separator="/", **kwargs)

    def _normalize_key(self, key):
        """Overrides the parent method.

        The parent method does not check for the .zmetatdata file. It will change the
        dot in the zmetadata file to a slash and break things.

        """
        key = normalize_storage_path(key).lstrip("/")
        if key:
            *bits, end = key.split("/")

            if end not in (self._array_meta_key, self._group_meta_key, self._attrs_key, ".zmetadata"):
                end = end.replace(".", self.key_separator)
                key = "/".join(bits + [end])

        return key.lower() if self.normalize_keys else key

    def getitems(self, keys, **kwargs):
        """Overrides the parent method to be backwards compatible for reading both stores with
        '.' and '/' as the dimension separator.
        """
        results = self.map.getitems(keys, on_error="omit")
        # The function calling this method may not recognize the transformed keys
        # So we send the values returned by self.map.getitems back into the original key space.
        return {keys[keys.index(rk)]: rv for rk, rv in results.items()}

Hopefully this is helpful, it works for us on a Petabyte scale.

1 Like

In addition to what Kyle said, we have also had good luck with setting AWS_RETRY_MODE=adaptive and AWS_MAX_RETRIES=10, which seems to obscure any remaining Slowdown errors. No matter what you do, you will still likely hit some slowdown errors occasionally as S3 rebalances partitions on the backend. With adaptive backoff/retry enabled (plus using the / as key/dimension separator), these errors are handled at a lower level and seem to only manifest as longer response times during massively parallel reads/writes.

1 Like

Although this wasn’t our primary design goal, the chunk-key indirection and content-addressable-storage scheme we implemented in Arraylake solves this problem pretty well.

We’ll be giving an overview of our solution this coming Wednesday at the Pangeo showcase.

3 Likes