While many of these workflows are probably related to large single datasets I wanted to emphasize the importance of being able to process many similar datasets. Of course the prime example for this is CMIP: “Typical” CMIP analysis involves running a similar analysis over often 100s of small to medium sized xarray Datasets.
Consider this simple example:
# Load all monthly sea surface temperature datasets from CMIP6
from tqdm.auto import tqdm
from xmip.utils import google_cmip_col
from xmip.preprocessing import combined_preprocessing
col = google_cmip_col()
cat = col.search(
variable_id='tos',
experiment_id='historical',
table_id='Omon',
grid_label='gn',
source_id=['GFDL-ESM4', 'GFDL-CM4', 'IPSL-CM6A-LR'], # select a subset for demo
)
ddict = cat.to_dataset_dict(
preprocess=combined_preprocessing,
aggregate=False,
)
# apply analysis, this is lazy and fast
ddict_derived = {k:ds.mean(['x', 'y']) for k, ds in ddict.items()}
The way I usually do this is with a loop
output_bucket = 'gs://leap-persistent/jbusecke/demo/'
for k,ds in ddict_derived.items():
path = f"{output_bucket}/traditional/{k}.zarr"
ds.to_zarr(path, mode='w')
This however is very inefficient
The task graph shows many gaps.
One way I have found around this is to do is to write some custom dask logic
from distributed import Client
client = Client()
def write_wrapper(k, ds):
path = f"{output_bucket}/client_map/{k}.zarr"
with worker_client() as client:
ds.to_zarr(store=path, mode='w')
return k
futures = client.map(
write_wrapper,
ddict_derived.keys(),
ddict_derived.values()
)
wait(futures)
This works really well in this case (~70s vs 340s runtime), showing a much more effient task graph:
This problem has come up several times for me (see e.g. here) but I also have seen performance issues with this approach using dask. When submitting many 110s of datasets the scheduler memory use is high and the scheduler itself seems to become fairly unstable. The ideal approach for this is probably some sort of streaming submission queue (using as_completed?) to ensure utilization of the workers and not overwhelm the scheduler.
While I am very curious to discuss practical improvements to this workflow using dask (happy to open another issue for that somewhere), I think the main point here is that other frameworks might more naturally be suited for this important type of workflow, and optimization of this would benefit many researchers in the climate science community.