I would like to find statistics for 30 years worth of gridded data, where the “time” axis must be preserved since I’m calculating quantile statistics. The data is stored on an s3 bucket. I need to save the statistics out as a netcdf. I’m having a memory error despite using dask, so I think I’m not understanding something fundamental to how dask operates.
Work flow is as follows:
Process the data locally, turning the data 1-D so I can store as a dataframe on an s3 bucket. Time is saved separately.
for year in years:
time, hs, tp, dp = dl.load_bulk_params(year)
bathy, lat, lon = dl.load_bathy()
#flip upside down
hs = np.flipud(hs).ravel()
tp = np.flipud(tp).ravel()
dp = np.flipud(dp).ravel()
dataframe = pd.DataFrame(
data = {'hs':hs, 'tp':tp, 'dp':dp}
)
time_dataframe = pd.DataFrame({'time':time})
dataframe.to_parquet(local_path)
s3.upload(local_path, remote_path)
Load the data from the remote:
ddf = dask.dataframe.read_parquet('s3://.../params*.parquet'))
time_ddf = dask.dataframe.read_parquet('s3://.../time*.parquet'))
Turn parameter from dask dataframe into dask array, reshape, rechunk and and wrap into xarray (is this the memory intensive part where I should be careful about chunk sizes?), so I can use the xarray functionality to group by years, seasons,etc. and to save out as a netcdf:
timevec = time_ddf.partitions[:]['time'].values.compute()
arr = ddf[param].to_dask_array(lengths=True).reshape((len(timevec), len(lat), len(lon))).rechunk((len(timevec), 5,5))
hs_xr = xr.DataArray(arr, dims = ['time', 'lat', 'lon'],
coords = {'time':timevec, 'lat':lat, 'lon':lon}
).chunk({'time':-1})
Calculate necessary statistics using map_partitions and a defined function:
def dask_percentile(arr, q, axis = 0):
if len(arr.chunks[axis]) > 1:
msg = ('Input array cannot be chunked along the percentile '
'dimension.')
raise ValueError(msg)
return da.array.map_blocks(np.percentile, arr, axis=axis, q=q,
drop_axis=axis)
computed_stats_ds = hs_xr.reduce(dask_percentile, dim = 'time', q = 90).compute()
Receiving process “killed”
Should I use a distributed dask cluster (start a client)? I have tried this, also, and I’m using a remote machine so I cannot access the dask dashboard (or can I?). When I use the client, the memory overflows and the workers are killed. I could reconfigure the dask parameters to allow for higher memory usage.
I thought, though, that the worker would process one chunk at a time. Is the rechunking and reshaping taking too much memory? What am I misunderstanding using Dask?
Another way forward would be to save everything as grids locally then load with xarray using open_mfdataset, but this is not desirable because I’d have to increase the disk space of the remote machine. Perhaps the whole work flow should be changed where I save the data out in a gridded format as a zarr?
Thank you so much for taking the time to respond and help me!