Accessing GRIB2 files as a single cloud-friendly dataset in xarray through kerchunk

My question might sound confused and perhaps not well defined, because I am confused and a bit lost! :slight_smile:

I report all the details below, but essentially TLDR:
Q1: How do I adapt the kerchunk example for grib2 files to my case, and specifically, to a simple case?
In particular:
syntax seems to have changed from the linked example, and I am not sure where to get the details to update kerchunk and fsspec calls.

I know these two packages are very much in flux and constantly moving along, and I am very thankful for their development! :slight_smile: I just really want to try to use this trick, but I am lost.

Long Version:

I found this notebook that is some type of previous/further iteration of the example on kerchunk documentation page

In this notebook - based on my limited understanding - we create json files for each grib files, then we concatenate the json files using kerchunk.combine.MultiZarrToZarr and then we magically read them in xarray

This is when i list all the json files previously created and then concatenate them

flist2 = fs2.ls(json_dir)
furls = sorted(['s3://'+f for f in flist2])
mzz = MultiZarrToZarr(furls, 
            storage_options={'anon':False},
            remote_protocol='s3', 
            remote_options={'anon': True},
            xarray_concat_args={'dim': 'valid_time'})

mzz.translate('hrrr_best.json')

This code doesn’t work straight away because of changes in fsspec syntax which I somewhat pinned down.

Then the cloud friendly dataset is created as:

rpath = 's3://esip-qhub-public/noaa/hrrr/hrrr_best.json'
s_opts = {'requester_pays':True, 'skip_instance_cache':True}
r_opts = {'anon':True}
fs = fsspec.filesystem("reference", fo=rpath, ref_storage_args=s_opts,
                       remote_protocol='s3', remote_options=r_opts)
m = fs.get_mapper("")
ds2 = xr.open_dataset(m, engine="zarr", backend_kwargs=dict(consolidated=False), 
                      chunks={'valid_time':1})

Now I overall follow the process, but I am completely lost when it comes to details (and some of the syntax that has changed that I don’t even know where to begin with looking for it).

My example
I want to extract some files from another aws dataset, GEFSv12 reforecast.
My case is simpler, I don’t have to update the time/day of the files, they are retrospective.
I am operating on the Pangeo AWS deployment (aws-uswest2.pangeo).

!pip install kerchunk

then

import xarray as xr
import hvplot.xarray
import datetime as dt
import pandas as pd
import dask
import panel as pn
import json
import fsspec
from kerchunk.grib2 import scan_grib
from kerchunk.combine import MultiZarrToZarr
import os
fs = fsspec.filesystem('s3', anon=True, skip_instance_cache=True)
dates = pd.date_range(start='2000-01-01',end='2001-01-2', freq='1D')
files = [date.strftime('s3://noaa-gefs-retrospective/GEFSv12/reforecast/%Y/%Y%m%d00/c00/Days:1-10/acpcp_sfc_%Y%m%d00_c00.grib2') for date in dates]

I create just two json files, for the first and second item of the list.

so = {"anon": True, "default_cache_type": "readahead"}
outfname = 'jsonfiles/'+files[0].split('/')[-1][:-6]+'.json'
out = scan_grib(files[0],common_vars = ['time', 'step', 'latitude', 'longitude', 'valid_time'], storage_options=so )
with open(outfname, "w") as f:
    f.write(json.dumps(out))

and

so = {"anon": True, "default_cache_type": "readahead"}
outfname = 'jsonfiles/'+files[1].split('/')[-1][:-6]+'.json'
out = scan_grib(files[1],common_vars = ['time', 'step', 'latitude', 'longitude', 'valid_time'], storage_options=so )
with open(outfname, "w") as f:
    f.write(json.dumps(out))

NOTE: Differently from Rich’s example, I don’t write my json file to a bucket, I simply save them in my directory (is this the problem? i don’t think it should, because the json file has the location of the grib file)
I then locate/list the files, mimicking the notebook linked above

flist2 = os.listdir('jsonfiles/')
furls = sorted(['jsonfiles/'+f for f in flist2])
print(flist2)
print(furls)

I then get to the last part and there i am completely lost.
the way to pass the keyword arguments has changed, and I am not sure how to pass the various values to fsspec

Note that I am not using the kerchunk.combine.MultiZarrToZarr here but simply i am passing one json file.

The syntax directly from the example notebook looks like:

rpath = 's3://esip-qhub-public/noaa/hrrr/hrrr_best.json'
s_opts = {'requester_pays':True, 'skip_instance_cache':True}
r_opts = {'anon':True}
fs = fsspec.filesystem("reference", fo=rpath, ref_storage_args=s_opts,
                       remote_protocol='s3', remote_options=r_opts)
