Issues getting started with Xarray and Dask on Pangeo

I’ve been trying to parallelize a high resolution (2 m DEM) Xarray workflow using Dask and run it on Pangeo. I’ve encountered a host of issues (including memory leakage crashing the cluster - changing my chunk size seemed to help this problem - to Cancellation errors). I think the cancellation error is ultimately a memory problem (the final exception is asyncio.exceptions.CancelledError but the stack trace includes ssl.SSLCertVerificationError: [SSL: CERTIFICATE_VERIFY_FAILED] certificate verify failed: self signed certificate (_ssl.c:1124) and distributed.client - ERROR - Failed to reconnect to scheduler after 10.00 seconds, closing client_GatheringFuture exception was never retrieved as well (I’m happy to share the whole thing - it’s quite long and not my primary question at the moment)).

I encountered the above errors after setting up my cluster as:

from dask_gateway import GatewayCluster
cluster = GatewayCluster()
cluster.adapt(minimum=2, maximum=10)  # or cluster.scale(n) to a fixed size.
client = cluster.get_client()
client

In trying to troubleshoot this I wanted to play with my cluster settings, but I wasn’t getting real-time info from the Dask dashboard in a separate browser window (I’m on a tired, 8 year old computer while my new one is out for repairs). I’ve seen the Pangeo+Dask integration demo-ed a few times and wanted to launch my cluster through JupyterLab instead so I could use those features. I’m able to start a Dask cluster (though it takes awhile - is this normal? I remember it being faster in workshops), but when I inject the client code:

from dask.distributed import Client
client = Client("gateway://traefik-icesat2-prod-dask-gateway.icesat2-prod:80/icesat2-prod.c09995cf7b1340609256f1c8460b5e0b")
client

into any notebook and try to run it I get a ValueError (full stack trace at the end of this post). I wanted to report this as unexpected behavior and also ask, how can I get the Dask Dashboard as a panel within my Jupyter Lab environment? I’m working on the stable Pangeo image.

Full stack trace:

---------------------------------------------------------------------------
ValueError                                Traceback (most recent call last)
<ipython-input-4-5de97e35304a> in <module>
      1 from dask.distributed import Client
      2 
----> 3 client = Client("gateway://traefik-icesat2-prod-dask-gateway.icesat2-prod:80/icesat2-prod.ef3014285d52450084bc4f1fbfde6f94")
      4 client

/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/client.py in __init__(self, address, loop, timeout, set_as_default, scheduler_file, security, asynchronous, name, heartbeat_interval, serializers, deserializers, extensions, direct_to_workers, connection_limit, **kwargs)
    746             ext(self)
    747 
--> 748         self.start(timeout=timeout)
    749         Client._instances.add(self)
    750 

/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/client.py in start(self, **kwargs)
    954             self._started = asyncio.ensure_future(self._start(**kwargs))
    955         else:
--> 956             sync(self.loop, self._start, **kwargs)
    957 
    958     def __await__(self):

/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/utils.py in sync(loop, func, callback_timeout, *args, **kwargs)
    338     if error[0]:
    339         typ, exc, tb = error[0]
--> 340         raise exc.with_traceback(tb)
    341     else:
    342         return result[0]

/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/utils.py in f()
    322             if callback_timeout is not None:
    323                 future = asyncio.wait_for(future, callback_timeout)
--> 324             result[0] = yield future
    325         except Exception as exc:
    326             error[0] = sys.exc_info()

/srv/conda/envs/notebook/lib/python3.8/site-packages/tornado/gen.py in run(self)
    760 
    761                     try:
--> 762                         value = future.result()
    763                     except Exception:
    764                         exc_info = sys.exc_info()

/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/client.py in _start(self, timeout, **kwargs)
   1044 
   1045         try:
-> 1046             await self._ensure_connected(timeout=timeout)
   1047         except (OSError, ImportError):
   1048             await self._close()

/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/client.py in _ensure_connected(self, timeout)
   1101 
   1102         try:
-> 1103             comm = await connect(
   1104                 self.scheduler.address, timeout=timeout, **self.connection_args
   1105             )

/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/comm/core.py in connect(addr, timeout, deserialize, handshake_overrides, **connection_args)
    264 
    265     scheme, loc = parse_address(addr)
--> 266     backend = registry.get_backend(scheme)
    267     connector = backend.get_connector()
    268     comm = None

/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/comm/registry.py in get_backend(scheme)
     79         )
     80         if backend is None:
