Many netcdf to single zarr store using concurrent.futures

Hi all !

thank you for making this knowledge source available, very useful. I’ve heard about pangeo since I’m a user of CNES facilities.

Shortly said, when I try to fill a single zarr store from multiple netcdf files using concurrent.futures, it fails to write the store correctly, ending up in an error about different dimension size when I try to load the zarr store.
I read some related topics here and there, including in this forum, yet not succeeded in apply the solution to my specific problem. My apologies if this question is already answered.

Below, I’ll give you the generic context then explain my issue.

Here is the context of my question:
S3 altimetry files are distributed as netcdf files, one file per orbit.
The structure of a netcdf file is quite simple: some one-dimensional variables with a time dimension.
The dimension of the time varies from one file to another.
My whole environment relies on xarray so I would like to remaing using xarray for I/O.
Each file is easily read with xarray.open_dataset

<xarray.Dataset>
Dimensions:                                        (time: 2553)
Coordinates:
  * time                                           (time) datetime64[ns] 2017...
    lat                                            (time) float64 ...
    lon                                            (time) float64 ...
Data variables: (12/53)
    surf_type_01                                   (time) float32 ...
    dist_coast_01                                  (time) float64 ...
    range_ocean_rms_01_ku                          (time) float32 ...

Loading a large bunch of individual netcdf files with open_mfdataset is sometimes very long so I’m trying to improve the performances using zarr
To do so, I’m trying to write each individual netcdf files in a single zarr store using concurent.futures.

I found somewhere

  • how to initialize the store with a compute=false
  • how to set the append mode and declare the dimension to expand
  • that it could be necessary to use a Zarr synchronizer
ZARR_SYNC=zarr.ProcessSynchronizer('zarr/sync_zarr.sync')

def load_netcdf_write_zarr_1store(fname):
    
    ds = xr.open_dataset(os.path.join(datadir,fname))

    fname = 'zarr_store_from_netcdf.zarr'
    fname = os.path.join(zarrdir,fname)
    ds.to_zarr(fname,mode='a',append_dim='time',synchronizer=ZARR_SYNC)

    return 1
    

flist = glob.glob(os.path.join(datadir,'S3A_SR_2_WAT____*_*_*_*_01[0]_???_*'))
flist.sort()
print('#',len(flist))

# The values of this dask array are entirely irrelevant; only the dtype,
# shape and chunks are used
fname = flist[0]
ds = xr.open_dataset(os.path.join(datadir,fname))
# Now we write the metadata without computing any array values
fname = 'zarr_store_from_netcdf.zarr'
fname = os.path.join(zarrdir,fname)
ds.to_zarr(fname,compute=False,synchronizer=ZARR_SYNC)
# parallel
with_threads_start = time.time()
with concurrent.futures.ProcessPoolExecutor() as executor:
    out = executor.map(load_netcdf_write_zarr_1store,flist)
print("With threads time:",time.time() - with_threads_start)

everything seems fine when this code is executed over about 1000 netcdf files.

but when I try to load the file using xarray.open_zarr, it seems that the zarr store is not properly filled

ValueError: conflicting sizes for dimension 'time': length 31616 on 'wtc' and length 39802 on {'time': 'att_ku'}

So here it is. This is probably a common situation but I could not find a way to fix this.

I’ve seen some topics related to ‘region’ option but since the time dimension varies from one file to another + I use concurrent writing, I can’t see how it would apply here.

Any suggestions is more than welcomed, thank you !

1 Like

Welcome @bpicard_fluctus!

Have you tried just using xarray + Dask to handle this, rather than rolling your own multiprocessing? Have you read the instructions on the main Pangeo site? - Pangeo and Data — Pangeo documentation

Thank you @rabernat !

I have no choice than dealing with this large amount of netcdf files and, yes, I’m currently using xarray + open_mfdataset + under the hood dask capability.

The thing is, I’m not happy with the performance of
open_mfdataset over a (very) large number of netcdf files and my goal is to verify if zarr provides better performance.

