Issue with dask.distributed in multiple nodes of a cluster

Hi all!

I am working in a cluster with 3 nodes/machines. Each node/machine has its own cores, CPUS and RAM memory. I am trying to run a script which requires some parallel computing in order to open the files that I want to use. Therefore, in the first machine I am setting up a dask.distributed and client, in order to perform the parallel computing across its cores, like this:

Set up the dask scheduler

from dask.distributed import Client, LocalCluster
cluster      = LocalCluster()
client       = Client(cluster)
client       = Client(threads_per_worker=2, n_workers=16,processes=True)
print(client.dashboard_link)

Now open the files with parallel computing

chunks  = {'time_counter':1,'y':3057,'x':4321}
dataset  = xr.open_dataset('/home/files/ATM_FLUXES/HF_1995_2D.nc',chunks=chunks,engine='h5netcdf')
 mld       = dataset.somxlt02.isel(y=slice(lat1,lat2),x=slice(lon1,lon2))
 MLD     = mld.compute()

Then I run the rest of my program normally in the 1st machine.

Once I try to run the same script (but with different lon and lats) by setting up a dask.distributed and client as described above, in the 2nd machine/node I notice two things:

  1. The script running on the 1st machine is slowing down
  2. The script running on the 2nd machine is slowing down and almost every time clashes or breaks down eventually.

The datasets that I am handling are the same in the 2 nodes, the python libraries are the same, I can connect to each machine with SSH without passwords (and I have to connect to each machine manually as I want to control the lons and lats of my script every time).
Basically, I am trying to run the same script for different regions of the global ocean. And I would like to be able to run my script (and therefore perform the required parallel computing) of each region in a different node.

Any ideas as to how I can run the same script and the dask.distributed and client in different nodes without crashing everything?

Kind regards,
Sofi

Does your cluster have a batch scheduling system (e.g. PBS or SLURM)? If so, I strongly recommend you use Dask Jobqueue:

Hi @rabernat.

Thanks for your answer. If i am not mistaken the Dask-jobqueue is for centrally assigning jobs to different nodes from a central node of the cluster, no?. I do not want to do this. I need to go to each node separately and specify the initial conditions of my script there. I cannot do that while specifying jobs from a centrally-defined instance as the dask-jobqueue, can I?

Why? Why not let Dask distribute your work for you?

Maybe I have not understood it correctly but I think that I can only say to Dask distributed to run a single script as it is, in the different nodes, without being able to provide arguments that will change the initial conditions of the script in each node.Is that the case? I was under the impression that it can run the same script (and therefore give the same results), whereas I would like to run my script with different initial conditions in each node. Can this be done with Dask distribute?

Dask does not run scripts. It executes tasks, which you define from python. Sometimes these tasks can be generated via the high-level APIs like dask.array (used by xarray) or dask.dataframe. The various distributed schedulers allow these tasks to be executed over many nodes in a cluster.

I recommend going through the Dask tutorial to gain a better understanding of the fundamentals of dask:

Perhaps I did not express my self correctly. Yes, I am aware that dask does not run scripts. I have been using dask for quite some time now. What I mean is that the dask jobqueue is there to give some tasks to multiple workers/nodes centrally (If i understood correctly) with dask commands etc. Ok.What I want is to run in every different node/machine my script with different initial conditions.
Within this script however, I am using dask.distributed & client in order to open multiple files quickly (parallel programming) that are needed for the script to run. Therefore, when I try to run with ipython my script in 2 different nodes, it means that it will use dask.distributed and client twice. So at the end I will have dask.distributed & client run two times in two different machines, which however are part of the same cluster.
And the problem is that, usually when I set up the second client in the second node, it always fails in the end and slows down my script in the 1st node.
I am using dask.distributed and client only to open files, because they are many files and very high resolution. So every time I run my script, I need to have a dask.distributed client running that opens these files.

I was wondering whether I am doing something wrong here.

Hi, if I understood correctly, you are running a different ‘local’ Dask cluster for each node (node-A runs the script with initial condition A, and Node-B with initial condition B)?
But does your /home/files is a shared disk ? Both script A and script B use ‘same’ set of files?
If so you might have file locking issue? and over charging shared file server ?

Hello,

Yes that is exactly what is happening. That is exactly what I am doing. I am running the script A in node-A and script B in node-B and yes the /home/files is a shared disk and so script A and script B use the “same” set of files. How do I find if I have a file locking issue? Can I solve it?

What would be the time difference in running version 1 on all 3 machines, then 2, then 3?

The A-script runs fine on the 1st machine when it is on its own. Once I start to run either the A-script or the B-script in a 2nd machine, then the 1st-machine-running becomes slower and the script (whichever) in the 2nd machine is stuck indefinitely in the opening of the files, (where I am using the dask.distributed & client to open the files).It does not move on at all. So there is no point in testing a 3rd machine.

All these machines are connected/mounted somehow in a central server as they are part of the same cluster, which is used by other people as well.So it can get buzy even without my scripts.

I meant run A on 1,2,3 until done, then B,C etc

Oh yes. So I have not yet tried to run one script in multiple nodes. I am not sure how to do this. As I am a starter in dask and this cluster is used by many people,I am not sure how efficient this could be. But it is an idea, I guess. This could be done then with dask jobqueue?

I think the short answer might be: if your machines are sharing the same network disc, and your script uses a lot of IO, then you may be saturating access to that dics, hitting its maximum throughput. It would not be surprising in that situation if two sets of tasks on two machines do their IO half as efficiently as one set of tasks on one machine only.

1 Like

Yeah, that is also another option I thought about. Is there a solution to that?

Get a better network filesystem? You should be able to speed test whether you are hitting this limit or not.

As I am working on a cluster that is used by several other people, I am not sure how to do this. But I will pull a request to the people controlling the system. Thanks a lot for this.