Pangeo Batch workflows

This has come up a few things, most recently on the pangeo ML working group and cloud devops calls.

Motivation

Pangeo users would like a way to submit long-running tasks that aren’t necessarily tied to
the lifetime of a jupyterlab session. Examples include

  • Training a machine learning model
  • Performing a large ETL job

Demo

The introduction of Dask-Gateway does get us partway there. It (optionally)
decouples the lifetime of the Dask Cluster from the lifetime of the notebook
kernel. So we can piece together a batch-style workflow with some effort.

My prototype is at https://github.com/TomAugspurger/pangeo-batch-demo. The
summary:

  1. Connect to the Gateway from outside the hub.
    auth = JupyterHubAuth(os.environ["PANGEO_TOKEN"])
    # Proxy address will be made easier to find.
    gateway = Gateway(
        address="https://staging.us-central1-b.gcp.pangeo.io/services/dask-gateway/",
        proxy_address="tls://104.197.142.28:8786",
        auth=auth
    )

This requires

  1. Start a Cluster that stays alive
    cluster = gateway.new_cluster(shutdown_on_close=False)

This is the batch-style workflow, where the client (a Python process running on my laptop) can go away but the computation continues.

  1. Submit your job
    fut = client.submit(main)
    fire_and_forget(fut)

We’re building of dask here, so our job submission is a Python function that we submit with the Dask client. We also call fire_and_forget on the result to ensure that the computation completes, even if all the clients go away.

In my example, the job does a grid search using joblib’s Dask backend. The
training happens on the cluster.

Issues

There are plenty of issues with this approach.

  1. Users don’t have great visibility into the status of jobs, what’s been submitted.
    The Dask Dashboard is available, but that’s about it.
  2. Clusters aren’t shut down. The shutdown_on_close argument lets us disconnect
    the client, but there’s no easy way to say “shut this down when my main job
    completes”. You’d need to manually reconnect to the gateway and shut things down,
    or wait for pangeo to shut it down for you after 60 minutes of inactivity.
  3. The main job is running on a Dask worker, which can cause strange things
    (e.g. trying to close the cluster from within a worker is… strange).

Issues 2 and 3 point to the need for something to manage the computation other
than the cluster itself. I think the possible options are either JupyterHub or
perhaps Dask Gateway. Even once that’s implemented, we’d still want some basic
reporting on what jobs users have submitted and their output.

Hopefully this helps guide future development a bit. If anyone has additional
use cases it’d be great to collect them here. If even this hacky setup is helpful
then let me know and I can make things a bit easier to use (like getting a stable
IP for the proxy address).

3 Likes

Tom I just saw this post now and wish I had looked at it before opening

Thanks for sharing your thoughts.

Wrote up this blog post inspired by this and other content. Talks about a small tool that can solve a very specific problem that we can build on. https://words.yuvi.in/post/kbatch/ Would love comments :slight_smile: If nobody tries to implement it, I’ll give it a shot.

2 Likes

Awesome!

On Friday I was experimenting with adding MLFlow as a JuptyerHub service. I think it and a ksubmit-type thing would work well together. MLFlow lets you define the command that is run as part of a project, so you do something like

mlflow run myproject

You can control what command is actually run. We’d want something like ksubmit (I had been experimenting with going through JupyterHub’s REST API & named servers).

So while all the extras you list would be great to have, they might not be needed if you’re willing to buy into MLFlow’s project structure, and some of their runtime things, like logging artifacts.

1 Like

oooh, that’s quite nice, @TomAugspurger. This makes me realize that we might need to pass env vars and what not from current running notebook to ksubmit as well. This should be pretty doable.

1 Like

I think this is an incredibly promising direction for us to be going in. Thank you both for your efforts on this.

I’ve added MLFlow as a JupyterHub service to the GCP hub (staging only for now). https://github.com/pangeo-data/pangeo-cloud-federation/pull/815.

The short version, is that on staging something like this works

$ pip install --user mlflow
$ git clone https://github.com/mlflow/mlflow-example.git
$ MLFLOW_TRACKING_URI=http://mlflow ~/.local/bin/mlflow run --no-conda mlflow-example/ -P alpha=5.0

and the results can be viewed at https://staging.us-central1-b.gcp.pangeo.io/services/mlflow/#/

This setup offers

  1. A UI for visualizing MLFlow “experiments” at $JUPYTERHUB_URL/services/mlflow/. e.g. https://staging.us-central1-b.gcp.pangeo.io/services/mlflow/#/.
  2. A CLI for batch jobs, through MLFlow projects: https://mlflow.org/docs/latest/projects.html
  3. (eventually) An model / artifact repository.

Right now, I believe the “batch job” is tied to the lifetime of a singleuser session, so it’s not much of a batch system by default (I think mlflow run ... runs as a subprocess? Need to better understand this). But I think we could combine it with something like ksubmit so that mlflow run my-project could actually execute in a pod that’s independent of the singleuser session. And then we can start thinking about submitting these from outside of the cluster, but one step at a time.

There are some downsides:

  1. This requires some buy-in to MLFlow. For example, using their project structure, and potentially calling mlfow functions at runtime (to log a parameter say). Yet another tool to learn.
  2. I haven’t looked much, but AFAICT, there’s no segmentation of users in the MLFLow UI. I believe RBAC is a paid feature of databrick’s managed MLFlow. Combined with everyone being named jovyan, this is may make finding your run challenging. We can maybe have a convention of including your JUPYTERHUB_USERNAME in the logs / experiment runs.

Some remaining tasks

  1. Install MLFlow into the GCP image (though it has quite a few dependencies)
  2. Set the relevant environment variables (MLFLOW_TRACKING_URI=http://mlflow/)
  3. Add docs to pangeo.io/cloud
  4. See if you can access the MLFlow REST API from outside the hub. Right now, there’s a conflict with the UI
  5. Set up storage of artifacts to GCS.
  6. Set up persistent storage for the MLFLow service (sqlite database on a PVC)
1 Like

Made a bit more progress on this, this time incorporating papermill. There’s an example at https://github.com/pangeo-data/pangeo/pull/800.

All of this is still premised on executing these from within the singleuser pod. @yuvipanda I think now would be a great time to experiment with a ksubmit if you have some free time. In my dream scenario, we’d be able to execute these mlflow run commands inside a Kubernetes Job (independent of the lifetime of the singleuser pod) that is managed by JupyterHub.

1 Like

Hi,

I came across this thread and thought there is a lot in common with Ploomber, an open-source project I’m working on and presented at JupyterCon last month. Ploomber has been in development for over a year and has a stable API.

The motivation for Ploomber is to the ease development of multi-stage workflows, where each stage can be a (Python/R) notebook, (Python/R/SQL) script, or Python function. Users describe a workflow using YAML (or Python), specifying what to run and where to save the output, Ploomber then infers execution order based on task dependencies declared in the source code itself. Here’s an example.

I’m currently working on adding support for running workflows in Kubernetes. The idea is to keep the API the same so users think in terms of notebooks/scripts/functions and not in terms of pods/images/volumes. I have a working prototype that exports Ploomber workflows to Argo, which allows one-off execution, but also supports scheduling and monitoring (using CLI or GUI).

This combination creates a nice dev/production parity where users can edit their workflows using Jupyter notebooks, test them on their own Jupyter session and submit them to the cluster once they’re ready. Since Argo manages execution, workflows are not tied to a user’s session lifetime.

I’d be very happy to collaborate with you to arrive at a solution that helps your use case.

1 Like

I just read this blog post, and the system they describe seems really cool!