m = fs.get_mapper("")
ds2 = xr.open_dataset(m, engine="zarr", backend_kwargs=dict(consolidated=False))

The merged json file is in a bucket, mine is in a local directory, and then there are many arguments and keyword arguments that are now not recognized.

When I try to add this part to my notebook, I got to the following step:

rpath = furls[1]
s_opts = {'requester_pays':True, 'skip_instance_cache':True}
r_opts = {'anon':True}
fs = fsspec.filesystem('s3', fo=rpath, ref_storage_args=s_opts,
                       remote_protocol='s3', remote_options=r_opts)
m = fs.get_mapper("")
ds2 = xr.open_dataset(m, engine="zarr", backend_kwargs=dict(consolidated=False), 
                      chunks={'valid_time':1})

that yells at me (among other things):

TypeError: __init__() got an unexpected keyword argument 'fo'

which tells me that the keyword arguments have changed in syntax.

I guess my overall goal would be to do what Rich does in his notebook, but without having to write to a bucket the json files, and of course updating the necessary syntax.

Does it make any sense?
Can anyone help with ironing out the details?

1 Like

I think in the last code block you have to use the "reference" filesystem (that’s what defines the fo kwarg):

fs = fsspec.filesystem("reference", fo=rpath, ...)
m = fs.get_mapper("")
ds = xr.open_dataset(m, engine="zarr", ...)

There is also a way to pass every argument to open_dataset without going through fsspec explicitly, but I don’t know that much about this part of xarray and I can’t find the comment on github where @martindurant mentioned how to do that.

In any case, this definitely does make sense (in fact, I think that was the workflow used by the blog post that introduced the library), and is just a matter of getting the details right. Unfortunately, I don’t have any experience with the new API yet so I can’t really help with that.

1 Like

Thanks!

I did try that argument - as it is in the example notebook -

fs = fsspec.filesystem("reference", fo=rpath, ref_storage_args=s_opts,
                       remote_protocol='s3', remote_options=r_opts)

and I get this error:

---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
<ipython-input-96-b0652aa32b31> in <module>
----> 9 fs = fsspec.filesystem("reference", fo=rpath, ref_storage_args=s_opts,
     10                        remote_protocol='s3', remote_options=r_opts)

/srv/conda/envs/notebook/lib/python3.8/site-packages/fsspec/registry.py in filesystem(protocol, **storage_options)
    238     """
    239     cls = get_filesystem_class(protocol)
--> 240     return cls(**storage_options)

/srv/conda/envs/notebook/lib/python3.8/site-packages/fsspec/spec.py in __call__(cls, *args, **kwargs)
     58             return cls._cache[token]
     59         else:
---> 60             obj = super().__call__(*args, **kwargs)
     61             # Setting _fs_token here causes some static linters to complain.
     62             obj._fs_token_ = token

TypeError: __init__() missing 1 required positional argument: 'references'

I tried also

fs = fsspec.filesystem(reference="reference", fo=rpath, ref_storage_args=s_opts,
                       remote_protocol='s3', remote_options=r_opts)

but then it yells at me because protocol is missing, and I just stopped there because clearly i can’t guess my way through :slight_smile:

it’s just not quite working, probably an easy fix due to changes in API syntax?

Example I had recently is like this

m = fsspec.get_mapper("reference://", fo=out, remote_protocol="s3", remote_options=r_opts)
z = zarr.open(
1 Like

Looking at the docstring of ReferenceFileSystem it seems that ref_storage_args is used to access the metadata file so I agree that you probably need to remove that.

If it still doesn’t work you can also try loading the references into memory yourself and passing that as fo:

with fsspec.open(rpath) as f:
    references = ujson.loads(f.read())
fs = fsspec.filesystem("reference", fo=references, remote_protocol="s3", remote_options=r_opts)
1 Like

Thanks!

This gives me once again the missing reference error. Did you save your json files in a bucket? or locally? I wonder if that makes a difference.
my fo is simply a ‘local’ directory in my account on the pangeo deployment. Thanks a lot!
chiara

This

rpath = furls[1]
s_opts = {'requester_pays':True, 'skip_instance_cache':True}
r_opts = {'anon':True}

with fsspec.open(rpath) as f:
    references = ujson.loads(f.read())
fs = fsspec.filesystem( "reference", fo=references, ref_storage_args=s_opts, remote_protocol="s3", remote_options=r_opts)

with or w/o ref_storage_args gives me the same error:

---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
<ipython-input-30-893506b03045> in <module>
     20 with fsspec.open(rpath) as f:
     21     references = ujson.loads(f.read())
---> 22 fs = fsspec.filesystem( "reference", fo=references, ref_storage_args=s_opts, remote_protocol="s3", remote_options=r_opts)

/srv/conda/envs/notebook/lib/python3.8/site-packages/fsspec/registry.py in filesystem(protocol, **storage_options)
    238     """
    239     cls = get_filesystem_class(protocol)
