Best practices to go from 1000s of netcdf files to analyses on a HPC cluster?

The source and target chunk structure in our case are the same 1 chunk per tile and time point. This does make things much simpler than for Shane’s case. For rechunking, I’d be more worried about multiple writes to the same chunk rather than overlapping reads. Apache beam is more of “push” style compared to dask+ xarray which has a “pull” like structure. Basically, we load a bunch of netCDFs and push their variables through a DAG, rather than building up an dask graph and pulling data from it.

I am currently working on a case with some rechunking and computations mixed in, so I’ll let you know how it goes! In principle, Google’s managed dataflow runner has a some intriguiging shuffle capabilities that offload data intelligently to a shared file system. Could be an interesting way to rechunk/transpose extremely large datasets.

Thanks @Theo_McCaie. The method in that blogpost was heavily inspired by this gist from @rsignell:

For what it’s worth the Met Office Informatics Lab have also played with constructing a Zarr without converting the data from netCDF, but simply pointing Zarr at the netCDF files as if they were chunks -> https://medium.com/informatics-lab/arrays-on-the-fly-9509b6b2e46a.

And to answer your question on vertical scaling @selipot, it means to increase the size of the one node you are running your computation on (e.g. upgrading your node to the beefiest one available, as @Thomas_Moore does). This is different to horizontal scaling which involved splitting your computation up into tasks and farming those out to lots of nodes which perform those tasks separately and in parallel (e.g. as something like dask-distributed does).

I hope that’s useful. Good luck, as @rabernat said - you’ve found a difficult problem to solve!

Ok, this is a very important detail. Data pipelines that don’t change the chunk structure and are suitably local to each chunk are “embarrassingly parallel”. There are many task execution frameworks that would probably handle this well, and it would be great if Dask got better at these very large but relatively simple transformations.

This is a choice you can make in your algorithm. In your approach (“push”), you would loop over files, reading each one only once and potentially writing to multiple target chunks. You would have to worry about write locks or else face potential data corruption from asynchronous writes. Alternatively, for a “pull” approach, you would write each target chunk out only once, but potentially may have to read your input files over and over many times. As you said, especially on cloud, the concurrent reads should perform just fine.

For large, “full-shuffle” style rechunking, you’re likely to hit memory limits with an embarrassingly parallel approach. Some more computational complexity–either task dependencies or temporary storage–is likely required.

A Proposal

This problem is driving me crazy. I would say I have thought about it most nights as I try to fall asleep for the past year or longer. The worst part about this is that it’s not even an interesting problem. It’s a straightforward problem in blocked linear algebra for which an optimal algorithm, given i/o and memory limitations, is likely already known to computer science. Or it’s a problem that a student in the right sub-field would be able to solve very quickly, in terms of algorithm design. However, it can perhaps be made interesting by combining with a modern, asynchronous, serverless parallel execution framework.

We should collaborate with a computer scientist to solve this problem well. The end result should be a simple utility than can rechunk any dataset and, given a scheduler, execute the operation. We will be able to write a paper about how well our algorithm scales, and we will get a very useful tool out of the process.

As a CLI, it might look like

zarr_rechunk --from gs://bucket/group/array1 --to gs://bucket/group/array2 --target-chunks (-1, 1000, 2000)

I would love to investigate pywren / numpywren as en execution framework, as it would enable us to do things in a truly serverless way, using aws lambda / cloud functions.

(also numpywren)

If there are any computer scientists out there who find this problem compelling, please help us!

2 Likes

I think that apache beam does this automatically. From what I can tell, “shuffle” or groupby operations (e.g. grouping by new chunks) will be offloaded either to a node-local disk or a google file store instance. I believe Hadoop map-reduce is similar. Keeping the data in-memory was actually a key selling point of Apache Spark.

This problem is driving me crazy. I would say I have thought about it most nights as I try to fall asleep for the past year or longer. The worst part about this is that it’s not even an interesting problem.

Haha. I couldn’t agree more. Really uninteresting from the scientific perspective, but extremely challenging. Lately, I’ve been very envious of the tabular data folks (i.e. the entire rest of the world) with their slick SQL queries, and extremely robust solutions.

For HPC, I wonder if we could leverage GitHub - NCAR/PyReshaper: A package for converting NetCDF files from time-slice (history) format to time-series (single-variable) format. to create zarr data? The algorithms must be similar.

