Strange issue with .compute() of Dask array values

Hello all,

I am currently trying to subset and then load using Dask chunking, the same ocean temperature variable but in 2 different (yearly files) years with the following code:

Dimensions of subset files:
T_1993_2D.nc [365,32,65,48], (original global file dimension: [365,32,4031,3057] ~ 400 GB of size)
T_1994_2D.nc [365,32,65,48]

Code for loading the subset temperature:

%%time 
chunks = {'time_counter':41,'deptht':5,'y':340,'x':481}
dataset = xr.open_dataset('/home/folder/T_1993_2D.nc',chunks=chunks,engine='h5netcdf')
TEMP = dataset.votemper.isel(deptht=slice(1,33),y=slice(lat1,lat2),x=slice(lon1,lon2))
temp = TEMP.compute()
Insert Temperature
CPU times: user 1.75 s, sys: 11.8 s, total: 13.6 s
Wall time: 12.7 s

and the 2nd file like this:

%%time 
chunks = {'time_counter':41,'deptht':5,'y':340,'x':481}
dataset = xr.open_dataset('/home/folder/T_1994_2D.nc',chunks=chunks,engine='h5netcdf')
TEMP = dataset.votemper.isel(deptht=slice(1,33),y=slice(lat1,lat2),x=slice(lon1,lon2))
temp = TEMP.compute()
Insert Temperature
CPU times: user 6min 7s, sys: 31.2 s, total: 6min 38s
Wall time: 10min 10s

These are 2 different (in terms of year) files, from which however, I am trying to load into python memory the same variable (temperature), using the same chunking and the same dimension for subsetting in 2 different ipython sessions. So far the only difference between the two loading times has been (I suspect) due to the fact that I have already loaded in python the first file (1993) a million times, upon trying to find the optimal way to load the subset of the 4D matrix of this particular year, without having to read the entire global file before .

Anyone any ideas as to why the performance of loading a subset of a small 4D matrix might be so slow?Subsetting a big data file and loading the subset afterwards, means that python has to parse the entire global file first??

Thank you in advance for your time and help,
Kind regards,
Sofi

@oceanusofi - just a note that I edited your post to use code blocks to make your code a bit more readable. Please consider using this option in future posts. :smiley:

1 Like

Some things to consider:

  • Are you using a fancy filesystem that does caching? This can make a huge difference.
  • Do the files internally use hdf chunking? If so, do they use the same chunking?

Chunking files will not generally speed things up unless you chunks are somehow aligned with the underlying file storage on disk.

Have you considered removing the chunks option completely from open_dataset? I recommend trying this and seeing how it goes.

Hello Rabernat,

So thanks for editing my code to make my code blocks more visible (Did not know how to do this).

To be honest with you I am not familiar with all this terminology…chunks,hdf chunking, caching of filesystem,… and that is why I do not really know the answer to all of these questions or even how to look for the answer to all of these questions. Any advice on how to do this, would be great.

In the meantime, yes I will try to remove the chunks as you suggested. But that means that I will no longer using dask? Isn’t it?

What I would like to do is to parallelize the loading of my 4D matrix over the depth dimension. So using 32 cores to load files of [365,56,48] and that is what I try to achieve by reading the online documentation. I do not succeed however. Can you help me perhaps with that? I need dask to do this, don’t it? In which case I need to load the xarray using chunks. Isn’t it?

Kind regards,
Sofi

Why? Why do you think you want to parallelize? Parallelization complicates things.

Can you give an example of non-parallelized (i.e. serial) code you are using? Is this too slow? How slow is it?

Without this information, it is possible you are engaged in “premature optimization”:

Hi Rabernat,

Ok so the reason why I want to parallelize is because, for a single run, I have to load 5 variables which are 4D matrices with dimensions: 365 * 32 * 56 * 48 and 3 variables which are 3D matrices of dimensions 365 * 56 * 48, xarrays all of them. And loading all of these data represent just 1 year of a very small location of the ocean. I have to repeat the same procedure for 26 years for this particular location and then I have to do the same thing for more than one locations in the ocean, (a lot more).

Currently, it takes about 20-30’ (or even more than that depending on how many people use the server I am using) to load only a year of all these data, for a small piece of the ocean and I do not count the processing of all the data afterwards. This means that I will need half a day just for loading the whole 26-year data for a small piece of the ocean (on a good day for the server). I would like to extend my research to at least 100 small areas of the ocean if possible.

Here I copy the time that it took to load the 4D temperature for this small location without dask, just simple xarray [ 365 * 32 * 56 * 48] for 1 year, by calling the loading with a function:

%%time 
 temp  = insert_files2.load_octemp(yearsel,dz,lon1-1,lon2+1,lat1-1,lat2+1,time)                                                                                                                                                      
Insert Temperature

Eventually, I had to kill this command in ipython after 30’ of waiting and still not finishing with the loading.This was during a very buzy time of the server I am working, so it took considerably longer for just one variable.

The loading of the data called by the functions was written like this:

dataset = xr.open_dataset('/home/folder/T_'+str(year)+'_2D.nc',engine='h5netcdf')
 TEMP = dataset.votemper.isel(deptht=slice(1,33),y=slice(lat1,lat2),x=slice(lon1,lon2))
  temp = TEMP.load()

I have also tried to lazily load the data with dask and then try to use delayed functions for performing various computations between my data afterwards (e.g. multiplications of 4D matrices, vector indexing etc etc ) and even tried to run the .compute() at a later stage of my calculations, but that took even more time, if succeeded at all.

In general, I noticed that once my data are fully loaded, the computation is considerably faster and so I would like to optimize - if possible- the reading time of my variables (which are a really small area of the ocean)
Any ideas?

Sofi

Hello Rabernat,

I think I might have some more information about the netcdf files I am using.

They were originally daily 3D files of dimensions : 1 * 33 * 3057 * 4321 (global ocean files) and the ncdump -h of one example of them is :

netcdf U_1994-09-24_2D {
dimensions:
	y = 3057 ;
	x = 4321 ;
	deptht = 33 ;
	time_counter = 1 ;
variables:
	float nav_lon(y, x) ;
		nav_lon:_FillValue = NaNf ;
		nav_lon:units = "degrees_east" ;
		nav_lon:valid_min = -179.999847540022 ;
		nav_lon:valid_max = 179.999842386314 ;
		nav_lon:long_name = "Longitude" ;
		nav_lon:nav_model = "Default grid" ;
		nav_lon:standard_name = "longitude" ;
		nav_lon:_ChunkSizes = 765LL, 1081LL ;
	float nav_lat(y, x) ;
		nav_lat:_FillValue = NaNf ;
		nav_lat:units = "degrees_north" ;
		nav_lat:valid_min = -77.0104751586914 ;
		nav_lat:valid_max = 89.9591064453125 ;
		nav_lat:long_name = "Latitude" ;
		nav_lat:nav_model = "Default grid" ;
		nav_lat:standard_name = "latitude" ;
		nav_lat:_ChunkSizes = 765LL, 1081LL ;
	float deptht(deptht) ;
		deptht:_FillValue = NaNf ;
		deptht:units = "m" ;
		deptht:positive = "down" ;
		deptht:valid_min = 0.494025379419327 ;
		deptht:valid_max = 5727.91650390625 ;
		deptht:long_name = "Vertical T levels" ;
		deptht:standard_name = "depth" ;
		deptht:axis = "Z" ;
		deptht:_ChunkSizes = 50LL ;
	int x(x) ;
		x:standard_name = "projection_x_coordinate" ;
		x:axis = "X" ;
		x:units = "1" ;
		x:_ChunkSizes = 4322LL ;
	int y(y) ;
		y:standard_name = "projection_y_coordinate" ;
		y:axis = "Y" ;
		y:units = "1" ;
		y:_ChunkSizes = 3059LL ;
	float time_counter(time_counter) ;
		time_counter:_FillValue = NaNf ;
		time_counter:standard_name = "time" ;
		time_counter:long_name = "Time axis" ;
		time_counter:axis = "T" ;
		time_counter:time_origin = "1950-JAN-01 00:00:00" ;
		time_counter:_ChunkSizes = 1LL ;
		time_counter:units = "hours since 1950-01-01" ;
		time_counter:calendar = "gregorian" ;
	float vozocrtx(time_counter, deptht, y, x) ;
		vozocrtx:_FillValue = 9.96921e+36f ;
		vozocrtx:units = "m s-1" ;
		vozocrtx:valid_min = -10. ;
		vozocrtx:valid_max = 10. ;
		vozocrtx:long_name = "Zonal velocity" ;
		vozocrtx:standard_name = "sea_water_x_velocity" ;
		vozocrtx:short_name = "vozocrtx" ;
		vozocrtx:online_operation = "N/A" ;
		vozocrtx:interval_operation = 86400LL ;
		vozocrtx:interval_write = 86400LL ;
		vozocrtx:associate = "time_counter deptht nav_lat nav_lon" ;
		vozocrtx:_ChunkSizes = 1LL, 5LL, 340LL, 481LL ;
		vozocrtx:coordinates = "nav_lon nav_lat" ;
		vozocrtx:add_offset = 0. ;
		vozocrtx:scale_factor = 1. ;
		vozocrtx:missing_value = 9.96921e+36 ;	
// global attributes:
		:Conventions = "CF-1.0" ;
		:file_name = "GLORYS12V1_y2019m06d24_gridU.nc" ;
		:institution = "MERCATOR OCEAN" ;
		:source = "NEMO" ;
		:TimeStamp = "2020-MAY-03 18:43:28 GMT-0000" ;
		:references = "http://www.mercator-ocean.eu" ;
}

Then I concatenated these daily files in time dimension using ncrcat (and generally I am processing every netcdf files with nco and cdo tools before I open and read them with xarray) . I am not sure so if nco tools are doing something fancy with the caching, but the final ~500GB global ocean file of dimensions: 365 * 33 * 3057 * 4321 seems to have contiguously stored format. Then I am trying to load only a small part of this file (365 * 33 * 56 * 48) using xarray and it takes a lot of time.

