Pangeo Batch workflows

This has come up a few things, most recently on the pangeo ML working group and cloud devops calls.

Motivation

Pangeo users would like a way to submit long-running tasks that aren’t necessarily tied to
the lifetime of a jupyterlab session. Examples include

  • Training a machine learning model
  • Performing a large ETL job

Demo

The introduction of Dask-Gateway does get us partway there. It (optionally)
decouples the lifetime of the Dask Cluster from the lifetime of the notebook
kernel. So we can piece together a batch-style workflow with some effort.

My prototype is at https://github.com/TomAugspurger/pangeo-batch-demo. The
summary:

  1. Connect to the Gateway from outside the hub.
    auth = JupyterHubAuth(os.environ["PANGEO_TOKEN"])
    # Proxy address will be made easier to find.
    gateway = Gateway(
        address="https://staging.us-central1-b.gcp.pangeo.io/services/dask-gateway/",
        proxy_address="tls://104.197.142.28:8786",
        auth=auth
    )

This requires

  1. Start a Cluster that stays alive
    cluster = gateway.new_cluster(shutdown_on_close=False)

This is the batch-style workflow, where the client (a Python process running on my laptop) can go away but the computation continues.

  1. Submit your job
    fut = client.submit(main)
    fire_and_forget(fut)

We’re building of dask here, so our job submission is a Python function that we submit with the Dask client. We also call fire_and_forget on the result to ensure that the computation completes, even if all the clients go away.

In my example, the job does a grid search using joblib’s Dask backend. The
training happens on the cluster.

Issues

There are plenty of issues with this approach.

  1. Users don’t have great visibility into the status of jobs, what’s been submitted.
    The Dask Dashboard is available, but that’s about it.
  2. Clusters aren’t shut down. The shutdown_on_close argument lets us disconnect
    the client, but there’s no easy way to say “shut this down when my main job
    completes”. You’d need to manually reconnect to the gateway and shut things down,
    or wait for pangeo to shut it down for you after 60 minutes of inactivity.
  3. The main job is running on a Dask worker, which can cause strange things
    (e.g. trying to close the cluster from within a worker is… strange).

Issues 2 and 3 point to the need for something to manage the computation other
than the cluster itself. I think the possible options are either JupyterHub or
perhaps Dask Gateway. Even once that’s implemented, we’d still want some basic
reporting on what jobs users have submitted and their output.

Hopefully this helps guide future development a bit. If anyone has additional
use cases it’d be great to collect them here. If even this hacky setup is helpful
then let me know and I can make things a bit easier to use (like getting a stable
IP for the proxy address).

4 Likes

Tom I just saw this post now and wish I had looked at it before opening

Thanks for sharing your thoughts.

Wrote up this blog post inspired by this and other content. Talks about a small tool that can solve a very specific problem that we can build on. https://words.yuvi.in/post/kbatch/ Would love comments :slight_smile: If nobody tries to implement it, I’ll give it a shot.

2 Likes

Awesome!

On Friday I was experimenting with adding MLFlow as a JuptyerHub service. I think it and a ksubmit-type thing would work well together. MLFlow lets you define the command that is run as part of a project, so you do something like

mlflow run myproject

You can control what command is actually run. We’d want something like ksubmit (I had been experimenting with going through JupyterHub’s REST API & named servers).

So while all the extras you list would be great to have, they might not be needed if you’re willing to buy into MLFlow’s project structure, and some of their runtime things, like logging artifacts.

1 Like

oooh, that’s quite nice, @TomAugspurger. This makes me realize that we might need to pass env vars and what not from current running notebook to ksubmit as well. This should be pretty doable.

1 Like

I think this is an incredibly promising direction for us to be going in. Thank you both for your efforts on this.

I’ve added MLFlow as a JupyterHub service to the GCP hub (staging only for now). https://github.com/pangeo-data/pangeo-cloud-federation/pull/815.

The short version, is that on staging something like this works

$ pip install --user mlflow
$ git clone https://github.com/mlflow/mlflow-example.git
$ MLFLOW_TRACKING_URI=http://mlflow ~/.local/bin/mlflow run --no-conda mlflow-example/ -P alpha=5.0

and the results can be viewed at https://staging.us-central1-b.gcp.pangeo.io/services/mlflow/#/

