Loading ensembles using intake

I’m trying to use intake-esm to load ensemble zarr files into xarray. The ensemble data is in s3 type storage using minIO. I can load it using the xarray.open_zarr() method. For example,

import s3fs
import xarray as xr

fs = s3fs.S3FileSystem(client_kwargs={"endpoint_url": 'https://wifire-data.sdsc.edu:9000',"verify": False},anon=True)

mapper = fs.get_mapper('/burnpro3d/d/80/aa/run_80aa1808-adc0-42c1-88e5-abfe3cfc5c41/quicfire.zarr')

ds = xr.open_zarr(mapper, consolidated=False,  drop_variables=['surfEnergy'])
ds

However, I’m not sure which parameters to pass in to load the zarr files using intake. Here’s a snippet:

col = intake.open_esm_datastore('quicfire_ensemble_collection.json')
col_subset = col.search(run_id='80aa1808-adc0-42c1-88e5-abfe3cfc5c41')
col_subset.df.head()

dsets = col_subset.to_dataset_dict(zarr_kwargs={"consolidated": False, "drop_variables":['surfEnergy']},
                                   storage_options={"token": "anon", "client_kwargs":{"endpoint_url": 'https://wifire-data.sdsc.edu:9000',"verify": False}})
OSError: 
            Failed to open zarr store.

            *** Arguments passed to xarray.open_zarr() ***:

            - store: /burnpro3d/d/80/aa/run_80aa1808-adc0-42c1-88e5-abfe3cfc5c41/quicfire.zarr
            - kwargs: {'consolidated': False, 'drop_variables': ['surfEnergy']}

            *** fsspec options used ***:

            - root: /burnpro3d/d/80/aa/run_80aa1808-adc0-42c1-88e5-abfe3cfc5c41/quicfire.zarr
            - protocol: None

            ********************************************

Any thoughts on how to configure storage_options for a custom endpoint_url?

@ben, are you able to modify the path in the catalog to include the s3 prefix? for e.g.

/burnpro3d/d/80/aa/run_80aa1808-adc0-42c1-88e5-abfe3cfc5c41/quicfire.zarrs3://burnpro3d/d/80/aa/run_80aa1808-adc0-42c1-88e5-abfe3cfc5c41/quicfire.zarr

and rerun your code.

1 Like

also, can you try the main branch of intake-esm:

python -m pip install git+https://github.com/intake/intake-esm@main

?? there are some recent bug fixes/enhancements that address some of the fsspec related issues.

when using the main branch, this call

dsets = col_subset.to_dataset_dict(zarr_kwargs={"consolidated": False, "drop_variables":['surfEnergy']},
                                   storage_options={"token": "anon", "client_kwargs":{"endpoint_url": 'https://wifire-data.sdsc.edu:9000',"verify": False}})

becomes

dsets = col_subset.to_dataset_dict(xarray_open_kwargs={"consolidated": False, "drop_variables":['surfEnergy']},
                                   storage_options={"token": "anon", "client_kwargs":{"endpoint_url": 'https://wifire-data.sdsc.edu:9000',"verify": False}})

Prepending s3:// to the path and changing “token”: “anon” → “anon”: True in storage_options got it working! Thank you!

1 Like

I have also encountered a similar problem and a same error. I installed a new library and after that I was unable to access the zarr store. Before that, it was working perfectly well!

I used the following code:

import xarray as xr
import dask
xr.set_options(display_style='html')
import intake
from xmip.preprocessing import rename_cmip6,promote_empty_dims,combined_preprocessing
import cftime

cat_url = "https://storage.googleapis.com/cmip6/pangeo-cmip6.json" #not a local but object storage
dataframe = intake.open_esm_datastore(cat_url)

#Using SIT and SIC from CESM2:

cat1  = dataframe.search(experiment_id=['piControl'], 
                         table_id=['SImon'], 
                         variable_id=['sithick','siconc'],
                         source_id=['CESM2'],
                         member_id = ['r1i1p1f1'], 
                         grid_label=['gn'])