--> 240     return cls(**storage_options)

/srv/conda/envs/notebook/lib/python3.8/site-packages/fsspec/spec.py in __call__(cls, *args, **kwargs)
     58             return cls._cache[token]
     59         else:
---> 60             obj = super().__call__(*args, **kwargs)
     61             # Setting _fs_token here causes some static linters to complain.
     62             obj._fs_token_ = token

TypeError: __init__() missing 1 required positional argument: 'references'

the references coming from reading the json file looks like:

{'version': 1,
 'refs': {'.zgroup': '{\n    "zarr_format": 2\n}',
  '.zattrs': '{\n    "Conventions": "CF-1.7",\n    "GRIB_centre": "kwbc",\n    "GRIB_centreDescription": "US National Weather Service - NCEP ",\n    "GRIB_edition": 2,\n    "GRIB_subCentre": 2,\n    "history": "2022-04-13T02:44:12 GRIB to CDM+CF via cfgrib-0.9.8.5/ecCodes-2.20.0 with {\\"source\\": \\"/tmp/tmpymur3lelgrib2\\", \\"filter_by_keys\\": {}, \\"encode_cf\\": [\\"parameter\\", \\"time\\", \\"geography\\", \\"vertical\\"]}",\n    "institution": "US National Weather Service - NCEP "\n}',
  'time/.zarray': '{\n    "chunks": [],\n    "compressor": null,\n    "dtype": "<i8",\n    "fill_value": 0,\n    "filters": null,\n    "order": "C",\n    "shape": [],\n    "zarr_format": 2\n}',
  'time/0': 'base64:gENtOAAAAAA=',
  'time/.zattrs': '{\n    "_ARRAY_DIMENSIONS": [],\n    "calendar": "proleptic_gregorian",\n    "long_name": "initial time of forecast",\n    "standard_name": "forecast_reference_time",\n    "units": "seconds since 1970-01-01T00:00:00"\n}',
  'step/.zarray': '{\n    "chunks": [],\n    "compressor": null,\n    "dtype": "<f8",\n    "fill_value": 0.0,\n    "filters": null,\n    "order": "C",\n    "shape": [],\n    "zarr_format": 2\n}',
  'step/0': '\x00\x00\x00\x00\x00\x00\x08@',
  'step/.zattrs': '{\n    "_ARRAY_DIMENSIONS": [],\n    "long_name": "time since forecast_reference_time",\n    "standard_name": "forecast_period",\n    "units": "hours"\n}',
  'latitude/.zarray': '{\n    "chunks": [\n        721\n    ],\n    "compressor": null,\n    "dtype": "<f8",\n    "fill_value": 0.0,\n    "filters": [\n        {\n            "id": "grib",\n            "var": "latitude"\n        }\n    ],\n    "order": "C",\n    "shape": [\n        721\n    ],\n    "zarr_format": 2\n}',
  'latitude/0': ['{{u}}', 0, 325317],
  'latitude/.zattrs': '{\n    "_ARRAY_DIMENSIONS": [\n        "latitude"\n    ],\n    "long_name": "latitude",\n    "standard_name": "latitude",\n    "stored_direction": "decreasing",\n    "units": "degrees_north"\n}',
  'longitude/.zarray': '{\n    "chunks": [\n        1440\n    ],\n    "compressor": null,\n    "dtype": "<f8",\n    "fill_value": 0.0,\n    "filters": [\n        {\n            "id": "grib",\n            "var": "longitude"\n        }\n    ],\n    "order": "C",\n    "shape": [\n        1440\n    ],\n    "zarr_format": 2\n}',
  'longitude/0': ['{{u}}', 0, 325317],
  'longitude/.zattrs': '{\n    "_ARRAY_DIMENSIONS": [\n        "longitude"\n    ],\n    "long_name": "longitude",\n    "standard_name": "longitude",\n    "units": "degrees_east"\n}',
  'valid_time/.zarray': '{\n    "chunks": [],\n    "compressor": null,\n    "dtype": "<f8",\n    "fill_value": 0.0,\n    "filters": null,\n    "order": "C",\n    "shape": [],\n    "zarr_format": 2\n}',
  'valid_time/0': 'base64:AAAA2LY2zEE=',
  'valid_time/.zattrs': '{\n    "_ARRAY_DIMENSIONS": [],\n    "calendar": "proleptic_gregorian",\n    "long_name": "time",\n    "standard_name": "time",\n    "units": "seconds since 1970-01-01T00:00:00"\n}',
  'acpcp/.zarray': '{\n    "chunks": [\n        721,\n        1440\n    ],\n    "compressor": null,\n    "dtype": "<f4",\n    "fill_value": 9999.0,\n    "filters": [\n        {\n            "id": "grib",\n            "var": "acpcp"\n        }\n    ],\n    "order": "C",\n    "shape": [\n        721,\n        1440\n    ],\n    "zarr_format": 2\n}',
  'acpcp/0.0': ['{{u}}', 29740164, 413329],
  'acpcp/.zattrs': '{\n    "GRIB_NV": 0,\n    "GRIB_Nx": 1440,\n    "GRIB_Ny": 721,\n    "GRIB_cfName": "lwe_thickness_of_convective_precipitation_amount",\n    "GRIB_cfVarName": "acpcp",\n    "GRIB_dataType": "cf",\n    "GRIB_gridDefinitionDescription": "Latitude/longitude. Also called equidistant cylindrical, or Plate Carree",\n    "GRIB_gridType": "regular_ll",\n    "GRIB_iDirectionIncrementInDegrees": 0.25,\n    "GRIB_iScansNegatively": 0,\n    "GRIB_jDirectionIncrementInDegrees": 0.25,\n    "GRIB_jPointsAreConsecutive": 0,\n    "GRIB_jScansPositively": 0,\n    "GRIB_latitudeOfFirstGridPointInDegrees": 90.0,\n    "GRIB_latitudeOfLastGridPointInDegrees": -90.0,\n    "GRIB_longitudeOfFirstGridPointInDegrees": 0.0,\n    "GRIB_longitudeOfLastGridPointInDegrees": 359.75,\n    "GRIB_missingValue": 9999,\n    "GRIB_name": "Convective precipitation (water)",\n    "GRIB_numberOfPoints": 1038240,\n    "GRIB_paramId": 3063,\n    "GRIB_shortName": "acpcp",\n    "GRIB_stepType": "accum",\n    "GRIB_stepUnits": 1,\n    "GRIB_totalNumber": 10,\n    "GRIB_typeOfLevel": "surface",\n    "GRIB_units": "kg m**-2",\n    "_ARRAY_DIMENSIONS": [\n        "latitude",\n        "longitude"\n    ],\n    "coordinates": "number time step surface latitude longitude valid_time",\n    "long_name": "Convective precipitation (water)",\n    "standard_name": "lwe_thickness_of_convective_precipitation_amount",\n    "units": "kg m**-2"\n}'},
 'templates': {'u': 's3://noaa-gefs-retrospective/GEFSv12/reforecast/2000/2000010100/c00/Days:1-10/acpcp_sfc_2000010100_c00.grib2'}}

