Synchronizer for Zarr + Dask on Kubernetes

Hey, All.

This is my first post here and I appreciate all the work. Let me know if there is a better place for this. Very cool stuff!

My problem: I have Tbs of .tiff files (landsat) on gcs and want to write them to a distributed xarray datastore on gcs using zarr. It is likely that some writes will overlap causing a need for some synchronizer or lock on the underlying chunk files. I feel I have come to the end of documentation regarding how to properly synchronize these with the .to_zarr() function in xarray.

My First Approach:

  1. I have successfully set up a GKE kubernetes cluster with an autoscaling node pool
  2. Installed Dask with helm
  3. Successfully port forwarded client and daskboard.
  4. Use gcsfs to glob tiff files from our storage.
  5. Wrote a per tiff parser to read in the tiff tiff file and write out the tile to a specific group.

I would rather combine and merge these all into one large xarray dataset. So, I have been revisiting the documentation and have found the delayed approach. I would like to do something like this:

all_tiles = fs.glob("...")
# inspect shape of all tiles and merge by inspecting meta with rasterio directly from gcs
band_shapes = get_shape(all_files)
all_bands = get_bands(all_files)
# use bands info to make dummy dataset
ds = xr.Dataset(...)
path = "path/to/directory.zarr"
ds.to_zarr(path, compute=False)

def parse_and_write(path_to_tiled_tiff):
    # get tile info
    tile = get_tile(path_to_tiled_tiff)
    # get read into xarray dset
    other_ds = get_some_xarray_dset(path_to_tiled_tiff)
    lat_slice = get_lat_slice(tile)
    lon_slice = get_lon_slice(tile)
    other_ds.chunk({some_chunking_scheme})
    # write out to a specific region
    other_ds.to_zarr(
        path, 
        region={'lon': lon_slice, 'lat':lat_slice)}, 
        synchronizer=NOT_SURE
    )

client.map(parse_and_write, list_of_gcs_tiffs)

Questions:

  1. Is this a reasonable approach?
  2. Are there any concerns?
  3. MOST IMPORTANTLY: how do I setup a synchronizer on dask kubernetes to file lock these underlying chunks in the even that they overlap and when multiple instances of parse_and_write writes to the same underlying chunk? I have used a redis distributed file lock, but wanted to hear how others solved this problem and if there is already an implementation for this?
1 Like

Leonard, thanks for the good question!

AFAIK, there is currently no solution for the lock object you need if you go through vanilla Xarray / Zarr.

However, I would strongly recommend you look at Pangeo Forge for this type of workflow. It is designed to help with exactly the sort of transformation you are doing. It has its own mechanism for managing locks which figures out the precise potential conflicts between write chunks.

We would be happy to help you develop a Pangeo Forge recipe for your data transformation.

1 Like

Thanks for the immediate response @rabernat! I admire your work and appreciate the time.

I will look into Pangeo Forge recipes. I am currently motivating the use of xarray and zarr to my team and would prefer to keep things as simple as possible for now–I’d prefer to not rewrite any zarr xarray code to obfuscate those packages during a demo and for code reviews from people that will be looking at it their first time. I also just started with dask, xarray and zarr a week ago, so it is pretty fresh. That said, I like the concept of file pattern to datastore and glanced at the ZarrayZarr recipe. I might take y’all up on that at some point!

I have been looking at leveraging stackSTAC for this potentially, but we have 6 features * 36 years stacked along the band axis in the tiff files, which I think is weird. (just started looking at tiff files a few months ago; I am a machine learning engineer and not a data engineer, but here we are lol).

I might be able to simply implement a redlock synchronizer and pass it directly to the to_zarr() arg. Seems like using a persistent volume claim for the ProcessSynchronizer is a bad idea.

Thanks for any thoughts. I know that this is primarily a zarr topic…

Hi @Leonard_Strnad I believe you work now with my awesome former colleague Martha, welcome to the discourse. There may be 2 higher level questions here that might be worth considering before committing to building a large archive. The first is what is the ideal representation model for the underlying data. Recently @TomAugspurger has begun thinking about alternative, efficient representations for sparse EO data https://discourse.pangeo.io/t/tables-x-arrays-and-rasters/1945 It is definitely worthwhile reviewing his thread as there is still significant flux in the community around how to approach sparse datasets.

The second question concerns the underlying data storage mechanism. If your collection of Tiffs exists in GCS, STAC can provide a mechanism to reference and access the underlying bytes without the need to ingest them into a Zarr archive. As you noted there is ongoing work on GitHub - gjoseph92/stackstac: Turn a STAC catalog into a dask-based xarray to improve the interoperability between STAC and xarray.

The community has not yet embraced a single solution for this question. Decision factors include the spatial and temporal distribution of your data, the storage access patterns of your use case and the mutability of your data collection over time. These are questions that lots of teams are grappling with at the moment so it would be great to keep an open dialogue going around this in this thread (or another if that makes more sense) :]

1 Like

It may be possible to use a Dask distributed lock, as described here:

@sharkinsspatial thanks for chiming in! I do work with her!

