Parallel processing when using numpy

Just wondering what is the best library to use to parallelize Python code which is heavily relying on numpy functionality. I tried to use dask, and according to the dask documentation, had to use ‘threads’ scheduler for any computations which operate on numpy objects rather than Python type objects. Using ‘threads’ scheduler proved to be much slower on my laptop with 4 CPUs than sequential processing (30 seconds vs. 4minutes). I did compile some of the functionality with numba which showed significant improvement for some but not other functionality. So just wondering what approaches people take to parallelize numpy functionality. Many thanks!

1 Like

Dask is pretty commonly used here. You might want to check out https://docs.dask.org/en/stable/array-best-practices.htm and Best Practices — Dask documentation, especially Best Practices — Dask documentation. What computation are you trying to parallelize? The fact that it’s slower with the threaded scheduler suggests that your computation is holding the GIL.

1 Like

Thank you for the suggestions @TomAugspurger ! Since we are processing spacial data one point at a time, I attempted to use dask with “threads” as scheduler to parallelize processing of 100 points at a time on 4CPUs. We begin with Zarr store of the data and xarray types, but after some benchmarking I found out that using xarray was much slower than using the same functionality with numpy. So we converted all data to numpy objects before any computations, so there are no problems with chunking of the data.

I am not holding the GIL explicitly, just calling numpy functions. Each task is processing its own data[:, i, j] time series which is passed to the task. Would reducing number of tasks from 100 to less to run in parallel with dask make it more efficient?
I did try setting env. variables before starting my Python code:

export OMP_NUM_THREADS=1
export MKL_NUM_THREADS=1
export OPENBLAS_NUM_THREADS=1

but to no effect.

@mliukis so if I understand correctly - you have something akin to 3D data with dimensions (time, x, y), and you want to process each separate timeseries for a given (x, y) combination?

This is probably a good candidate for something like xarray’s apply_ufunc() machinery, which lets you broadcast a particular function across your data in this way. See the documentation there for motivating examples that might match your use case. If this works, then in many cases you can use a tool like numba to JIT-compile the function you have that runs on a single timeseries. A lot of times, for moderate-complexity functions, this should be more than performant enough.

If it’s not, your best bet might actually be to consider vectorizing your function. Is there a way you can write it so that you operate on multiple timeseries simultaneously? Sometimes re-writing your timeseries code as a marching algorithm which iterates over your data timestep-by-timestep will allow you to process vectors of data simultaneously, and ultimately get a huge speed-up.

The final thing you can consider is just dropping to joblib or a similar tool to process each timeseries separately. Forget that they’re packaged as an array - just consider it a list of separate timeseries that you pluck out one by one. Then the standard joblib idiom applies:

my_data_as_arr = ... # data; shape = (n_timesteps, n_x, n_y)
nt, nx, ny = my_data_as_arr.shape

def my_timeseries_func(arr: Sequence[float]) -> float:
    """Consumes a timeseries of data and outputs a scalar result."""
    pass

from joblib import Parallel, delayed

