Hitting memory limit converting CMIP6 to numpy array

I have what likely boils down to a simple issue of how to read in large xarray datasets via dask. I am trying to convert a CMIP6 daily temperature zarr file to a numpy array, and then run subsequent calculations on it. The code works for lower spatial resolution models but fails with higher resolution ones, so I believe it is an issue of memory exceedance. The code below requests 10 workers but it also fails with 40 workers. Regardless, the dataset size is 17GB so 10 workers (each with 2 cores having 4GB combined) should be sufficient.

I have set up a Pangeo Binder environment using a copy of one of the environment files from the Pangeo GitHub through my repo here. The error arises when calling np.array() on the lazily loaded xr.Dataset.

Code snippet:

import pandas as pd
import numpy as np
import gcsfs
import xarray as xr
import zarr
import intake
import time

Set up dask cluster

from dask.distributed import Client
from dask_gateway import Gateway

gateway = Gateway()
cluster = gateway.new_cluster()
cluster.scale(10)
client = Client(cluster)

variable_id = ‘tasmax’
table_id = ‘day’
gcs = gcsfs.GCSFileSystem(token = ‘anon’)

Read in file data (daily)

zstore = ‘gs://cmip6/CMIP/AWI/AWI-CM-1-1-MR/historical/r1i1p1f1/day/tasmax/gn/’

mapper = gcs.get_mapper(zstore)
ds_historical = xr.open_zarr(mapper, consolidated = True)

Caculate 98th percentile temperature between 1980 and 2010 at each grid point

ds_baseline = ds_historical.sel(time = slice(str(1980), str(2010)))
ds_baseline = np.array(ds_baseline[variable_id]) ### Code fails here
threshold_grid = np.nanquantile(ds_baseline, q = 0.98, axis = 0)

The abbreviated error message is:

Exception in callback None()
handle:
Traceback (most recent call last):
File “/srv/conda/envs/notebook/lib/python3.6/site-packages/tornado/iostream.py”, line 1391, in _do_ssl_handshake
self.socket.do_handshake()
File “/srv/conda/envs/notebook/lib/python3.6/ssl.py”, line 1077, in do_handshake
self._sslobj.do_handshake()
File “/srv/conda/envs/notebook/lib/python3.6/ssl.py”, line 689, in do_handshake
self._sslobj.do_handshake()
OSError: [Errno 0] Error

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
File “/srv/conda/envs/notebook/lib/python3.6/site-packages/tornado/iostream.py”, line 702, in _handle_events
self._handle_read()
File “/srv/conda/envs/notebook/lib/python3.6/site-packages/tornado/iostream.py”, line 1472, in _handle_read
self._do_ssl_handshake()
File “/srv/conda/envs/notebook/lib/python3.6/site-packages/tornado/iostream.py”, line 1423, in _do_ssl_handshake
return self.close(exc_info=err)
File “/srv/conda/envs/notebook/lib/python3.6/site-packages/tornado/iostream.py”, line 617, in close
self._signal_closed()
File “/srv/conda/envs/notebook/lib/python3.6/site-packages/tornado/iostream.py”, line 647, in _signal_closed
self._ssl_connect_future.exception()
concurrent.futures._base.CancelledError

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
File “/srv/conda/envs/notebook/lib/python3.6/asyncio/events.py”, line 145, in _run
self._callback(*self._args)
File “/srv/conda/envs/notebook/lib/python3.6/site-packages/tornado/platform/asyncio.py”, line 138, in _handle_events
handler_func(fileobj, events)
File “/srv/conda/envs/notebook/lib/python3.6/site-packages/tornado/iostream.py”, line 738, in _handle_events
self.close(exc_info=e)
File “/srv/conda/envs/notebook/lib/python3.6/site-packages/tornado/iostream.py”, line 617, in close
self._signal_closed()
File “/srv/conda/envs/notebook/lib/python3.6/site-packages/tornado/iostream.py”, line 647, in _signal_closed
self._ssl_connect_future.exception()
concurrent.futures._base.CancelledError

