Reading a Larger than RAM NetCDF4 using Xarray

Hey there Pangeo community,

I’m dealing with an unwieldy 40GB NetCDF4 file that is a collection of various in-situ snow cover datasets. There are 10,118 stations reporting at varying temporal frequencies between 1979 and 2022. My mentor and I are comparing this file to two NOAA climate data records to analyze how in-situ observations compare to satellite data. We’re only interested in looking at the period between 2001 and 2022.

I downloaded this file and then uploaded it to our AWS server, hoping to be able to access it programmatically from there. I keep running into a problem where it either wants to open the entire dataset at once (40GB of file in 16GB of RAM and it blows up instantly, either crashing the EC2 environment or throwing a memory error) or it gets stuck loading and slowly ticking up the RAM use until it has to be force quit.

(combined_norswe_debug) In one of our smaller comparison datasets (~30MB) the top code cell works without any issues. But for the 40GB file it throws the memory error.

The second iteration is the slow-crash version. If I try to set chunks={‘time’: 1, ‘station_id’:10118} then it runs out of RAM nearly instantaneously. With just one station it never crashes, but it never makes it to the second debug statement either. I ran it for 15 minutes and the RAM usage peaked at around 4GB before I killed the process. With chunks=‘auto’ it ballooned up to around 13GB of RAM in 8 minutes before I killed it.

Those two code cells were the two best attempts, but I sat here banging my head against it for a couple of hours with no success.

This link is what brought me to this website. From there I found my way to the xarray documentation and all signs are leading me to turning the .NetCDF file into a .Zarr file, then reupload it to the cloud. Does that sound right or am I barking up the wrong tree?

Thanks everybody!

Does it work without .read() ? What was the reasoning for adding that?

When I run it without .read() I get a type error : “TypeError: a bytes-like object is required, not ‘StreamingBody’”.

I may be misunderstanding it. The metaphor I’ve been using to explain what it’s doing is that the NetCDF file can be compared to a physical package:

s3-client.get_object : hands me the package, a sealed box
[‘body’] : opens the sealed box and removes from it a sealed envelope
.read() : opens the envelope and gets the contents
io.BytesIO(…) : translates the contents of the envelope
xr.open_dataset : reads the contents of the envelope

So without .read() there’s nobody to open the envelope and so the other steps fail. Or, as the type error said, it hasn’t be turned into a bytes-like object yet. The StreamingBody it’s referencing is (I think) just the raw stream of data, it’s not the actual data itself though.

NetCDF was designed to be used with a local filesystem. By AWS server what do you mean? Are you writing this to object storage or some other storage medium in the cloud?

From there I found my way to the xarray documentation and all signs are leading me to turning the .NetCDF file into a .Zarr file, then reupload it to the cloud. Does that sound right or am I barking up the wrong tree?

If your aim is to make the contents of the entire file (or multiple such files) available to many people then yes this (or virtual zarr) would be easier. But it doesn’t sound like disseminating the entire file is your aim?

Zarr should be a lot better but if you wanted to hit the netCDF file directly:

xr.open_dataset("s3://...", engine="h5netcdf")

should work provided you have s3fs installed.

With this you should get Xarray’s lazy indexing, which will avoid loading the whole thing in memory.

4 Likes

Right! So, I thought that too. And it’s worked really well for me in the past with files that are smaller than my system’s available RAM.

### Code cell trying to access NorSWE from S3 with chunking ### All engines and chunks fail to reach 2nd debug statement
NorSWE_bucket = "norswe-sce-hp-project"
NorSWE_key = "NorSWE-NorEEN_1979-2021_v2.nc"
print("\033[32m#DEBUG\033[0m: ✅ NorSWE bucket & key init")

ds = xr.open_dataset("s3://" + NorSWE_bucket + "/" + NorSWE_key, engine='h5netcdf', chunks='auto')
print("\033[32m#DEBUG\033[0m: ✅ ds init")
print(ds.variables.keys)

That’s what I was trying and it failed every time, with and without chunking. No matter the variation on the code it kept stalling out and never reaching the second debug statement, or it tried to read too much and crashed.

Huh! I didn’t know that NetCDF was designed for use with a local filesystem! That’s really good to know!

