What?
8759 netcdf files totalling 17TB of HYCOM ocean model (u,v) velocity data at two depth levels (and bottom velocity), at hourly time steps. So the total data arrays of interest are [9000 (X) by 7055 (Y) by 8759 (time) by 2 (Depth)] for both u and v; Using nco tools, I can reduce these files as an example to 365 netcdf files, totaling 4.3TB, one file per day (24 hourly steps), for one depth level only, so data arrays are 9000 (X) by 7055 (Y) by 8759 (time) for the u,v components.
How?
Using python, dask, xarray, etc on brand new (beta) supercomputer/HPC at the University of Miami (https://acs-docs.readthedocs.io/triton/README.html) that runs a JupytherHub, and distributes jobs across its nodes using the LSF scheduler. Using the scheduler I can distribute jobs on individual nodes, each having ~250GB of memory, and 40 cores, perhaps which means that it physically has 8 CPUs with 5 threads each, but thatâs a major source of confusion (for me). I can typically access 10 nodes at a time, so secure about 2.5TB of RAM, with potentially 80 processes and 400 threads. I have been told than I can get up to 500 âworkersâ (threads?) so that would be on 12 nodes of the ânormalâ queue. Alternatively, I could work with only the interactive node that runs Jupyter Lab, with ~250GB of memory and up to 40 workers.
Why?
We want to conduct analyses along the time axis, at every grid point. Ultimately, we want to compute average rotary velocity spectra following the study of Yu et al. 2019 ⌠but I can hardly do simple things right now.
What am I looking for?
Advice to make these HYCOM data analysis-ready for pangeo tools, favoring the time dimension/coordinates. Conducting some tests and visualizing attempts at computation, it looks like the I/O is a major issue. Forget about lazily opening an xarray dataset from the 8759 hourly netcdf files (however this appears to work with 365 daily files). We have also made 19 zarr arrays containing about 20-day worth of data, which seem to load in a notebook, and manage to launch computations but these eventually failed, typically with killed workers because of memory issues. Sizing dask chunks is also something I have been playing with âŚ
Hopefully this can be helpful? I am also on the journey of understanding how to make things work better myself and Iâm thankful to have gleaned knowledge from others.
We generate and work with 6D (3D + ensemble + start date + lead time) forecasts and 5D (3D + ensemble + time) coupled ocean-atmosphere output where single variables can be ~5TB+, complete 6D forecast experiments are order 50+TB range, and 5D reanlyses are in the multi-100TB range.
We went on a journey to figure out how to deal with this data tsunami. Luckily we found the Pangeo community and have been making progress with lots of help. In our team I work with @dougiesquire and weâve taken what weâve learned from our Pangeo colleagues to build an HPC workflow internal to our organisation.
Here are the main dot-points from my POV.
conversion from NetCDF to zarr has really enabled our work
chunking (and re-chunking depending on your calculation) matters
there are still hurdles I donât fully understand wrt horizontal scaling in some calculations
WRT #3 weâve had issues with horizontal scaling using our orgâs new super-fast parallel storage. My very ignorant POV is that it has something fundamental to do with how Dask bundles up tasks and the timing at which the workers communicate with each other? I think itâs related to: https://github.com/dask/distributed/issues/2602 ?
Our current solution is vertical scaling using the largest node we can get as a single worker with as many threads & RAM as the largest node can provide.
To sum up this ramble - from my POV:
work on converting NetCDF to zarr
chunking (and re-chunking depending on situation) matters to get your calculations to finish quickly or at all
there may be some underlying software engineering challenges that could be holding you/us back
Our current solution is vertical scaling using the largest node we can get as a single worker with as many threads & RAM as the largest node can provide.
Copy the mounted NetCDF files to S3 using FSx ârepository syncâ
Run the zarr conversion for an initial time slice (say 60 days). Continue to append data and copy the zarr store to S3 using FSx repository sync
Another problem was that if you encounter an error while generating and appending to the final zarr store, itâs very difficult (impossible?) to recover from. My system for this was time-consuming but for every year that I appended I did another repository sync and created a backup to duplicate where I had successfully got to - in case something goes wrong in the subsequent year. Fortunately, FSx seems to keep some metadata about already copied data so it should only add the new zarr files if you are syncing to the same S3 location.
Thank you @aimeeb@Thomas_Moore@cgentemann for connecting me and taking the time to share your experience and recommendations. I have a lot to digest but perhaps a few clarifications needed:
What does vertical scaling mean in your instances?
About zarr: would you create a single zarr array/store with the model global dataset or should I divide it up in geographical regions? From what you are describing @aimeeb it seems that you create a single array and append to it along the time dimension? In the end I want to analyze/use my entire global model data.
Right now I am lazily loading the netcdf files to form a xarray dataset which I convert to zarr using the xarray.to_zarr method, setting chunks in whatever way I think should work. With my probably-not-adequate cluster configuration, creating a zarr array for 1/20-th of my model geographical area has now been going on for 8 hours.
I have some comments on the netCDF side of things. I agree that converting to zarr is the way to go.
This should not be an issue if you use the right combination of commands ( I need to work on making the defaults more sane). See https://xarray.pydata.org/en/stable/io.html#reading-multi-file-datasets . I do this routinely, so if it does not complete in a reasonable amount of time, please ping me.
The second point is that the HDF-5 C library does not support multithreading AFAICT (for e.g. Multi-threaded reads use only 100% CPU? ¡ Issue #591 ¡ h5py/h5py ¡ GitHub). This is used by the netCDF4 python library so thereâs no sidestepping it. So you should set up your dask cluster to be one thread per process. This should speed up reading though amount of improvement will depend on how youâve configured your cluster currently. A small test reading the same dataset using all threads or all processes should show the difference.
It uses an artificial dataset full of normally distributed random numbers that is created on the fly with chunks that look like they were opened from many different files containing only one time step. Then, re-chunking to a more time-filter friendly form is done, and a simple 6h box-shaped running-mean filter is applied. Finally some calculation (time variance of zonal mean plotted against y) is performed.
By playing with the chunk sizes, you can easily find choices that make it very difficult for either the Dask scheduler (because the graph just got too complex) or for the cluster to not run out of memory.
Hi Shane, welcome to Pangeo, and thanks for posting such an interesting and popular first question!
Above several folks have given very useful practical suggestions. I would like to digress into a more abstract, conceptual discussion about why what you are doing is a hard computational problem. I have personally wrestled with this problem for quite a while. In fact, I wrote the original code that Yu started with for the LLC4320 model here. So I thought I would take this as a chance to write down some thoughtsâforgive me for taking things a bit off topic.
All array data is chunked somehow
All array data is âchunkedâ to begin with, whether we recognize it or not. Our models (also satellites) tend to produce one 2D or 3D spatial field per timestep, which is why you are starting with 8759 netCDF files. When that data is stored on a hard disk, this means that the bits from the same day are physically close to each other, and can efficiently be read into memory in a single block a.k.a. chunk. (If the netCDF files use actual netCDF4 chunking, it means that a whole chunk has to be read and decompressed at once.) When you want to analyze the data along the time dimension, e.g. computing temporal spectra, you need to get the bits from each point in space into memory, which means bringing together bits that are stored very far away from each other. For a dataset of your size, this is just really hard any way you approach it.
Most of us can imagine a naive approach to the problem, where we simply loop over each point. In pseudocode, it would be something like this
for j in range(Ny):
for i in range(Nx):
timeseries = []
for n in range(Nt):
data = open_file(n)
timeseries.append(data[j, i])
# maybe analyze it now
analyze_timeseries(timeseries)
# or maybe store it for later
store_timeseries(timeseries, j, i)
This would involve opening each of the 8759 files 9000x7055 times. It will be extremely slow. If each file can be opened and read in 1 ms (pretty fast), it will take 17 years! So if we can beat 17 years, we have made progress.
Parallelization is clearly needed, but itâs not obvious it will always help. We will face at several constraints:
how fast can the storage system read data in parallel
how much memory (RAM) is available to cache data
how fast can data be passed between parallel actors over the network
On the first point (read throughput), itâs easy to imaging we might actually degrade performance by trying to read data in parallel. Our hard disk (and our sysadmin) might be very unhappy if we issue 8759 simultaneous requests to open different files, or 9000 simultaneous reads from the same file. It might backfire, and take even more than 17 years!
Letâs bypass that issue for a moment, and consider the idealized case where the disk is not a limiting factor. (If youâre lucky, your HPC system has a fancy parallel filesystem and this is the case.) In that case, we can replace the reading with randomly generated data.
Example with Random Data
I created a simple notebook to explore the rechunking problem with random data:
In this example, I create some arrays that have the traditional one-chunk-per-timestep structure, i.e something like this (only 4 timesteps)
I do this in such a way that the total final chunk size (128 MB) matches the original chunk sizeâŚthis determines the final chunk spatial dimensions.
An important parameter to look at here is the number of tasks: it took 28 tasks to rechunk these 4 chunks. That represents the computational complexity of the operation. For such a small graph, we can examine it directly (data.visualize()):
At the bottom are the four chunks of random data we read, and at the top are the final four outputs chunks. In between are some operations involve splitting, and recombining the chunks. An important observation is dask, by default, will not regenerate the same data twice. It will only do it once and them pass the data around between workers. I believe this can be suboptimal in many cases, but I donât know how to change it.
How does this scale? To check this out, I tried this operation for increasing Nt (number of timesteps).
we are already at > 250,000 tasks. Once we get close to a million tasks, we start to reach the limits of the dask scheduler.
An Actual Computation
I tried computing some of these arrays on ocean.pangeo.io, using a cluster of 20 workers (each with 2 cores and 11.5 GB RAM; 230 GB RAM total). I was able to compute up to size Nt=512, 65 GB total. For larger arrays than that, my workers started running out of memory before they were able to start storing the output chunks. (Dask performance report for Nt=512 is here.)
The conclusion from my unscientific study is that you probably need significantly more memory in your cluster than the size of your array. So if you desired output array is 4 TB, your cluster may need to have something like 12 TB of memory.
It would be really nice if Dask could figure out how to not fail on this computation without requiring so much memory. After all, our naive example would never blow up our memory (even if it did take 17 years). Is there something in between. I believe that the fundamental issue here is a known challenge scenario for dask involving the concept of âmemory backpressureâ:
In the long term, we hope dask will improve its scheduling strategies and eventually become more adept at these workloads. But that will not happen immediately.
So what should you do?
The obvious workaround is to do everything you can to make your problem smaller. Some ideas:
Definitely process each variable (e.g. u, v) separately. Your cluster is already overwhelmed.
Do you really need every point in the horizontal? Based on my experience, the answer is no. There is very little valuable information at the grid scale in these models. The effective resolution is much lower. Maybe you could you get by with subsampling, say, every 4, 8, or even more, in the x and y dimensions. Subsampling lowers the memory footprint.
Do you really need a âfullâ rechunk? Or could you get by rechunking to an intermediate level of contiguity, say 60 days? This lowers the complexity.
Experiment with idealized examples like the one I showed in my notebook to get a feel for what your cluster can handle.
Just throw more resources at it. More memory, more workers, etc.
If this is your first real experience with the Pangeo stack, you may be feeling frustrated by now. Why doesnât this âjust workâ? I think itâs important to recognize that you have picked one of the hardest problems out there. Our stack excels at mostly-embarassingly-parallel problems, but this is not one of them. Itâs a genuine computational challenge, and a massive amount of data, however you slice it.
p.s. Please consider sharing your actual code (e.g. via github gist), as there may be a few obvious tricks and optimizations we can suggest after inspecting it.
Ryanâs post does a good job describing Daskâs issues with this. Iâll summarize them, and then provide a potential solution.
The basic idea is to help out Dask by doing the rechunking in two passes:
A split only pass, which rechunk on the dimensions you arenât merging (1 and 2 in this case). This output is written to disk
A merge pass, which reads the output from step 1 and merges the merge dimension (0 or time in this case)
With this, we should be able to efficiently repartition larger than memory datasets on a cluster (see some caveats at the end).
Letâs generate a random dataset thatâs chunked along the first dimension (think time) and write that to disk (using Zarr, but everything Iâm talking about here applies equally well to HDF5). This represents the raw data as handed to us; we donât have any control over it.
import dask.array as da
a = da.random.random((4, 4000, 4000), chunks=(1, 4000, 4000))
da.to_zarr(a, "/tmp/data/original.zarr", overwrite=True)
Read it into a Dask Array:
data = da.from_zarr("/tmp/data/original.zarr")
data
For analysis, weâd love to merge the first dimension (time). To keep the blocks of reasonable size weâll also split the other dimensions. Weâll rechunk such that the size of each block is the same.
Inspecting this task graph shows the problem from Daskâs point of view: a global communication pattern where each output chunk depends on each input chunk.
As written, our workers would need a lot of memory to finish this job, at least without a lot of spilling to disk. And Dask tends to struggle with workloads that push the memory limits of the cluster, leading to crashing workers and sad users.
A two pass solution
We can work around these issues in Dask by processing the data in two passes
A split only pass, which rechunk on the dimensions you arenât merging (1 and 2 in this case). This output is written to disk
A merge pass, which reads the output from step 1 and merges the merge dimension (0 or time in this case)
# split only
data2 = data.rechunk({1: 2000, 2: 2000})
Each of the 4 rechunk-merge output chunks depends on just 4 input chunks each: the âsplitâ versions of the original chunks. So our peak memory use for this stage is just the size of rechunk-merge output, which we know fits in memory since itâs the size of the original chunks given to us.
That can be written to disk, ready for analysis. At least, thatâs the dream. For the single-machine threaded scheduler we actually achieve that. On my laptop I was able to scale this workload up to a 16 GB dataset without issue (finished in ~1 minute. Most of the time the memory usage was ~1.5Gb).
But with the distributed scheduler we run into some issues. Follow https://github.com/dask/dask/issues/5105 for updates, but we potentially end up scheduling (from-zarr, 3,0,0) on a different machine than (from-zarr, 1, 0, 0). That means we need to communicate between machines / processes, which blows up our memory budget. Iâm hoping to have a fix for this (by fusing those 4 from-zarr tasks into one, so that the distributed scheduler canât make the âwrongâ choice). If that fails Iâll share a trick to âmanuallyâ schedule these, but I hope it doesnât come to that.
Anyway, at this point we should be able to write data4.to_zarr() to disk in the analysis-ready format.
Tom, this is really awesome! Thanks for your creative thinking here!
I guess the only downside is that it potentially requires lots of scratch space for the temporary dataset. The temporary dataset might also have very many chunks (potentially millions of individual files), which could stress some HPC filesystems. However, I think it is definitely the best path forward for Shane.
@kmpaul - I know you worked on this problem from the perspective of NetCDF chunking. Do you have any insight to add?
Exacty, that could be the biggest pain point with Tomâs strategy, it might prevent it to work in some HPC facilities.
This point seems really important to me, it would be worth spending some time to see if something could be done.
Could we think of something intermediate between the two approaches : some sort of iterative rechunking. Say if youâve got to rechunk over a year, first you rechunk by month, save the output, and then start from there to rechunk along the full year reducing again spatial dimension of the chunks? Could this lead to graphs a little simpler and easier to execute?
On the number of intermediate tasks / files: yes that is a problem. @geyardâs suggestion to do the rechunking in multiple passes (e.g. daily -> monthly -> yearly) seems like it should work.
Still thinking through the data regeneration point. My initial reaction is that it may not be terribly important here. That typically comes up when you have a large input and a small output (e.g. .from_zarr(...).mean()).
In this case I think the issue is primarily poor task fusion (https://github.com/dask/dask/issues/5105#issuecomment-625282112), coupled with distributedâs memory issues in communication heavy workloads (still a problem to be fixed, but much much harder).
At the risk of being controversial, I would recommend against dask+xarray for
these kind of ETL pipelines, although they do work better in HPC than the
cloud. When faced with a similar netCDF-to-zarr task, we we ultimately
decided that dask+xarray is not the right tool for this task, and that
âdumbingâ our code down to lower-level libraries like netCDF and zarr was
necessary.
Our data consists of highly structured data with the following pattern:
gs://bucket/data/{time in YYYYMMDD.HHMMSS}/data.tile{tile}.nc
Each netCDF file contains O(10) variables and has identical dims and coords.
The tile dimension is a spatial dimension for a subdomain of a cubed-sphere
atmospheric model. There are 6 tiles, 3800 time steps, and each file is 270
MB in size (so 6 TB in total).
Ultimately, we could not use xarray+dask to process this data into zarr. We
tried several different strategies that seemed interesting, but all
deadlocked on rechunking/out of memory issues. We fixed this by subtracting
the dask layer, which is rightfully focused on a much larger set of problems,
and less optimized for these simple data transformation tasks, which usually
have a very structured form.
The basic idea is to
open a single file. Use this is a template to
initialize a zarr group and zarr array with the correct size to fit all the
files. This only uses zarr.open_group and zarr.create.
loop over all files and insert into the pre-initialized array.
The loop can be done in
parallelâwe used Google Dataflow, but dask distributed would work tooâbut
the chunking per worker has to be correct if you want to avoid race
conditions when inserting data. The easiest way to accomplish this is to use
a chunksize of 1 along time or tile dimensions. The data can be rechunked
later using xarray+dask. For HPC writes to a shared filesystem you could
probably use zarrâs locking
primitives to avoid
race-conditions even if workers need to write to the same chunk.
This more manual approach is scalable and robust. You could do this for 10 or
million files since there is no global dask graph that needs to fit in
memory. You can easily add retry logic for I/O failures, making the pipeline
fault tolerant.
A disadvantage is that it requires reverse engineering how xarray interprets
zarr stores using attributes like _ARRAY_DIMENSIONS, fill_values, and time
encodings.
tl;dr; xarray+dask is a heavy layer for these highly structure I/O
operations. Use a parellel for-loop instead.
Noah, your suggestion is most appreciated. Itâs certainly not controversialâmost of us have been aware this âfail caseâ for dask for some time. The last thing we want here is to become dogmatic about using certain tools. Your workaround sounds like a very useful reference point.
Could you share a few more details, such as the source and target chunk structure? Are you trying to do a âfull shuffleâ rechunk like Shaneâs example? If so, I donât quite see how you avoid a combinatorial explosion of source file reads.
I think this statement is very wise:
We fixed this by subtracting the dask layer, which is rightfully focused on a much larger set of problems, and less optimized for these simple data transformation tasks, which usually have a very structured form.
My view is that, in the long term, it would be useful to collaborate around a dedicated rechunking package, focused solely on this problem. Dask may still play a role in the solution, but perhaps not using the out-of-the-box graphs generated by dask.array.rechunk.
Btw, this xarray PR might help you optimize your pipeline further, perhaps making it easier to use Xarray rather than the lower-level zarr interface for your I/O.
The source and target chunk structure in our case are the same 1 chunk per tile and time point. This does make things much simpler than for Shaneâs case. For rechunking, Iâd be more worried about multiple writes to the same chunk rather than overlapping reads. Apache beam is more of âpushâ style compared to dask+ xarray which has a âpullâ like structure. Basically, we load a bunch of netCDFs and push their variables through a DAG, rather than building up an dask graph and pulling data from it.
I am currently working on a case with some rechunking and computations mixed in, so Iâll let you know how it goes! In principle, Googleâs managed dataflow runner has a some intriguiging shuffle capabilities that offload data intelligently to a shared file system. Could be an interesting way to rechunk/transpose extremely large datasets.
Thanks @Theo_McCaie. The method in that blogpost was heavily inspired by this gist from @rsignell:
For what itâs worth the Met Office Informatics Lab have also played with constructing a Zarr without converting the data from netCDF, but simply pointing Zarr at the netCDF files as if they were chunks -> https://medium.com/informatics-lab/arrays-on-the-fly-9509b6b2e46a.
And to answer your question on vertical scaling@selipot, it means to increase the size of the one node you are running your computation on (e.g. upgrading your node to the beefiest one available, as @Thomas_Moore does). This is different to horizontal scaling which involved splitting your computation up into tasks and farming those out to lots of nodes which perform those tasks separately and in parallel (e.g. as something like dask-distributed does).
I hope thatâs useful. Good luck, as @rabernat said - youâve found a difficult problem to solve!