StreamClosedError Traceback (most recent call last)
/srv/conda/envs/notebook/lib/python3.6/site-packages/distributed/comm/tcp.py in read(self, deserializers)
183 try:
→ 184 n_frames = await stream.read_bytes(8)
185 n_frames = struct.unpack(“Q”, n_frames)[0]

StreamClosedError: Stream is closed

During handling of the above exception, another exception occurred:

CommClosedError Traceback (most recent call last)
in
23 ## Caculate threshold temperature for heatwave occurrence (varies spatially)
24 ds_baseline = ds_historical.sel(time = time_slice_baseline)
—> 25 ds_baseline = np.array(ds_baseline[variable_id])
26 hw_threshold_grid = np.nanquantile(ds_baseline, q = hw_baseline_percentile, axis = 0)
27 del ds_baseline

/srv/conda/envs/notebook/lib/python3.6/site-packages/xarray/core/common.py in array(self, dtype)
130
131 def array(self: Any, dtype: DTypeLike = None) → np.ndarray:
→ 132 return np.asarray(self.values, dtype=dtype)
133
134 def repr(self) → str:

/srv/conda/envs/notebook/lib/python3.6/site-packages/xarray/core/dataarray.py in values(self)
568 def values(self) → np.ndarray:
569 “”“The array’s data as a numpy.ndarray”“”
→ 570 return self.variable.values
571
572 @values.setter

/srv/conda/envs/notebook/lib/python3.6/site-packages/xarray/core/variable.py in values(self)
455 def values(self):
456 “”“The variable’s data as a numpy.ndarray”“”
→ 457 return _as_array_or_item(self._data)
458
459 @values.setter

/srv/conda/envs/notebook/lib/python3.6/site-packages/xarray/core/variable.py in _as_array_or_item(data)
258 TODO: remove this (replace with np.asarray) once these issues are fixed
259 “”"
→ 260 data = np.asarray(data)
261 if data.ndim == 0:
262 if data.dtype.kind == “M”:

/srv/conda/envs/notebook/lib/python3.6/site-packages/numpy/core/_asarray.py in asarray(a, dtype, order)
81
82 “”"
—> 83 return array(a, dtype, copy=False, order=order)
84
85

/srv/conda/envs/notebook/lib/python3.6/site-packages/dask/array/core.py in array(self, dtype, **kwargs)
1338
1339 def array(self, dtype=None, **kwargs):
→ 1340 x = self.compute()
1341 if dtype and x.dtype != dtype:
1342 x = x.astype(dtype)

/srv/conda/envs/notebook/lib/python3.6/site-packages/dask/base.py in compute(self, **kwargs)
165 dask.base.compute
166 “”"
→ 167 (result,) = compute(self, traverse=False, **kwargs)
168 return result
169

/srv/conda/envs/notebook/lib/python3.6/site-packages/dask/base.py in compute(*args, **kwargs)
445 postcomputes.append(x.dask_postcompute())
446
→ 447 results = schedule(dsk, keys, **kwargs)
448 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
449

/srv/conda/envs/notebook/lib/python3.6/site-packages/distributed/client.py in get(self, dsk, keys, restrictions, loose_restrictions, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs)
2686 should_rejoin = False
2687 try:
→ 2688 results = self.gather(packed, asynchronous=asynchronous, direct=direct)
2689 finally:
2690 for f in futures.values():

/srv/conda/envs/notebook/lib/python3.6/site-packages/distributed/client.py in gather(self, futures, errors, direct, asynchronous)
1986 direct=direct,
1987 local_worker=local_worker,
→ 1988 asynchronous=asynchronous,
1989 )
1990

/srv/conda/envs/notebook/lib/python3.6/site-packages/distributed/client.py in sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
831 else:
832 return sync(
→ 833 self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
834 )
835

/srv/conda/envs/notebook/lib/python3.6/site-packages/distributed/utils.py in sync(loop, func, callback_timeout, *args, **kwargs)
337 if error[0]:
338 typ, exc, tb = error[0]
→ 339 raise exc.with_traceback(tb)
340 else:
341 return result[0]

