Pangeo Forge bakeries

Hiya amazing Pangeo folk, I’m working at UKCEH (UK Centre for Ecology and Hydrology, quite a mouthful…!) and a group of us are getting to grips with object/cloud storage and parallel cloud compute of big/huge datasets.

As part of this we’ve been experimening with getting some of our existing datasets onto AWS object storage and learning (oftentimes the hard way!) what does and doesn’t work with the tools and techniques and best practices etc. We have now got a sort-of workflow setup, but it is very disconnected and not straightforward. Ultimately we want to create a straightforward and easy-to-follow & adapt workflow and guide to help others in our organisation and further afield get on-board with this new way of working and these new tools etc.
I’ve been lurking on the edge of Pangeo for a while now, slowly reading up, on and around things, and today I started looking at Pangeo Forge in earnest and it seems like this does essentially what we want - create a straightforward recipe for converting an existing dataset to be ARCO and making it publically accessible. The one thing I’m a bit stuck on is how the compute resource works. I can see the conversion jobs can be submitted to bakeries, but I can only see one bakery on Pangeo-Forge Bakeries and it seems to have jobs on there that have been pending for months. Is this still operational, and are there other bakeries that could be used, and if not, is it possible/plausible to use some of our own compute resource? (After all, up to this point we have been doing everything locally!).

But that aside, Pangeo and it’s forge seem like fantastic resources and I’m excited to get more into it over the coming !!

1 Like

Hi @matbro and welcome!

We are in the midst of rethinking how to operate Pangeo Forge as a service. We may have bit off a bit more than we could chew in terms of operating a global-scale service that would allow anyone in the world to move and transform massive volumes of data anywhere on the internet. :laughing: We are in the process of scaling back our scope a bit to something that’s a bit more sustainable and achievable.

Our focus now is precisely on enabling folks to run Pangeo Forge effectively in their own cloud account. We just completed a big refactor of Pangeo Forge which will hopefully make this a lot easier. @cisaacstern has been leading this and can perhaps give you an update on the best way to engage with the project right now.

1 Like

@matbro, thanks so much for your engagement with Pangeo Forge, and for posting this question. As @rabernat mentioned, we are currently migrating to a model in which emphasizes users deploying recipes to self-hosted compute. Apologies that this transition is not yet clearly communicated on the website (I’ve just opened this issue to discuss fixing this).

In terms of how to execute Pangeo Forge recipes today, there are a few points I can highlight:

  1. We have recently finished rewriting pangeo-forge-recipes using Apache Beam (more details on that in this blog post). With this change, Pangeo Forge recipes have become “just” Apache Beam Pipelines. As such, they can be run wherever Beam can run. Certainly following those linked Beam docs to deploy recipes is an option.
  2. For a more opinionated and hopefully simplified deployment experience, we are now also offering pangeo-forge-runner, a CLI which is a thin wrapper on Beam’s deployment API. This tool streamlines the deployment process, with the caveat that the documentation there is appallingly incomplete. Currently my top priority is to improve this documentation, and thereby to make it considerably clearer to new users such as yourself how to get started.
  3. We are also developing a GitHub Action which wraps pangeo-forge-runner, for those that want to deploy recipes from GitHub Workflows. Documentation is also lagging here (addressing this is also on my to-do list). FWIW, here’s a working example of using the Action to deploy recipes to GCP Dataflow.

Hopefully that gives you some sense of the current deployment landscape, but it may not be exactly actionable. For the actionable side (that is, to help you actually get going in the right direction), it might be helpful to know a bit more about your use case.

What cloud provider(s) do you have access to and aspire to run recipes on? Currently pangeo-forge-runner deploys reliably to GCP Dataflow, and in early prototyping appears to do well with deploying to a Flink K8s cluster on AWS as well. Beam can run on other clouds, but those are the two use cases we’ve developed so far.

In addition these distributed frameworks, Beam’s single-machine multithreaded runner can also pretty performant. So another option, depending on the workload, is to simply start a large VM instance on a cloud provider and run multithreaded jobs there.

Thank you both for the detailed responses, really appreciate that, saves me having to figure it out on my own!

What attracted me most to pangeo forge was the remote cloud compute option, as that’s one thing we don’t really have access to or set up, but I absolutely get that that’s a big challenge to maintain! I still think there’s merit in us following the recipe paradigm at least, we very much need to simplify our workflow and it looks like pangeo forge would help us develop that. Running it might be another issue though.
Up to now we have been using JASMIN, which is a compute resource/storage space etc. for certain projects in the UK. JASMIN Site
It consists of traditional disk storage, some linux-based compute servers, AWS object storage and an HPC SLURM cluster. We’ve been using the cluster for most of our compute needs, using rechunker and dask/xarray to compute and then store the data on the object storage.

