Many netcdf to single zarr store using concurrent.futures

So, I had a look to the different recipe presented in pangeo forge

  1. some feedbacks on pangeo-forge install
  • pip install pangeo-forge worked fine
  • pip install pangeo-forge-recipe worked fine
  • I needed to add netcdf4 and zarr package because xarray did not recognize the engine (would be nice if it was already included ?)
  • I needed to add mypy_extensions
  • you are probably aware that fsspec should be force to fsspec=2021.11.0 because there is a bug on blocksizeerror at the import on 2011.11.1
  1. from OISST example, I learned how to create a pattern

  2. from CMIP6 example, I learned how to compute chunksize and preprocess the data

but from what I understood, all those examples (even TerraClimate) seems to assume a regular sampling in time (like 1 month).

my particular case is that each netcdf files contains a different duration time corresponding to half an orbit with a temporal sampling of 1s (about 2700 records, could be more, could be less)

I guess that the tricky point is to set nitems_per_input=None, as in the TerraClimate example, but still, in this example, we are talking of regular monthly data.

Am I in the right direction ?
below the main outputs and the script I have executed

I tried the following chain, inspired by the different examples, but without success…
some encouraging

[2021-12-18 20:15:02+0100] INFO - prefect.TaskRunner | Task 'cache_input[888]': Finished task run for task with final state: 'Success'

was fine but followed by a lot of

[2021-12-18 20:15:04+0100] INFO - prefect.TaskRunner | Task 'store_chunk[7]': Finished task run for task with final state: 'TriggerFailed'
import os,sys
import glob

import xarray as xr

from pangeo_forge_recipes.patterns import pattern_from_file_sequence
from import XarrayZarrRecipe
from pangeo_forge_recipes.patterns import FilePattern, ConcatDim, MergeDim

# the netcdf lists some of the coordinate variables as data variables. This is a fix which we want to apply to each chunk.
def preprocess(ds):
    ds = ds.rename({'n_profiles':'time','time_01':'time','lat_01':'latitude','lon_01':'longitude'})
    ds = ds.assign_coords({'time':ds.time,'latitude':ds.latitude,'longitude':ds.longitude})
    return ds

datadir = '/DATA/ETUDES/AMTROC/1DVAR/v15/S3A/'

fname = ''
ds = xr.open_dataset(os.path.join(datadir,fname))
ds = preprocess(ds)
print(f"File size is {ds.nbytes/1e6} MB")

input_urls = glob.glob(os.path.join(datadir,'S3A_SR_2_WAT____*_*_*_*_01[0]_???_*'))
print(f"Found {len(input_urls)} files!")

pattern = pattern_from_file_sequence(input_urls, "time")

for key in pattern:

# preprocess + chunk size
# ~
# =======================================================================

# ~ Step 2: Deciding how to chunk the dataset
# ~ Here we set the desired chunk size to 50 Mb, but something between 50-100 Mb is usually alright

# ~ Now let’s think about the Zarr chunks that this recipe will produce. 
# ~ Each target chunk corresponds to one input. So each variable chunk will only be a few MB. 
# ~ That is too small. Let’s increase inputs_per_chunk to 20. 
# ~ This means that we will need to be able to hold 10 files like the one we examined above in memory at once. 
# ~ That’s 16MB * 10 = 160MB. Not a problem!
# ~ recipe = XarrayZarrRecipe(pattern, inputs_per_chunk=10)
ntime = len(ds.time)       # the number of time slices
chunksize_optimal = 50e6  # desired chunk size in bytes
ncfile_size = ds.nbytes    # the netcdf file size
chunksize = max(int(ntime* chunksize_optimal/ ncfile_size),1)
print("chunksize is ",chunksize)
target_chunks = ds.dims.mapping
target_chunks['time'] = chunksize

# ~ Step 4: Create a recipe
# ~ Time to make the recipe!
# ~ In it’s most basic form, XarrayZarrRecipe can be instantiated using a file pattern as the only argument. 
# ~ Here we’ll be using some of the optional arguments to specify a few additional preferences:

# nitems_per_input=None
# ~
# =======================================================================

# ~ What makes it tricky
# ~ This is an advanced example that illustrates the following concepts
    # ~ Multiple variables in different files: There is one file per year for a dozen different variables.
    # ~ Complex Preprocessing: We want to apply different preprocessing depending on the variable. This example shows how.
    # ~ Inconsistent size of data in input files: This means we have to scan each input file and cache its metadata before we can start writing the target.
# ~ This recipe requires a new storage target, a metadata_cache. 
# ~ In this example, this is just another directory. You could hypothetically use a database or other key/value store for this.

# ~ We are now ready to define the recipe. We also specify the desired chunks of the target dataset.
# ~ A key property of this recipe is nitems_per_input=None, which triggers caching of input metadata.
# ~ chunks = {"lat": 1024, "lon": 1024, "time": 12}

# ~ We plan to update these data periodically (annually).
# ~ target_chunks = {"lat": 1024, "lon": 1024, "time": 12}

recipe = XarrayZarrRecipe(

# ~ Since we did not specify nitems_per_file in our ConcatDim, the recipe needs to cache input metadata. So we need to suply a metadata_cache target.
import tempfile
from fsspec.implementations.local import LocalFileSystem
from import FSSpecTarget, CacheFSSpecTarget, MetadataTarget

fs_local = LocalFileSystem()

target_dir = tempfile.TemporaryDirectory()
target = FSSpecTarget(fs_local,

cache_dir = tempfile.TemporaryDirectory()
cache_target = CacheFSSpecTarget(fs_local,

meta_dir = tempfile.TemporaryDirectory()
meta_store = MetadataTarget(fs_local, = target
recipe.input_cache = cache_target
recipe.metadata_cache = meta_store

# logging will display some interesting information about our recipe during execution
import logging

    format='%(asctime)s [%(levelname)s] %(name)s - %(message)s',
    datefmt='%Y-%m-%d %H:%M:%S',
logger = logging.getLogger("pangeo_forge_recipes")
flow = recipe.to_prefect()

ds_target = xr.open_zarr(target.get_mapper(), consolidated=True)

1 Like