For cloud, as you say we could implement zarr_rechunk with 1) any task execution framework (w/e is easiest to deploy) and 2) some caching mechanism. Some tools come with both of these out of the box (e.g. Apache Spark/Beam/Flink/etc) but it should be relatively easy to DIY. Either way, this two-phase algorithm could work:

  1. For each chunk, read it, find the intersection w/ each output chunk and persist that into some cache. The keys of this cache will be output chunk, the values will be a list of array data.
  2. For each key in cache, combine the sequence of arrays into a single array and persist. This is zarr data at this point.

It’s pretty simple in principle, but has some tricky indexing logic. The main difficulty is probably finding a performant and easy to deploy task execution back-end. Google Dataflow/Beam is great, but it can be a pain to install dependencies into, and won’t work on HPC.

1 Like

@kmpaul is one of the developers of this tool. It would be great to get his input.

Hey, @rabernat! Thanks for the ping! Yes, the PyReshaper is mine. I’m pretty much the sole author on it. It’s actually been a while since I’ve touched it. (It’s still Python 2.7!)

PyReshaper Design

The PyReshaper was designed to convert synoptic data (1 time-step of all variables in a single file) to time-series data (1 variable over all time-steps in a single file). In other words, it’s like a transpose in the “variable” (or “field”) and “time” “dimensions” (not really dimensions, just conceptually).

The algorithm is very simple, what my group referred to as “Simple Task Parallelism”. Each “Task” in this context is “the writing of a single time-series file”. Hence, using MPI, we parallelize over “output files”. In practice, it is what @rabernat referred to as the “pull” approach, above. Every MPI rank (“task”) opens and reads a single variable from every input file, and then writes the data to its own output file, like depicted in this image:

In this picture, each different color represents a different time-step, and each “slice” is a “time-slice” or “history” file containing multiple fields.

We talked about a more sophisticated algorithm, one that dedicated 1 MPI rank for each input file (i.e., open each input file only once) and 1 MPI rank for each output file, and then use MPI send/recvs to shuffle the data to its approriate destination. But since we considered this an optimization step, and since premature optimization is the root of all evil, we decided to revisit this idea later.

Interestingly, on HPC machines with a good parallel FS, we never had to implement this more advanced algorithm. The PFS dealt with the multiple simultaneous open+read+close of the same file beautifully.

By default, the PyReshaper reads the entire data array from the input file (i.e., a single “chunk”) and writes that to its appropriate time-series file. At some point, when requested by a user, I added the ability to read smaller chunks from the input file and write each chunk separately. That way memory could be controlled a bit.

Problems with the PyReshaper

We take advantage of (abuse?) the optimized performance of the filesystem to allow simultaneous opens+reads+closes of the same file. I’m certain that object storage systems can do the same thing efficiently, too. Maybe that’s not a problem, but just a design choice to not over-engineer the solution.

We greatly depend on the low-level I/O libraries, in this case netCDF4 (originally, PyNIO). We have had problems with the I/O libraries keeping data in memory until the file is closed, even if flushed, which can lead to memory blow-out. We never implemented a solution to this as the solution was always to control the number of input slices you read from (i.e., more slices leading to longer time-series files). We also many times had to run in inefficient modes, such as running only 4 MPI tasks on a 16-core node, to prevent memory blow-out. In this sense, the algorithm is fairly inflexible.

Also, the parallelism is over output files (i.e., variables in the input files). If there are not enough variables in the input files, then you cannot achieve much parallelism. We discussed implementing a more sophisticated algorithm to allow more general scaling, but again, we never found the need.

Xreshaper

When @andersy005 joined us here at NCAR, I told him about the PyReshaper and that if I had written the PyReshaper a few years later than I did, I would have written it with Xarray. So, as an early task for him, I asked him to create an Xarray “port” of the PyReshaper. It never got finished, mainly because there was no demand for it, but you can see the WIP version here:

2 Likes

Yeah, the project didn’t go far :). We didn’t even test it with dask’s distributed scheduler :slightly_frowning_face: I am happy to revisit this in the future (provided there’s a need) and/or help out with addressing this issue.

2 Likes

