Numba guvectorize and distributed

Hi,

I’m trying to compute a @guvectorized function over a cluster created by dask distributed, particularly dask-cloudprovider.

I’ve seen there are some open issues regarding this problem, among possible others

numba/numba/issues#4314
xgcm/fastjmd95/issues#1
dask/distributed/issues#3450

which don’t paint an optimistic picture that there’s a working solution or fix soon.

I was just wondering if anyone here has some more experiences with this, or a possible workaround that hasn’t been documented in the issues.

Thanks!

Val

1 Like

Hi Val,

I don’t believe that anyone is investigating this. IIRC, we loosely determined that we’d need changes to NumPy’s ufunc machinery to enable this.

Just to make sure, did you see the comment that using array.map_blocks(gufunc), rather than gufunc(array) should work?

Hi Tom,

thanks for your answer. I already got this sense reading the issues.

Ideally, I’d like to use xarray.apply_ufunc, which works just fine on the local cluster.

Actually, on the local cluster, all of the “fixes” work - just in the production setup on Fargate, it fails and I can’t figure out a way around it.

To boil things down, I’ve tried this:

import dask.array as da
from dask_cloudprovider import FargateCluster
from distributed import LocalCluster, Client
from numba import guvectorize
...

@guvectorize("(uint8[:], uint8[:])", "(n) -> ()", nopython=True)
def gf_test(data, out):
    ...

def main():

 arr = da.random.randint(0, 2, (10,10,100), dtype="uint8")
 task = arr.map_blocks(gf_test)

 result = client.compute(task).result()  

 ...

if __name__ == "__main__":
 main()

Which works fine when I tested it using a local cluster like

cluster = LocalCluster(ip='0.0.0.0', n_workers=2)
client = Client(cluster)

but fails in production using a FargateCluster from dask-cloudprovider like

cluster = FargateCluster(...)
client = Client(cluster)

Same with external imports etc …

Edit:

Clearly the difference seems to be the cluster, but that goes way beyond my understanding of dask and/or distributed.

The error msg I get is

AttributeError: module ‘mp_main’ has no attribute ‘gf_test’

as compared to the one you’ve had in the issue which was

AttributeError: module ‘main’ has no attribute ‘test_numba’

maybe that helps?

Sorry for the delayed reply.

I suspect your issue can be solved by placing the definition of gf_test in a standalone module.

# file: mymodule.py

from numba import guvectorize

@guvectorize("(uint8[:], uint8[:])", "(n) -> ()", nopython=True)
def gf_test(data, out):
    pass

Then in your notebook / environment you’d import mymodule

arr = da.random.randint(0, 2, (10,10,100), dtype="uint8")
task = arr.map_blocks(mymodule.gf_test)

It’s important that your workers also have mymodule available. If you’re unable to include it in the image used on your workers, you can provide it with client.upload_file("mymodule.py"). There are some downsides (workers that crash and replace will not have the module. worker plugins might help here https://distributed.dask.org/en/latest/plugins.html#worker-plugins)

Hope that helps.

2 Likes

Thanks Tom! And also thanks for pushing the issue again in the respective issues on GitHub.

Your solution provided the fix I needed (which makes my life a ton easier) :partying_face:

I thought I’ve tried the “external module” fix before, but either I mixed some things up and I didn’t, or I missed a small detail, like the fact that the worker needs the module accessible as well. I think that was a crucial detail for me.

In the end, this worked for me:

  • put the guvectorize function in a separate module
  • include the module in both the supervisor and worker image
  • pass the function in form module.function to the client.

Actually, this way I can use xarray.apply_ufunc which is the optimal solution for me!

Thanks again :slight_smile:

1 Like