so I really wonder if the location of the json file matters…
Or it is something on the json coming from the GRIB2 file. Perhaps is presenting differently from what expected, but it is not obvious to me.
c

If you are recreating several FSs with very similar arguments which experimenting, it might be good policy to include the extra argument skip_instance_cache=True in filesystem() or get_mapper(), because fsspec caches instances by default, and maybe you keep getting back the same broken instance each time. The default of reusing instances is generally a good thing (you don’t re-download the refs file each time), but not if things got broken!

To make the dataset in one line, you can do

ds = xr.open_dataset("reference://", engine="zarr",
    backend_kwargs={
        "consolidated": False,
        "storage_options": dict(fo=references, ref_storage_args=s_opts, remote_protocol="s3", remote_options=r_opts, skip_instance_cache=True)
    }
)

so you end up using all the same bits in a sightly more convoluted block. There’s no real reason to do this when writing a script or notebook except that this form is what Intake will use if you want to make your dataset a member of a catalog.

2 Likes

Thanks @martindurant so much for this, especially the trick about skip_instance_cache good to know.
It doesn’t seem to work, and I think it is a version issue.
I ran this:

rpath ='jsonfiles/acpcp_sfc_2000010100_c00.json'

s_opts = {'requester_pays':True, 'skip_instance_cache':True}
r_opts = {'anon':True}

with fsspec.open(rpath) as f:
    references = ujson.loads(f.read())