So, organizationally we keep nearly all of our files either in virtual AWS EC2 environments or in AWS S3 buckets. Keeping things ephemeral makes it easier for everybody to have access to any data they may need, without having to go work too hard through email chains to gain access. At least I suppose that’s a possible reason why, there may be other reasons, but I haven’t really dug too deep into why everything’s cloud based.

But yeah, the goal is to have this 40GB file (or at least the parts of it we need for our project) accessible from the AWS S3 bucket that it resides in. We’d like to be able to access it and the slices of it that we need programmatically from our EC2 environments. The two climate data records (CDRs) we’re comparing it to are both in S3 buckets too. The one CDR (SCE) is a single .nc file that’s updated monthly and it’s only a couple hundred MB. The other CDR (HP) is a collection of weekly .nc files going from 2001 to 2024 that are each a few hundred MB.

The first part of the analysis was just to compare the SCE and HP CDRs. I was able to do this using a handful of loops in Python using s3fs. I opened the single SCE .nc file and compared it to each HP .nc file, closing the HP file and opening the next one.

In that phase of the project I ran into a problem similar to what I’m currently having. When I first wrote the loop I wasn’t closing the HP file at the end of each iteration. Before I realized what was going on it was just leaving them all open in RAM and eventually crashing the EC2 environment before processing finished.

Awesome! Since my original post I’ve tried to convert it to .Zarr and ran into the same issue.
Here was my stab at it:

### Code cell trying to convert from .nc into .zarr ### Nope. Can't write from a read-only file, can't open in anything other than read.

ProgressBar().register()

ds = xr.open_dataset("NorSWE-NorEEN_1979-2021_v2.nc", engine="netcdf4", chunks={"time": 1, "station_id": 10118})

# Initialize Zarr path
zarr_path = "NorSWE-NorEEN_1979-2021_v2.nc.zarr"
ds.isel(time=slice(0, 1)).to_zarr(zarr_path, mode="w")

for t in range(ds.dims["time"]):
    print(f"\033[34m#DEBUG\033[0m: Writing time slice {t} of {ds.dims['time']}")

    # Slice one time step lazily
    chunk = ds.isel(time=slice(t, t+1))

    # Write slice to Zarr
    chunk.to_zarr(zarr_path, mode="a", append_dim="time")

###EDIT: RE that code block above: Oh! Whoops! I have the file saved locally, I can try to convert it to .Zarr from there instead of trying to do it from the S3 bucket. :zany_face: ###EDIT 2: Nope, scratch that, that’s what I was already doing in this code. I thought I saw an S3 in front of the filepath for the xr.open_dataset, but I just wasn’t looking closely enough.

Right now I’m looping through the parts of file I’m interested in in readonly mode turning it into a .parquet file instead. It seems to be a good alternative to .Zarr or accessing the .nc file itself.

##### Code cell constructing one big .parquet from HP start to NorSWE end

# Reopen dataset
nc = netCDF4.Dataset(nc_file, "r")
print("\033[32m#DEBUG\033[0m: ✅ NetCDF file opened")

# Time and dimensions
time_units = nc.variables['time'].units
calendar = getattr(nc.variables['time'], 'calendar', 'standard')
dates = netCDF4.num2date(nc.variables['time'][:], units=time_units, calendar=calendar)
num_stations = len(nc.dimensions['station_id'])

# Confirm station count
if num_stations == 10118:
    print(f"\033[32m#DEBUG\033[0m: ✅ num_stations = {num_stations}.")
else:
    print(f"\033[31m#DEBUG\033[0m: ❌ ERROR! num_stations = {num_stations} instead of 10118.")

### Define get_var function to get 1-D station metadata without blowing anything up
# It also decodes byte strings to readable text PRN
# loops through num_stations to make sure that even non-reporting stations are included in every time slice
def get_var(varname, decode=False): ### for variables that need to be decoded they're passed into the function with decode=True
    try:
        data = nc.variables[varname][:]
        if decode:
            data = [b.decode('utf-8').strip() if isinstance(b, bytes) else str(b) for b in data]
        return data
    except Exception:
        return [np.nan] * num_stations # exceptions return nan * num_stations because pandas dataframe arrays have to all be the same lengt