(Each file covers about 45min to 90min and I sometimes need to compute daily mean over 10 years)

In that case, I would strongly encourage you to check out Pangeo Forge. It is designed to solve this exact problem!

https://pangeo-forge.readthedocs.io/en/latest/tutorials/xarray_zarr/netcdf_zarr_sequential.html

Edit to clarify:

There are two common ways of generating a single zarr store from many netcdf files:

  1. The approach described in Pangeo and Data — Pangeo documentation
    ds = xr.open_mfdataset(list_of_many_netcdf_files)
    ds.to_zarr('one_big_store.zarr')
    
  2. Pangeo Forge

In my comment above, I was suggesting you try option 1. I was not suggesting that you just forget about Zarr and work directly with the netCDF files. If option 1 doesn’t work, then I would look into Pangeo Forge.

Ok, great ! Thank you for pointing out this particular pangeo example: I’ve seen those pages these past days but I was not sure which one will apply to my case.

So, I will definitely compare the two approaches and then come back on this topic with some metrics.

Talk soon !

1 Like

So, I had a look to the different recipe presented in pangeo forge

  1. some feedbacks on pangeo-forge install
  • pip install pangeo-forge worked fine
  • pip install pangeo-forge-recipe worked fine
  • I needed to add netcdf4 and zarr package because xarray did not recognize the engine (would be nice if it was already included ?)
  • I needed to add mypy_extensions
  • you are probably aware that fsspec should be force to fsspec=2021.11.0 because there is a bug on blocksizeerror at the import on 2011.11.1
  1. from OISST example, I learned how to create a pattern

  2. from CMIP6 example, I learned how to compute chunksize and preprocess the data

but from what I understood, all those examples (even TerraClimate) seems to assume a regular sampling in time (like 1 month).

my particular case is that each netcdf files contains a different duration time corresponding to half an orbit with a temporal sampling of 1s (about 2700 records, could be more, could be less)

I guess that the tricky point is to set nitems_per_input=None, as in the TerraClimate example, but still, in this example, we are talking of regular monthly data.

Am I in the right direction ?
below the main outputs and the script I have executed

I tried the following chain, inspired by the different examples, but without success…
some encouraging

[2021-12-18 20:15:02+0100] INFO - prefect.TaskRunner | Task 'cache_input[888]': Finished task run for task with final state: 'Success'

was fine but followed by a lot of

[2021-12-18 20:15:04+0100] INFO - prefect.TaskRunner | Task 'store_chunk[7]': Finished task run for task with final state: 'TriggerFailed'
import os,sys
import glob

import xarray as xr

from pangeo_forge_recipes.patterns import pattern_from_file_sequence
from pangeo_forge_recipes.recipes import XarrayZarrRecipe
from pangeo_forge_recipes.patterns import FilePattern, ConcatDim, MergeDim


# the netcdf lists some of the coordinate variables as data variables. This is a fix which we want to apply to each chunk.
def preprocess(ds):
    ds = ds.rename({'n_profiles':'time','time_01':'time','lat_01':'latitude','lon_01':'longitude'})
    ds = ds.assign_coords({'time':ds.time,'latitude':ds.latitude,'longitude':ds.longitude})
    return ds


datadir = '/DATA/ETUDES/AMTROC/1DVAR/v15/S3A/'

fname = 'S3A_SR_2_WAT____20210101T133145_20210101T141802_20210127T055314_2777_067_010______MAR_O_NT_004.1dvar.nc.no_wind'
ds = xr.open_dataset(os.path.join(datadir,fname))
ds = preprocess(ds)
print(f"File size is {ds.nbytes/1e6} MB")

input_urls = glob.glob(os.path.join(datadir,'S3A_SR_2_WAT____*_*_*_*_01[0]_???_*'))
input_urls.sort()
print(f"Found {len(input_urls)} files!")

pattern = pattern_from_file_sequence(input_urls, "time")

for key in pattern:
    break
print(key)


#
# preprocess + chunk size
#
# ~ https://pangeo-forge.readthedocs.io/en/latest/tutorials/xarray_zarr/cmip6-recipe.html
# =======================================================================

