Hey y’all, I’m having a go at running a Pangeo-forge-recipes pipeline using the DaskRunner for Beam.
At my org we’re keen to use pangeo-forge-recipes as an easy way to convert datasets to zarr as a starting point for encouraging more science-analysis-of-big-datasets to be done via the cloud. Using the DaskRunner does make deploying recipes easier - as it’s relatively easy to deploy/get access to dask vs Flink or Spark, needed for the other Beam runners.
I’m aware the DaskRunner is relatively new so I’m not entirely sure how sensible this is to attempt and whether or not it’s worth persuing in the face of some of the problems I’m encountering. I’d love to get a status report on the DaskRunner work and opinions on what I’m attempting!!
The latest development appears to be Windowing Support for the Dask Runner by alxmrs · Pull Request #32941 · apache/beam · GitHub adding in “windowing support for the Dask Runner” with some idea of a next step in an older post: Add DaskBakery by cisaacstern · Pull Request #109 · pangeo-forge/pangeo-forge-runner · GitHub which might be needed before I can indeed run a pangeo-forge-recipe using DaskRunner.
Here’s what I’ve got up to so far:
This is the pipeline I’m attempting to run, ultimately converting chunksizes of
time: 66, y: 114, x: 64 to
time: 8760, y:100, x:100
def make_path(time):
filename = config.prefix + time + config.suffix
print(f"FILENAME: {filename}")
return os.path.join(config.input_dir, filename)
years = list(range(config.start_year, config.end_year + 1))
months = list(range(config.start_month, config.end_month + 1))
ymonths = [f"{year}{month:02d}" for year in years for month in months]
time_concat_dim = ConcatDim("time", ymonths)
pattern = FilePattern(make_path, time_concat_dim)
recipe = (
beam.Create(pattern.items())
| OpenWithXarray(file_type=pattern.file_type)
| DataVarToCoordVar()
| StoreToZarr(
target_root=config.target_root,
store_name=config.store_name,
combine_dims=pattern.combine_dim_keys,
target_chunks=dict(config.target_chunks),
)
| ConsolidateDimensionCoordinates()
| ConsolidateMetadata()
)
beam_options = PipelineOptions(
["--dask_gateway", "https://dask-gateway.jasmin.ac.uk",
"--dask_worker_cores", "4",
"--dask_worker_memory", "20.0",
"--dask_worker_setup", dask_worker_setup_cmd,
"--dask_workers", str(config.num_workers)],
)
with beam.Pipeline(runner=DaskRunner(), options=beam_options) as p:
p | recipe
where
DataVarToCoordVar() is to get around this issue ValueError: Region (...) does not align with Zarr chunks (). · Issue #644 · pangeo-forge/pangeo-forge-recipes · GitHub :
class DataVarToCoordVar(beam.PTransform):
@staticmethod
def _datavar_to_coordvar(item: Indexed[T]) -> Indexed[T]:
index, ds = item
ds = ds.set_coords(['x_bnds', 'y_bnds', 'time_bnds', 'crs'])
return index, ds
def expand(self, pcoll: beam.PCollection) -> beam.PCollection:
return pcoll | beam.Map(self._datavar_to_coordvar)
and because I’m using a Dask Gateway, I’ve had to mess around a little bit with the dask_runner file in the beam package to make sure the Dask Client picks up all the options I need to give it to get it to run correctly via this gateway.
The good news is that Beam does run the pipeline using the DaskRunner.
The bad news is that it errors out fairly quickly with memory problems, something that didn’t happen when I was using the DirectRunner in parallel:
KilledWorker: Attempted to run task ('get_windowed_value-apply_dofn_to_bundle-23e5eb837af24a3ea8c23e7415f39183', 0) on 4 different workers, but all those workers died while running it.
Watching the dask dashboard whilst this is happening shows the unmanaged memory of the worker increasing and increasing whilst processing this one task until it gets killed.Once the same thing has happened for all the workers with this task, dask gives up. It’s difficult to know what the actual task is as it’s wrapped up in the more generic terms.
I’ve tried the “throw more memory at the problem” approach, but I can only go up to 20GB per worker, which didn’t seem to be enough.
When I run the pipeline with the DirectRunner in parallel it doesn’t use anywhere near this much memory per worker, approx 1GB.
Previously I have very much enjoyed my ability to essential throw the kitchen sink (in terms of sizes of datsets!) at Beam and not need to worry about memory management, and I think this is what I’m looking for when using the DaskRunner too - am I being unrealistic?
Before I go any further in trying to debug this I’m wondering if this is just a result of some features, such as “Combiners”, needed for running a recipe not being implemented in DaskRunner yet, or whether there’s something deeper going on. Very, very happy to provide more information/examples etc. if useful!
I am using the latest available beam direct from GitHub, so that I can use this PR Windowing Support for the Dask Runner by alxmrs · Pull Request #32941 · apache/beam · GitHub which isn’t in the latest beam release yet.
Other relevant package versions in my environment:
apache-beam 2.63.0.dev0 pypi_0 pypi
arrow 1.3.0 pyhd8ed1ab_1 conda-forge
dask 2024.8.0 pyhd8ed1ab_0 conda-forge
dask-core 2024.8.0 pyhd8ed1ab_0 conda-forge
dask-expr 1.1.10 pyhd8ed1ab_0 conda-forge
dask-gateway 2024.1.0 pyh8af1aa0_0 conda-forge
distributed 2024.8.0 pyhd8ed1ab_0 conda-forge
pangeo-forge-recipes 0.10.8 pyhff2d567_0 conda-forge