Many netcdf to single zarr store using concurrent.futures

So, I had a look to the different recipe presented in pangeo forge

  1. some feedbacks on pangeo-forge install
  • pip install pangeo-forge worked fine
  • pip install pangeo-forge-recipe worked fine
  • I needed to add netcdf4 and zarr package because xarray did not recognize the engine (would be nice if it was already included ?)
  • I needed to add mypy_extensions
  • you are probably aware that fsspec should be force to fsspec=2021.11.0 because there is a bug on blocksizeerror at the import on 2011.11.1
  1. from OISST example, I learned how to create a pattern

  2. from CMIP6 example, I learned how to compute chunksize and preprocess the data

but from what I understood, all those examples (even TerraClimate) seems to assume a regular sampling in time (like 1 month).

my particular case is that each netcdf files contains a different duration time corresponding to half an orbit with a temporal sampling of 1s (about 2700 records, could be more, could be less)

I guess that the tricky point is to set nitems_per_input=None, as in the TerraClimate example, but still, in this example, we are talking of regular monthly data.

Am I in the right direction ?
below the main outputs and the script I have executed

I tried the following chain, inspired by the different examples, but without success…
some encouraging

[2021-12-18 20:15:02+0100] INFO - prefect.TaskRunner | Task 'cache_input[888]': Finished task run for task with final state: 'Success'

was fine but followed by a lot of

[2021-12-18 20:15:04+0100] INFO - prefect.TaskRunner | Task 'store_chunk[7]': Finished task run for task with final state: 'TriggerFailed'
import os,sys
import glob

import xarray as xr

from pangeo_forge_recipes.patterns import pattern_from_file_sequence
from pangeo_forge_recipes.recipes import XarrayZarrRecipe
from pangeo_forge_recipes.patterns import FilePattern, ConcatDim, MergeDim


# the netcdf lists some of the coordinate variables as data variables. This is a fix which we want to apply to each chunk.
def preprocess(ds):
    ds = ds.rename({'n_profiles':'time','time_01':'time','lat_01':'latitude','lon_01':'longitude'})
    ds = ds.assign_coords({'time':ds.time,'latitude':ds.latitude,'longitude':ds.longitude})
    return ds


datadir = '/DATA/ETUDES/AMTROC/1DVAR/v15/S3A/'

fname = 'S3A_SR_2_WAT____20210101T133145_20210101T141802_20210127T055314_2777_067_010______MAR_O_NT_004.1dvar.nc.no_wind'
ds = xr.open_dataset(os.path.join(datadir,fname))
ds = preprocess(ds)
print(f"File size is {ds.nbytes/1e6} MB")

input_urls = glob.glob(os.path.join(datadir,'S3A_SR_2_WAT____*_*_*_*_01[0]_???_*'))
input_urls.sort()
print(f"Found {len(input_urls)} files!")

pattern = pattern_from_file_sequence(input_urls, "time")

for key in pattern:
    break
print(key)


#
# preprocess + chunk size
#
# ~ https://pangeo-forge.readthedocs.io/en/latest/tutorials/xarray_zarr/cmip6-recipe.html
# =======================================================================

# ~ Step 2: Deciding how to chunk the dataset
# ~ Here we set the desired chunk size to 50 Mb, but something between 50-100 Mb is usually alright

# ~ Now let’s think about the Zarr chunks that this recipe will produce. 
# ~ Each target chunk corresponds to one input. So each variable chunk will only be a few MB. 
# ~ That is too small. Let’s increase inputs_per_chunk to 20. 
# ~ This means that we will need to be able to hold 10 files like the one we examined above in memory at once. 
# ~ That’s 16MB * 10 = 160MB. Not a problem!
# ~ recipe = XarrayZarrRecipe(pattern, inputs_per_chunk=10)
ntime = len(ds.time)       # the number of time slices
chunksize_optimal = 50e6  # desired chunk size in bytes
ncfile_size = ds.nbytes    # the netcdf file size
chunksize = max(int(ntime* chunksize_optimal/ ncfile_size),1)
print("chunksize is ",chunksize)
target_chunks = ds.dims.mapping
target_chunks['time'] = chunksize

# ~ Step 4: Create a recipe
# ~ Time to make the recipe!
# ~ In it’s most basic form, XarrayZarrRecipe can be instantiated using a file pattern as the only argument. 
# ~ Here we’ll be using some of the optional arguments to specify a few additional preferences:


#
# nitems_per_input=None
# ~ https://pangeo-forge.readthedocs.io/en/latest/tutorials/xarray_zarr/terraclimate.html
#
# =======================================================================


# ~ What makes it tricky
# ~ This is an advanced example that illustrates the following concepts
    # ~ Multiple variables in different files: There is one file per year for a dozen different variables.
    # ~ Complex Preprocessing: We want to apply different preprocessing depending on the variable. This example shows how.
    # ~ Inconsistent size of data in input files: This means we have to scan each input file and cache its metadata before we can start writing the target.
# ~ This recipe requires a new storage target, a metadata_cache. 
# ~ In this example, this is just another directory. You could hypothetically use a database or other key/value store for this.


# ~ We are now ready to define the recipe. We also specify the desired chunks of the target dataset.
# ~ A key property of this recipe is nitems_per_input=None, which triggers caching of input metadata.
# ~ chunks = {"lat": 1024, "lon": 1024, "time": 12}

# ~ We plan to update these data periodically (annually).
# ~ target_chunks = {"lat": 1024, "lon": 1024, "time": 12}

recipe = XarrayZarrRecipe(
    file_pattern=pattern,
    target_chunks=target_chunks,
    process_chunk=preprocess
)
print(recipe)

# ~ Since we did not specify nitems_per_file in our ConcatDim, the recipe needs to cache input metadata. So we need to suply a metadata_cache target.
import tempfile
from fsspec.implementations.local import LocalFileSystem
from pangeo_forge_recipes.storage import FSSpecTarget, CacheFSSpecTarget, MetadataTarget

fs_local = LocalFileSystem()

target_dir = tempfile.TemporaryDirectory()
target = FSSpecTarget(fs_local, target_dir.name)

cache_dir = tempfile.TemporaryDirectory()
cache_target = CacheFSSpecTarget(fs_local, cache_dir.name)

meta_dir = tempfile.TemporaryDirectory()
meta_store = MetadataTarget(fs_local, meta_dir.name)

recipe.target = target
recipe.input_cache = cache_target
recipe.metadata_cache = meta_store
print(recipe)

# logging will display some interesting information about our recipe during execution
import logging

logging.basicConfig(
    format='%(asctime)s [%(levelname)s] %(name)s - %(message)s',
    level=logging.INFO,
    datefmt='%Y-%m-%d %H:%M:%S',
    stream=sys.stdout,
)
logger = logging.getLogger("pangeo_forge_recipes")
logger.setLevel(logging.INFO)
flow = recipe.to_prefect()
flow.run()


ds_target = xr.open_zarr(target.get_mapper(), consolidated=True)

1 Like