# ~ Step 2: Deciding how to chunk the dataset
# ~ Here we set the desired chunk size to 50 Mb, but something between 50-100 Mb is usually alright

# ~ Now let’s think about the Zarr chunks that this recipe will produce. 
# ~ Each target chunk corresponds to one input. So each variable chunk will only be a few MB. 
# ~ That is too small. Let’s increase inputs_per_chunk to 20. 
# ~ This means that we will need to be able to hold 10 files like the one we examined above in memory at once. 
# ~ That’s 16MB * 10 = 160MB. Not a problem!
# ~ recipe = XarrayZarrRecipe(pattern, inputs_per_chunk=10)
ntime = len(ds.time)       # the number of time slices
chunksize_optimal = 50e6  # desired chunk size in bytes
ncfile_size = ds.nbytes    # the netcdf file size
chunksize = max(int(ntime* chunksize_optimal/ ncfile_size),1)
print("chunksize is ",chunksize)
target_chunks = ds.dims.mapping
target_chunks['time'] = chunksize

# ~ Step 4: Create a recipe
# ~ Time to make the recipe!
# ~ In it’s most basic form, XarrayZarrRecipe can be instantiated using a file pattern as the only argument. 
# ~ Here we’ll be using some of the optional arguments to specify a few additional preferences:


#
# nitems_per_input=None
# ~ https://pangeo-forge.readthedocs.io/en/latest/tutorials/xarray_zarr/terraclimate.html
#
# =======================================================================


# ~ What makes it tricky
# ~ This is an advanced example that illustrates the following concepts
    # ~ Multiple variables in different files: There is one file per year for a dozen different variables.
    # ~ Complex Preprocessing: We want to apply different preprocessing depending on the variable. This example shows how.
    # ~ Inconsistent size of data in input files: This means we have to scan each input file and cache its metadata before we can start writing the target.
# ~ This recipe requires a new storage target, a metadata_cache. 
# ~ In this example, this is just another directory. You could hypothetically use a database or other key/value store for this.


# ~ We are now ready to define the recipe. We also specify the desired chunks of the target dataset.
# ~ A key property of this recipe is nitems_per_input=None, which triggers caching of input metadata.
# ~ chunks = {"lat": 1024, "lon": 1024, "time": 12}

# ~ We plan to update these data periodically (annually).
# ~ target_chunks = {"lat": 1024, "lon": 1024, "time": 12}

recipe = XarrayZarrRecipe(
    file_pattern=pattern,
    target_chunks=target_chunks,
    process_chunk=preprocess
)
print(recipe)

# ~ Since we did not specify nitems_per_file in our ConcatDim, the recipe needs to cache input metadata. So we need to suply a metadata_cache target.
import tempfile
from fsspec.implementations.local import LocalFileSystem
from pangeo_forge_recipes.storage import FSSpecTarget, CacheFSSpecTarget, MetadataTarget

fs_local = LocalFileSystem()

target_dir = tempfile.TemporaryDirectory()
target = FSSpecTarget(fs_local, target_dir.name)

cache_dir = tempfile.TemporaryDirectory()
cache_target = CacheFSSpecTarget(fs_local, cache_dir.name)

meta_dir = tempfile.TemporaryDirectory()
meta_store = MetadataTarget(fs_local, meta_dir.name)

recipe.target = target
recipe.input_cache = cache_target
recipe.metadata_cache = meta_store
print(recipe)

# logging will display some interesting information about our recipe during execution
import logging

logging.basicConfig(
    format='%(asctime)s [%(levelname)s] %(name)s - %(message)s',
    level=logging.INFO,
    datefmt='%Y-%m-%d %H:%M:%S',
    stream=sys.stdout,
)
logger = logging.getLogger("pangeo_forge_recipes")
logger.setLevel(logging.INFO)
flow = recipe.to_prefect()
flow.run()


ds_target = xr.open_zarr(target.get_mapper(), consolidated=True)

1 Like