Thanks for sharing! Iām trying this out on a somewhat common workload that should stress exactly this situation: making a cloud-free mosaic of many satellite images by taking a median over time. We have a 4-D array (time
, y
, x
, band
) and want to take the median over time
. With satellite imagery, the source array essentially always has a chunksiz of time=1
, and may or may not be chunked along the other dimensions.
To do the median computation, dask.array
needs to rechunk the array to be contiguous in time. Essentially, the chunking on disk is exactly wrong for this operation, so we need to rechunk a lot of data.
My first couple attempts have consistently failed with a CancelledError
, resulting from a TimeoutError
.
---------------------------------------------------------------------------
CancelledError Traceback (most recent call last)
File /srv/conda/envs/notebook/lib/python3.10/asyncio/tasks.py:418, in wait_for()
417 try:
--> 418 return fut.result()
419 except exceptions.CancelledError as exc:
CancelledError:
The above exception was the direct cause of the following exception:
TimeoutError Traceback (most recent call last)
File /srv/conda/envs/notebook/lib/python3.10/site-packages/distributed/comm/core.py:330, in connect()
329 handshake = await wait_for(comm.read(), time_left())
--> 330 await wait_for(comm.write(local_info), time_left())
331 except Exception as exc:
File /srv/conda/envs/notebook/lib/python3.10/site-packages/distributed/utils.py:1812, in wait_for()
1811 async def wait_for(fut: Awaitable[T], timeout: float) -> T:
-> 1812 return await asyncio.wait_for(fut, timeout)
File /srv/conda/envs/notebook/lib/python3.10/asyncio/tasks.py:420, in wait_for()
419 except exceptions.CancelledError as exc:
--> 420 raise exceptions.TimeoutError() from exc
422 waiter = loop.create_future()
TimeoutError:
The above exception was the direct cause of the following exception:
OSError Traceback (most recent call last)
File /srv/conda/envs/notebook/lib/python3.10/site-packages/distributed/shuffle/_rechunk.py:41, in rechunk_transfer()
40 try:
---> 41 return _get_worker_extension().add_partition(
42 input,
43 input_partition=input_chunk,
44 shuffle_id=id,
45 type=ShuffleType.ARRAY_RECHUNK,
46 new=new,
47 old=old,
48 )
49 except Exception as e:
File /srv/conda/envs/notebook/lib/python3.10/site-packages/distributed/shuffle/_worker_extension.py:632, in add_partition()
631 shuffle = self.get_or_create_shuffle(shuffle_id, type=type, **kwargs)
--> 632 return sync(
633 self.worker.loop,
634 shuffle.add_partition,
635 data=data,
636 input_partition=input_partition,
637 )
File /srv/conda/envs/notebook/lib/python3.10/site-packages/distributed/utils.py:412, in sync()
411 typ, exc, tb = error
--> 412 raise exc.with_traceback(tb)
413 else:
File /srv/conda/envs/notebook/lib/python3.10/site-packages/distributed/utils.py:385, in f()
384 future = asyncio.ensure_future(future)
--> 385 result = yield future
386 except Exception:
File /srv/conda/envs/notebook/lib/python3.10/site-packages/tornado/gen.py:769, in run()
768 try:
--> 769 value = future.result()
770 except Exception:
File /srv/conda/envs/notebook/lib/python3.10/site-packages/distributed/shuffle/_worker_extension.py:354, in add_partition()
353 out = await self.offload(_)
--> 354 await self._write_to_comm(out)
355 return self.run_id
File /srv/conda/envs/notebook/lib/python3.10/site-packages/distributed/shuffle/_worker_extension.py:152, in _write_to_comm()
151 self.raise_if_closed()
--> 152 await self._comm_buffer.write(data)
File /srv/conda/envs/notebook/lib/python3.10/site-packages/distributed/shuffle/_buffer.py:189, in write()
188 if self._exception:
--> 189 raise self._exception
190 if not self._accepts_input or self._inputs_done:
File /srv/conda/envs/notebook/lib/python3.10/site-packages/distributed/shuffle/_buffer.py:107, in process()
106 try:
--> 107 await self._process(id, shards)
108 self.bytes_written += size
File /srv/conda/envs/notebook/lib/python3.10/site-packages/distributed/shuffle/_comms.py:71, in _process()
70 with self.time("send"):
---> 71 await self.send(address, shards)
File /srv/conda/envs/notebook/lib/python3.10/site-packages/distributed/shuffle/_worker_extension.py:123, in send()
122 self.raise_if_closed()
--> 123 return await self.rpc(address).shuffle_receive(
124 data=to_serialize(shards),
125 shuffle_id=self.id,
126 run_id=self.run_id,
127 )
File /srv/conda/envs/notebook/lib/python3.10/site-packages/distributed/core.py:1231, in send_recv_from_rpc()
1230 kwargs["deserializers"] = self.deserializers
-> 1231 comm = await self.pool.connect(self.addr)
1232 prev_name, comm.name = comm.name, "ConnectionPool." + key
File /srv/conda/envs/notebook/lib/python3.10/site-packages/distributed/core.py:1475, in connect()
1474 raise
-> 1475 return await connect_attempt
File /srv/conda/envs/notebook/lib/python3.10/site-packages/distributed/core.py:1396, in _connect()
1395 self._connecting_count += 1
-> 1396 comm = await connect(
1397 addr,
1398 timeout=timeout or self.timeout,
1399 deserialize=self.deserialize,
1400 **self.connection_args,
1401 )
1402 comm.name = "ConnectionPool"
File /srv/conda/envs/notebook/lib/python3.10/site-packages/distributed/comm/core.py:334, in connect()
333 await comm.close()
--> 334 raise OSError(
335 f"Timed out during handshake while connecting to {addr} after {timeout} s"
336 ) from exc
338 comm.remote_info = handshake
OSError: Timed out during handshake while connecting to tls://10.244.9.6:42079 after 30 s
The above exception was the direct cause of the following exception:
RuntimeError Traceback (most recent call last)
Cell In[6], line 8
6 with distributed.performance_report("new.html"):
7 with dask.config.set({"array.rechunk.method": "p2p", "optimization.fuse.active": False}):
----> 8 median = data.median(dim="time").compute()
File /srv/conda/envs/notebook/lib/python3.10/site-packages/xarray/core/dataarray.py:1089, in DataArray.compute(self, **kwargs)
1070 """Manually trigger loading of this array's data from disk or a
1071 remote source into memory and return a new array. The original is
1072 left unaltered.
(...)
1086 dask.compute
1087 """
1088 new = self.copy(deep=False)
-> 1089 return new.load(**kwargs)
File /srv/conda/envs/notebook/lib/python3.10/site-packages/xarray/core/dataarray.py:1063, in DataArray.load(self, **kwargs)
1045 def load(self: T_DataArray, **kwargs) -> T_DataArray:
1046 """Manually trigger loading of this array's data from disk or a
1047 remote source into memory and return this array.
1048
(...)
1061 dask.compute
1062 """
-> 1063 ds = self._to_temp_dataset().load(**kwargs)
1064 new = self._from_temp_dataset(ds)
1065 self._variable = new._variable
File /srv/conda/envs/notebook/lib/python3.10/site-packages/xarray/core/dataset.py:746, in Dataset.load(self, **kwargs)
743 import dask.array as da
745 # evaluate all the dask arrays simultaneously
--> 746 evaluated_data = da.compute(*lazy_data.values(), **kwargs)
748 for k, data in zip(lazy_data, evaluated_data):
749 self.variables[k].data = data
File /srv/conda/envs/notebook/lib/python3.10/site-packages/dask/base.py:599, in compute(traverse, optimize_graph, scheduler, get, *args, **kwargs)
596 keys.append(x.__dask_keys__())
597 postcomputes.append(x.__dask_postcompute__())
--> 599 results = schedule(dsk, keys, **kwargs)
600 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
File /srv/conda/envs/notebook/lib/python3.10/site-packages/distributed/client.py:3168, in Client.get(self, dsk, keys, workers, allow_other_workers, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs)
3166 should_rejoin = False
3167 try:
-> 3168 results = self.gather(packed, asynchronous=asynchronous, direct=direct)
3169 finally:
3170 for f in futures.values():
File /srv/conda/envs/notebook/lib/python3.10/site-packages/distributed/client.py:2328, in Client.gather(self, futures, errors, direct, asynchronous)
2326 else:
2327 local_worker = None
-> 2328 return self.sync(
2329 self._gather,
2330 futures,
2331 errors=errors,
2332 direct=direct,
2333 local_worker=local_worker,
2334 asynchronous=asynchronous,
2335 )
File /srv/conda/envs/notebook/lib/python3.10/site-packages/distributed/utils.py:345, in SyncMethodMixin.sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
343 return future
344 else:
--> 345 return sync(
346 self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
347 )
File /srv/conda/envs/notebook/lib/python3.10/site-packages/distributed/utils.py:412, in sync(loop, func, callback_timeout, *args, **kwargs)
410 if error:
411 typ, exc, tb = error
--> 412 raise exc.with_traceback(tb)
413 else:
414 return result
File /srv/conda/envs/notebook/lib/python3.10/site-packages/distributed/utils.py:385, in sync.<locals>.f()
383 future = wait_for(future, callback_timeout)
384 future = asyncio.ensure_future(future)
--> 385 result = yield future
386 except Exception:
387 error = sys.exc_info()
File /srv/conda/envs/notebook/lib/python3.10/site-packages/tornado/gen.py:769, in Runner.run(self)
766 exc_info = None
768 try:
--> 769 value = future.result()
770 except Exception:
771 exc_info = sys.exc_info()
File /srv/conda/envs/notebook/lib/python3.10/site-packages/distributed/client.py:2191, in Client._gather(self, futures, errors, direct, local_worker)
2189 exc = CancelledError(key)
2190 else:
-> 2191 raise exception.with_traceback(traceback)
2192 raise exc
2193 if errors == "skip":
File /srv/conda/envs/notebook/lib/python3.10/site-packages/distributed/shuffle/_rechunk.py:50, in rechunk_transfer()
41 return _get_worker_extension().add_partition(
42 input,
43 input_partition=input_chunk,
(...)
47 old=old,
48 )
49 except Exception as e:
---> 50 raise RuntimeError(f"rechunk_transfer failed during shuffle {id}") from e
RuntimeError: rechunk_transfer failed during shuffle cc9531b7820cc766da633788babf811c
Iām using this notebook. I suspect Iāll be able to simplify things. Iām going to dig through the logs first to see whatās going on (I donāt think a worker is dying, but will confirm that).
Edit: Here are the logs from the worker that seemed to run into issues first: shuffle-logs.txt Ā· GitHub. Things start OK, then we get some warnings about the GIL, and then we get the note that a TCP connection failed.
2023-03-20 15:16:29,763 - distributed.comm.tcp - INFO - Connection from tls://10.244.10.9:44336 closed before handshake completed
2023-03-20 15:14:46,236 - distributed.core - INFO - Starting established connection to tls://dask-351d74e2f7f644beb571a17408e81902.staging:8786
2023-03-20 15:14:59,341 - distributed.core - INFO - Event loop was unresponsive in Worker for 9.18s. This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
2023-03-20 15:15:00,410 - distributed.utils_perf - INFO - full garbage collection released 47.80 MiB from 0 reference cycles (threshold: 9.54 MiB)
2023-03-20 15:15:03,907 - distributed.core - INFO - Event loop was unresponsive in Worker for 4.53s. This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
2023-03-20 15:15:43,799 - distributed.core - INFO - Event loop was unresponsive in Worker for 3.59s. This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
2023-03-20 15:15:48,464 - distributed.core - INFO - Event loop was unresponsive in Worker for 4.66s. This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
2023-03-20 15:15:51,847 - distributed.core - INFO - Event loop was unresponsive in Worker for 3.38s. This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
2023-03-20 15:16:29,763 - distributed.comm.tcp - INFO - Connection from tls://10.244.10.9:44336 closed before handshake completed
2023-03-20 15:18:05,967 - distributed.shuffle._comms - ERROR - Shuffle cc9531b7820cc766da633788babf811c forgotten
Traceback (most recent call last):
File "/srv/conda/envs/notebook/lib/python3.10/site-packages/distributed/shuffle/_comms.py", line 71, in _process
await self.send(address, shards)
File "/srv/conda/envs/notebook/lib/python3.10/site-packages/distributed/shuffle/_worker_extension.py", line 122, in send
self.raise_if_closed()
File "/srv/conda/envs/notebook/lib/python3.10/site-packages/distributed/shuffle/_worker_extension.py", line 163, in raise_if_closed
raise self._exception
RuntimeError: Shuffle cc9531b7820cc766da633788babf811c forgotten