FYI, I’ve put together the bones of a small package encoding the two-pass rechunk idea at https://github.com/TomAugspurger/rechunk.

As an example of the performance on my laptop, for the 32Gb array:

original = "/tmp/data/original.zarr"
split = "/tmp/data/split.zarr"
final = "/tmp/data/final.zarr"
nslices = 256

a = da.random.random(size=(nslices, 4000, 4000), chunks=(1, 4000, 4000))
display(a)

with performance_report():
    rechunk.rechunk(original, split, final)

gives https://gistcdn.rawgit.org/TomAugspurger/7ff1a55c68886086c0d763045f06a0aa/9ec45684738eeeaffe758c7bfc448b2a0eec4d46/dask-report.html.

I’m essentially limited by the speed of my file system (a plain SSD in this case). We spend ~15% of the time reading from disk and ~85% writing.

It’s using just Dask and Zarr right now, and I’m avoid dask.array.rechunk. It’s more complex than we need and by writing the reading, rechunking, and writing ourselves we get a smaller task graph and guaranteed no communication between workers. Memory usage is just about ideal: the size of a chunk * number of threads / worker.

There are tons of limitations right now (only works with Dask Arrays, not xarray objects, strict limitations on which axis can be rechunked and the chunks on the other axes), but they don’t reflect fundamental issues. If people find the idea promising then I can continue to make progress on it.

The biggest design issue that might make this a bad idea is probably the number of intermediate files. Using Zarr we have a file per chunk. In my example, that’s 65,536 files for the 32GB array. Avoiding that will require some careful thought.

3 Likes

Tom , I wished we had coordinated a bit better, because I spent the whole weekend doing basically the exact same thing, with nearly the exact same name! :grinning:

However, I think mine goes a bit deeper. I have figured out an algorithm to define the minimum needed tasks, and the largest possible intermediate chunks, given a specified worker memory limitation.

Here are some slides I made in the process:

And I have a pretty rigorous test suite of this basic algorithm:

3 Likes

@rabernat naively, should I be able to try this on my dataset starting from the daily netcdf files?

Give us a few days to refine and merge our approaches.

1 Like

What great slides. Looks like ya’ll have been busy!

What if the GCD of the chunk sizes is 1 though? Imagine some horrific chunk sizes like (3, 4) and (4, 3). The chunk size of the intermediate data would be then be (1,1) if I understand your method correctly. By allowing irregular chunks in the intermediate store, one could do significantly better.

This can be mitigated by the “consolidating reads” step. The more memory you have, the larger the intermediate chunks can be.

You could go even farther and make this algorithm recursive, with many stages of intermediate files. That would allow you to limit the total number of intermediate chunks.

Actually, I realized it can be a lot simpler. I think you can just take the min of read_chunks and target_chunks. Testing this now.

1 Like

The mention of Pywren piqued my interest as I’ve recently been experimenting with getting Dask running with Pywren on Google Cloud (note that it uses the newer IBM version of Pywren):

I think rechunker looks like a very elegant solution (I particularly like the fact that memory constraints are explicitly accounted for, often the bane of distributed systems). It could be a very nice fit for Dask on Pywren.

1 Like

FYI I managed to get rechunker running using Dask and Pywren - for a toy example at least - https://github.com/tomwhite/dask-executor-scheduler/commit/343882c14fc7bd4010bc7c77b20e8f65c5ea2f44

1 Like

@tomwhite - this sounds amazing! We’ve been a bit swamped this week with our cloud refactor (see Migration of ocean.pangeo.io User Accounts). I’m looking forward to going through your code. Thanks so much for working on this problem.

1 Like

@selipot - as you noticed on twitter, the rechunker library is now released. We wrote about it here:

Please give it a try using the documentation and report back on any problems you have. I anticipate there will be bugs as this is the very first release. I’d love to work with you to get your workflow optimized well and use this as a case study for why we need rechunker.

1 Like

That’s fantastic and great timing. I am planning on working again on this project in a few days. I am definitely keeping you updated.

1 Like

If you want to rechunk directly from dask arrays (e.g. that you make by reading your netcdf files with xarray), you will need this feature that was just merged:

Otherwise, it only does zarr to zarr rechunking.

In order to use the latest version (which is not yet released) you need to install directly from git:

pip install git+https://github.com/pangeo-data/rechunker.git