/srv/conda/envs/notebook/lib/python3.6/site-packages/distributed/utils.py in f()
321 if callback_timeout is not None:
322 future = asyncio.wait_for(future, callback_timeout)
→ 323 result[0] = yield future
324 except Exception as exc:
325 error[0] = sys.exc_info()

/srv/conda/envs/notebook/lib/python3.6/site-packages/tornado/gen.py in run(self)
733
734 try:
→ 735 value = future.result()
736 except Exception:
737 exc_info = sys.exc_info()

/srv/conda/envs/notebook/lib/python3.6/site-packages/distributed/client.py in _gather(self, futures, errors, direct, local_worker)
1874 else:
1875 self._gather_future = future
→ 1876 response = await future
1877
1878 if response[“status”] == “error”:

/srv/conda/envs/notebook/lib/python3.6/site-packages/distributed/client.py in _gather_remote(self, direct, local_worker)
1925
1926 else: # ask scheduler to gather data for us
→ 1927 response = await retry_operation(self.scheduler.gather, keys=keys)
1928
1929 return response

/srv/conda/envs/notebook/lib/python3.6/site-packages/distributed/utils_comm.py in retry_operation(coro, operation, *args, **kwargs)
388 delay_min=retry_delay_min,
389 delay_max=retry_delay_max,
→ 390 operation=operation,
391 )

/srv/conda/envs/notebook/lib/python3.6/site-packages/distributed/utils_comm.py in retry(coro, count, delay_min, delay_max, jitter_fraction, retry_on_exceptions, operation)
368 delay *= 1 + random.random() * jitter_fraction
369 await asyncio.sleep(delay)
→ 370 return await coro()
371
372

/srv/conda/envs/notebook/lib/python3.6/site-packages/distributed/core.py in send_recv_from_rpc(**kwargs)
859 name, comm.name = comm.name, “ConnectionPool.” + key
860 try:
→ 861 result = await send_recv(comm=comm, op=key, **kwargs)
862 finally:
863 self.pool.reuse(self.addr, comm)

/srv/conda/envs/notebook/lib/python3.6/site-packages/distributed/core.py in send_recv(comm, reply, serializers, deserializers, **kwargs)
642 await comm.write(msg, serializers=serializers, on_error=“raise”)
643 if reply:
→ 644 response = await comm.read(deserializers=deserializers)
645 else:
646 response = None

/srv/conda/envs/notebook/lib/python3.6/site-packages/distributed/comm/tcp.py in read(self, deserializers)
197 self.stream = None
198 if not shutting_down():
→ 199 convert_stream_closed_error(self, e)
200 else:
201 try:

/srv/conda/envs/notebook/lib/python3.6/site-packages/distributed/comm/tcp.py in convert_stream_closed_error(obj, exc)
121 raise CommClosedError(“in %s: %s: %s” % (obj, exc.class.name, exc))
122 else:
→ 123 raise CommClosedError(“in %s: %s” % (obj, exc))
124
125

CommClosedError: in : Stream is closed
distributed.client - ERROR - Failed to reconnect to scheduler after 10.00 seconds, closing client
_GatheringFuture exception was never retrieved
future: <_GatheringFuture finished exception=CancelledError()>

1 Like

A general tip with large datasets is to avoid converting to numpy arrays at all cost.

In this case, xarray has a quantile method that wraps np.nanquantile so just use that instead of np.nanquantile directly. You may need to rechunk to have 1 chunk along axis=0

If you need to apply a function that expects numpy arrays, use apply_ufunc with dask="parallelized": https://xarray.pydata.org/en/stable/generated/xarray.apply_ufunc.html#xarray.apply_ufunc (this is how xarray wraps nanquantile)

https://xarray.pydata.org/en/stable/dask.html#automatic-parallelization-with-apply-ufunc-and-map-blocks

1 Like

If lazy rechunking doesn’t work for you, you might want to check out rechunker, which can help create a copy of your data more suited to your analysis.

You might also want to read through this issue, as it is very similar to yours: