Huh! I didn’t know that NetCDF was designed for use with a local filesystem! That’s really good to know!
So, organizationally we keep nearly all of our files either in virtual AWS EC2 environments or in AWS S3 buckets. Keeping things ephemeral makes it easier for everybody to have access to any data they may need, without having to go work too hard through email chains to gain access. At least I suppose that’s a possible reason why, there may be other reasons, but I haven’t really dug too deep into why everything’s cloud based.
But yeah, the goal is to have this 40GB file (or at least the parts of it we need for our project) accessible from the AWS S3 bucket that it resides in. We’d like to be able to access it and the slices of it that we need programmatically from our EC2 environments. The two climate data records (CDRs) we’re comparing it to are both in S3 buckets too. The one CDR (SCE) is a single .nc file that’s updated monthly and it’s only a couple hundred MB. The other CDR (HP) is a collection of weekly .nc files going from 2001 to 2024 that are each a few hundred MB.
The first part of the analysis was just to compare the SCE and HP CDRs. I was able to do this using a handful of loops in Python using s3fs. I opened the single SCE .nc file and compared it to each HP .nc file, closing the HP file and opening the next one.
In that phase of the project I ran into a problem similar to what I’m currently having. When I first wrote the loop I wasn’t closing the HP file at the end of each iteration. Before I realized what was going on it was just leaving them all open in RAM and eventually crashing the EC2 environment before processing finished.
Awesome! Since my original post I’ve tried to convert it to .Zarr and ran into the same issue.
Here was my stab at it:
### Code cell trying to convert from .nc into .zarr ### Nope. Can't write from a read-only file, can't open in anything other than read.
ProgressBar().register()
ds = xr.open_dataset("NorSWE-NorEEN_1979-2021_v2.nc", engine="netcdf4", chunks={"time": 1, "station_id": 10118})
# Initialize Zarr path
zarr_path = "NorSWE-NorEEN_1979-2021_v2.nc.zarr"
ds.isel(time=slice(0, 1)).to_zarr(zarr_path, mode="w")
for t in range(ds.dims["time"]):
print(f"\033[34m#DEBUG\033[0m: Writing time slice {t} of {ds.dims['time']}")
# Slice one time step lazily
chunk = ds.isel(time=slice(t, t+1))
# Write slice to Zarr
chunk.to_zarr(zarr_path, mode="a", append_dim="time")
###EDIT: RE that code block above: Oh! Whoops! I have the file saved locally, I can try to convert it to .Zarr from there instead of trying to do it from the S3 bucket.
###EDIT 2: Nope, scratch that, that’s what I was already doing in this code. I thought I saw an S3 in front of the filepath for the xr.open_dataset, but I just wasn’t looking closely enough.
Right now I’m looping through the parts of file I’m interested in in readonly mode turning it into a .parquet file instead. It seems to be a good alternative to .Zarr or accessing the .nc file itself.
##### Code cell constructing one big .parquet from HP start to NorSWE end
# Reopen dataset
nc = netCDF4.Dataset(nc_file, "r")
print("\033[32m#DEBUG\033[0m: ✅ NetCDF file opened")
# Time and dimensions
time_units = nc.variables['time'].units
calendar = getattr(nc.variables['time'], 'calendar', 'standard')
dates = netCDF4.num2date(nc.variables['time'][:], units=time_units, calendar=calendar)
num_stations = len(nc.dimensions['station_id'])
# Confirm station count
if num_stations == 10118:
print(f"\033[32m#DEBUG\033[0m: ✅ num_stations = {num_stations}.")
else:
print(f"\033[31m#DEBUG\033[0m: ❌ ERROR! num_stations = {num_stations} instead of 10118.")
### Define get_var function to get 1-D station metadata without blowing anything up
# It also decodes byte strings to readable text PRN
# loops through num_stations to make sure that even non-reporting stations are included in every time slice
def get_var(varname, decode=False): ### for variables that need to be decoded they're passed into the function with decode=True
try:
data = nc.variables[varname][:]
if decode:
data = [b.decode('utf-8').strip() if isinstance(b, bytes) else str(b) for b in data]
return data
except Exception:
return [np.nan] * num_stations # exceptions return nan * num_stations because pandas dataframe arrays have to all be the same lengt
station_ids = get_var('station_id', decode=True)
lats = get_var('lat')
lons = get_var('lon')
elevations = get_var('elevation')
sources = get_var('source', decode=True)
station_names = get_var('station_name', decode=True)
type_mes = get_var('type_mes', decode=True)
mmask = get_var('mmask')
NorSWE_index = list(range(num_stations))
### Define a function to extract per-time values for reporting stations
# Why? Some vars (lat, lon, elevation, etc) only have 1 dimension, the station to which they're associated. For them we use get_var.
# The rest of them (SNW, SND, QC flags) are 2 dimensional, associated with both the time and station. All floats & ints, no decode req
### Does the same thing as get_var; loops through num_stations to make sure that even when there's no data we document it
def get_time_slice(varname, t_index):
try:
return nc.variables[varname][t_index, :]
except Exception:
return [np.nan] * num_stations
# Container for all days
all_days = []
start_date = pd.to_datetime("2001-03-19")
end_date = pd.to_datetime("2022-01-01")
# Loop over date range
for date in pd.date_range(start_date, end_date):
if date not in dates:
print(f"\033[33m#DEBUG\033[0m: ⚠️ Skipping {date.strftime('%Y-%m-%d')} (not in dataset)")
continue
date_index = np.where(dates == date)[0][0]
# Time-dependent values
snw_daily = get_time_slice('snw', date_index)
snd_daily = get_time_slice('snd', date_index)
den_daily = get_time_slice('den', date_index)
data_flag_snw = get_time_slice('data_flag_snw', date_index)
data_flag_snd = get_time_slice('data_flag_snd', date_index)
qc_flag_snw = get_time_slice('qc_flag_snw', date_index)
qc_flag_snd = get_time_slice('qc_flag_snd', date_index)
date_column = [date.strftime('%Y-%m-%d')] * num_stations
# Daily dataframe
df = pd.DataFrame({
'index': NorSWE_index,
'date': date_column,
'station_id': station_ids,
'station_name': station_names,
'lat': lats,
'lon': lons,
'elevation': elevations,
'mmask': mmask,
'source': sources,
'type_mes': type_mes,
'data_flag_snw': data_flag_snw,
'data_flag_snd': data_flag_snd,
'qc_flag_snw': qc_flag_snw,
'qc_flag_snd': qc_flag_snd,
'snw': snw_daily,
'snd': snd_daily,
'den': den_daily,
})
# Boolean flags
df['reported'] = df['snw'].notna() | df['snd'].notna()
df['reported_snw_only'] = df['snw'].notna() & df['snd'].isna()
df['reported_snd_only'] = df['snw'].isna() & df['snd'].notna()
df['reported_both'] = df['snw'].notna() & df['snd'].notna()
# Daily summary counts
df['count_reported'] = df['reported'].sum()
df['count_reported_snw_only'] = df['reported_snw_only'].sum()
df['count_reported_snd_only'] = df['reported_snd_only'].sum()
df['count_reported_both'] = df['reported_both'].sum()
all_days.append(df)
if date == start_date:
print(f"\033[32m#DEBUG\033[0m: ✅ Processed {date.strftime('%Y-%m-%d')}")
elif (date - start_date).days % 31 == 0:
print(f"\033[32m#DEBUG\033[0m: ✅ Processed {date.strftime('%Y-%m-%d')}")
elif date == end_date:
print(f"\033[32m#DEBUG\033[0m: ✅ Processed {date.strftime('%Y-%m-%d')}")
# Combine all days
full_df = pd.concat(all_days, ignore_index=True)
print(f"\033[32m#DEBUG\033[0m: ✅ Combined all {len(all_days)} days into full_df with {len(full_df)} rows")
# Save one big Parquet file
full_parquet = f"NorSWE_annual_slice_{start_date}_{end_date}.parquet"
full_df.to_parquet(full_parquet, index=False)
print(f"\033[32m#DEBUG\033[0m: ✅ Saved full annual Parquet file: {full_parquet}")
# Cleanup
nc.close()
print("\033[32m#DEBUG\033[0m: ✅ NetCDF file closed")