Parallelize moving polygon sample from ITS_LIVE datacube

Hello,

I am currently writing a python script to take the mean ice velocity within a polygon for thousands of polygons from NASA’s ITS_LIVE zarr datacube. Currently, the code depends on rioxarray to clip a single time slice of the velocity datacube to the corresponding polygon. However, we are using python’s multiprocessing library to split up the dates into smaller date lists and running the split date lists in parallel. I imagine there is a more pangeo esque way of parallelizing this.

Below is the function we are parallelizing:

def rio_clip(dataset,gdf,date_list,crs):
    velocity = []
    area = []
    for date in date_list:
        geom_values = gdf[gdf['date'] == pd.to_datetime(date.date())].geometry.values
        dataset_sample_clip = dataset.v.sel(mid_date=date).rio.clip(geom_values,crs,drop=True,invert=False)
        mean_arr = dataset_sample_clip.mean(dim=["x", "y"],skipna=True)
        area_count = dataset_sample_clip.count(dim=["x", "y"]) #,skipna=True) This should not count NaNs
        
        velocity.append(mean_arr)
        area.append(area_count)
        
    return area, velocity

then we use multiprocessing’s pool to distribute this amongst the workers:

with Pool(processes=cpu_count()-4) as pool:
    result = pool.starmap(rio_clip, args)

My questions are 1) how can I apply this function using dask to parallelize amongst several workers? 2) are there typical strategies to parralize per time slice? For example in the off cloud scenario, clipping each raster to a polygon and storing the mean value.

The code in question is also here

Thanks!!

1 Like

I’m not a Dask expert so I can’t help with that but you should note that the datacubes are optimized for time series access. The Zarr array is chuncked 10x10x20000 in space and time respectively. This means that when you request data from the datacube, at a minimum it will return 20,000 slices in time (a single chunk). A more optimal approach for working with the datacubes is to read in the full time series then subset. I’m not sure what you’re use case is but if you can accommodate this access pattern you’ll see massive improvement in speed.

2 Likes

I think there’s a couple approaches I would take:

  1. simply decorate with @dask.delayed and call dask.compute(scheduler=client) where client = distributed.Client() on the results
  2. Or use ds.map_blocks

Here’s an example with plotting

1 Like

@shahinmg I think what Alex mentioned is relevant, these data cubes are chunked for time series and each chunk covers a small spatial footprint. If you’re trying to get aggregated speeds over a large area, it could be more efficient to work with the annual regional mosaics (/velocity_mosaic/v2/annual/). If your question is more about just parallelizing access, I think @ahuang11 is right, a way to do it is to wrap your function into a Dask delayed decorator and then compute the results.

1 Like

For a different approach, you could considering rasterizing your polygons to a giant array of integer labels and using a groupby with flox . I think this is appropriate since you aren’t doing any weighting when averaging within a polygon.

2 Likes