This setup offers

  1. A UI for visualizing MLFlow “experiments” at $JUPYTERHUB_URL/services/mlflow/. e.g. https://staging.us-central1-b.gcp.pangeo.io/services/mlflow/#/.
  2. A CLI for batch jobs, through MLFlow projects: https://mlflow.org/docs/latest/projects.html
  3. (eventually) An model / artifact repository.

Right now, I believe the “batch job” is tied to the lifetime of a singleuser session, so it’s not much of a batch system by default (I think mlflow run ... runs as a subprocess? Need to better understand this). But I think we could combine it with something like ksubmit so that mlflow run my-project could actually execute in a pod that’s independent of the singleuser session. And then we can start thinking about submitting these from outside of the cluster, but one step at a time.

There are some downsides:

  1. This requires some buy-in to MLFlow. For example, using their project structure, and potentially calling mlfow functions at runtime (to log a parameter say). Yet another tool to learn.
  2. I haven’t looked much, but AFAICT, there’s no segmentation of users in the MLFLow UI. I believe RBAC is a paid feature of databrick’s managed MLFlow. Combined with everyone being named jovyan, this is may make finding your run challenging. We can maybe have a convention of including your JUPYTERHUB_USERNAME in the logs / experiment runs.

Some remaining tasks

  1. Install MLFlow into the GCP image (though it has quite a few dependencies)
  2. Set the relevant environment variables (MLFLOW_TRACKING_URI=http://mlflow/)
  3. Add docs to pangeo.io/cloud
  4. See if you can access the MLFlow REST API from outside the hub. Right now, there’s a conflict with the UI
  5. Set up storage of artifacts to GCS.
  6. Set up persistent storage for the MLFLow service (sqlite database on a PVC)
1 Like

Made a bit more progress on this, this time incorporating papermill. There’s an example at https://github.com/pangeo-data/pangeo/pull/800.

All of this is still premised on executing these from within the singleuser pod. @yuvipanda I think now would be a great time to experiment with a ksubmit if you have some free time. In my dream scenario, we’d be able to execute these mlflow run commands inside a Kubernetes Job (independent of the lifetime of the singleuser pod) that is managed by JupyterHub.

1 Like

Hi,

I came across this thread and thought there is a lot in common with Ploomber, an open-source project I’m working on and presented at JupyterCon last month. Ploomber has been in development for over a year and has a stable API.

The motivation for Ploomber is to the ease development of multi-stage workflows, where each stage can be a (Python/R) notebook, (Python/R/SQL) script, or Python function. Users describe a workflow using YAML (or Python), specifying what to run and where to save the output, Ploomber then infers execution order based on task dependencies declared in the source code itself. Here’s an example.

I’m currently working on adding support for running workflows in Kubernetes. The idea is to keep the API the same so users think in terms of notebooks/scripts/functions and not in terms of pods/images/volumes. I have a working prototype that exports Ploomber workflows to Argo, which allows one-off execution, but also supports scheduling and monitoring (using CLI or GUI).

This combination creates a nice dev/production parity where users can edit their workflows using Jupyter notebooks, test them on their own Jupyter session and submit them to the cluster once they’re ready. Since Argo manages execution, workflows are not tied to a user’s session lifetime.

I’d be very happy to collaborate with you to arrive at a solution that helps your use case.

1 Like

I just read this blog post, and the system they describe seems really cool!

I’m taking another crack at this in GitHub - TomAugspurger/kbatch. Copying some design goals:

  • Simplicity of implementation : kbatch: sbatch, but for kubernetes - Yuvi Panda by Yuvi Panda captures this well.
  • Simplicity of use : Ideally users don’t need to adapt their script / notebook / unit of work to the job system.
  • Integration with JupyterHub : Runs as a JupyterHub services, uses JupyterHub for auth, etc.
  • Runs on Kubernetes : mainly for the simplicity of implementation, and also that’s my primary use-case.
  • Users do not have access to the Kubernetes API : partly because if users need to know about Kubernetes then we’ve failed, and partly for security.

I’m at the point where I’d appreciate some feedback on the kinds of workflows people have, especially ones that feel awkward on the (interactive-focused) JupyterHub. Right now, my hypothetical use cases are

  • Run some script that consumes a (large) input data from blob storage, transforms it, and writes the derived dataset to blob storage (e.g. batch inference from an ML model, generating a cloud-optimized datset, etc.).
  • Training some ML model
  • Executing a notebook (perhaps with small outputs like a figure or summary statistics, or perhaps one that writes large results to blob storage) that takes a long time to run

If people have additional workflows, thoughts on the design goals, or would like to work on this then please do share.

2 Likes

THIS IS AWESOME!!! I’m super excited for this, @TomAugspurger! I’ll check it out this week.

Can you talk a bit more about the tradeoffs in having the server vs just talking to the k8s API directly? When I wrote up the blog post (am so glad you loved it!), I was thinking it’ll just talk to the k8s API for simplicity. It’ll also mean you can submit anywhere you have k8s API access, without any direct interface with JupyterHub.

Anyway, I’m super excited about this!

2 Likes

I think the main (only?) issue is around granting users access to the Kubernetes API (the README also mentions kubernetes complexity, but I think that can be mostly hidden behind a nice CLI / API). I’m not at all familiar with how the Kubernetes API works, so I don’t have a good sense of how identity / auth would work, and what the security implications are of granting users access to the Kubernetes API.

The tradeoff here is the complexity of developing, maintaining, and running yet another service.

I’d be really curious to hear if you have thoughts on this, since I’m guessing you have thought about it much more than I have :slight_smile:

One perhaps important point, with a bit of effort I think that kbatch/backend.py at main · TomAugspurger/kbatch · GitHub could be made agnostic to the server (I happen to be using django-rest-framework) and usable by anyone with access to the Kubernetes API. So we could perhaps satisfy both use-cases?

The big advantages of not having a separate service is that we now don’t have to store state about the jobs (running, stopped, etc), forward logs when needed, no need to deal with server restarts (and k8s objects changing state during restart), HA for the server, etc. The tricky part would be figuring out auth / authz, but I think if we can do that the benefits are well worth it.

Now the question is how to do RBAC, and that gets tricky. The ‘right’ way to do this in kubernetes is to give each user their own namespace, and I think that’s actually quite doable in a wide variety of use cases. For cases where that is not possible, we could use conventions where we have usernames in labels, or use a server that does additional levels of auth before passing through a subset of requests straight through to the k8s API. IMO, longer term this will be simpler than us maintaining another server - which really is just another layer of indirection here. So either you kinda trust each other and live in the same namespace with soft restrictions, or don’t and use proper k8s level isolation…

Thanks, all of those upsides sound very appealing :slight_smile:

One question around job handling, Jobs | Kubernetes mentions that

Finished Jobs are usually no longer needed in the system. Keeping them around in the system will put pressure on the API server.

Do you have thoughts on how a service-less system would handle retaining jobs (and logs) longer-term? My current plans with GitHub - TomAugspurger/kbatch are to add a kubernetes hook that uploads the logs of the completed Job pod to the kbatch-server service and to update the status with the outcome. Perhaps losing access to the logs / status long-term isn’t the worst thing.

I’ll prioritize docs and dev setup next so that it’s easy for others to try out.

I think this is ok. If not, we can provide hooks to any of the log aggregation services that exist around kubernetes for this. I don’t think we should try and solve this problem ourselves, but just provide pass throughts to solutions people can deploy.

Tom - another possibly is lots of jobs that don’t take that long - e.g. doing stuff to a ‘tile’ etc. - but have way too many to complete in the time the Jupyter Hub is up for - even with lots of machines.

Well, all that sounds very reasonable! I think it’s at least worth exploring a bit. I’ve been wary to grant users direct access to the Kubernetes API just because doing so securely sounds so challenging. But it sounds like perhaps it’s feasible, at least if users are granted permission in just their own namespace?

I’ll look into Authentication now. I think we’d either like to rely on JupyterHub API tokens, or perhaps hook directly into whatever auth system the Hub is using.

I’ve been thinking of individual jobs (which map directly to Kubernetes Jobs) as something that would take longer than that. There’s still overhead some overhead in starting a job. So I would expect the user-submitted code to handle parallelizing some operation across many tiles.

I opened Investigate using the Kubernetes API directly · Issue #1 · TomAugspurger/kbatch · GitHub up to talk about the details of using the Kubernetes API directly. Given that this Discourse is focused on geoscience, debating the finder points of Kubernete’s webhook token authentication might be of less interest to most people here :slight_smile:

For those who are interested in following that, happy to hear your thoughts on that issue. I’ll make sure to update here when appropraite.

So 12 hour+ type things as jobs?