Hi folks,
I am trying to create a CI enabled check for the Pangeo CMIP6 Cloud Data, which involves simply opening every store and checking if an error is raised (this was motivated by a user report pointing to the fact that some stores might have been corrupted or not written properly).
This is generally a pretty simple task and can be achieved by something like this:
from xmip.utils import google_cmip_col
import gcsfs
import xarray as xr
col = google_cmip_col()
stores = col.df['zstore'].tolist() # results in a simple list of cloud zarr stores
filesystem = gcsfs.GCSFileSystem(token='anon', access='read_only')
def failcheck(store):
mapper = filesystem.get_mapper(store)
try:
xr.open_dataset(mapper, engine='zarr', consolidated=True, use_cftime=True)
return ('success', None)
except Exception as e:
return (store, e)
b_computed = []
for s in stores:
b_computed.append(failcheck(s))
fails = [b for b in b_computed if b[0] != 'success']
with open('report.txt', 'a') as file:
for fail in fails:
file.write(f"{','.join([str(f) for f in fail])}\n")
print(f'Found {len(fails)} failed stores')
I have tested this and it works well for small amounts of stores, but my goal here is to run this on all stores we provided (> 500k single stores) and with a simple for loop this might take on the order of days. So I am curious if there is a way to speed up this process. I first tried to parallelize the process with dask by replacing
b_computed = []
for s in stores:
b_computed.append(failcheck(s))
with
import dask.bag as db
b = db.from_sequence(stores, partition_size=25).map(failcheck)
b_computed = list(b)
and this gives me a nice speedup when I have many cores available (e.g. on the larger pangeo cloud hub instances), but still takes very long when I run it as a github action (which I think has 2 or 4 cores available).
I am just curious if there is a way to use async to open multiple stores concurrently similar to an http request. Has anyone had some experience with this sort of task?