Hello folks,
I need some help on determining what is the ideal way to read the following data.
I have a 151 netcdf files with a combined size of ~35GB.
Dimensions of each file are:
time = UNLIMITED ; // (2920 currently)
rlon = 155 ;
rlat = 130 ;
bnds = 2 ;
I need to open these, extract a single grid point and write it to a csv file for some further work (not ideal, but I am bound by operational constraints).
I have been using xr.open_mfdataset
for a while for other workflows and it has performed really well but for this particular problem, it seems to be extremely slow.
I tested it by performing the operation on a subset of just 3 files, and the following code took over 4 minutes to complete.
fls = glob.glob(f"raw_netcdf/*{var}*{mrun}*.nc") # Read only 3 files for test
with xr.open_mfdataset(fls, parallel=True) as ds:
data = ds[var]
dsel = data.isel({'rlon':irlon,'rlat':irlat})
# Convert to dataframe to write to csv
df = pd.DataFrame({'time': ds['time'].values, var: dsel.values})
# Build name
end_year = re.findall('\d{8}',fls[-1])[1]
outfile = re.sub(r'-\d{8}',f"-{end_year}",re.search(r'(?<=\\).*',fls[0]).group(0)).split(".")[0]
# Write to disk
df.to_csv(f"{out_path}{outfile}.csv", index=False)
What am I doing wrong here? I am running this on a system with 64GB of RAM and 40 logical processors.
Also related question, since xr.open_mfdataset
is using Dask in the background, is there a way to access the Dask dashboard for the cluster that is running xarray, similar to if we created a manual cluster?
1 Like
Thanks for the question. I have some ideas, but first I’d like you to run a baseline experiment.
Could you just do:
ds = xr.open_dataset(one_single_file)
dsel = data.isel({'rlon':irlon,'rlat':irlat})
%time _ = dsel.values
And then could you repeat the experiment with
ds = xr.open_mfdataset([one_single_file])
This will give a measurement of how fast you can select a single timeseries from one file.
Yes. You just create a Dask LocalCluster and Client, and it will automatically be used.
from dask.distributed import Client
client = Client()
display(client)
Thank you Ryan!
I was able to make some progress on my own but got stuck again.
I abandoned xr.open_mfdataset
and instead used xr.open_dataset
with dask.
Here’s the code:
def get_griddata_from_nc(f):
with xr.open_dataset(f) as ds:
data = ds[var]
dsel = data.isel({'rlon':irlon,'rlat':irlat})
d = pd.DataFrame({'time': ds['time'].values, var: dsel.values})
return d
fls = glob.glob(f"raw_netcdf/*{var}*{mrun}*.nc") # Set of 151 files
futures = client.map(get_from_nc, fls)
dls = client.gather(futures) # List of dataframes, one for each year
# Then concatenate them into a single dataframe and write to disk
This was surprisingly fast! It processed all 151 files in under 4 seconds, including writing the csv to disk.
But now there’s another problem.
I have a 35-member ensemble containing these 151 files in each. Which means I need to run that code (which is processing 151 files), a total of 35 times.
for mrun in mruns: # len(mruns) = 35
fls = glob.glob(f"raw_netcdf/*{var}*{mrun}*.nc") # Set of 151 files
futures = client.map(get_from_nc, fls)
dls = client.gather(futures) # List of dataframes, one for each year
# Then concatenate them into a single dataframe and write to disk
As I mentioned before, the first ensemble gets written to disk in under 4 seconds, but now the next iteration is currently in progress and its been over 9 minutes now.
One of the clues I am getting from my task manager is that my disk usage is at a constant 100% during this time. I was thinking that maybe netcdf files are not being closed, but I am using the with open
notation, which means that should not be the case.
To summarize, this is the workflow:
- Initialize dask cluster.
- Use dask delayed to parallelize reading data from 151 netcdf files using
xr.open_dataset
.
- Use a general for loop to run task #2 35 times (once for each ensemble).
Results: The first ensemble was processed in under 4 seconds, the second is in progress since the last ~11 minutes now.
Glad you made progress. Your new approach suggests that the issue with the first approach may be that dask is not effectively inlining the __getitem__
operation.
I’m wondering if you could just try the experiment I proposed in my previous question. I am very curious to see the results.
Yes of course.
Here are the results:
ds = xr.open_dataset(f)
data = ds[var]
dsel = data.isel({'rlon':irlon,'rlat':irlat})
%time _ = dsel.values
# CPU times: total: 219 ms
# Wall time: 5.18 s
and
ds = xr.open_mfdataset(f)
data = ds[var]
dsel = data.isel({'rlon':irlon,'rlat':irlat})
%time _ = dsel.values
CPU times: total: 62.5 ms
Wall time: 66.6 ms
But here’s the catch. I ran the first command again because I had a hunch that my disk usage was slowing it down. I stopped the running dask operation, waited for my disk use to drop down to normal levels. The second time, it gave me the following time:
CPU times: total: 62.5 ms
Wall time: 69.4 ms
So, I am pretty its disk I/O but not sure why would it persist if I use with open
notation when opening files.