I also did some experiments and tried to make the storage of the final file chunked (using again nco tools) and then repeat the same procedure (cut the global file to a small region and then load it with xarray or dask) and it took even more time or never loaded at all.

Does that help?

Sofi

The reason this takes so long may be that reads are not aligned well with your file’s chunks.

I do not recommend you use nco for processing your files. The way we tend to handle this type of data in Pangeo is the following.

  1. Open the original files in xarray using xr.open_mfdataset
  2. Maybe change the chunk structure to work better with your analysis using .rechunk
  3. Write the data to zarr using .to_zarr

You could also investigate the rechunker package.

Thank you for your answer.

What do you mean by the ''reads are not aligned with my file’s chunks"What are the “reads”?

I am trying to understand how to chunk the files. I do not quite understand what it means the " not too many and not too few" chunks . How does someone decide how many are the “too many” and how few are the “too few”? It is confusing to me. And that is why I also am not familiar with the rechunk command.

At the end of my calculations I really need a netcdf output (and not .to_zarr) that i will also keep processing afterwarsd with other tools.
xr.open_mfdataset is for opening a lot of (let’s say here daily) files and then concatenate them afterwards,right? I have one big file of [time,depth,lat,lon] that I open, so I should first split it to smaller files( e.g. daily ones) and then use this command where I will concatenate in time dimnesion? I have tried this before and it is quite much slower just to open the files, let alone to load them afterwards.

Sofi

I came up with a different idea and I would like to ask you advice on it.

I have the file with dimensions: temp = 365 * 32 * 56 * 48 (time,deptht,lat,lon)

To load a subset of data like this : temp = 365 * 1 * 56 * 48 with.compute() takes about 9 seconds.

I was wondering whether I can load simultaneously the 32 levels from different processors with python .
Something like:

temp1 = temp [:,0,:,:].compute()
temp2 = temp[:,1,:,:].compute()
temp3 = temp[:,3,:,:]. compute ()
.
.
.
.
and then concatenate/stack all of them together to make the final file of 365 * 32 * 56 *48.

Is this possible in python ?

Sofi

That is the exact same as calling temp.chunk({'time': 1}).compute().

But keep in mind that your serial performance will not necessarily scale as you make it parallel. Your hard disk can only deliver data at a certain rate. Trying to parallelize the reads may actually slow things down as the disk has to work harder. You need to experiment to see.

This syntax
temp.chunk({'time': 1}).compute()
means that it will deal with the time dimension in one chunk right? Why is this equivalent to what I want to do? Usually reading of the 3D matrices [time,lat,lon] takes some seconds with python. So I wanted to parallelize their reading with 32 cores or threads that correspond to the 32 different vertical levels that I have. I thought parallelization with dask can help with this and speed it up. No?

Sofi

1 Like

Also I have another question regarding the distributed scheduler:

I only need 32 cores just to load my data in memory, because this is the task that is taking time to do. After the data are loaded I can keep doing my processing of variables with just one core. Is there a way to say that to python? To stop using all the cores once the loading is finished? It seems that the workers are active after the loading and I have not found a way to stop them without entirely stopping my whole script. I am working in a server with a lot of other users and I would not like to “block” a lot of cores all the time.

Here, is my current code, following your suggestions:

from dask.distributed import Client, LocalCluster
cluster = LocalCluster()
client = Client(cluster
client = Client(threads_per_worker=2, n_workers=16)
chunks = {'deptht':1} 
dataset  = xr.open_dataset('/folder/U_2005_2D.nc',chunks=chunks,engine='h5netcdf')
UO  = dataset.vozocrtx.isel(deptht=slice(1,33),y=slice(lat1,lat2),x=slice(lon1,lon2))
uo  = UO.chunk({'deptht':1}).compute()

Can somebody verify that I actually parallelise my file loading with this code?
Is it normal to take ~57s for a 4D matrix of dimensions: temp = 365 * 32 * 56 * 48 (time,deptht,lat,lon)
to be loaded by 16 workers * 2 threads (each) ?

Any recommendations about the caching in dask/xarray/ that might make this faster?

Thanks in advance for your help,
Sofi

Some cluster managers support the scale method https://distributed.dask.org/en/latest/api.html#distributed.SpecCluster.scale, but I don’t recall if LocalCluster supports it or not.

So cluster.scale(1) might do it.

Ok thanks for your answer. I did not know that syntax.
As I am trying to read the documentation can you please explain to me what
cluster.scale(1)
might do? Does it specify the number of workers? Haven’t I done that already by writing:
client = Client(threads_per_worker=2, n_workers=8)
? Or is this another thing?

I tried to write this line of code after I finish my loading. When typing “top” in the terminal though afterwards, it seems that 17 cores are still active under my username.

Sofi

Also can you maybe tell me if it would be better to use a multiprocessing Pool of python instead of dask and clusters to parallelize the reading of my files?

Thanks,
Sofi