Optimizing calculation of multiple projections

Hello

I’m pretty new to Dask and Xarray and want to use them to work with a dataset of 80GB. I have a machine with a single AMD Threadripper CPU with 32 cores and 128GB of RAM. The dataset contains measurements of n HG impulses that I want to project onto two first Hermite-Gauss eigenfunctions.

While it does fit into memory, I need to calculate multiple projections of it, increasing the chunks size by 2 due to 2 eigenfunctions, as well as 11 displacements in time and 11 displacements in phase. Because of that, the chunk size increases 242 times and Dask runs out of memory. Here are the metadata of both DataArrays

And the code I use for computation:

def cache_projections(cache_path=projections_cache_path):
    # signs inverted due to conjugation
    products = xr.concat(
        [
            ds.sel(ReIm="real") * ef.sel(ReIm="real")
            + ds.sel(ReIm="imag") * ef.sel(ReIm="imag"),
            ds.sel(ReIm="real") * ef.sel(ReIm="imag")
            - ds.sel(ReIm="imag") * ef.sel(ReIm="real"),
        ],
        dim=COMPLEX_DIM,
    )

    # simple integration
    projections = products.sum(dim="time") * DT
    # rechunk n as I need to bootstrap the impulses later
    projections = projections.chunk(
        {
            **dict(zip(projections.dims, np.repeat("auto", len(projections.dims)))),
            "n": -1,
        }
    )
    print("Calculating projections")
    projections.to_dataset(name="sig").to_zarr(cache_path, mode="w")

The calculations are really slow and don’t fully utilize the CPU. I need advice on how to speed it up as I believe it may be caused by wrong chunking. In order not to run out of memory, I rechunked my original zarr array to have chunks of size 1MB instead of 128MB. I also tried increasing the number of workers to 2 in my LocalCluster, while rechunking ef to chunk along the eigenfunctions. I saw a minor speedup, but it still takes over 20 hours to finish.

Is there something I’m doing wrong or that can be improved?

1 Like

Thanks for sharing this question. It’s a rather complicated one.

Would it be possible to construct a minimum reproducible example for this using synthetic data?

Otherwise, people just have to guess what might be wrong by looking at your code / screenshots. This is hard to do.

Today I managed to fix it. There were several issues that I had, so I’ll just list them and how I solved them:

  • My data was in int16 format with 2 dimensions for real and imag parts of the complex signal. I did that to save storage space, as 2 * int16 is 4 times less than complex128, but it seems that Dask really struggled with the manual complex multiplication. Furthermore, it didn’t save any space in actual calculations, as it was upcasted to complex128 anyway, which is the dtype of ef. This also led to a huge increase in required memory, which I didn’t factor in, when estimating chunk size.
  • Another issue was that I was using the default setting for the threaded LocalCluster. Because I was using interactive mode in VSCode, I couldn’t get multiprocessing to work. The default settings were to create one worker with 64 threads (on my machine), which wasn’t very efficient. However, when I manually specified 8 workers with 8 threads, Dask was reporting 80GB of used memory and kept pausing workers, while the Task Manager showed only ~16GB being used, as if all the memory assigned to each worker was actually the shared between them in some way?

I therefore rewrote my code as a script and run it with a process based cluster in the terminal and now the computations run in a stable manner and way faster than before. Of course, I still had to adjust the chunk size, but now I could do it in a predictable way and everything runs smoothly.

1 Like