result = Parallel(n_jobs=n_cpu)(
    delayed(my_timeseries_func)(my_data_as_arr[:, *idx) 
    for idx in np.ndindex((nx, ny))
)

I don’t think the unpacking of *idx in the generator expression will actually work but it should be easy enough to write a helper function for that.

@darothen Thank you for the suggestions!
Yes, this is exactly what I am trying to do - I have embarrassingly parallel code processing each time series separately.

I will give all your suggestions a try.

Regarding joblib vs. dask, in the past I had an experience that the code I parallelized with dask and had very slow performance would have exact same slow runtime when I would parallelize it with joblib. Must be the GIL problem in underlying Python code. I will give it a try in this particular case though just to make sure.

In any case just getting rid or xarray datatypes and using numba significantly improved the performance, but we could definitely benefit from even better runtime :slight_smile:

It’s hard to confirm without an MWE but if that’s the case, then you might just be running into excessive I/O with your original code. To apply this workflow you’d want to make sure that any given timeseries is contained in one single, contiguous chunk of your original data. That’s what’s almost certainly happening when you eagerly process the data with numpy. Your performance constraints could really just be an issue with massively duplicating your I/O (reading your data) because of a mis-match in chunking with your analysis strategy.

There is no I/O involved as I am reading the chunk of data into memory beforehand and then process each (:, x, y) series separately with numpy.
So that’s why I was very surprised that making that code parallel had such a huge performance hit

@darothen @TomAugspurger Doing some more investigation, I think we ran into exact situation that numpy documentation NumPy packages & accelerated linear algebra libraries warns about:

Both MKL and OpenBLAS will use multi-threading for function calls like np.dot , with the number of threads being determined by both a build-time option and an environment variable. Often all CPU cores will be used. This is sometimes unexpected for users; NumPy itself doesn’t auto-parallelize any function calls. It typically yields better performance, but can also be harmful - for example when using another level of parallelization with Dask, scikit-learn or multiprocessing.

We use conda distribution of numpy, and both defaults and conda-forge channels distributions of numpy provide MKL and OpenBLAS respectively, and thus we are getting a performance hit when trying to parallelize with dask or multiprocessing.

1 Like

That’s a good catch. I think that the issue referred to here really depends on what you’re doing to each timeseries that you’re processing. If it’s something that’s very linear algebra heavy then I can definitely see how this would crop up. If your analysis is amenable to it, you could try compiling the timeseries processing function with numba (assuming it isn’t heavily leaning on linear algebra) as an alternative.

Our computation is very linear algebra heavy. I am already compiling some of the code blocks with numba and it definitely helped.

Hi here,

Just wanted to underline, as @darothen said:

There was this post from Matthew Rocklin we often referred to:
https://matthewrocklin.com/blog/work/2018/02/28/minimal-bug-reports

Here, we only make wild guesses about a problem, but we nearly don’t know anything about it. I’m positive we can build examples of using Dask (or Xarray with Dask) to parallelize Numpy code by time series that shows a 4 times improvement using 4 cores.

There must be something really wrong with how things were done using Dask if it runs 8 times slower with 4 CPUs. But we need more inputs to help.

Some other wild guesses/questions:

  • I don’t think it is related with MKL/OpenBLAS. It this was really the case, you would see no improvement but probably not such a slow down.
  • Did you tried using a multi processing scheduler, or even a LocalCluster with processes? Did you look at a Dask Dashboard to catch what was going wrong?
  • Can you say more about your data dimension, chunking (if so ?), number of task you launch (100 apparently)? How much time takes one task in sequential?
1 Like

@geynard Thank you for your input. I will come up with MWE with numpy functionality we use to reproduce the problem I am observing.

1 Like

Hi,
If your array shape is indeed (nt, ny, nx) then it’s clearly not the best arrangement for a parallelization per grid point, viz each process/threads handles one full time series. A very simple fix is

flip_data = np.transpose(data, [1, 2, 0])

to have a shape (ny, nx, nt). Now time series are contiguous in memory. Each process/thread will pull a chunk of data that are contiguous in memory, meaning the processes won’t compete for the same memory locations.

Hope it helps,

Guillaume

1 Like

@groullet Thank you for the tip!
Though I am passing each of the time series data[:, j, I] directly to each of the threads instead of passing the whole data and then accessing each of the time series from within each of the threads. I thought that would be a better practice that pass the whole data to the thread.

Also when you say

do you mean to still use multithreaded numpy in the Dask parallelization?

To answer some of your previous questions:

Didn’t look at the Dask Dashboard, but I did use"processes" as Dask scheduler, which brings processing to its knees. It takes 4minutes to process only 10 spacial points in parallel vs. 1 minute to process all 10000 points with “threads” scheduler. I learned from Dask best practices: Processes and Threads which recommends to use “threads” for all NumPy/SciPy/etc. calculations and “processes” when working with Python collections like lists and dicts

I am loading [:,100,100] chunk of data into memory from Zarr store stored in the S3 bucket. So there is no I/O or chunking involved during processing. I’m spawning 100 Dask tasks at a time to process all 100x100=10000 spacial points of the chunk.

Another pieces of information:

  • it takes a bit faster (71 seconds) to process all the points sequentially using multi-threaded NumPy vs. 99 seconds to process the same points using dask with “threads” scheduler.
  • it takes double the time if I use multithreaded NumPy with Dask (143 seconds) vs. single threaded NumPy with Dask (71 seconds) to process the same 10K of spacial points

OK, I better come up with MWE to demonstrate the problem

The transpose idea is a good one but there is subtlety I slip over. When you

flip_data = np.transpose(data, [1, 2, 0])

numpy doesn’t duplicate the content of the memory contrary to what one can think. Numpy simply creates a new mapping between the index space and the memory location. The memory layout is unchanged. flip_data is not the owner of the memory as you can see with

flip_data.flags

vs

data.flags

To really change the layout of the data in memory you need to allocate first, then copy

flip_data = np.zeros((ny, nx, nt))
flip_data.flat = np.transpose(data, [1, 2, 0])

now you should have the speed-up.

Guillaume

@groullet Thank you so much for the suggestion! Indeed I see a speed up in runtime when changing dimension order for the data to process. However I had to introduce extra transpose's in calculations which seem to add more processing time. So I end up with the same runtime.

Hi Masha,

If the transpose doesn’t bring any speedup, you may try to add .copy() to the array you pass to Parallel, namely

result = Parallel(n_jobs=n_cpu)(
    delayed(my_timeseries_func)(my_data_as_arr[:, *idx].copy()
    for idx in np.ndindex((nx, ny))
)

the idea is to send the threads data that are both contiguous in memory and completely independent each other so that the threads do not compete when accessing the memory.

Also, watch out my_timeseries_func. If it uses arrays in addition to the data you send then the threads will compete and you will hurt the parallelization.

Lastly, don’t use too many threads. A good number is as many as the number of cpu per node.

Don’t give up, you should/will have the speedup. Cheers

Guillaume

1 Like

Thanks for your suggestions in this thread @groullet. Just wanted to link to a short geoscience-focused blog post on the topic of numpy memory layout that i found helpful in the past

1 Like