station_ids = get_var('station_id', decode=True)
lats = get_var('lat')
lons = get_var('lon')
elevations = get_var('elevation')
sources = get_var('source', decode=True)
station_names = get_var('station_name', decode=True)
type_mes = get_var('type_mes', decode=True)
mmask = get_var('mmask')
NorSWE_index = list(range(num_stations))

### Define a function to extract per-time values for reporting stations
# Why? Some vars (lat, lon, elevation, etc) only have 1 dimension, the station to which they're associated. For them we use get_var.
# The rest of them (SNW, SND, QC flags) are 2 dimensional, associated with both the time and station. All floats & ints, no decode req
### Does the same thing as get_var; loops through num_stations to make sure that even when there's no data we document it
def get_time_slice(varname, t_index):
    try:
        return nc.variables[varname][t_index, :]
    except Exception:
        return [np.nan] * num_stations

# Container for all days
all_days = []


start_date = pd.to_datetime("2001-03-19")
end_date = pd.to_datetime("2022-01-01")


# Loop over date range
for date in pd.date_range(start_date, end_date):
    if date not in dates:
        print(f"\033[33m#DEBUG\033[0m: ⚠️ Skipping {date.strftime('%Y-%m-%d')} (not in dataset)")
        continue

    date_index = np.where(dates == date)[0][0]

    # Time-dependent values
    snw_daily = get_time_slice('snw', date_index)
    snd_daily = get_time_slice('snd', date_index)
    den_daily = get_time_slice('den', date_index)
    data_flag_snw = get_time_slice('data_flag_snw', date_index)
    data_flag_snd = get_time_slice('data_flag_snd', date_index)
    qc_flag_snw = get_time_slice('qc_flag_snw', date_index)
    qc_flag_snd = get_time_slice('qc_flag_snd', date_index)
    date_column = [date.strftime('%Y-%m-%d')] * num_stations

    # Daily dataframe
    df = pd.DataFrame({
        'index': NorSWE_index,
        'date': date_column,
        'station_id': station_ids,
        'station_name': station_names,
        'lat': lats,
        'lon': lons,
        'elevation': elevations,
        'mmask': mmask,
        'source': sources,
        'type_mes': type_mes,
        'data_flag_snw': data_flag_snw,
        'data_flag_snd': data_flag_snd,
        'qc_flag_snw': qc_flag_snw,
        'qc_flag_snd': qc_flag_snd,
        'snw': snw_daily,
        'snd': snd_daily,
        'den': den_daily,
    })

    # Boolean flags
    df['reported'] = df['snw'].notna() | df['snd'].notna()
    df['reported_snw_only'] = df['snw'].notna() & df['snd'].isna()
    df['reported_snd_only'] = df['snw'].isna() & df['snd'].notna()
    df['reported_both'] = df['snw'].notna() & df['snd'].notna()

    # Daily summary counts
    df['count_reported'] = df['reported'].sum()
    df['count_reported_snw_only'] = df['reported_snw_only'].sum()
    df['count_reported_snd_only'] = df['reported_snd_only'].sum()
    df['count_reported_both'] = df['reported_both'].sum()

    all_days.append(df)

    if date == start_date:
        print(f"\033[32m#DEBUG\033[0m: ✅ Processed {date.strftime('%Y-%m-%d')}")
    elif (date - start_date).days % 31 == 0:
        print(f"\033[32m#DEBUG\033[0m: ✅ Processed {date.strftime('%Y-%m-%d')}")
    elif date == end_date:
        print(f"\033[32m#DEBUG\033[0m: ✅ Processed {date.strftime('%Y-%m-%d')}")

# Combine all days
full_df = pd.concat(all_days, ignore_index=True)
print(f"\033[32m#DEBUG\033[0m: ✅ Combined all {len(all_days)} days into full_df with {len(full_df)} rows")

# Save one big Parquet file
full_parquet = f"NorSWE_annual_slice_{start_date}_{end_date}.parquet"
full_df.to_parquet(full_parquet, index=False)
print(f"\033[32m#DEBUG\033[0m: ✅ Saved full annual Parquet file: {full_parquet}")

# Cleanup
nc.close()
print("\033[32m#DEBUG\033[0m: ✅ NetCDF file closed")