Extracting pixel values to the points distributed over larger area

Main task:
I have ~100,000 points with xy coordinates distributed almost all over the African continent (bound = (-20, -35, 52, 30)). I am using quarterly bands values as predictor variables calculated from sentinel-2 images for 2022. I want to extract pixel values to the points for species distribution modelling.

Step 1: Set up a dask cluster for parallel computing.

cluster = dask_gateway.GatewayCluster()
client = cluster.get_client()
cluster.adapt(minimum=8, maximum=100)

Step 2: Data access in planetary computer

catalog = pystac_client.Client.open(
“https://planetarycomputer.microsoft.com/api/stac/v1”,
modifier=planetary_computer.sign_inplace
)

bbox = gp.GeoDataFrame(
geometry = gp.GeoSeries([box(13.00, -21.50, 35.20, -20.00)]),
crs = “epsg:4326”)

search = catalog.search(
collections=[“sentinel-2-l2a”],
bbox=bbox.total_bounds.tolist(),
datetime=“2022-01-01/2022-12-31”,
query={“eo:cloud_cover”: {“lt”: 10}},
)

items = search.item_collection()

Step 3: Lazy load of the data

ds = stac_load(
items,
crs=epsg,
resolution=10,
bands=[“red”, “green”, “blue”, “nir”],
chunks={“x”: 2048, “y”: 2048},
bbox=bbox.total_bounds.tolist()
)

Step 4: Extracting the pixel values to the points

bgPoints = gp.read_file(“~/ndvi_sdm/files/bg_points.gpkg”)

x = xr.DataArray(bgPoints [“geometry”].x) ## for data in .gpkg format
y = xr.DataArray(bgPoints [“geometry”].y)

data_extract = ds.sel(x=x, y=y, method=“nearest”, drop=True)
bgData = data_extract.compute()

This is where I am having trouble with extracting data. If the region of interest (bbox) is small, then there is no issue. However, if the bbox is large, as in this case, computation time is high and I get the error as shown below and says failed to reconnect to scheduler and closes client.

I am relatively new to dask computing and xarray. I would like to know how can I efficiently extract pixel values for multiple points distributed over larger areas? Is there any other way of taking a bigger size bounding box while reducing computing time? My way takes more than a month just to get data for my area of interest which is not sustainable in the long run because I am also planning to build a species distribution model for monitoring the change in species distribution each year.

This sounds like an interesting problem. I’m a bit worried that all of these libraries (xarray, dask, down to rasterio / whatever is loading the files) won’t be able to do the “right” thing in the ds.sel(x=x, y=y, ...).

If I were exploring this, I would start at the lowest level with reading individual points from individual assets. Do you know roughly how dense your points are in a given scene? Are you likely to be reading many points from a single block, or just one? The smallest window of data that GDAL / rasterio can read out of a COG is a block (Looks like 512x512 pixels for the Planetary Computer’s S2 assets). I’d use that to establish a rough baseline for performance: it would read the least amount of data (though it might make more HTTP calls, depending on how many blocks of a scene you’re reading from, which depends on how dense your points are). If you want to use xarray at this level, I think that one of the .rio methods from rioxarray should do the right thing and read the minimal amount of data.

With that baseline in place, I’d think about parallelizing it. There are probably multiple ways to do this. Personally, I would consider loading the STAC items into a geopandas.GeoDataFrame, and then doing a spatial join against your bgPoints. That will associate a bunch of STAC items with each point (one per timestamp). Then you can do a groupby time and apply the function that reads each row?

Let me know if you pursue this! I think it’ll expose some weak points of this stack. I hope that someday we could make sure that the ds.sel(x=x, y=y, method=“nearest”, drop=True) you wrote works optimally.

1 Like

Thank you, Tom for the reply and excuse my late reply.

As I mentioned I am new to this, there were lots of things to understand and uncover. :slight_smile:

To answer your question first: Points I am working with are not evenly distributed. However, the average points distributed per Sentinel-2-l2a scene is ~40 and I will reading multiple points from multiple blocks/scenes (in total I have 100,000 points to work with distributed over large area). I tried to follow your suggestion on loading STAC items into a GeoDataFrame and the doing the spatial join against my points. Because I have multiple points distributed unevenly, the result from above method is more or less the same I was working before while reading the data, I think.

I modified my method of extracting pixel values to the points. First, I created the buffer around my points, and I used that buffer as my area of interest. And then, I read individual points and buffer in a loop to get the data. At the moment, I can read 20 points per minute in average which is the huge improvement than the last one I was working on. However, because I have to do multiple calls over the multiple points, I also got suggestion that it might not be best approach in my situation.
Still exploring the best approach…