ds = xr.open_dataset("reference://", engine="zarr",
    backend_kwargs={
        "consolidated": False,
        "storage_options": dict(fo=references, ref_storage_args=s_opts, remote_protocol="s3", 
                                remote_options=r_opts, skip_instance_cache=True)
    }
)

I get this error:

---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
<ipython-input-15-c221b796d813> in <module>
      7     references = ujson.loads(f.read())
      8 
----> 9 ds = xr.open_dataset("reference://", engine="zarr",
     10     backend_kwargs={
     11         "consolidated": False,

/srv/conda/envs/notebook/lib/python3.8/site-packages/xarray/backends/api.py in open_dataset(filename_or_obj, group, decode_cf, mask_and_scale, decode_times, autoclose, concat_characters, decode_coords, engine, chunks, lock, cache, drop_variables, backend_kwargs, use_cftime, decode_timedelta)
    570 
    571         opener = _get_backend_cls(engine)
--> 572         store = opener(filename_or_obj, **extra_kwargs, **backend_kwargs)
    573 
    574     with close_on_error(store):

TypeError: open_group() got an unexpected keyword argument 'storage_options'

fsspec.__version__ is 0.8.5 - that seems to be REALLY old I guess newer than the example I found, but definitely not up to date.
This is the version on the AWS Pangeo deployment. I will try and run it somewhere with a newer version, and it will probably fix the issue. Thanks for chiming in, this was helpful.

Oh, you certainly need a very recent fsspec, and also recent zarr/xarray - sorry. This is cutting edge code :expressionless:

Absolutely - it was very silly of me not to check the version right away!

I successfully loaded the one file following your suggestion! thanks!
I am now using the GCS deployment, which has
fsspec.__version__ '2021.11.1'
but didn’t quite work:

TLDR:
the json file is not created correctly

rpath ='jsonfiles1/acpcp_sfc_2000010100_c00.json'

s_opts = {'requester_pays':True, 'skip_instance_cache':True}
r_opts = {'anon':True}

with fsspec.open(rpath) as f:
    references = ujson.loads(f.read())

ds = xr.open_dataset("reference://", engine="zarr",
    backend_kwargs={
        "consolidated": False,
        "storage_options": dict(fo=references, ref_storage_args=s_opts, remote_protocol="s3", 
                                remote_options=r_opts, skip_instance_cache=True)
    }
)
ds

but the dataset is wrong because the json file - i believe - is wrong

Note how step, time, and valid time have become variables and not coordinates, and more importantly, it loaded only one of the values along the dimension step there should be 80.

This is how the actual file looks if I download it:

!wget https://noaa-gefs-retrospective.s3.amazonaws.com/GEFSv12/reforecast/2000/2000010100/c00/Days:1-10/acpcp_sfc_2000010100_c00.grib2
ds1 = xr.open_dataset('acpcp_sfc_2000010100_c00.grib2', engine = 'cfgrib')

When I create the json files I define step as a common_vars various coordinate values, like:

so = {"anon": True, "default_cache_type": "readahead"}
out = scan_grib(files[0],common_vars = ['time', 'step', 'latitude', 'longitude', 'valid_time'], storage_options=so )
outfname = 'jsonfiles1/'+files[0].split('/')[-1][:-6]+'.json'
with open(outfname, "w") as f:
    f.write(json.dumps(out))

however when I inspect the json file I indeed have empty
chunks
ARRAY_DIMENSIONS under refs for time and step (see blue squares) compared to latitude (see green squares).

But also step/0 is empty, and other relevant fields.

Does this have to do - probably - with how this grib file is defined?

In the example linked above (the HRRR file) I didn’t pay attention to that, because it concatenates them along another dimension, but it looks like the step variable is equal to 1 so you don’t notice that.
I in fact downloaded one HRRR file and when I loaded with cfgrib the variable step had only one entry, which is very common for real time grib files.

I thought it was worth to mention it, FYI, and to know if there are ways to create the json correctly.
I tried to hack it but i am missing the
"step/0": "\u0000\u0000\u0000\u0000\u0000\u0000\b@"
which instead for latitutde has something like:
"latitude/0": ["{{u}}", 0, 325317],

I should add - that real time forecast products usually are organized like HRRR, which means they have one step per grib files. These are reforecast products (retrospective runs, used to create a reanalysis to calibrate realtime forecast), which are always a different beast, and the files can be ad-hoc. So probably the scan_grib module doesn’t know what to do with whatever way the step variable is presented in this grib file.

thanks for your time!

The kerchunk Multizarr part has a coordinate mapping input…coo_map? Given the rest seems ok, that might do it?

1 Like