Hey folks,
I have been playing around with numerical weather forecasting using deep learning for a while. I mainly use the hourly reanalysis ERA5 datasets. The size of ERA5 datasets is huge, especially if you prefer a high-resolution model with multiple atmosphere levels. Below, I will summarize what I think is the best practice for handling such a huge amount of data.
Storage, Format Conversion and Efficient I/O
To put it in short, the best format for fast I/O is zarr. However, many datasets are not available in zarr format (there exists some cloud version). For weather datasets, NetCDF and Grib are commonly adopted data formats. If you intend to convert between NetCDF and Grib, you may simply use ecCodes or climate data operator.
Pay attention, if you want to convert NetCDF file, especially combining multiple NetCDF files over time/level dimension, into zarr. Make sure to drop the encoding of each data variable first. Failure to do so will lead to nan value and it will be too late when your loss and gradient become nan. The NetCDF encodes each variable by default, with FillValue, scale factor and add offsets. When you concat/merge multiple NetCDF files using xarray, only the encodings from the first file will be kept, leading to data inconsistency and nan value. The following code will do the job:
for key in ds.data_vars.keys():
ds[key].encoding = {}
There are also efforts to create logical zarr from NetCDF/Grib files using kerchunk. I’ve tested the performance of kerchunk with a large amount of NetCDF files, but it does not scale well when the number of files increases. The kerchunk implementation for Grib file seems buggy and the generated datasets tend to have missing time steps. But if you are interested in using cloud storage, you should definitely read the following posts 1, 2.
Chunking, Xarray, Dask and pytorch dataloader
The bottleneck of my training loop is the slow read of data from disks. Since my training objective is to predict the weather state at a particular time step, each chunk of the data containing the entire weather state for a single time step will give the fastest I/O over the time dimension. The ERA5 weather datasets contain multiple variables, each stored (or chunked) separately. If you are familiar with xarray, you will notice that xarray automatically uses dask to load chunks in parallel if you intend to use the chunking feature. Using dask will, in general, increase the reading speed when each reading operation involves multiple chunks. But when coupled with distributed training, dask may cause a significant amount of problems. Below, I summarize the problem I encountered with dask, xarray and distributed training.
-
Avoid using xarray.open_mfdataset when there are too many files
-
If you use dask and pytorch dataloader with multiple workers, be sure to read this post.
-
If you use dask and pytorch dataloader with multiple workers, you may encounter memory leak and should set the environmental variable MALLOC_MMAP_MAX_=40960
-
If you use xarray to read the data with a chunking strategy, and you intend to use distributed training (let’s say pytorch DDP), you should avoid using dask. pytorch and dask will race for CPU and significantly slow down the reading speed. you can avoid this by setting DASK_NUM_WORKERS=1 and DASK_THREADS_PER_WORKER=1. dask.config.set(scheduler=‘sync’) may also do the work.
-
Further optimization could be done by directly reading from zarr. you will have to decode the variable yourself using the encoding information, but I prefer simply drop all the encodings before converting to zarr
Now after all this, I managed to load 11 variables for a single time step on the 0.25 resolution grid for 32 levels within 0.8s. Combined with multiple dataloader workers, the GPU is almost always fully utilized! There are also some other posts worth reading on this topic, like the following one:
There are tools like xBatcher, publicly available datasets weatherBench and library like kvikio for direct disk to GPU loading if you are interested.
I will share my implementation after I have more time.