Thanks, here is some sample code. I’m taking 10 variables that need to go through a slow function. I’m mocking this up with a sleep here. So worse case would be serial, taking 50 seconds.
import xarray as xr
import dask.array as da
import time
import dask
from dask.distributed import Client
client = Client(n_workers=5)
def double_fun(da):
time.sleep(5)
return da*2
@dask.delayed
def delayed_double_fun(da):
time.sleep(5)
return da*2
# create a dataset
ds = xr.Dataset(
{
"A": (('y', 'x'), da.ones(shape=(10, 10), chunks=(10, 10))),
"B": (('y', 'x'), da.ones(shape=(10, 10), chunks=(10, 10))),
"C": (('y', 'x'), da.ones(shape=(10, 10), chunks=(10, 10))),
"D": (('y', 'x'), da.ones(shape=(10, 10), chunks=(10, 10))),
"E": (('y', 'x'), da.ones(shape=(10, 10), chunks=(10, 10))),
"F": (('y', 'x'), da.ones(shape=(10, 10), chunks=(10, 10))),
"G": (('y', 'x'), da.ones(shape=(10, 10), chunks=(10, 10))),
"H": (('y', 'x'), da.ones(shape=(10, 10), chunks=(10, 10))),
"I": (('y', 'x'), da.ones(shape=(10, 10), chunks=(10, 10))),
"J": (('y', 'x'), da.ones(shape=(10, 10), chunks=(10, 10))),
}
)
st = time.time()
ds_double = ds.map(double_fun)
et = time.time()
elapsed_time = et - st
print('Map() total time:', elapsed_time, 'seconds')
st = time.time()
results = []
for var in ds.data_vars:
results.append(delayed_double_fun(ds[var]))
ds_double = xr.merge(dask.compute(results)[0][:])
et = time.time()
elapsed_time = et - st
print('Dask.compute total time:', elapsed_time, 'seconds')
client.close()
As expected, I’m getting 50s on xr.map() but also getting 5.6s on the Dask delayed version. The real scenario is spatial averaging code, which seems particularly slow when I have 40 something small variables. I was just hoping for some way to process these variables in parallel. I tried the Dask delayed option obviously, but it seems to be blowing out the memory or dropping workers, so the the Dask client becomes unstable. I’m not sure why the memory footprint is so large, it seems like it’s trying to pre-load all the data assigned to a worker (meaning multiple variables within worker rather that loading in serial within the worker).