If you use explicit figure and axes handles, it works fine with threads, otherwise you’ll get a mess
import dask
import matplotlib.pyplot as plt
import xarray as xr
def plot(ds):
time = ds["time"]
if sum(ds.shape) > 0:
f, ax = plt.subplots(1,1)
ds.plot(ax=ax)
f.savefig(str(time.values[0])[:16])
plt.close(f)
return time
ds = xr.tutorial.open_dataset("air_temperature").isel(time=slice(0, 10))
tasks = ds["air"].chunk({"time": 1, "lat": -1, "lon": -1}).map_blocks(plot)
tasks.compute(num_workers=4, scheduler="threads")
and I don’t see any issues with the output either with threads or processes.
I’m using
INSTALLED VERSIONS
------------------
commit: fe036ae443ecc202a04877b67526133a48963b43
python: 3.8.6 | packaged by conda-forge | (default, Jan 25 2021, 23:21:18)
[GCC 9.3.0]
python-bits: 64
OS: Linux
OS-release: 5.8.0-40-generic
machine: x86_64
processor: x86_64
byteorder: little
LC_ALL: None
LANG: en.UTF-8
LOCALE: en_US.UTF-8
libhdf5: 1.10.6
libnetcdf: 4.7.4
xarray: 0.16.3.dev119+g8a3912c72.d20210202
pandas: 1.2.1
numpy: 1.20.0
scipy: 1.5.3
netCDF4: 1.5.5.1
pydap: installed
h5netcdf: 0.8.1
h5py: 3.1.0
Nio: None
zarr: 2.6.1
cftime: 1.4.1
nc_time_axis: 1.2.0
PseudoNetCDF: None
rasterio: 1.2.0
cfgrib: None
iris: 2.4.0
bottleneck: 1.3.2
dask: 2021.01.1
distributed: 2021.01.1
matplotlib: 3.3.4
cartopy: 0.18.0
seaborn: 0.11.1
numbagg: None
pint: 0.16.1
setuptools: 49.6.0.post20210108
pip: 21.0
conda: 4.9.2
pytest: 6.2.2
IPython: 7.20.0
sphinx: 3.4.3
Re:the delayed solution; I have generally found that you shouldn’t pass dask collections to delayed functions, I think it computes the whole thing and sends it to the function. For example, see
import xarray as xr
import dask
import matplotlib.pyplot as plt
@dask.delayed
def plot(ds, time):
import dask
if not dask.base.is_dask_collection(ds):
raise ValueError
plt.figure()
ds.sel(time=time)['air'].plot()
plt.savefig(str(time)[:16])
plt.close()
ds = xr.tutorial.open_dataset('air_temperature').isel(
time=slice(0, 10))
tasks = [plot(ds, time) for time in ds['time'].values]
dask.compute(tasks, scheduler='processes', num_workers=4)
This raises the ValueError
so it’ll only work if your dataset is small enough. Note that dask.array.Array.to_delayed()
exists. This use-case would be better served by implementing Dataset.to_delayed()