These are all really good points that I have have been asking myself recently. I am a machine learning engineer and less of a data engineer, but have found myself trying to increase the discoverability and availability of our data. So, here we are.

I will take a look at those threads in more detail as I have already run into them at some point in time. Thanks for sharing!

The two goals I am trying to accomplish are building a datastore for raster data and a datastore for labeled datasets. We tend to store large tiff files that cover regions of interest, so the raster datastore is not sparse and can be used for inference, unsupervised training, sampling, building a labeled dataset datastore, etc. Each use case might call for copies of the raster datastore with specific chunking for that access pattern if motivated. The labeled dataset datastore will likely be built from the raster datastore and probably only have an ordered coordinate such as range(n). The labeled dataset datastore will be much much smaller in size than the raster datastore and will act as a mechanism to decouple the data from the underlying python class for portability. In terms of mutability, raster datastores should not change and labeled dataset datastores will be built on request depending on sampling, labeled selection, etc. This is all a small step forward compared to what we currently have–we directly read only zoom 10 rasters even for small areas of interest. I will be starting with a smaller region of interest and build up to larger archives and I am sure I will run into issues as I hit very large regions.

The biggest advantage of zarr imo is simply the ability to chunk according to a specific access pattern. I struggle to see how STAC from gcs will support slicing when the smallest read is one whole band for tiffs of a specific zoom(I believe). Not sure I see how one can get away from the idea of partitioning, which is native to zarr. I think the success of stackstac depends on the zooms of your tiffs, which act as shards themselves, but I might be wrong.

My current approach for the raster datastore:

  1. given geo, path to gcs, discover tiffs in collection
  2. estimate height, width in pixels from tiffs for target xarray + zarr
  3. build target and push with compute=False
  4. apply function at tiff level to read, parse, and push to_zarr with region specifying logical coordinates determined by min/max coords of tiff on targets lat/lon coordinates
  5. client.map function in 3 over all tiffs with dask distributed lock

Any thoughts or concerns would be appreciated!

p.s. If you know any data engineers with experience, we are looking.

I am using the dask distributed lock and so far so good I think. Thanks again for sharing. Some questions:

  1. Is there anyway to tell when there has been corruption?
  2. Also, what is the advantage of estimating the collision as you do in pangeo-forge-recipes/xarray_zarr.py at master · pangeo-forge/pangeo-forge-recipes · GitHub?
  3. Is the lock slow enough such that it is only desirable to only lock chunks that have conflict?

A distributed question:

  1. Any time I specify resources for client.map the job wont start/submit or show up on the dashboard. I have to manually iterate over batches to make sure that each worker has at most k tasks or the worker kills from too much memory usage. Any idea what is up? I am using helm to install dask.

(happy to start a new thread or ask somewhere that might be more relevant)

No, not yet, but this is being actively worked on. See checksums for chunks · Issue #392 · zarr-developers/zarr-python · GitHub for discussion.

We are perhaps over-optimizing. We started from an assumption that no locking would be required and then gradually started adding locks. We wanted to avoid any locking when it was unnecessary. A simpler approach would have been just to acquire a lock for each chunk. The downside is that you have to query the scheduler for the lock every time you write even if there are no conflicts. I’m not sure if this has a performance impact or not.

Don’t know.

I think your Dask questions belongs elsewhere, perhaps as a Dask issue. My impression is that client.map is probably not the right entry point for using Dask here. You want to use one of the APIs like dask.array or dask.delayed.

1 Like

As Leonard mentions, presumably native Landsat so not cloud optimised. To enable lots of modelling downstream far more efficiently at terabyte scale, have to take the processing hit at the start.

Soliciting some more advice on to_zarr(region=). Happy to make a new thread if anyone prefers.

I have broke out my function above into multiple delayed tasks:

def read_parse_write(paths):
    results = []
    for path in paths:
        # read the tiff and parse into xarray
        dset = dask.delayed(read_parse)(path)
        # determine the slice of target based on lat and lon
        slice_dict = dask.delayed(get_slice)(dset)
        # return bool or exception potentially
        result = dask.delayed(dset.to_zarr)(region=slice_dict)
        results.append(result)
    return results

This helps to see what is taking the most time.

The files I am working with have 37 years of 6 band data stacked along the band dimension (222 bands total) and read_parse builds an xarray with time, lat, lon coords. They are about 800mb to 1.5Gb zoom 10.

I am looking for some tips on speeding up the .to_zarr(region=) portion of the code. When I chunk before applying to_zarr, Many tasks show up as the chunk function turns the underlying arrays into dask arrays, but seems to hang and these chunks seem to spread across workers. Some questions:

  • If I am passing 100s of these tiff paths, and these tasks are running across all the workers, does it ever make sense to chunk first? In the case where we are working with one large dataset it makes sense, but if every task is working with its own smaller dataset, I think not.
  • Maybe there is a way to chunk thereby turning into a dask array, but enforce that the chunks must stay on the current worker in order to benefit from parallel writes and prevent data shuffling?

Any thoughts are greatly appreciated. Cheers!