I´m currently working on a project where I create a multidimensional array from GeoTif files and save it as a Zarr file. For this I am using Xarray and Dask (Gateway) via a Kubernetes cluster in a cloud environment. Is there a way to more precisely control the processing of the chunks of my multidimensional array in Dask? I hope to get some advice from your expertise:
memory bottlenecks:
When the data size is close to or exceeds the available memory, a bottleneck occurs: the upload fails because some workers require more memory than allocated, instead of distributing the data across additional workers.
• Why and what factors influence the increasing memory requirements per worker, even though the chunk size is so small (200 MB) that a worker could process multiple chunks in parallel?
memory spikes:
I observed that Dask does not balance the memory load evenly among the workers at the beginning of the storage process. Some workers require up to 6 GB of memory for a short time, while only around 2 GB per worker is used during the rest of the upload process. This leads to inefficient resource usage, as each worker needs more memory allocated than is typically required on average.
• What could be the cause of these initial “Memory Spikes”?
• Is there a way to optimize this behavior in order to achieve a more balanced worker utilization?
• In our previous tests with up to 500 images, the memory usage per worker did not exceed 6 GB. Is it to be expected that the memory requirements per worker will increase further with an increase in the number of images?
Here is an example of the distribution with 4 workers using 2 threads and 8 GB memory:
I may not be able to answer all the questions, but I think a lot of the times it has to do with the settings of the cluster itself. You may want to look at how many tasks are processing per-worker before it fails.
I’ve definitely experienced something similar in the past, when a worker becomes ‘greedy’ for some reason, and it starts processing many more tasks than it can handle, running out of memory.
A few questions that can help clarify the matter below:
What data type are your arrays? Downcasting can help a lot sometimes if acceptable.
What is happening to the data between reading and writing?
Does it work if you use smaller chunks?
I think this is a tricky one to figure out without seeing the process and it depends a lot on the task, but in my experience, 200MiB chunks for workers with 8GB sometimes do that, whereas somewhere closer to 128MiB works a bit better. This post in the dask blog has some rule of thumbs and loads of useful information.
FYI these kind of frustrations with dask are what motivated development of Cubed, which aims to be transparent and predictable about per-worker runtime memory usage.
what is the tiling and compression of the underlying Geotiffs? Are they consistent, and do they represent a clean mosaic (or single tile-per time?) in the output , or are they mixed shapes/resolution and/or mixed crs ?
I gather that each is 6000x10000 and you are just concatenating them in time, so best perf should come from setting chunk size as a multiple of the underlying tiling in the tifs (if they have it, they could be row-bound with 1*ncol “chunks”). I don’t get why your chunk size is 5000 for x, vs 6000 - that is a lot of “dangle” compared to the partitioning.
Thanks for your feedback! I really appreciate your input.
The Geotiffs represent a single tile per time. Each image has the same shape (bands = 4, x = 10000, y = 10000), resolution and CRS and they’re all uncompressed.
Here’s a quick overview of what happens between reading and writing:
I’ve noticed that I get similar results with other chunk sizes, like 100 MiB or even down to 25 MiB. However, it works across all chunk sizes when I allocate more memory per worker. One of my goals is to explore how chunk sizes impact the writing of Zarr files, as well as the reading and processing efficiency for spatiotemporal analysis. So i’m aiming for a Dask setup that works smoothly with different chunk sizes.
The native chunksize for a single geotiff looks like this: dask.array< shape=(4, 10000, 10000), dtype=uint8, chunksize=(4, 10000, 10000), chunktype=numpy.ndarray>
oh, I see that’s weird - usually when there’s no tiling it would treat it as scanlines, 10000x1, but effectively the same - thanks! (I will explore this more for my own purposes)
The implication is that there doesn’t seem to be much you can do with knowing more about the sources (if the chunking was wildly different it could be a lot of churn, but this shouldn’t be bad on that side of things). It might be worth re"chunking" (aka tiling, blocksize) the tifs themselves to match the final chunking you want, so that the actual reads would be non-partial (just concatenating multiple time steps and merging the bands into an array). The cli way looks like
but can be done via python with osgeo.gdal or rasterio. I don’t know if xarray chunk alignment is to the top of bottom or how to control that (GeoTIFF is to aligned to top), but with clean divisors like here it wouldn’t matter. And, overall I can’t see that having a huge impact with so few tiles.