---> 81             raise ValueError(
     82                 "unknown address scheme %r (known schemes: %s)"
     83                 % (scheme, sorted(backends))

ValueError: unknown address scheme 'gateway' (known schemes: ['inproc', 'tcp', 'tls', 'ucx'])
1 Like

Hi!

I am a heavy user of Pangeo - but not a hard core developer. But my first zero order solution to this issue is: do not use adapt.

For some reason does not work as it should. Based on a LOT of frustration accumulated in the past months, my explanation for these issues is that the adapt() scaling closes/kills workers too soon, and what happens is that parts of your computation are lost before being handled by other workers.

Other observation I had is that even with simple scale() - so no adapt - some times it happens that there is some “miscommunication” - for lack of better word - in the workflow, but you will see a huge improvement, I believe, if you stop using adapt().

Other important thing I want to point you to is to make sure that the clusters you are loosing connection with are correctly terminated and they don’t just hang in the background.

I have noticed that whenever I had this issue - meaning i lost connection with my cluster while using adapt() - even if I did cluster.close() or restart kernel, or even restart server (log out and in), the clusters were still there as zombie clusters, often still with x number of workers assigned (holding memory that cannot be used by others). In theory, unused clusters should terminate themselves after a certain amount of idle time, but sometimes they don’t.

I invite you to read on this issue (linked below) and see if you have many clusters hanging, and 1) scale them to zero after connecting to them, 2) closing them. In case you are unable to close them (there is a delay but eventually they should die) please open an issue and report them.

issue talking about zombie clusters

so summarizing:
open another notebook
run:

from dask_gateway import Gateway
g = Gateway()
g.list_clusters()

it will list you all the clusters hanging. Hopefully you have none - unless you are running something
then you can connect to any of those by doing

cluster = g.connect(g.list_clusters()[0].name)
cluster

or [1] or [2] and so on, depending on how many clusters you have
and then if you are certain that they shouldn’t be there you can scale them to zero
cluster.scale(0)
so at least they don’t hold memory
and then try to kill them
cluster.close()

please let me know if any of what I wrote is not clear. I have been there! happy to help.

Chiara

Sorry I meant to also respond about the dask dashboard

This trick of connecting to a cluster in a separate notebook works essentially as a dask dashboard.
In fact if I have to scale up or scale down a cluster running in another notebook, this is what i do:

  1. connect to cluster as indicated above
  2. scale up/down

and you can switch across your cluster by simply connecting to the various clusters one at the time.

1 Like

The code injected by the labextension to connect a client is incorrect (there’s an issue tracking this on the dask-labextension repo I think).

You should be able to copy the URL of the dashboard into the labextension window and preset Enter. There’s an incompatibility with the latest dask-labextension that’s being fixed, but I think the pangeo hubs are pinned to older versions.

1 Like

Thanks @chiaral for these helpful tips. Knowing how to list my open clusters has been particularly helpful. I also do not see them shut down when I use cluster.close(), but when I checked back in a few hours later they were no longer active, so at least I don’t seem to have zombies.

I’ve made some progress (though my kernel keeps crashing so I’m clearly still having some issue! The dashboard isn’t showing particularly high resource usage (I’m using a relatively small dummy example to test my conversion of my code to using Dask)), but I have the feeling it’s a memory issue. I’ll be digging more into this next week.

1 Like

Thanks @TomAugspurger. I wondered if that was the case but didn’t know where in the Pangeo-dask-jupyterlab ecosystem to look for one (with your suggestion I found a few issues reporting similar things so I’ll wait for the patches to see if I have a separate issue)!

1 Like

I thought I’d figured it out (I was reading in a large netcdf and THEN rechunking it later on when I interped it, rather than chunking it on read-in, so I was accidentally loading it all into RAM). So it makes some sense that before I added chunking I got a Canceled error (presumably the cluster running out of memory, though the dask dashboard shows next to no usage and the same step worked fine earlier - it seems like there’s a lot of variability in cluster performance though).

However, now I’m completely baffled. If I use chunking
newdset = xr.open_dataset(newfile, chunks={'x': 500, 'y': 500}),
then later on in my code (as best as I can tell when dask actually tries to do the calculations that use the variable added from newdset, which until now has been allowed to be lazily loaded as far as I understand) I get a
FileNotFoundError: [Errno 2] No such file or directory: b'/home/jovyan/icebath/notebooks/supporting_docs/160281892/BedMachineGreenland-2017-09-20.nc'.

I cannot for the life of me figure out why (and if I copy-paste the string from the error into my notebook and run xr.open_dataset('/home/jovyan/icebath/notebooks/supporting_docs/160281892/BedMachineGreenland-2017-09-20.nc') it works fine. This happens whether I just open (but not close) the file within my function or if I open it within a with statement (which I know is better practice but took out to try and debug). If I view the xarray dataset, the variable added from newdset is there. So I guess my questions are:

  1. When/why is it trying to re-access the file?
  2. Why can’t it find it/what can I do about this error?

Question:

  • Are you running this in the cloud on our GCP or AWS clusters?
  • Are you using dask gateway?

If the answer to both is “yes”, then the problem is simple: the dask workers cannot see your home directory, so they can’t open the file! Your home directory is not part of a shared filesystem. It only exists on the notebook pod.

The solution is to put your data into object storage (S3 or GCS) instead of your home directory.

We are lacking documentation of lots of things, but this part about home directories / files is in fact well documented here:

https://pangeo.io/cloud.html#cloud-object-storage

Thanks @rabernat! I do believe the answer to both questions is “yes”, so your explanation makes perfect sense (and thanks for the link to the right part of the docs to move forward). Was I not seeing this error previously because I wasn’t using chunking (and thus dask) during the actual file read-in, so the cluster wasn’t trying to access the file, only data already read into memory?

2 Likes

:bell: Bingo! You’ve got it.

1 Like