Collecting problematic workloads

In the Pangeo Distributed Arrays Working Group we’re interested in trying out tricky Pangeo workloads on various distributed backends, e.g. Dask, Cubed, Beam…

We want your problematic workloads! If you have any xarray/pangeo workflow which runs poorly or suboptimally at scale, we would love to see it. We’re aiming to collect a range of such examples in this repository, to eventually create benchmarks out of them. (Feel free to post things that have been posted before elsewhere too!)

cc @tomwhite @keewis @dcherian

6 Likes

While many of these workflows are probably related to large single datasets I wanted to emphasize the importance of being able to process many similar datasets. Of course the prime example for this is CMIP: “Typical” CMIP analysis involves running a similar analysis over often 100s of small to medium sized xarray Datasets.

Consider this simple example:

# Load all monthly sea surface temperature datasets from CMIP6
from tqdm.auto import tqdm
from xmip.utils import google_cmip_col
from xmip.preprocessing import combined_preprocessing
col = google_cmip_col()
cat = col.search(
    variable_id='tos',
    experiment_id='historical',
    table_id='Omon',
    grid_label='gn',
    source_id=['GFDL-ESM4', 'GFDL-CM4', 'IPSL-CM6A-LR'], # select a subset for demo 
)
ddict = cat.to_dataset_dict(
    preprocess=combined_preprocessing,
    aggregate=False,
)
# apply analysis, this is lazy and fast
ddict_derived = {k:ds.mean(['x', 'y']) for k, ds in ddict.items()}

The way I usually do this is with a loop

output_bucket = 'gs://leap-persistent/jbusecke/demo/'

for k,ds in ddict_derived.items():
    path = f"{output_bucket}/traditional/{k}.zarr"
    ds.to_zarr(path, mode='w')

This however is very inefficient


The task graph shows many gaps.

One way I have found around this is to do is to write some custom dask logic

from distributed import Client
client = Client()

def write_wrapper(k, ds):
    path = f"{output_bucket}/client_map/{k}.zarr"
    with worker_client() as client:
        ds.to_zarr(store=path, mode='w')
    return k
   
futures = client.map(
    write_wrapper,
    ddict_derived.keys(),
    ddict_derived.values()
)
wait(futures)

This works really well in this case (~70s vs 340s runtime), showing a much more effient task graph:

This problem has come up several times for me (see e.g. here) but I also have seen performance issues with this approach using dask. When submitting many 110s of datasets the scheduler memory use is high and the scheduler itself seems to become fairly unstable. The ideal approach for this is probably some sort of streaming submission queue (using as_completed?) to ensure utilization of the workers and not overwhelm the scheduler.
While I am very curious to discuss practical improvements to this workflow using dask (happy to open another issue for that somewhere), I think the main point here is that other frameworks might more naturally be suited for this important type of workflow, and optimization of this would benefit many researchers in the climate science community.

1 Like

Yes, to_zarr blocks by default. Try tasks.append(ds.to_zarr(path, mode="w", compute=False)) and then later dask.compute(tasks)

@dcherian thanks. Even with compute=False this will take quite long in serial (I estimated ~1 hour for the amount of datasets in that particular workflow). I think whatever is happening in the background (init of zarr store?) is still taking quite long. with the approach above this is nicely ‘folded’ in between actual store tasks.

Interesting. It’d be nice to profile that and open an issue!

With compute=False, you still compute and store the dataset “template” eagerly. As we know, the cost of initializing Zarr cloud datasets can be pretty high, particularly those with many variables to create. That could be part of the story here.

1 Like

@jbusecke see Add `metadata_only` param to `.to_zarr`? · Issue #8343 · pydata/xarray · GitHub .