z_kwargs = {'consolidated': True, 'use_cftime':True}

with dask.config.set(**{'array.slicing.split_large_chunks': True}):
    dset_dict1 = cat1.to_dataset_dict(zarr_kwargs=z_kwargs)

I get the following error:

--> The keys in the returned dictionary of datasets are constructed as follows:
	'activity_id.institution_id.source_id.experiment_id.table_id.grid_label'
---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
~/miniconda3/lib/python3.7/site-packages/intake_esm/merge_util.py in _open_asset(path, data_format, zarr_kwargs, cdf_kwargs, preprocess, varname, requested_variables)
    269         try:
--> 270             ds = xr.open_zarr(path, **zarr_kwargs)
    271         except Exception as exc:

~/miniconda3/lib/python3.7/site-packages/xarray/backends/zarr.py in open_zarr(store, group, synchronizer, chunks, decode_cf, mask_and_scale, decode_times, concat_characters, decode_coords, drop_variables, consolidated, overwrite_encoded_chunks, chunk_store, storage_options, decode_timedelta, use_cftime, **kwargs)
    780         decode_timedelta=decode_timedelta,
--> 781         use_cftime=use_cftime,
    782     )

~/miniconda3/lib/python3.7/site-packages/xarray/backends/api.py in open_dataset(filename_or_obj, engine, chunks, cache, decode_cf, mask_and_scale, decode_times, decode_timedelta, use_cftime, concat_characters, decode_coords, drop_variables, backend_kwargs, *args, **kwargs)
    480 
--> 481     backend = plugins.get_backend(engine)
    482 

~/miniconda3/lib/python3.7/site-packages/xarray/backends/plugins.py in get_backend(engine)
    160     if isinstance(engine, str):
--> 161         engines = list_engines()
    162         if engine not in engines:

~/miniconda3/lib/python3.7/site-packages/xarray/backends/plugins.py in list_engines()
    104 def list_engines():
--> 105     entrypoints = entry_points().get("xarray.backends", ())
    106     return build_engines(entrypoints)

AttributeError: 'EntryPoints' object has no attribute 'get'

The above exception was the direct cause of the following exception:

OSError                                   Traceback (most recent call last)
<ipython-input-4-f44b00d949b0> in <module>
      8 
      9 with dask.config.set(**{'array.slicing.split_large_chunks': True}):
---> 10     dset_dict1 = cat1.to_dataset_dict(zarr_kwargs=z_kwargs)

~/miniconda3/lib/python3.7/site-packages/intake_esm/core.py in to_dataset_dict(self, zarr_kwargs, cdf_kwargs, preprocess, storage_options, progressbar, aggregate)
    920             ]
    921             for i, task in enumerate(concurrent.futures.as_completed(future_tasks)):
--> 922                 key, ds = task.result()
    923                 self._datasets[key] = ds
    924                 if self.progressbar:

~/miniconda3/lib/python3.7/concurrent/futures/_base.py in result(self, timeout)
    426                 raise CancelledError()
    427             elif self._state == FINISHED:
--> 428                 return self.__get_result()
    429 
    430             self._condition.wait(timeout)

~/miniconda3/lib/python3.7/concurrent/futures/_base.py in __get_result(self)
    382     def __get_result(self):
    383         if self._exception:
--> 384             raise self._exception
    385         else:
    386             return self._result

~/miniconda3/lib/python3.7/concurrent/futures/thread.py in run(self)
     55 
     56         try:
---> 57             result = self.fn(*self.args, **self.kwargs)
     58         except BaseException as exc:
     59             self.future.set_exception(exc)

~/miniconda3/lib/python3.7/site-packages/intake_esm/core.py in _load_source(key, source)
    906 
    907         def _load_source(key, source):
--> 908             return key, source.to_dask()
    909 
    910         sources = {key: source(**source_kwargs) for key, source in self.items()}

~/miniconda3/lib/python3.7/site-packages/intake_esm/source.py in to_dask(self)
    243     def to_dask(self):
    244         """Return xarray object (which will have chunks)"""
