Tips for creating large mosaics

I’m trying to find sensible approaches to doing that task that seems common in the pangeo world, that is taking a number of raster images, processing them in some way, and producing a mosaic.

My experiments when the spatial extent is small work well, but I run into trouble when the bounds of the input image increase.

My current task, for example, looks at taking around 200 Sentinel 2 tiles and creating a cog. I’m using odc-stac for this, and my load and process would look something like:

  ds_odc = stac.load(
      items,
      crs=crs,
      nodata=0,
      bands=('height',),
      chunks={'time': 1, 'x': 2048, 'y': 2048}
      )

mos = ds_odc['height'].max("time").astype('uint8')

fut = odc.geo.cog.save_cog_with_dask(mos, 
                              out_cogname,
                               compression="LZW")
fut.compute()

I run this on a PBS cluster via from dask_jobqueue import PBSCluster. I would typically have access to say 20 workers, and could request around 16gb of memory for each one.

when the number of items is small, then this whips through nicely. When the number of items increases, it takes a long time before processing starts, and I get that common message about the size of the graph.

One massive cog at the end is probably not actually such a good idea, so I have experimented a little bit by creating smaller, non-overlapping geoboxes, and the looping through each geobox to create a number of individual cogs.

This works ok, but seems suboptimal to loop serially through a list of geoboxes. There is significant wasted time between the creation of one cog and the start of computation for the next one.

I was wondering if anyone had any tips on how best to manage large workflows where the result is either a number of cogs, or even a single large cog, without running into trouble with large graphs.

Would love to hear some tips. Perhaps the results could be collated as well, since I’m sure that this is a pretty common situation (hence why we use dask). Also pointers to example notebooks would be helpful; I know people create large cloud free mosaics for example, but I struggled to find example code for this.

Not sure what your (@rdenham) data is like, but 2 tiles of Sentinel is not a huge image at 10m resolution, you should be able to call mos.compute() and not run out of memory on one of your 16gb workers. Especially if you swap the order of max and astype, or better yet use dtype="uint8" at load time and avoid graph duplication with a separate .astype call.

Things I would experiment with, to reduce input Dask graph, I assume that’s where the majority of Dask chunks are in your case.

  1. Use groupby="solar_day" when calling odc.stac.load, this will reduce number of temporal slices from the number of items to the number of passes over the area, which is typically at least 2 times fewer.
  2. Specify required dtype at load time (is data actually in the range [0, 255]?)
  3. Increase input chunk size, although you probably have relatively few spatial chunks as it is. You can specify output COG chunk size separately, make sure input chunk is a multiple of output chunk sizes.
  4. Use mos.persist() when calling into save_cog_wtih_dask instead of just mos, you have enough cluster memory, and this will help the scheduler a bunch

Separate note, why LZW? Does it actually compress your data better than more modern compression schemes? Even ancient DEFLATE is usually better in most cases, and I’m yet to interact with software that only supports LZW and not DEFLATE (I’m not young BTW).

This works ok, but seems suboptimal to loop serially through a list of geoboxes. There is significant wasted time between the creation of one cog and the start of computation for the next one.

This is simple: don’t call .compute in a loop, collect all the futures into a list, then call client.compute(all_the_cogs), but like I said your mosaic is already not that huge. What are the dimensions of the final COG you get?

Thanks @kirill.kzb

Yes, 2 tiles is small, my aoi would be more than 200 (covering Queensland). The final output would be single band, and something like 233,835 by 156,295 rows,columns

I did try collecting a list of delayed items, since save_cog_with_dask returns a dask delayed object. This didn’t solve my issue, presumably because you still end up sending a large graph to the workers. I did the serial approach, mostly to try to reduce the graph size, like the dask documents suggest:

  • Breaking up your computation: For very large workloads you may also want to try sending smaller chunks to Dask at a time. For example if you’re processing a petabyte of data but find that Dask is only happy with 100 TB, maybe you can break up your computation into ten pieces and submit them one after the other.

But to be fair, my computation is considerably smaller than 1pt, so probably I need to work on the other issues first.

On the compression note, I’m not too worried about the choice here. I’m just experimenting with workflow. I think we have used LZW in the past just because it was better known by us, but we also routinely use DEFLATE. I’m not sure if it makes a lot of difference in this case though.

I’ll use ‘groupby’ to reduce the slices, I think this will actually make a big difference.

When splitting into multiple independent cogs, you can call persist inside the loop, then wait for results from a list of futures that got started. It will still take similar time to submit them all, but things will be computed in parallel to task submission, so your cluster will start work sooner and should stay busy without pauses, plus you can add progress bar that should make it less anxious for the user.

Ok, with a bit of experimenting using suggestions from @kirill.kzb I have no trouble quickly creating a large cog. On my system at least, I need to use large chunks to keep the graph small, but not so large I spill to disk. In this case 4096 works well.

My issue now is that the cog block size will be by default be 4096, which is a bit too big. If I specify a smaller block size in save_cog_with_dask then the graph grows again. Block size=2048 is slow and block size=1024 I think creates too many chunks, and has trouble propagating to the workers.

I’m thinking a compromise might be to create the cog with large blocks, but then post process to get the block size I want.

1 Like

So this is my current approach.

Load the data from all my stac items, but use that information to work out a sensible subset. Then use GeoboxTiles to create some subsets.

crs = "epsg:3577"
chunksize = 1024 
ds_odc = stac.load(
    stac_items,
    crs=crs,
    nodata=255,
    groupby="solar_day",
    bands=['visual.1', 'visual.2', 'visual.3'],
    chunks={'time': 1, 'x':  chunksize, 'y': chunksize}
    )


fgb = ds_odc.odc.geobox
# little function to get approx shape so that each
# subset doesn't have too many chunks
tile_shape = get_tile_shape(fgb, total_chunks)
gt = GeoboxTiles(fgb, tile_shape)

Then, iterate over the tiles, like

with Client(cluster) as _client:
    print(_client.dashboard_link)
    cluster.scale(jobs=16)
    for _i in range(gt.shape.y):
        for _j in range(gt.shape.x):
            subset = ds_odc.odc.crop(gt[_i,_j].extent)
            mosaic = subset.to_array(dim="variable").mean("time").astype('uint8')
            delayed_cog = save_cog_with_dask(mosaic.persist(), 
                                    f"/scratch/rsc7/denhamr/code/projects/dask_mosaic/vis_{_i}_{_j}.tif",
                                    compression="DEFLATE").persist()
            delayed_cog.compute()

At least this way, as @kirill.kzb points out, you can monitor the progress and see action in the dask client dashboard.

I think it is substantially quicker if you can do it in a single pass, but not if you require too many chunks.

If there was a quick way to take a cog with large blocks and reduce them, then post-processing one large cog might be feasible. I couldn’t see how though.

If anyone would like to point out other ways to optimise this, I’d love to hear them.