Best practices to go from 1000s of netcdf files to analyses on a HPC cluster?

What?
8759 netcdf files totalling 17TB of HYCOM ocean model (u,v) velocity data at two depth levels (and bottom velocity), at hourly time steps. So the total data arrays of interest are [9000 (X) by 7055 (Y) by 8759 (time) by 2 (Depth)] for both u and v; Using nco tools, I can reduce these files as an example to 365 netcdf files, totaling 4.3TB, one file per day (24 hourly steps), for one depth level only, so data arrays are 9000 (X) by 7055 (Y) by 8759 (time) for the u,v components.

How?
Using python, dask, xarray, etc on brand new (beta) supercomputer/HPC at the University of Miami (https://acs-docs.readthedocs.io/triton/README.html) that runs a JupytherHub, and distributes jobs across its nodes using the LSF scheduler. Using the scheduler I can distribute jobs on individual nodes, each having ~250GB of memory, and 40 cores, perhaps which means that it physically has 8 CPUs with 5 threads each, but that’s a major source of confusion (for me). I can typically access 10 nodes at a time, so secure about 2.5TB of RAM, with potentially 80 processes and 400 threads. I have been told than I can get up to 500 “workers” (threads?) so that would be on 12 nodes of the “normal” queue. Alternatively, I could work with only the interactive node that runs Jupyter Lab, with ~250GB of memory and up to 40 workers.

Why?
We want to conduct analyses along the time axis, at every grid point. Ultimately, we want to compute average rotary velocity spectra following the study of Yu et al. 2019 … but I can hardly do simple things right now.

What am I looking for?
Advice to make these HYCOM data analysis-ready for pangeo tools, favoring the time dimension/coordinates. Conducting some tests and visualizing attempts at computation, it looks like the I/O is a major issue. Forget about lazily opening an xarray dataset from the 8759 hourly netcdf files (however this appears to work with 365 daily files). We have also made 19 zarr arrays containing about 20-day worth of data, which seem to load in a notebook, and manage to launch computations but these eventually failed, typically with killed workers because of memory issues. Sizing dask chunks is also something I have been playing with …

6 Likes

@aimeeb worked on the zarr-ification of MUR SST, I think she might be able to help.

2 Likes

Hi Shane,

Hopefully this can be helpful? I am also on the journey of understanding how to make things work better myself and I’m thankful to have gleaned knowledge from others.

We generate and work with 6D (3D + ensemble + start date + lead time) forecasts and 5D (3D + ensemble + time) coupled ocean-atmosphere output where single variables can be ~5TB+, complete 6D forecast experiments are order 50+TB range, and 5D reanlyses are in the multi-100TB range. :boom:

We went on a journey to figure out how to deal with this data tsunami. Luckily we found the Pangeo community and have been making progress with lots of help. In our team I work with @dougiesquire and we’ve taken what we’ve learned from our Pangeo colleagues to build an HPC workflow internal to our organisation.

Here are the main dot-points from my POV.

  1. conversion from NetCDF to zarr has really enabled our work
  2. chunking (and re-chunking depending on your calculation) matters
  3. there are still hurdles I don’t fully understand wrt horizontal scaling in some calculations

I’ve mainly taken my directions and absorbed knowledge from folks like @rabernat, @dougiesquire, @andersy005, @TomAugspurger, @mrocklin, @jhamman, @kaedonkers, @pbranson and others in the community.

WRT #3 we’ve had issues with horizontal scaling using our org’s new super-fast parallel storage. My very ignorant POV is that it has something fundamental to do with how Dask bundles up tasks and the timing at which the workers communicate with each other? I think it’s related to:
https://github.com/dask/distributed/issues/2602 ?

I also note this image from a recent @andersy005 presentation: https://pbs.twimg.com/media/EWs6LCXUYAEqbZc?format=jpg&name=large

Our current solution is vertical scaling using the largest node we can get as a single worker with as many threads & RAM as the largest node can provide. :woman_shrugging:t4:

To sum up this ramble - from my POV:

  1. work on converting NetCDF to zarr
  2. chunking (and re-chunking depending on situation) matters to get your calculations to finish quickly or at all
  3. there may be some underlying software engineering challenges that could be holding you/us back

Very much welcome being corrected on my point of view, BTW

4 Likes

@selipot I would echo a lot of what @Thomas_Moore said, but specifically:

Our current solution is vertical scaling using the largest node we can get as a single worker with as many threads & RAM as the largest node can provide.

My solution for MUR SST data (sourced from https://podaac.jpl.nasa.gov/Multi-scale_Ultra-high_Resolution_MUR-SST) was to:

  1. Configure AWS FSx on a powerful EC2 instance so that I can use S3 for persistence storage
  2. Mount the data to an EC2 using the PODAAC drive and davfs2 Questions/Comments - Earthdata Forum
  3. Copy the mounted NetCDF files to S3 using FSx “repository sync”
  4. Run the zarr conversion for an initial time slice (say 60 days). Continue to append data and copy the zarr store to S3 using FSx repository sync

Another problem was that if you encounter an error while generating and appending to the final zarr store, it’s very difficult (impossible?) to recover from. My system for this was time-consuming but for every year that I appended I did another repository sync and created a backup to duplicate where I had successfully got to - in case something goes wrong in the subsequent year. Fortunately, FSx seems to keep some metadata about already copied data so it should only add the new zarr files if you are syncing to the same S3 location.

3 Likes

Thanks for tagging me @cgentemann see response above (can’t tag more than two users in a post apparently)

1 Like

Thank you @aimeeb @Thomas_Moore @cgentemann for connecting me and taking the time to share your experience and recommendations. I have a lot to digest but perhaps a few clarifications needed:

  1. What does vertical scaling mean in your instances?
  2. About zarr: would you create a single zarr array/store with the model global dataset or should I divide it up in geographical regions? From what you are describing @aimeeb it seems that you create a single array and append to it along the time dimension? In the end I want to analyze/use my entire global model data.
  3. Right now I am lazily loading the netcdf files to form a xarray dataset which I convert to zarr using the xarray.to_zarr method, setting chunks in whatever way I think should work. With my probably-not-adequate cluster configuration, creating a zarr array for 1/20-th of my model geographical area has now been going on for 8 hours.
1 Like

I have some comments on the netCDF side of things. I agree that converting to zarr is the way to go.

This should not be an issue if you use the right combination of commands (:frowning: I need to work on making the defaults more sane). See https://xarray.pydata.org/en/stable/io.html#reading-multi-file-datasets . I do this routinely, so if it does not complete in a reasonable amount of time, please ping me.

The second point is that the HDF-5 C library does not support multithreading AFAICT (for e.g. Multi-threaded reads use only 100% CPU? · Issue #591 · h5py/h5py · GitHub). This is used by the netCDF4 python library so there’s no sidestepping it. So you should set up your dask cluster to be one thread per process. This should speed up reading though amount of improvement will depend on how you’ve configured your cluster currently. A small test reading the same dataset using all threads or all processes should show the difference.

3 Likes

Here’s a gist https://nbviewer.jupyter.org/gist/willirath/ecfc3adfa89b82e31c91b40360194b45 that has hints on chunking strategies for, e.g., time filtering. It does not touch on the netCDF-IO issue, but might help figuring out chunk sizes and cluster config.

It uses an artificial dataset full of normally distributed random numbers that is created on the fly with chunks that look like they were opened from many different files containing only one time step. Then, re-chunking to a more time-filter friendly form is done, and a simple 6h box-shaped running-mean filter is applied. Finally some calculation (time variance of zonal mean plotted against y) is performed.

By playing with the chunk sizes, you can easily find choices that make it very difficult for either the Dask scheduler (because the graph just got too complex) or for the cluster to not run out of memory.

3 Likes

Hi Shane, welcome to Pangeo, and thanks for posting such an interesting and popular first question!

Above several folks have given very useful practical suggestions. I would like to digress into a more abstract, conceptual discussion about why what you are doing is a hard computational problem. I have personally wrestled with this problem for quite a while. In fact, I wrote the original code that Yu started with for the LLC4320 model here. So I thought I would take this as a chance to write down some thoughts–forgive me for taking things a bit off topic.

All array data is chunked somehow

All array data is “chunked” to begin with, whether we recognize it or not. Our models (also satellites) tend to produce one 2D or 3D spatial field per timestep, which is why you are starting with 8759 netCDF files. When that data is stored on a hard disk, this means that the bits from the same day are physically close to each other, and can efficiently be read into memory in a single block a.k.a. chunk. (If the netCDF files use actual netCDF4 chunking, it means that a whole chunk has to be read and decompressed at once.) When you want to analyze the data along the time dimension, e.g. computing temporal spectra, you need to get the bits from each point in space into memory, which means bringing together bits that are stored very far away from each other. For a dataset of your size, this is just really hard any way you approach it.

Most of us can imagine a naive approach to the problem, where we simply loop over each point. In pseudocode, it would be something like this

for j in range(Ny):
    for i in range(Nx):
        timeseries = []
        for n in range(Nt):
            data = open_file(n)
            timeseries.append(data[j, i])
        # maybe analyze it now
        analyze_timeseries(timeseries)
        # or maybe store it for later
        store_timeseries(timeseries, j, i)

This would involve opening each of the 8759 files 9000x7055 times. It will be extremely slow. If each file can be opened and read in 1 ms (pretty fast), it will take 17 years! :scream: So if we can beat 17 years, we have made progress. :grin:

Parallelization is clearly needed, but it’s not obvious it will always help. We will face at several constraints:

  • how fast can the storage system read data in parallel
  • how much memory (RAM) is available to cache data
  • how fast can data be passed between parallel actors over the network

On the first point (read throughput), it’s easy to imaging we might actually degrade performance by trying to read data in parallel. Our hard disk (and our sysadmin) might be very unhappy if we issue 8759 simultaneous requests to open different files, or 9000 simultaneous reads from the same file. It might backfire, and take even more than 17 years!

Let’s bypass that issue for a moment, and consider the idealized case where the disk is not a limiting factor. (If you’re lucky, your HPC system has a fancy parallel filesystem and this is the case.) In that case, we can replace the reading with randomly generated data.

Example with Random Data

I created a simple notebook to explore the rechunking problem with random data:

In this example, I create some arrays that have the traditional one-chunk-per-timestep structure, i.e something like this (only 4 timesteps)


…and rechunk it to something like this

I do this in such a way that the total final chunk size (128 MB) matches the original chunk size…this determines the final chunk spatial dimensions.

An important parameter to look at here is the number of tasks: it took 28 tasks to rechunk these 4 chunks. That represents the computational complexity of the operation. For such a small graph, we can examine it directly (data.visualize()):

At the bottom are the four chunks of random data we read, and at the top are the final four outputs chunks. In between are some operations involve splitting, and recombining the chunks. An important observation is dask, by default, will not regenerate the same data twice. It will only do it once and them pass the data around between workers. I believe this can be suboptimal in many cases, but I don’t know how to change it.

How does this scale? To check this out, I tried this operation for increasing Nt (number of timesteps).


etc.

The scaling looks like this

image

It is super-linear on a log-log scale (slope ~ 1.3), meaning that the complexity is going up pretty fast. For the largest array I tried, Nt=4096


we are already at > 250,000 tasks. Once we get close to a million tasks, we start to reach the limits of the dask scheduler.

An Actual Computation

I tried computing some of these arrays on ocean.pangeo.io, using a cluster of 20 workers (each with 2 cores and 11.5 GB RAM; 230 GB RAM total). I was able to compute up to size Nt=512, 65 GB total. For larger arrays than that, my workers started running out of memory before they were able to start storing the output chunks. (Dask performance report for Nt=512 is here.)

The conclusion from my unscientific study is that you probably need significantly more memory in your cluster than the size of your array. So if you desired output array is 4 TB, your cluster may need to have something like 12 TB of memory.

It would be really nice if Dask could figure out how to not fail on this computation without requiring so much memory. After all, our naive example would never blow up our memory (even if it did take 17 years). Is there something in between. I believe that the fundamental issue here is a known challenge scenario for dask involving the concept of “memory backpressure”:

In the long term, we hope dask will improve its scheduling strategies and eventually become more adept at these workloads. But that will not happen immediately.

So what should you do?

The obvious workaround is to do everything you can to make your problem smaller. Some ideas:

  • Definitely process each variable (e.g. u, v) separately. Your cluster is already overwhelmed.
  • Do you really need every point in the horizontal? Based on my experience, the answer is no. There is very little valuable information at the grid scale in these models. The effective resolution is much lower. Maybe you could you get by with subsampling, say, every 4, 8, or even more, in the x and y dimensions. Subsampling lowers the memory footprint.
  • Do you really need a “full” rechunk? Or could you get by rechunking to an intermediate level of contiguity, say 60 days? This lowers the complexity.
  • Experiment with idealized examples like the one I showed in my notebook to get a feel for what your cluster can handle.
  • Just throw more resources at it. More memory, more workers, etc.

If this is your first real experience with the Pangeo stack, you may be feeling frustrated by now. Why doesn’t this “just work”? I think it’s important to recognize that you have picked one of the hardest problems out there. Our stack excels at mostly-embarassingly-parallel problems, but this is not one of them. It’s a genuine computational challenge, and a massive amount of data, however you slice it.

16 Likes

p.s. Please consider sharing your actual code (e.g. via github gist), as there may be a few obvious tricks and optimizations we can suggest after inspecting it.

1 Like

@rabernat thank you for spending the time to write this long reply. I need time to ponder all that and will keep you updated of my progress.

1 Like

Ryan’s post does a good job describing Dask’s issues with this. I’ll summarize them, and then provide a potential solution.

The basic idea is to help out Dask by doing the rechunking in two passes:

  1. A split only pass, which rechunk on the dimensions you aren’t merging (1 and 2 in this case). This output is written to disk
  2. A merge pass, which reads the output from step 1 and merges the merge dimension (0 or time in this case)

With this, we should be able to efficiently repartition larger than memory datasets on a cluster (see some caveats at the end).

Let’s generate a random dataset that’s chunked along the first dimension (think time) and write that to disk (using Zarr, but everything I’m talking about here applies equally well to HDF5). This represents the raw data as handed to us; we don’t have any control over it.

import dask.array as da

a = da.random.random((4, 4000, 4000), chunks=(1, 4000, 4000))

da.to_zarr(a, "/tmp/data/original.zarr", overwrite=True)

Read it into a Dask Array:

data = da.from_zarr("/tmp/data/original.zarr")
data

Screen Shot 2020-05-06 at 3.56.29 PM

For analysis, we’d love to merge the first dimension (time). To keep the blocks of reasonable size we’ll also split the other dimensions. We’ll rechunk such that the size of each block is the same.

desired = data.rechunk({0: 4, 1: 2000, 2: 2000})
desired

Screen Shot 2020-05-06 at 3.56.33 PM

Inspecting this task graph shows the problem from Dask’s point of view: a global communication pattern where each output chunk depends on each input chunk.

desired.visualize()

As written, our workers would need a lot of memory to finish this job, at least without a lot of spilling to disk. And Dask tends to struggle with workloads that push the memory limits of the cluster, leading to crashing workers and sad users.

A two pass solution

We can work around these issues in Dask by processing the data in two passes

  1. A split only pass, which rechunk on the dimensions you aren’t merging (1 and 2 in this case). This output is written to disk
  2. A merge pass, which reads the output from step 1 and merges the merge dimension (0 or time in this case)
# split only
data2 = data.rechunk({1: 2000, 2: 2000})
# write
data2.to_zarr("/tmp/data/temp.zarr", overwrite=True)
# read output of step 1. Identical to `data2`
data3 = da.from_zarr("/tmp/data/temp.zarr")
data3

Screen Shot 2020-05-06 at 3.56.45 PM

# merge pass
data4 = data3.rechunk({0: 4})
data4

Screen Shot 2020-05-06 at 3.56.56 PM

data4 has a communication pattern that’s much friendlier to Dask.

data4.visualize()

Each of the 4 rechunk-merge output chunks depends on just 4 input chunks each: the “split” versions of the original chunks. So our peak memory use for this stage is just the size of rechunk-merge output, which we know fits in memory since it’s the size of the original chunks given to us.

That can be written to disk, ready for analysis. At least, that’s the dream. For the single-machine threaded scheduler we actually achieve that. On my laptop I was able to scale this workload up to a 16 GB dataset without issue (finished in ~1 minute. Most of the time the memory usage was ~1.5Gb).

But with the distributed scheduler we run into some issues. Follow https://github.com/dask/dask/issues/5105 for updates, but we potentially end up scheduling (from-zarr, 3,0,0) on a different machine than (from-zarr, 1, 0, 0). That means we need to communicate between machines / processes, which blows up our memory budget. I’m hoping to have a fix for this (by fusing those 4 from-zarr tasks into one, so that the distributed scheduler can’t make the “wrong” choice). If that fails I’ll share a trick to “manually” schedule these, but I hope it doesn’t come to that.

Anyway, at this point we should be able to write data4.to_zarr() to disk in the analysis-ready format.

Notebook at https://gist.github.com/6b548371b60d9b397f24c9464933fad3

6 Likes

Tom, this is really awesome! :grinning: Thanks for your creative thinking here!

I guess the only downside is that it potentially requires lots of scratch space for the temporary dataset. The temporary dataset might also have very many chunks (potentially millions of individual files), which could stress some HPC filesystems. However, I think it is definitely the best path forward for Shane.

@kmpaul - I know you worked on this problem from the perspective of NetCDF chunking. Do you have any insight to add?

2 Likes

Exacty, that could be the biggest pain point with Tom’s strategy, it might prevent it to work in some HPC facilities.

This point seems really important to me, it would be worth spending some time to see if something could be done.

Could we think of something intermediate between the two approaches : some sort of iterative rechunking. Say if you’ve got to rechunk over a year, first you rechunk by month, save the output, and then start from there to rechunk along the full year reducing again spatial dimension of the chunks? Could this lead to graphs a little simpler and easier to execute?

1 Like

On the number of intermediate tasks / files: yes that is a problem. @geyard’s suggestion to do the rechunking in multiple passes (e.g. daily -> monthly -> yearly) seems like it should work.

Still thinking through the data regeneration point. My initial reaction is that it may not be terribly important here. That typically comes up when you have a large input and a small output (e.g. .from_zarr(...).mean()).

In this case I think the issue is primarily poor task fusion (https://github.com/dask/dask/issues/5105#issuecomment-625282112), coupled with distributed’s memory issues in communication heavy workloads (still a problem to be fixed, but much much harder).

2 Likes

Hi @selipot. You’ve already got fab advice here and hopefully seen how this is a challenge that faces much of the community. Encase it helps my colleague @kaedonkers has written a post about our recent attempt to do the same on a medium-sized dataset. To the cloud and back again,
Building a Zarr from bespoke data formats using Xarray and Iris
.

1 Like

At the risk of being controversial, I would recommend against dask+xarray for
these kind of ETL pipelines, although they do work better in HPC than the
cloud. When faced with a similar netCDF-to-zarr task, we we ultimately
decided that dask+xarray is not the right tool for this task, and that
“dumbing” our code down to lower-level libraries like netCDF and zarr was
necessary.

Our data consists of highly structured data with the following pattern:

gs://bucket/data/{time in YYYYMMDD.HHMMSS}/data.tile{tile}.nc

Each netCDF file contains O(10) variables and has identical dims and coords.
The tile dimension is a spatial dimension for a subdomain of a cubed-sphere
atmospheric model. There are 6 tiles, 3800 time steps, and each file is 270
MB in size (so 6 TB in total).

Ultimately, we could not use xarray+dask to process this data into zarr. We
tried several different strategies that seemed interesting, but all
deadlocked on rechunking/out of memory issues. We fixed this by subtracting
the dask layer, which is rightfully focused on a much larger set of problems,
and less optimized for these simple data transformation tasks, which usually
have a very structured form.

The basic idea is to

  1. open a single file. Use this is a template to
    initialize a zarr group and zarr array with the correct size to fit all the
    files. This only uses zarr.open_group and zarr.create.
  2. loop over all files and insert into the pre-initialized array.
    The loop can be done in
    parallel—we used Google Dataflow, but dask distributed would work too—but
    the chunking per worker has to be correct if you want to avoid race
    conditions when inserting data. The easiest way to accomplish this is to use
    a chunksize of 1 along time or tile dimensions. The data can be rechunked
    later using xarray+dask. For HPC writes to a shared filesystem you could
    probably use zarr’s locking
    primitives
    to avoid
    race-conditions even if workers need to write to the same chunk.

This more manual approach is scalable and robust. You could do this for 10 or
million files since there is no global dask graph that needs to fit in
memory. You can easily add retry logic for I/O failures, making the pipeline
fault tolerant.

A disadvantage is that it requires reverse engineering how xarray interprets
zarr stores using attributes like _ARRAY_DIMENSIONS, fill_values, and time
encodings.

tl;dr; xarray+dask is a heavy layer for these highly structure I/O
operations. Use a parellel for-loop instead.

3 Likes

Noah, your suggestion is most appreciated. It’s certainly not controversial–most of us have been aware this “fail case” for dask for some time. The last thing we want here is to become dogmatic about using certain tools. Your workaround sounds like a very useful reference point.

Could you share a few more details, such as the source and target chunk structure? Are you trying to do a “full shuffle” rechunk like Shane’s example? If so, I don’t quite see how you avoid a combinatorial explosion of source file reads.

I think this statement is very wise:

We fixed this by subtracting the dask layer, which is rightfully focused on a much larger set of problems, and less optimized for these simple data transformation tasks, which usually have a very structured form.

My view is that, in the long term, it would be useful to collaborate around a dedicated rechunking package, focused solely on this problem. Dask may still play a role in the solution, but perhaps not using the out-of-the-box graphs generated by dask.array.rechunk.

Btw, this xarray PR might help you optimize your pipeline further, perhaps making it easier to use Xarray rather than the lower-level zarr interface for your I/O.

The source and target chunk structure in our case are the same 1 chunk per tile and time point. This does make things much simpler than for Shane’s case. For rechunking, I’d be more worried about multiple writes to the same chunk rather than overlapping reads. Apache beam is more of “push” style compared to dask+ xarray which has a “pull” like structure. Basically, we load a bunch of netCDFs and push their variables through a DAG, rather than building up an dask graph and pulling data from it.

I am currently working on a case with some rechunking and computations mixed in, so I’ll let you know how it goes! In principle, Google’s managed dataflow runner has a some intriguiging shuffle capabilities that offload data intelligently to a shared file system. Could be an interesting way to rechunk/transpose extremely large datasets.

Thanks @Theo_McCaie. The method in that blogpost was heavily inspired by this gist from @rsignell:

For what it’s worth the Met Office Informatics Lab have also played with constructing a Zarr without converting the data from netCDF, but simply pointing Zarr at the netCDF files as if they were chunks -> https://medium.com/informatics-lab/arrays-on-the-fly-9509b6b2e46a.

And to answer your question on vertical scaling @selipot, it means to increase the size of the one node you are running your computation on (e.g. upgrading your node to the beefiest one available, as @Thomas_Moore does). This is different to horizontal scaling which involved splitting your computation up into tasks and farming those out to lots of nodes which perform those tasks separately and in parallel (e.g. as something like dask-distributed does).

I hope that’s useful. Good luck, as @rabernat said - you’ve found a difficult problem to solve!