So I think the pertinent question for us is is it possibe to use Apache Beam on this cluster we have access to? Or do we need to start investigating actual cloud compute options? It’s not something I’ve encountered before, so I have zero knowledge on what is required for it to run!
We don’t have root/admin access to the cluster, which I’m guessing might be the determining factor in that question… We usually install all the python packages we need in a local anaconda environment. If it’s possible to do that with Apache we might have a solution, but my reading of the links you sent suggested it was a little more complex than that…

Dear @matbro,
Excited to hear about your interest in Pangeo-Forge. Over at LEAP I am working with @cisaacstern on setting up a more local bakery. Id be happy to share things we have learned.

For context we are using Google Dataflow to run our recipes and use a central repo to hold multiple recipes. Our users can suggest datasets using an issue template. This is very much a WIP and might change substantially in the future, but so far this has worked quite well for us.

Happy to chat more

PS: Also tagging @rsignell here, since they are working on setting up something similar but on AWS

Hi @jbusecke
Many thanks for sharing - I think what you’ve got there is more or less what we’re aiming for in the long run, so will be super helpful as a blueprint as we develop. Please do share what you learnt along the way - that’ll definitely help us get up and running more quickly!

I still think the major hurdle for us is going to be getting Google Dataflow or something equivalent sorted to actually run some recipies on. But doing some reading of the Apache Beam docs it looks like it might be possible to use the ‘DirectRunner’ to have it running on our local/JASMIN compute resource, similar to what @cisaacstern suggests:

This, combined with an approach similar to jbusecke and the ability to append to zarr datasets that I just learnt about, should allow us to run small-medium size dataset recipies (which we could append to make larger zarr datasets). At least until we work out how to get Google Dataflow or the like up and running.

To those with more experience: does that sound like a sensible approach?

1 Like

@matbro yes using the multithreaded runner on a powerful local resource is likely a good starting place. Is the source data you want to process already stored in JASMIN?

In terms of setting up a cloud deployment, does your office currently have accounts with any cloud provider(s)? If so, which one(s)?

Great - I’ll look into getting that set up as a starting point then. Some of the data we’d want to work with is on JASMIN on disk, some is only available locally to us, and some is available from an archive via http, so a good mix… I imagine the JASMIN data will be where we’ll start if we can get the direct runner working.

As for the cloud deployment, I’m asking around at the moment so I’ll get back to you on that…

1 Like

@matbro sounds great, please let us know of any issues you encounter.

We’re currently working to get our documentation caught up with the latest flurry of software development, and I’ll follow up here with formal docs links as they become available. In the meantime, the content of this PR might be the closest thing we have at the moment to complete documentation of the workflow you’re intending to explore: Add documentation on how to run recipes locally by yuvipanda · Pull Request #89 · pangeo-forge/pangeo-forge-runner · GitHub.

Again, please chime in with questions, as we are getting docs up to speed I’d fully expect any new users to run into lots of confusing bits. We really appreciate your interest and engagement!

1 Like

I will do for sure, I appreciate your help a lot too!

As an FYI I’m away on holiday for the next couple of weeks, so there won’t be any progress then xD

1 Like

Sounds good.

@matbro when you get back to this, note that over in Blosc decompression error on `StoreDatasetFragments` (local multithreaded runner) · Issue #560 · pangeo-forge/pangeo-forge-recipes · GitHub we are finding some limitations to the suitability of the Direct Runner for production workloads (indeed, Beam does not recommend this). So perhaps this approach is best suited for testing + development after all.

Let me know if I can help sort through options once you get back to this!

Just a note here that we did manage to get the pangeo-forge-runner working on AWS using the Flink approach that @yuvipanda created.

We are going to follow the same approach as @jbusecke with LEAP for the USGS-led HyTEST project.

In addition to pangeo-forge-runner on AWS, we have the local runner working on our on-prem HPC system.

1 Like

And if it’s relevant to anyone, I did get a bakery set up on Azure at https://github.com/TomAugspurger/pangeo-forge-cloud-federation/tree/azure/terraform/azure. I didn’t quite get to running a pangeo-forge recipe, but I at least got the Apache Flink quickstart running.

1 Like

I’m curious if anyone has tried Pangeo Forge with a Spark runner. Spark is probably 100x more widely used than Flink. On paper, Beam works well on Spark. Can you just run Pangeo Forge on AWS EMR?

1 Like

Spark was my first choice too, for the same reason. Unfortunately at that time I basically found 0 documentation on ‘how to actually run a beam job with v2 runner (for python support) on EMR’ - although that may have changed now? The few examples I found were using the pure-java version of Beam, rather than the one with the v2 runner. So I went with Flink instead.

