Pangeo Forge bakeries

Hey y’all, I’m making good progress here.
The direct runner seems to run fine in multi_process parallel mode on our local HPC, and whilst not super speedy, definitely much faster than our old clunky method of doing things.
This is all for disk → disk pipelines, now I want to write out directly to AWS S3 but, after sorting out a few dependency issues, have hit a problem I don’t know how to solve.
From what I can tell from the error messages each of the workers eventually hits a HTTP 409 ‘Conflict’ error when writing to the object store and doesn’t retry or resume afterwards, so that eventually the whole process grinds to a halt. My guess is that this lack of retry logic is related to what @rabernat discovered in Blosc decompression error on `StoreDatasetFragments` (local multithreaded runner) · Issue #560 · pangeo-forge/pangeo-forge-recipes · GitHub, but I’m not so sure about the 409 HTTP errors. I’d guess something tricky and obscure along the lines of stale file-handles and/or connection timeouts and the like, or possibly still some outdated python packages.
Any tips on what to try would be very useful! If there’s no easy solution it might be time to investigate using the other runners more deeply, to at least get around the retries issue.

These are the python versions of the key libraries I’m using, in case that’s of use:

aiobotocore 2.5.4
aiohttp 3.8.5
apache-beam 2.42.0
boto 2.49.0
botocore 1.31.17
fsspec 2023.9.2
pangeo-forge-recipes 0.10.0
python 3.9.15
s3fs 2023.9.2
zarr 2.16.1

and the full ugly error message:

Traceback (most recent call last):
File “/home/users/mattjbr/miniconda3/envs/apache/lib/python3.9/site-packages/apache_beam/runners/worker/sdk_worker.py”, line 284, in _execute
response = task()
File “/home/users/mattjbr/miniconda3/envs/apache/lib/python3.9/site-packages/apache_beam/runners/worker/sdk_worker.py”, line 357, in
lambda: self.create_worker().do_instruction(request), request)
File “/home/users/mattjbr/miniconda3/envs/apache/lib/python3.9/site-packages/apache_beam/runners/worker/sdk_worker.py”, line 597, in do_instruction
return getattr(self, request_type)(
File “/home/users/mattjbr/miniconda3/envs/apache/lib/python3.9/site-packages/apache_beam/runners/worker/sdk_worker.py”, line 635, in process_bundle
bundle_processor.process_bundle(instruction_id))
File “/home/users/mattjbr/miniconda3/envs/apache/lib/python3.9/site-packages/apache_beam/runners/worker/bundle_processor.py”, line 1003, in process_bundle
input_op_by_transform_id[element.transform_id].process_encoded(
File “/home/users/mattjbr/miniconda3/envs/apache/lib/python3.9/site-packages/apache_beam/runners/worker/bundle_processor.py”, line 227, in process_encoded
self.output(decoded_value)
File “apache_beam/runners/worker/operations.py”, line 526, in apache_beam.runners.worker.operations.Operation.output
File “apache_beam/runners/worker/operations.py”, line 528, in apache_beam.runners.worker.operations.Operation.output
File “apache_beam/runners/worker/operations.py”, line 237, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
File “apache_beam/runners/worker/operations.py”, line 240, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
File “apache_beam/runners/worker/operations.py”, line 907, in apache_beam.runners.worker.operations.DoOperation.process
File “apache_beam/runners/worker/operations.py”, line 908, in apache_beam.runners.worker.operations.DoOperation.process
File “apache_beam/runners/common.py”, line 1419, in apache_beam.runners.common.DoFnRunner.process
File “apache_beam/runners/common.py”, line 1491, in apache_beam.runners.common.DoFnRunner._reraise_augmented
File “apache_beam/runners/common.py”, line 1417, in apache_beam.runners.common.DoFnRunner.process
File “apache_beam/runners/common.py”, line 623, in apache_beam.runners.common.SimpleInvoker.invoke_process
File “apache_beam/runners/common.py”, line 1581, in apache_beam.runners.common._OutputHandler.handle_process_outputs
File “apache_beam/runners/common.py”, line 1694, in apache_beam.runners.common._OutputHandler._write_value_to_tag
File “apache_beam/runners/worker/operations.py”, line 240, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
File “apache_beam/runners/worker/operations.py”, line 907, in apache_beam.runners.worker.operations.DoOperation.process
File “apache_beam/runners/worker/operations.py”, line 908, in apache_beam.runners.worker.operations.DoOperation.process
File “apache_beam/runners/common.py”, line 1419, in apache_beam.runners.common.DoFnRunner.process
File “apache_beam/runners/common.py”, line 1507, in apache_beam.runners.common.DoFnRunner._reraise_augmented
File “apache_beam/runners/common.py”, line 1417, in apache_beam.runners.common.DoFnRunner.process
File “apache_beam/runners/common.py”, line 837, in apache_beam.runners.common.PerWindowInvoker.invoke_process
File “apache_beam/runners/common.py”, line 983, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
File “/home/users/mattjbr/miniconda3/envs/apache/lib/python3.9/site-packages/apache_beam/transforms/core.py”, line 1877, in
wrapper = lambda x, *args, **kwargs: [fn(x, *args, **kwargs)]
File “/home/users/mattjbr/miniconda3/envs/apache/lib/python3.9/site-packages/pangeo_forge_recipes/writers.py”, line 89, in store_dataset_fragment
_store_data(vname, da.variable, index, zgroup)
File “/home/users/mattjbr/miniconda3/envs/apache/lib/python3.9/site-packages/pangeo_forge_recipes/writers.py”, line 51, in _store_data
zarr_array[region] = data
File “/home/users/mattjbr/miniconda3/envs/apache/lib/python3.9/site-packages/zarr/core.py”, line 1497, in __setitem__
self.set_basic_selection(pure_selection, value, fields=fields)
File “/home/users/mattjbr/miniconda3/envs/apache/lib/python3.9/site-packages/zarr/core.py”, line 1593, in set_basic_selection
return self._set_basic_selection_nd(selection, value, fields=fields)
File “/home/users/mattjbr/miniconda3/envs/apache/lib/python3.9/site-packages/zarr/core.py”, line 1983, in _set_basic_selection_nd
self._set_selection(indexer, value, fields=fields)
File “/home/users/mattjbr/miniconda3/envs/apache/lib/python3.9/site-packages/zarr/core.py”, line 2058, in _set_selection
self._chunk_setitems(lchunk_coords, lchunk_selection, chunk_values, fields=fields)
File “/home/users/mattjbr/miniconda3/envs/apache/lib/python3.9/site-packages/zarr/core.py”, line 2261, in _chunk_setitems
self.chunk_store.setitems(to_store)
File “/home/users/mattjbr/miniconda3/envs/apache/lib/python3.9/site-packages/zarr/storage.py”, line 1441, in setitems
self.map.setitems(values)
File “/home/users/mattjbr/miniconda3/envs/apache/lib/python3.9/site-packages/fsspec/mapping.py”, line 124, in setitems
self.fs.pipe(values)
File “/home/users/mattjbr/miniconda3/envs/apache/lib/python3.9/site-packages/fsspec/asyn.py”, line 118, in wrapper
return sync(self.loop, func, *args, **kwargs)
File “/home/users/mattjbr/miniconda3/envs/apache/lib/python3.9/site-packages/fsspec/asyn.py”, line 103, in sync
raise return_result
File “/home/users/mattjbr/miniconda3/envs/apache/lib/python3.9/site-packages/fsspec/asyn.py”, line 56, in _runner
result[0] = await coro
File “/home/users/mattjbr/miniconda3/envs/apache/lib/python3.9/site-packages/fsspec/asyn.py”, line 399, in _pipe
return await _run_coros_in_chunks(
File “/home/users/mattjbr/miniconda3/envs/apache/lib/python3.9/site-packages/fsspec/asyn.py”, line 254, in _run_coros_in_chunks
await asyncio.gather(*chunk, return_exceptions=return_exceptions),
File “/home/users/mattjbr/miniconda3/envs/apache/lib/python3.9/asyncio/tasks.py”, line 442, in wait_for
return await fut
File “/home/users/mattjbr/miniconda3/envs/apache/lib/python3.9/site-packages/s3fs/core.py”, line 1109, in _pipe_file
return await self._call_s3(
File “/home/users/mattjbr/miniconda3/envs/apache/lib/python3.9/site-packages/s3fs/core.py”, line 348, in _call_s3
return await _error_wrapper(
File “/home/users/mattjbr/miniconda3/envs/apache/lib/python3.9/site-packages/s3fs/core.py”, line 140, in _error_wrapper
raise err
RuntimeError: OSError: [Errno 5] An error occurred (SwarmError) when calling the PutObject operation: <html><body><h2>CAStor Error</h2><br>Replication peer request failed (409 HTTP/1.1 409 Conflict
Castor-System-Error-Token: WriterOutOfDate
Castor-System-Error-Text: Cannot POST out-of-date version.
Castor-System-Error-Code: 409
Castor-System-Cluster: objectstore4.jc.rl.ac.uk
Content-Length: 85
Content-Type: text/html
Date: Mon, 09 Oct 2023 13:23:28 GMT
Server: CAStor Cluster/14.1.2
Allow: POST, GET, HEAD, PUT, DELETE, COPY, APPEND, PATCH, GEN, SEND
Keep-Alive: timeout=14400
) (request:140508600282032 connection:140508600201024 label:A308D6BF01500CCB) [while running ‘[9]: Create|OpenWithXarray|StoreToZarr/StoreToZarr/StoreDatasetFragments/Map(store_dataset_fragment)’]

Now with code too…

fs = s3fs.S3FileSystem(anon=False,
                       key="bla", 
                       secret="bla",
                       client_kwargs={'endpoint_url': "https://chess-scape-o.s3-ext.jc.rl.ac.uk"})

target_root = FSSpecTarget(fs=fs, root_path="s3://g2g-test")
tn = 'test.zarr'

target_chunks = {'RCM': 1, 'Time': 360,
                 'Northing': 100,
                 'Easting': 100}

transforms = (
    beam.Create(pattern_pruned.items())
    | OpenWithXarray(file_type=pattern_pruned.file_type)
    | StoreToZarr(
        target_root=target_root,
        store_name=tn,
        combine_dims=pattern.combine_dim_keys,
        target_chunks=target_chunks
    )
)

beam_options = PipelineOptions(direct_num_workers=8, direct_running_mode="multi_processing")
with beam.Pipeline(options=beam_options) as p:
    p | transforms

@risgnell is this or similar a problem you ever ran into with your on-prem HPC setup?