There’s a few things that I think are not optimal with this.

One issue is that you’ve got a 3D array of data (x, y and time) and you’re requesting pixels by x and y only. Something to consider is doing a median first, but you really should be masking out clouds before you do that. Basically preparing your Sentinel-2 data before you select it.

We’re doing similar work for Digital Earth Pacific, but we’re using pre-computed annual cloud free mosaics of Sentinel-2 and Sentinel-1. This is a big, complex notebook: mineral-resource-detection/MRD_S1_S2_elev_RandomForest.ipynb at main · digitalearthpacific/mineral-resource-detection · GitHub

But the step you’re after is this one:

from tqdm import tqdm

projected_training_data = tdata.to_crs("EPSG:3832")

# Remove the ID field
# projected_training_data.drop(columns=["id"], inplace=True)

training_array = []


def get_training_data(id_row):
    _, row = id_row
    cls_id = row["lulc_code"]
    # id = row["id"]
    geom = row["geometry"]

    # Get xarray values at the point
    x = merged.sel(x=geom.x, y=geom.y, method="nearest")
    one_point = [cls_id] + [float(v) for v in x.values()]
    return one_point


with ThreadPoolExecutor(max_workers=10) as executor:
    training_array = list(
        tqdm(
            executor.map(get_training_data, projected_training_data.iterrows()),
            total=len(projected_training_data),
        )
    )

print(f"Fetched data for {len(training_array)} training points")

Note that tdata is a set of point locations with classes (training data) and merged is an in-memory dataset of x, y, . Also note that we were doing this lazy-loaded, with chunks of 100x100k, which worked ok too, and we probably should go back to that now.

On a bigger point, Digital Earth Africa has Sentinel-2 mosaics, and a STAC API, here: https://explorer.digitalearth.africa/stac/collections/gm_s2_annual. Using that data will dramatically simplify your workflow!

1 Like

Hi all, I’m also doing some experiments extracting a large number of nearest values (point geometries) from a raster. Initially I also tried using da.sel(lon=lons, lat=lats), but I found that this does not scale well - the Dask graphs are very fragmented.

Currently I’m doing it by persisting the (x,y) DataArray combined with map partitions of the point geometries. However, I find that my Dask processing chain is still quite unstable:

  1. workers are being spinned up to dissappear straight away;
  2. there still appears to be some kind of dependency between the different vector partitions as they are not processed fully in parallel.
from dask.dataframe.utils import make_meta

# persist some ERA 5 aggregated over time in memory
da = ds.surface_air_pressure.mean("time").persist()
da.coords["lon"] = (da.coords["lon"] + 180) % 360 - 180
da = da.sortby(da.lon)
da = da.rio.write_crs(4326)
da = da.rio.set_spatial_dims(x_dim="lon", y_dim="lat")

# store in memory of all workers
da_scattered = client.scatter(da, broadcast=True)

# function to add variable to geodataframe that contains lon/lat columns 
def add_var(df, da):
    import rioxarray

    da = da.result() # get scattered raster 
    da = da.rio.write_crs(4326) # we need to re-add the rio attrs
    da = da.rio.set_spatial_dims(x_dim="lon", y_dim="lat") # we need to re-add the rio attrs
    
    # not sure if this is better than da.sel(lon=df.lon.values, lat=df.lat.values)
    r = da.rio.clip(df.geometry, df.crs, drop=True)

    # maybe it would be better to vectorize the raster and then do a spatial join?
    r = r.to_dataframe().dropna().drop(columns="spatial_ref").reset_index()
    r = gpd.GeoDataFrame(
        r, geometry=gpd.GeoSeries.from_xy(r.lon, r.lat), crs=4326
    )  # maybe better to do vectorize here before the spatial join>?

    # add attrs to source df by nearest spatial join. 
    r = gpd.sjoin_nearest(
        df.to_crs(3857), r.to_crs(3857).drop(columns=["lon", "lat"]), max_distance=30000
    ).drop(columns=["index_right"])
    return r


new_col = pd.Series([], dtype="f4").to_frame("surface_air_pressure")

meta = pd.concat([make_meta(transects), new_col])

r = transects.map_partitions(lambda df: add_var(df, da_scattered), meta=meta)
r = r.compute()

Have you optimized the routines yet? Or do you see obvious shortcomings in this approach? If you want to test I can share the fully notebook including sas token for the transects.