I have picked a “schema” for a tile xarray feature store and roughly landed on (tiles, time, x, y). When I process tiles in parallel, and when I have 1 time element (and probably if I have a few more), I get rate limit Exception from gcs. Here is a snippet to show what I am doing:
import dask
import numpy as np
import datetime
years = [2019]
tiles = [f'{t}_876_10' for t in range(0, 1000)]
final_width = 3
final_height = 3
target_features = ['a','b','c']
years_to_datetime = [datetime.datetime(year=year, month=1, day=1) for year in years]
# establish the shape of each feature
dummies = dask.array.zeros(
(len(tiles), len(years), final_width, final_height),
chunks=[1, 1, final_width, final_height],
dtype=np.float32,
)
# establish the shape of the lat and lon coordinates
lon_dummies = dask.array.zeros(
(len(tiles), final_width), dtype="float64", chunks=[1, final_width]
)
lat_dummies = dask.array.zeros(
(len(tiles), final_height), dtype="float64", chunks=[1, final_height]
)
tile_array = dask.array.from_array(np.array(list(tiles)).astype("U17"), chunks=[1])
target = xr.Dataset(
data_vars={nm: (["tile", "time", "x", "y"], dummies) for nm in target_features},
coords={
"lon": (["tile", "x"], lon_dummies),
"lat": (["tile", "y"], lat_dummies),
"tiles": (("tile"), tile_array),
"times": (("time"), years_to_datetime),
},
)
target.to_zarr("gs://blah_datastore/test_tile_fstore", group='tiles', compute=False)
import gcsfs
def write_tile(i):
"""In real life, open geotiffs with rioxarray ( potentially a few for multiple features)"""
target_features = ['a','b','c']
data = np.random.normal(0, 1, 9).reshape((1,1,3,3))
dset = xr.Dataset(
data_vars={nm: (["tile", "time", "x", "y"], data) for nm in target_features},
coords={
"lon": (["tile", "x"], [[1,2,3]]),
"lat": (["tile", "y"], [[4,5,6]]),
"tiles": (("tile"), [str(i)]),
"times": (("time"), years_to_datetime),
}
)
gcs = gcsfs.GCSFileSystem()
store = gcsfs.GCSMap(root="gs://blah_datastore/test_tile_fstore", gcs=gcs, check=False)
dset.to_zarr(
store,
group='tiles',
region={
"time": slice(0, 1),
"tile": slice(i, i + 1),
"x": slice(0, dset.dims["x"]),
"y": slice(0, dset.dims["y"]),
}
)
return dset
futures = client.map(write_tile, range(1000))
results = client.gather(futures)
# gives me
Exception: HttpError('The rate of change requests to the object blah_datastore/test_tile_fstore/tiles/times/0 exceeds the rate limit. Please reduce the rate of create, update, and delete requests., 429')
Question:
How can I avoid rewriting the tile coordinate with every single task and avoid this rate limit Exception? Can I write the coord data for time in advance to and avoiding writing to the same file of chunksize 1?
Other notes:
- Something that is odd in the actual code (not this reproduction) is that the Exception can’t be serialized and I get the whole
str does not have a .get method
which comes from gcsfs.retries module. It took a while to discover this was actually even the issue. - I have tried to not use a dask array with chunks for the target, I have tried to drop variables.
- I have not tried to use a synchronizer or lock, maybe that would help, but would also slow down the writes, which are designed with non-overlapping chunks.
- I have not tried appending and not using region…
- To actually get this to work for the real example, I have to run client.map in batch sizes of 100 to get to 1000. Otherwise, dask seems to get sad from all the tasks. I also do client.restart between batches because rioxarray/gdal seems to love leaving memory around (maybe its chunk cache thing), which ends up with killed workers and a sad user lol.
Thanks for any thoughts!