Best practices to go from 1000s of netcdf files to analyses on a HPC cluster?

Hi Shane, welcome to Pangeo, and thanks for posting such an interesting and popular first question!

Above several folks have given very useful practical suggestions. I would like to digress into a more abstract, conceptual discussion about why what you are doing is a hard computational problem. I have personally wrestled with this problem for quite a while. In fact, I wrote the original code that Yu started with for the LLC4320 model here. So I thought I would take this as a chance to write down some thoughts–forgive me for taking things a bit off topic.

All array data is chunked somehow

All array data is “chunked” to begin with, whether we recognize it or not. Our models (also satellites) tend to produce one 2D or 3D spatial field per timestep, which is why you are starting with 8759 netCDF files. When that data is stored on a hard disk, this means that the bits from the same day are physically close to each other, and can efficiently be read into memory in a single block a.k.a. chunk. (If the netCDF files use actual netCDF4 chunking, it means that a whole chunk has to be read and decompressed at once.) When you want to analyze the data along the time dimension, e.g. computing temporal spectra, you need to get the bits from each point in space into memory, which means bringing together bits that are stored very far away from each other. For a dataset of your size, this is just really hard any way you approach it.

Most of us can imagine a naive approach to the problem, where we simply loop over each point. In pseudocode, it would be something like this

for j in range(Ny):
    for i in range(Nx):
        timeseries = []
        for n in range(Nt):
            data = open_file(n)
            timeseries.append(data[j, i])
        # maybe analyze it now
        analyze_timeseries(timeseries)
        # or maybe store it for later
        store_timeseries(timeseries, j, i)

This would involve opening each of the 8759 files 9000x7055 times. It will be extremely slow. If each file can be opened and read in 1 ms (pretty fast), it will take 17 years! :scream: So if we can beat 17 years, we have made progress. :grin:

Parallelization is clearly needed, but it’s not obvious it will always help. We will face at several constraints:

  • how fast can the storage system read data in parallel
  • how much memory (RAM) is available to cache data
  • how fast can data be passed between parallel actors over the network

On the first point (read throughput), it’s easy to imaging we might actually degrade performance by trying to read data in parallel. Our hard disk (and our sysadmin) might be very unhappy if we issue 8759 simultaneous requests to open different files, or 9000 simultaneous reads from the same file. It might backfire, and take even more than 17 years!

Let’s bypass that issue for a moment, and consider the idealized case where the disk is not a limiting factor. (If you’re lucky, your HPC system has a fancy parallel filesystem and this is the case.) In that case, we can replace the reading with randomly generated data.

Example with Random Data

I created a simple notebook to explore the rechunking problem with random data:

In this example, I create some arrays that have the traditional one-chunk-per-timestep structure, i.e something like this (only 4 timesteps)


…and rechunk it to something like this

I do this in such a way that the total final chunk size (128 MB) matches the original chunk size…this determines the final chunk spatial dimensions.

An important parameter to look at here is the number of tasks: it took 28 tasks to rechunk these 4 chunks. That represents the computational complexity of the operation. For such a small graph, we can examine it directly (data.visualize()):

At the bottom are the four chunks of random data we read, and at the top are the final four outputs chunks. In between are some operations involve splitting, and recombining the chunks. An important observation is dask, by default, will not regenerate the same data twice. It will only do it once and them pass the data around between workers. I believe this can be suboptimal in many cases, but I don’t know how to change it.

How does this scale? To check this out, I tried this operation for increasing Nt (number of timesteps).


etc.

The scaling looks like this

image

It is super-linear on a log-log scale (slope ~ 1.3), meaning that the complexity is going up pretty fast. For the largest array I tried, Nt=4096


we are already at > 250,000 tasks. Once we get close to a million tasks, we start to reach the limits of the dask scheduler.

An Actual Computation

I tried computing some of these arrays on ocean.pangeo.io, using a cluster of 20 workers (each with 2 cores and 11.5 GB RAM; 230 GB RAM total). I was able to compute up to size Nt=512, 65 GB total. For larger arrays than that, my workers started running out of memory before they were able to start storing the output chunks. (Dask performance report for Nt=512 is here.)

The conclusion from my unscientific study is that you probably need significantly more memory in your cluster than the size of your array. So if you desired output array is 4 TB, your cluster may need to have something like 12 TB of memory.

It would be really nice if Dask could figure out how to not fail on this computation without requiring so much memory. After all, our naive example would never blow up our memory (even if it did take 17 years). Is there something in between. I believe that the fundamental issue here is a known challenge scenario for dask involving the concept of “memory backpressure”:

In the long term, we hope dask will improve its scheduling strategies and eventually become more adept at these workloads. But that will not happen immediately.

So what should you do?

The obvious workaround is to do everything you can to make your problem smaller. Some ideas:

  • Definitely process each variable (e.g. u, v) separately. Your cluster is already overwhelmed.
  • Do you really need every point in the horizontal? Based on my experience, the answer is no. There is very little valuable information at the grid scale in these models. The effective resolution is much lower. Maybe you could you get by with subsampling, say, every 4, 8, or even more, in the x and y dimensions. Subsampling lowers the memory footprint.
  • Do you really need a “full” rechunk? Or could you get by rechunking to an intermediate level of contiguity, say 60 days? This lowers the complexity.
  • Experiment with idealized examples like the one I showed in my notebook to get a feel for what your cluster can handle.
  • Just throw more resources at it. More memory, more workers, etc.

If this is your first real experience with the Pangeo stack, you may be feeling frustrated by now. Why doesn’t this “just work”? I think it’s important to recognize that you have picked one of the hardest problems out there. Our stack excels at mostly-embarassingly-parallel problems, but this is not one of them. It’s a genuine computational challenge, and a massive amount of data, however you slice it.

16 Likes