--> 245         self._load_metadata()
    246         return self._ds
    247 

~/miniconda3/lib/python3.7/site-packages/intake/source/base.py in _load_metadata(self)
    234         """load metadata only if needed"""
    235         if self._schema is None:
--> 236             self._schema = self._get_schema()
    237             self.dtype = self._schema.dtype
    238             self.shape = self._schema.shape

~/miniconda3/lib/python3.7/site-packages/intake_esm/source.py in _get_schema(self)
    172 
    173         if self._ds is None:
--> 174             self._open_dataset()
    175 
    176             metadata = {

~/miniconda3/lib/python3.7/site-packages/intake_esm/source.py in _open_dataset(self)
    224             for _, row in self.df.iterrows()
    225         ]
--> 226         datasets = dask.compute(*datasets)
    227         mapper_dict = dict(datasets)
    228         nd = create_nested_dict(self.df, self.path_column, self.aggregation_columns)

~/miniconda3/lib/python3.7/site-packages/dask/base.py in compute(*args, **kwargs)
    566         postcomputes.append(x.__dask_postcompute__())
    567 
--> 568     results = schedule(dsk, keys, **kwargs)
    569     return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
    570 

~/miniconda3/lib/python3.7/site-packages/dask/threaded.py in get(dsk, result, cache, num_workers, pool, **kwargs)
     85         get_id=_thread_get_id,
     86         pack_exception=pack_exception,
---> 87         **kwargs
     88     )
     89 

~/miniconda3/lib/python3.7/site-packages/dask/local.py in get_async(submit, num_workers, dsk, result, cache, get_id, rerun_exceptions_locally, pack_exception, raise_exception, callbacks, dumps, loads, chunksize, **kwargs)
    512                             _execute_task(task, data)  # Re-execute locally
    513                         else:
--> 514                             raise_exception(exc, tb)
    515                     res, worker_id = loads(res_info)
    516                     state["cache"][key] = res

~/miniconda3/lib/python3.7/site-packages/dask/local.py in reraise(exc, tb)
    323     if exc.__traceback__ is not tb:
    324         raise exc.with_traceback(tb)
--> 325     raise exc
    326 
    327 

~/miniconda3/lib/python3.7/site-packages/dask/local.py in execute_task(key, task_info, dumps, loads, get_id, pack_exception)
    221     try:
    222         task, data = loads(task_info)
--> 223         result = _execute_task(task, data)
    224         id = get_id()
    225         result = dumps((result, id))

~/miniconda3/lib/python3.7/site-packages/dask/core.py in _execute_task(arg, cache, dsk)
    119         # temporaries by their reference count and can execute certain
    120         # operations in-place.
--> 121         return func(*(_execute_task(a, cache) for a in args))
    122     elif not ishashable(arg):
    123         return arg

~/miniconda3/lib/python3.7/site-packages/intake_esm/source.py in read_dataset(path, data_format, storage_options, cdf_kwargs, zarr_kwargs, preprocess, varname)
    208                 preprocess=preprocess,
    209                 varname=varname,
--> 210                 requested_variables=self.requested_variables,
    211             )
    212             return (path, ds)

~/miniconda3/lib/python3.7/site-packages/intake_esm/merge_util.py in _open_asset(path, data_format, zarr_kwargs, cdf_kwargs, preprocess, varname, requested_variables)
    286             """
    287 
--> 288             raise IOError(message) from exc
    289 
    290     else:


OSError: 
            Failed to open zarr store.

            *** Arguments passed to xarray.open_zarr() ***:

            - store: <fsspec.mapping.FSMap object at 0x7fe7d859af50>
            - kwargs: {'consolidated': True, 'use_cftime': True}

            *** fsspec options used ***:

            - root: cmip6/CMIP6/CMIP/NCAR/CESM2/piControl/r1i1p1f1/SImon/siconc/gn/v20190320
            - protocol: ('gcs', 'gs')

            ********************************************

I installed using pip install jupyter-resource-usage and after that I started getting this error.

Any help would be greatly appreciated!
Thank you very much!