How to be more efficient with subsetting data from multiple netcdf files?

Hello folks,

I need some help on determining what is the ideal way to read the following data.
I have a 151 netcdf files with a combined size of ~35GB.

Dimensions of each file are:

time = UNLIMITED ; // (2920 currently)
rlon = 155 ;
rlat = 130 ;
bnds = 2 ;

I need to open these, extract a single grid point and write it to a csv file for some further work (not ideal, but I am bound by operational constraints).
I have been using xr.open_mfdataset for a while for other workflows and it has performed really well but for this particular problem, it seems to be extremely slow.

I tested it by performing the operation on a subset of just 3 files, and the following code took over 4 minutes to complete.

fls = glob.glob(f"raw_netcdf/*{var}*{mrun}*.nc") # Read only 3 files for test

with xr.open_mfdataset(fls, parallel=True) as ds:
    data = ds[var]
    dsel = data.isel({'rlon':irlon,'rlat':irlat})
    # Convert to dataframe to write to csv
    df = pd.DataFrame({'time': ds['time'].values, var: dsel.values})
    # Build name
    end_year = re.findall('\d{8}',fls[-1])[1]
    outfile = re.sub(r'-\d{8}',f"-{end_year}",re.search(r'(?<=\\).*',fls[0]).group(0)).split(".")[0]
    # Write to disk
    df.to_csv(f"{out_path}{outfile}.csv", index=False)

What am I doing wrong here? I am running this on a system with 64GB of RAM and 40 logical processors.

Also related question, since xr.open_mfdataset is using Dask in the background, is there a way to access the Dask dashboard for the cluster that is running xarray, similar to if we created a manual cluster?

1 Like

Thanks for the question. I have some ideas, but first I’d like you to run a baseline experiment.

Could you just do:

ds = xr.open_dataset(one_single_file)
dsel = data.isel({'rlon':irlon,'rlat':irlat})
%time _ = dsel.values

And then could you repeat the experiment with

ds = xr.open_mfdataset([one_single_file])

This will give a measurement of how fast you can select a single timeseries from one file.

Yes. You just create a Dask LocalCluster and Client, and it will automatically be used.

from dask.distributed import Client
client = Client()
display(client)

Thank you Ryan!
I was able to make some progress on my own but got stuck again.

I abandoned xr.open_mfdataset and instead used xr.open_dataset with dask.
Here’s the code:

def get_griddata_from_nc(f):
    with xr.open_dataset(f) as ds:
        data = ds[var]
        dsel = data.isel({'rlon':irlon,'rlat':irlat})
        d = pd.DataFrame({'time': ds['time'].values, var: dsel.values})
    return d

fls = glob.glob(f"raw_netcdf/*{var}*{mrun}*.nc") # Set of 151 files
futures = client.map(get_from_nc, fls)
dls = client.gather(futures) # List of dataframes, one for each year
# Then concatenate them into a single dataframe and write to disk

This was surprisingly fast! It processed all 151 files in under 4 seconds, including writing the csv to disk.
But now there’s another problem.

I have a 35-member ensemble containing these 151 files in each. Which means I need to run that code (which is processing 151 files), a total of 35 times.

for mrun in mruns: # len(mruns) = 35
    fls = glob.glob(f"raw_netcdf/*{var}*{mrun}*.nc") # Set of 151 files
    futures = client.map(get_from_nc, fls)
    dls = client.gather(futures) # List of dataframes, one for each year
    # Then concatenate them into a single dataframe and write to disk

As I mentioned before, the first ensemble gets written to disk in under 4 seconds, but now the next iteration is currently in progress and its been over 9 minutes now.
One of the clues I am getting from my task manager is that my disk usage is at a constant 100% during this time. I was thinking that maybe netcdf files are not being closed, but I am using the with open notation, which means that should not be the case.

To summarize, this is the workflow:

  1. Initialize dask cluster.
  2. Use dask delayed to parallelize reading data from 151 netcdf files using xr.open_dataset.
  3. Use a general for loop to run task #2 35 times (once for each ensemble).

Results: The first ensemble was processed in under 4 seconds, the second is in progress since the last ~11 minutes now.

Glad you made progress. Your new approach suggests that the issue with the first approach may be that dask is not effectively inlining the __getitem__ operation.

I’m wondering if you could just try the experiment I proposed in my previous question. I am very curious to see the results.

Yes of course.
Here are the results:

ds = xr.open_dataset(f)
data = ds[var]
dsel = data.isel({'rlon':irlon,'rlat':irlat})
%time _ = dsel.values

# CPU times: total: 219 ms
# Wall time: 5.18 s

and

ds = xr.open_mfdataset(f)
data = ds[var]
dsel = data.isel({'rlon':irlon,'rlat':irlat})
%time _ = dsel.values

CPU times: total: 62.5 ms
Wall time: 66.6 ms

But here’s the catch. I ran the first command again because I had a hunch that my disk usage was slowing it down. I stopped the running dask operation, waited for my disk use to drop down to normal levels. The second time, it gave me the following time:

CPU times: total: 62.5 ms
Wall time: 69.4 ms

So, I am pretty its disk I/O but not sure why would it persist if I use with open notation when opening files.