I’ve been experiencing curious behaviour when using xarray and dask-jobqueue to parallelise the computation of a monthly climatology from zarr-format data stored on an ultra-fast parallel file storage system (https://www.beegfs.io/content/).
The memory load on each worker increases very rapidly throughout the operation, to values substantially larger than what I believe any one worker should have at any one time based on the chunking. This does not occur (or at least occurs to a much lesser degree) when the data is on a different file system.
It’s almost as though the workers are unable to reduce and process data at a sufficient rate to keep up with reading input. This probably makes no sense.
Given that the issue seems to be related to the hardware I’m using, it’s difficult for me to provide a reproducible example. So, I’m seeking input regarding how to debug and understand this issue. My apologies in advance if this is not the right forum for this type of question.
Hi @dougiesquire - welcome to the forum and thanks for your message!
What you have encountered is a long-standing issue with dask. There is a github issue for it:
and a lot of active development happening in this area.
I strongly encourage you to engage on that github issue and describe your experience. Particularly useful would be to see the actual code you are using to trigger this memory overload. The dask gurus might have some useful workarounds.
Hi @dougiesquire–just wanted to follow up. If you could weigh in on the linked github issue, it would probably add a lot to the discussion and motivate the dask developers to continue to focus on this issue. The more people they see chiming in, the higher priority it might receive.
Let me know if there is any way I can help you with this.
Forgot one thing: you may want to use the Dask Dashboard to see if your guess is correct. It would tell you a lot of information about what volume is being stored in memory on Dask side!
I’ve posted this issue on the recommended github thread.
I should have been clearer in my original post - my references to “memory load” are as diagnosed by the Dask Dashboard. I’m pretty confident that my case is associated with Dask.
@dougiesquire I have had similar issues and the work around I have used for the CSIRO HPC is to allow spilling to disk as the nodes have local storage accessible at $TMPDIR - I believe this is better than using a location on the parallel file system due to the many small reads and writes that happen with caching. The workers will go orange, but your calculation may still complete without killing the workers.
Here is the jobqueue.yml that I use:
distributed:
worker:
memory:
target: 0.6
spill: 0.75
pause: 0.80 # fraction at which we pause worker threads
terminate: 0.95 # fraction at which we terminate the worker
When working on Pawsey HPC, the nodes dont have local storage, but if you are happy to use containers with Shifter you can create a so called ‘per-node-cache’ that is overlaid into the container filesystem and reduces the metadata load on the parallel file system, example here:
increase the size=4G part to suit your needs, I have used up to 40G per worker without issue.
Hopefully some of the ideas that @TomAugspurger and @rabernat have discussed in those github issues help in a future dask release.
Thanks @pbranson! I’m hopeful of potential solutions implemented within dask, In the meantime, spilling to disk at least means the jobs will finish and these are great tips for Australian HPC systems.
Not surprisingly, within a similar CSIRO Pangeo HPC environment including our new fast BeeGFS storage, I seem to be hitting similar issues. @pbranson my attempt to set the local-directory to $TMPDIR doesn’t appear to assist? This is anecdotal as I’ve not had time to do lots of comparisons. As someone not well versed in investigating Dask code under the hood I’m wondering what I could be logging or documenting that might help others explore a fix in Dask?