I spoke to @cisaacstern a while ago about what it would take to get this running on EMR. I think someone more familiar with spark / EMR / beam could probably do that without a lot of work. It would involve adding a SparkBakery or EMRBakery to https://github.com/pangeo-forge/pangeo-forge-runner/tree/main/pangeo_forge_runner/bakery, and the runner architecture is defined decently enough for this to be not too big a deal - we already have 3 bakery implementations that people use (Dataflow on GCP, local for local / HPC, and Flink on AWS).

3 Likes

Hey y’all, good to read about all the updates going on, think I’m starting to get the lay of the land a bit better.

I think I’ll continue with my plan to use the direct runner for now and see how I get on. Once I’ve got a sortof workflow setup (semi-)locally and better established our potential access to cloud compute providers I can start investigating moving the workflow there.

I’ll post back here with updates and questions, though I am only working on this part time at the moment.

Hey y’all, time for an update.

I’ve successfully got a pipeline running on my local-ish HPC. It’s achingly slow, but at least it’s a start.

This is my current use case:
Gridded netcdf files with 100yrs of daily data across the UK, one per ensemble (‘RCM’) member (~10 in total).
That’s 36000(time) * 1000(x) * 1000(y) * 4(bytes) = 144GB per file (compressed down to 11GB, there’s a lot of zeros in these files…) * 10 (RCM) = 1.4TB total.

Some better chunking clearly needs to happen…
But first, question 1, ncdump -hs reports that the data is chunked in time (one timestep per chunk) on disk, but xarray doesn’t see this when I open the files with xr.open_mfdataset, and opens them as one chunk per file instead. What is going on here? I’m guessing the mapping of the operations on the large xarray/dask ‘chunk’ from/to the smaller ‘disk chunks’ happens at the disk IO level, out of site/mind?

That aside, I’ve been testing the following pipeline/recipe on just two of the files, rechunking in time in space:

target_chunks = {'RCM': 1, 'Time': 360,
                 'Northing': 100,
                 'Easting': 100}

RCM_concat_dim = ConcatDim("RCM", RCMs, nitems_per_file=1)

pattern = FilePattern(make_path, RCM_concat_dim)
pattern_pruned = pattern.prune()

transforms = (
    beam.Create(pattern_pruned.items())
    | OpenWithXarray(file_type=pattern_pruned.file_type)
    | StoreToZarr(
        target_root=td,
        store_name=tn,
        combine_dims=pattern.combine_dim_keys,
        target_chunks=target_chunks
    )
)

with beam.Pipeline() as p:
    p | transforms

and this takes several hours to run.

I can think of two possible reasons for this…
Firstly, I’m aware the target chunks are not ideal. In earlier tests we did with a different dataset this chunking was the ‘least bad’ option if we didn’t know what type of analysis the user was likely to do (mostly across time, mostly across space or both).
Some less intensive rechunking might be an idea to start with here though.

Secondly, I am using the beam DirectRunner as is, on a single process/thread, albeit one with lots of memory. I imagine this means no parallel processing is happening. The amount of memory SLURM is reporting the runner as using is also suspiciously low (only 1.4GB).

As I’m still very much learning how beam works (I’m much more familiar with dask) I’d value any quick insights here to avoid me having to run myriad tests!!
I’m slowly working through beam docs and walkthroughs: this is a particularly useful walkthrough I find: Google Colab

Hey @matbro. I just ran a pgf pipeline on a local machine and specifying the # of workers + multiprocessing seemed to help speed it up. Although it’s not all roses (How to use multiprocessing / multithreading with direct runner · Issue #618 · pangeo-forge/pangeo-forge-recipes · GitHub).

options = PipelineOptions.from_dictionary({
    'direct_num_workers': 8,
    'direct_running_mode': 'multi_processing',
})

with beam.Pipeline(options=options) as p:
    p | transforms

Also, it’s possible that the nitems_per_file in ConcatDim isn’t doing anything in post 1.0 pangeo-forge-recipes. Maybe deprecate `ConcatDim.nitems_per_file` · Issue #633 · pangeo-forge/pangeo-forge-recipes · GitHub

This is Xarray’s default behavior. It does not map 1:1 netcdf4/HDF5 chunks to dask chunks. But is is now possible to override this if you want! (See https://github.com/pydata/xarray/pull/7948).

Given the on-disk chunks, you have a rather ambitious rechunking included here in your recipe. This is likely the cause of the slowness.

Thanks both, really useful info that definitely gives me stuff to go on. Appreciate it, as always

1 Like