Best Practices for automating large scale Sentinel dataset building and Machine Learning?

which seems to be ok - but might have unfortunate overhead?

– and getting memory errors when trying this sort of thing from a basic cluster model.
`with dask.diagnostics.ProgressBar():

                all_labelsNP = km14cls.predict(dask.array.from_array(z3, z3.chunks) 

[startvar:endvar,:].compute(optimize_graph=False))

                all_labelsNP = all_labels.compute()

                #pickle.dump(all_labelsNP.astype('uint8', f)

                zP[startvar:endvar] = all_labelsNP.astype('uint8')` 

A pickle dump was ok though - doing it in parts? Taking a lot of memory.

I am considering this in having to scale up to deal with bigger areas - this one only being 300000 kmsq

Would you use map_blocks for large scale prediction/inference @rabernat ? e.g. predict returns an array of labels in that case.

That sounds an interesting thread, @RichardScottOZ !

Hopefully will work all this out eventually @Guy_Maskall , although the answer might still be ‘impressive cluster’ even with optimisations/best way.

I broke an r524x.large doing lots of the above, killing memory etc., too.

Impressive! :wink:
I get to play on a single Windows VM that has 54 GM RAM and 27 cores. I’ve now got as far as creating a class with some useful methods so I can load multiple geoTIFFs, stack them, add a layer of rasterized labels from a shapefile, then cast that to a dask dataframe and then output to parquet. That takes care of creating a training data table. Similarly, having built a machine learning model on the features, I can load data again and apply the model.predict() to it in chunks, saving the output as netCDF. Doing the two main steps in such a way you can chain methods seems to really help the dask scheduler. One day I’ll get access to some distributed compute and it all should all work the same. But faster.

So that is all in one graph workflow?

Yeah, my current code for running extracting the data I want looks something like

load_data(file_list, keep_attrs=['crs', 'transform'])\
        .add_labels(training, measurements=['field_id', 'rez_id'], keep_notnull=['rez_id'])\
        .assign_coords({'tile': tile})\
        .to_dask_dataframe()\
        .dropna(subset=['rez_id'])\
        .to_parquet(outpath, 
                    compute_kwargs={'scheduler': client},
                    schema={'tile': pa.string()}
                   )

load_data returns a custom class that has methods add_labels() and assign_coords(). to_dask_dataframe() is a custom method that just calls xarray’s method on the data attribute of the class. Thereafter it’s all dask object methods. So that one chained workflow loads geotiffs, adds the ground truth from a geopandas GeoDataFrame, adds a new coord (here the tile ID for subsequent tracking), before converting to a dask dataframe and dropping pixels not associated with any ground truth. The field_id is just a row index column I added to my input shapefile. This gives each polygon a unique ID so I can use grouped cross-validation later on.

Doing this on a tile by tile basis essentially partitions my parquet dataset by tile and keeps the dask scheduler happy. As I understand it, this approach means the dask scheduler doesn’t have to try to calculate a bunch of dependencies itself and join them together into a DAG. It certainly produces the leanest set of tasks in the dask dashboard that I’ve seen in all my fumblings.

I’ve got something like 400? small fields spread over three S2 tiles stacking something like ~100 bands over time and all pixels are extracted in something like 7 minutes. It’s early days and I haven’t had chance to see how it scales with more ground truth polygons. But it’s efficient for sparse training data, so I’m happy enough with it for now. My current effort is on adding testing to ensure various things are handled properly. We do also use opendatacube, although my code to date has been dealing with unindexed geoTIFFs. I’m looking forward to adding opendatacube support to the data loading soon!

1 Like

Interesting, so converting tiffs to dataframes?

Be interesting to try.

Yeah, basically. If you don’t care about preserving spatial relationships, and just want to extract a data table of observations (pixels) in rows and features (EO bands over time along with a target label) as columns.

Anyway, some success last week

Basically, dask gateway/kubernetes via jupyter hub and an odc grid workflow with the dask futures api and s3 worker access. Bigger the machines in the cluster, the less you have to break it up. Scheduler blowing up before the worker memory it seemed, too. So something to look at.

Merging the ‘lots of tiles’ into one for downstream ML is a slow part. @TomAugspurger I saw started working on a xstac - e.g. maybe xarray set of tiles to a stac catalogue automatically then means you can do fast mosaicing in parallel on an ad hoc created dataset? As opposed to a standard rasterio type merge.

A cluster model with a predict function benefits from breaking things up to by making billions of predictions in parallel.