Running Monte Carlo sampling module in parallel using MPI and Dask in Pangeo

Hi everyone,

I am a glaciologist and new in python and pangeo, so I do apologise in advance if my question appears completely incompetent. I also hope the “Cloud” forum is the right place to share this issue with you.

My work focuses on modelling evolution of temperature within an ice column based on the energy balance equation and a number of changing boundary conditions, such as surface temperatures, geothermal heat flux, etc. I have a working code that successfully runs forward simulations and next step for me is running inverse model, which I successfully implemented in Jupyter on my laptop using Markov Chain Monte-Carlo sampler/module called emcee. However, it takes too long to run the MCMC sampler, even in parallel, using multiprocessing tool on my laptop (following this guide: https://emcee.readthedocs.io/en/stable/tutorials/parallel/), so I would like to use cloud to distribute the task to more cores.

The issue arises when I try to implement the code the same way in the pangeo environment on us-central1-b.gcp.pangeo.io. It takes approximately the same time it does when I run it on my laptop. My suspicion is that it does not really send the task to the cloud at all, although I tried to follow the tutorials for parallelization of emcee module (which requires MPI) and, in turn, using MPI with dask (https://mpi.dask.org/en/latest/index.html).

I simplified everything to just run an example from emcee tutorial (first link I provided) using both dask and mpi, and it still does not seem to work. Prior to attempting this, I also installed all the required packages into my pangeo environment in terminal in jupyter, such as emcee, dask-mpi, etc.

Perhaps, my specific question is whether there is something obvious that I missed while trying to run emcee module in parallel in the cloud, but more broadly, maybe somebody could suggest an efficient way of parallelising MCMC sampling using cloud? Sorry in advance if the way the question is formulated is too convoluted. Cheers, Sasha.

PS I attach the simplified code that attempts to implement both emcee and MPI in dask, following the MPI section of the tutorial https://emcee.readthedocs.io/en/stable/tutorials/parallel/ and also the Dask-MPI with Batch Jobs of https://mpi.dask.org/en/latest/batch.html below.

with open("script.py", "w") as f:
    f.write("""
import sys
import time
import emcee
import numpy as np
from schwimmbad import MPIPool

from dask_gateway import Gateway
from dask.distributed import Client

gateway = Gateway()
cluster = gateway.new_cluster()
cluster.adapt(minimum=5, maximum=50)
client = Client(cluster)


def log_prob(theta):
    t = time.time() + np.random.uniform(0.005, 0.008)
    while True:
        if time.time() >= t:
            break
    return -0.5*np.sum(theta**2)

with MPIPool() as pool:
    if not pool.is_master():
        pool.wait()
        sys.exit(0)
        
    np.random.seed(42)
    initial = np.random.randn(32, 5)
    nwalkers, ndim = initial.shape
    nsteps = 100

    sampler = emcee.EnsembleSampler(nwalkers, ndim, log_prob, pool=pool)
    start = time.time()
    sampler.run_mcmc(initial, nsteps)
    end = time.time()
    print(end - start)
""")

mpi_time = !mpiexec -n 8 python script.py
mpi_time
1 Like

Hi @sashamontelli, welcome here!

The key of your problem is that you cannot use MPI within Dask, it’s the other way around : Dask can leverage MPI to form its own cluster. MPI is not available in a Pangeo cloud platform, it is usually only installed on HPC clusters with specific hardware.

So you won’t be able to distribute emcee in a Pangeo cloud platform using MPI. In the cloud, Pangeo scalability is entirely based on Dask clusters launched through Kubernetes.

What you need is ask emcee folks if they could provide parallelization through Dask. Or help providing it. This shouldn’t be so hard emcee can be distributed through multiprocessing pools and MpI. Refering to emcee doc:

In general, a pool is any Python object with a map method that can be used to apply a function to a list of numpy arrays.

Maybe just passing a Dask client to the pool keyword of EnsembleSampler is enough (but I doubt it, this would be too simple :smile:), as client has a map method.

2 Likes