Rate limited with `.to_zarr(region=)`

I have picked a “schema” for a tile xarray feature store and roughly landed on (tiles, time, x, y). When I process tiles in parallel, and when I have 1 time element (and probably if I have a few more), I get rate limit Exception from gcs. Here is a snippet to show what I am doing:

import dask
import numpy as np
import datetime

years = [2019]
tiles = [f'{t}_876_10' for t in range(0, 1000)]
final_width = 3
final_height = 3
target_features = ['a','b','c']

years_to_datetime = [datetime.datetime(year=year, month=1, day=1) for year in years]

# establish the shape of each feature
dummies = dask.array.zeros(
    (len(tiles), len(years), final_width, final_height),
    chunks=[1, 1, final_width, final_height],
    dtype=np.float32,
)

# establish the shape of the lat and lon coordinates
lon_dummies = dask.array.zeros(
    (len(tiles), final_width), dtype="float64", chunks=[1, final_width]
)
lat_dummies = dask.array.zeros(
    (len(tiles), final_height), dtype="float64", chunks=[1, final_height]
)
tile_array = dask.array.from_array(np.array(list(tiles)).astype("U17"), chunks=[1])

target = xr.Dataset(
    data_vars={nm: (["tile", "time", "x", "y"], dummies) for nm in target_features},
    coords={
        "lon": (["tile", "x"], lon_dummies),
        "lat": (["tile", "y"], lat_dummies),
        "tiles": (("tile"), tile_array),
        "times": (("time"), years_to_datetime),
    },
)

target.to_zarr("gs://blah_datastore/test_tile_fstore", group='tiles', compute=False)

import gcsfs

def write_tile(i):
    """In real life, open geotiffs with rioxarray ( potentially a few for multiple features)"""
    target_features = ['a','b','c']
    data = np.random.normal(0, 1, 9).reshape((1,1,3,3))
    
    dset = xr.Dataset(
        data_vars={nm: (["tile", "time", "x", "y"], data) for nm in target_features},
        coords={
            "lon": (["tile", "x"], [[1,2,3]]),
            "lat": (["tile", "y"], [[4,5,6]]),
            "tiles": (("tile"), [str(i)]),
            "times": (("time"), years_to_datetime),
        }
    )
    
    gcs = gcsfs.GCSFileSystem()
    store = gcsfs.GCSMap(root="gs://blah_datastore/test_tile_fstore", gcs=gcs, check=False)

    dset.to_zarr(
        store, 
        group='tiles', 
        region={
            "time": slice(0, 1),
            "tile": slice(i, i + 1),
            "x": slice(0, dset.dims["x"]),
            "y": slice(0, dset.dims["y"]),
        }
    )
    return dset

futures = client.map(write_tile, range(1000))
results = client.gather(futures)


 # gives me
Exception: HttpError('The rate of change requests to the object blah_datastore/test_tile_fstore/tiles/times/0 exceeds the rate limit. Please reduce the rate of create, update, and delete requests., 429')

Question:
How can I avoid rewriting the tile coordinate with every single task and avoid this rate limit Exception? Can I write the coord data for time in advance to and avoiding writing to the same file of chunksize 1?

Other notes:

  1. Something that is odd in the actual code (not this reproduction) is that the Exception can’t be serialized and I get the whole str does not have a .get method which comes from gcsfs.retries module. It took a while to discover this was actually even the issue.
  2. I have tried to not use a dask array with chunks for the target, I have tried to drop variables.
  3. I have not tried to use a synchronizer or lock, maybe that would help, but would also slow down the writes, which are designed with non-overlapping chunks.
  4. I have not tried appending and not using region…
  5. To actually get this to work for the real example, I have to run client.map in batch sizes of 100 to get to 1000. Otherwise, dask seems to get sad from all the tasks. I also do client.restart between batches because rioxarray/gdal seems to love leaving memory around (maybe its chunk cache thing), which ends up with killed workers and a sad user lol.

Thanks for any thoughts!

1 Like

What are the total task numbers like?

I am calling .compute on each of the tile tasks, which keeps the task number 1:1 to the number of tiles. So, the total number of tasks sit at about 100 since I am doing batches of 100 at a time.

I have added a synchronizer to help prevent a burst of file updates all at once:

from dask.distributed import Lock


class DaskSynchronizer:
    """Distributed lock ..."""

    def __init__(self, client=None):
        self.client = client

    def __getitem__(self, item):
        lock = Lock(name=item)
        return lock


synchronizer = DaskSynchronizer()

This helps and does not actually add much time to the entire process. So, this will be my solution for the time being.

It still seems ideal to be able to specify which coordinates get updated in to_zarr. For some coords, they can be known up front while building the target and other coords needs to be written and discovered during write. I sort of assumed that when specifying a target datastore with compute==False, you could specify a dask array for coords that you plan to write to and a numpy or list for coords that will remain constant. In any case, when I read the round trip target metadata, I see delayed dask.array objects. Logic around fixing these would be nice (I think; unless I am missing something). For example, lat and lon coords above will not be known in advance (these could be dask.arrays with dummies for target), but the years/time will be known in advance (these could be list or numpy for target). This approach could help prevent the same file from being rewritten to the same values over and over. Just some random thoughts that probably are not totally sensible as I am not a pro here lol.

1 Like