Extract small data cubes around observation points in datacube

Hello everyone,

My name is Gwenaël Caër and i’m a research engineer, i’m relatively new to the Pangeo community, and this is my first request.

I work with a 30-year daily dataset in netCDF format. From the NetCDF files I used Pangeo Forge Recipes to build a datacube that I load with xarray using the intake-xarray library. In pseudo-code, this gives:

cat = intake.open_catalog("reference.yaml")
datacube = cat["data"](chunks={"time": 1, "latitude": -1, "longitude": -1}).to_dask()

The result is a datacube with the following dimensions: (time: 11115, longitude: 1440, latitude: 720).

On the other hand, I have a set of 4,000,000 observation points distributed more or less evenly across the dataset.

I’d like to extract data from the datacube around these observation points. In other words, I’m not just looking for the datacube value closest to the observation point, but also the values around it, i.e. I’m looking to extract a small cube of data around my observation point. The size of this cube varies from one observation point to another, I’m looking for a dt time step of +/- 2 days around the observation point but in longitude and latitude (dlon, dlat) it varies.

The datacube and small cubes can be represented schematically as follows:

So I’m looking to optimize this operation. I’ve already tried several methods.

  1. First i extracted each cube 1 by 1 sequentially in a for loop. But it’s extremely long. In pseudo-code, this gives:
results = []
for obs in ds.obs:
    
    start, end = obs.time + pd.Timedelta(days=-dt), obs.time + pd.Timedelta(days=dt+1)
    lon_min, lon_max = obs.lon - dlon, obs.lon + dlon
    lat_min, lat_max = obs.lat - dlat, obs.lat + dlat
    
    results.append(
        datacube.sel(
            time=slice(start,end),
            longitude=slice(lon_min, lon_max),
            latitude=slice(lat_min, lat_max),
        ).compute()
    )

Then I tried to use vetorized indexing

  1. So I tried to create a DataArray of slices, In pseudo-code, this gives:

times, longitudes, latitudes = [],[],[]
for obs in ds.obs:
    
    times.append(slice(obs.time + pd.Timedelta(days=-dt), obs.time + pd.Timedelta(days=dt+1))
    longitudes.append(slice(obs.lon - dlon, obs.lon + dlon))
    latitudes.append(slice(obs.lat - dlat, obs.lat + dlat))

ds_slice = xr.Dataset(
    coords=dict(
        lon=(['obs'], longitudes),
        lat=(['obs'], latitudes),
        time=(['obs'], times),
        obs=ds.obs.values,
    ),
)

results = datacube.sel(
    time=ds_slice.time,
    longitude=ds_slice.lon,
    latitude=ds_slice.lat,
).compute()

But Xarray & Numpy doesn’t support indexing by an array of slices,

  1. So I wanted to generate each value of the cube to be extracted to get rid of the slices, but once again since the cubes aren’t all the same size I couldn’t create a unique numpy array with different dimensions for each cube.

  2. So the last solution I’ve come up with is to flatten all the indices, in pseudo-code, this gives:

f_t, f_lo, f_la = [], [], []
for obs in ds.obs:

    start, end = obs.time + pd.Timedelta(days=-dt), obs.time + pd.Timedelta(days=dt+1)
    lon_min, lon_max = obs.lon - dlon, obs.lon + dlon
    lat_min, lat_max = obs.lat - dlat, obs.lat + dlat

    s_t = slice(start.values, end.values, timedelta(days=1))
    s_lo = slice(lon_min, lon_max)
    s_la = slice(lat_min, lat_max)

    # Generate indexes
    it, ilo, ila = np.r_[s_t], np.r_[s_lo], np.r_[s_la]

    # Create multidimensional grids
    gt, glo, gla = np.meshgrid(it, ilo, ila, indexing='ij')

    # Flattening indices
    ft, flo, fla = gt.flatten(), glo.flatten(), gla.flatten()

    # Add the cube's flat indices to the list of flat indices
    f_t.extend(ft)
    f_lo.extend(flo)
    f_la.extend(fla)

results = datacube.sel(
    time=xr.DataArray(f_t,  dims="points"),
    longitude=xr.DataArray(f_lo,  dims="points"),
    latitude=xr.DataArray(f_la,  dims="points"),
    method="nearest",
).compute()

Whatever method we try, execution times are still too long. I feel like I’m missing something. Because when I run my script on an HPC, whatever resources I request, I get almost the same calculation times. So I have the feeling that the operation isn’t parallelized.

Thank you for your help and your time.

Gwenaël Caër

2 Likes

Hello @gcaer,

I will try to answer your question.

I think the first step of your computation (or computation batch) would be to locate where your observations fit in the cube. You could use numpy.searchsorted to get the indexes:

idx_time = np.searchsorted(datacube.time.values, obs.time)
idx_lon = np.searchsorted(datacube.longitude.values, obs.lon)
idx_lat = np.searchsorted(datacube.longitude.values, obs.lat)

How to use these indexes depends on what needs to be done on each extracted datacube. I would go with a for loop in numba for a first naïve approach:

batch_process(datacube.values, idx_time, dt, idx_lon, dlon, idx_lat, dlat)

@numba.njit
def batch_process(values, idx_time, dt, idx_lon, dlon, idx_lat, dlat):
	for ii in len(idx_time):
		slice_time = idx_time[ii] - dt[ii], idx_time[ii] + dt[ii] + 1
		slice_lon = idx_lon[ii] - dlon[ii], idx_lon[ii] + dlon[ii] + 1
		slices_lat = idx_lat[ii] - dlat[ii], idx_lat[ii] + dlat[ii] + 1
		
		# Do the computation
		values[slice_time, slice_lon, slice_lat]

On the other hand, assuming your datacube contains 64-float data, it is around 86GiB and loading it in memory could be difficult. If you don’t have enough RAM or want to reduce your memory consumption, I would suggest binning your observations into each chunk and do the computation chunk-wise.

Hello @robin-cls

Thank you very much for your reply and for taking the time to answer me.
I didn’t know about the np.searchsorted function, and I’d never used Numba before. The method you presented is really interesting.
For my part, I’ve made a bit of progress and have adopted the following method:
I proceed file by file, in other words I select all the observations to be extracted on a given day (in a file), and I extract them, then I move on to the next files. At the end, all I have to do is gather all the extracted cubes into a dataset. Performance is pretty good. In pseudo-code it looks like this:

@dask.delayed
def colocalisation(datacube, obs):
    return datacube.sel(
        longitude=obs.lon,
        latitude=obs.lat,
        method="nearest",
    )

results = []
for time in datacube.time:
   
    results.append(
        colocalisation(
            datacube.sel(time=time),
            obs.where(time=time)
        )
    )

r = dask.compute(*results)