This is my first post to this community. I’m working on a pilot project with JASMIN, a data analysis facility for the NERC community in the UK. It’s a hybrid of traditional batch system and OpenStack cloud. We’ve recently set up a system so that groups make their own instance of Pangeo in OpenStack projects on our system. We are also experimenting with storing CMIP6 data on an object store which is also part of JASMIN. This is S3 compatible.
I’ve looked at https://pangeo.io/data.html to see how you’ve gone about uploading data to Google cloud object storage with xarray and zarr. We would like to do a workflow where we transfer CMIP6 data from our POSIX file system to our object store. Looking at the guide, It looks like for 2) we could miss a step and write direct to our object store. I think we can do that with the to_zarr call described but would appreciate some tips. I’m not sure how we give to_zarr a handle to tell it the location of the object store rather than a POSIX directory path.
We’d also appreciate any pointers with using Dask to scale this out into a larger workflow.
Perhaps you also have some general experience you could share from the work you’ve done uploading CMIP6 data into the public cloud?
Rich Signell gist here (hope you don’t mind the share Rich):
Is a workflow for creating Zarrs on object stores from many individual files on POSIX. We found it needs a tweak here and there for your own use case but is a good paradigm.
The main gotcha’s with scaling out that I’ve found:
You can only append to zarrs so make sure you have a process by which you start at index 0.
Choosing good chunking options is always hard. If you’ve many small files it can be tempting to write each as a chunk but you might get significantly better write performance if you first combine many files into an xarray object and then write out into bigger chunks (but not too big). Even if you don’t want bigger chunks, because of the way dask/zarr paralyse this operation you going to do better combining more data into an xarray and doing a big-ish append rather than many small ones.
On “I’m not sure how we give to_zarr a handle to tell it the location of the object store…” the to_zarr method can take a store argument. This is an object that implements a dictionary-like API, xarray will tell it the key and the binary data to write to that key and the store is responsible for doing it. There are lots of stores in zarr but you might need to implement your own or extend/overwrite the S3 one. If my memory serves the s3 store can take a “filesystem” object (typically from s3fs) which is responsible to write to s3 you might be able to implement an object like this. Happy to talk more on this subject.
*My general experience is that this is non-trivial. It seems simple enough but somehow the reality of the data always throw up interesting aspects/artefacts. It’s simpler to create single phenomenon zarr objects (by creating single phenomenon arrays) but this may or may not be your ideal access patten. Exposing whatever you do through intake allows you to ‘tidy up’ if necessary by organising things into catalogues and nested catalogues etc.
sorry I’m only allowed two links so you might need to search the web a bit more…
This is definitely possible. You have two choices:
write the zarr to POSIX disk and then copy it to the object store with the s3 command line utility
stream the data directly to the object store
Dask can parallelize both operations.
You will need to create an fsspec-style mapper object for your object store. I recently did something like this with wasabi cloud storage.
fs = s3fs.S3FileSystem(key=s3_access_key, secret=s3_secret,
mapper = fs.get_mapper('bucket_name/whatever/path/you/want')
# create xarray dataset somehow, i.e.
ds = xr.open_mfdataset('/all_files/*.nc')
# write to object store
# (if you're connected to a dask distributed cluster, this will run in parallel)
# (if you pass a regular path instead of `mapper`, you would write to disk instead)
# to read back the data after writing, you need to restart your kernel for some reason
# I think this has to do with s3fs caching
ds_s3 = xr.open_zarr(mapper, consolidated=True)
We need to refresh our docs with some updated info.
Thanks @Theo_McCaie, @rabernat! This help is much appreciated. Almost certainly there’ll be more questions to follow ;). We definitely prefer streaming direct to our object store. Colleague Matt here had tried s3fs earlier in the week and done an initial test write to our store. We’ll need to get Dask working in this setup so that we can scale it up.
We see a lot of potential in object storage for our on-premise infrastructure so this kind of pilot is important for us.
As the person working on this project with Phil, I thought it might be good to ask others on here for some suggestions. I understand the purpose of Dask and that xarray operations use Dask, but I can’t seem to get Dask, xarray and our LSF cluster (JASMIN, as described in Phil’s first post) all working together.
I’ve used a colleagues example code to get Dask and LSF working together; this takes a list of numbers, creates scheduler, fires up a Dask worker for each number, each number is squared and the total is calculated. I can see child jobs being created for each worker and it works as expected.
However when I create a scheduler for the xarray code, xarray doesn’t seem to create any workers. I confirmed this by creating an instance of a scheduler (in a different terminal, while the xarray code was running) and specifying the address and port and printing scheduler_info() - the workers were empty. I assumed Dask would automatically create workers.
Without LSF, I can perform the actions required (opening multiple netCDF files, sending xarray datasets to object store) however it’s currently a bit too brittle and the code needs to be more flexible to suit a range of data, but this is a different problem.
Does anyone have any suggestions on how to use an LSF cluster with xarray - any code snippets would be much appreciated!
The standard workflow for Pangeo looks something like this:
from dask_jobqueue import LSFCluster
cluster = LSFCluster(queue='general',
from dask.distributed import Client
# after this line, the cluster scheduler will be used for all dask operations
client = Client(cluster)
# this will show a link to the dask dashboard, which can optionally be proxied via Jupyter
# test some computation
import dask.array as dsa
ones = dsa.ones(100000, chunks=(1000,))
Once you have something like this working, you can move on to more complicated scenarios involving xarray, data access, etc.
Rereading your post more carefully, I see that you do seem to have some basic dask distributed cluster working. However, I still think it’s worth verifying that you can use my example code above. After that, xarray + dask should just work.
For more specific advice, it would be important to see precisely how you are creating your xarray datasets and triggering computations.
I submit this code as a job to the same LSF cluster which is used in the code. This code does eventually create objects on our object store, however it doesn’t seem to use the worker I created - the logs from the job linked with the worker only shows a few lines of Dask logging, the xarray and botocore logs are all in the logs for the job I used to submit the Python code.
I create this worker because xarray doesn’t appear to create them on its own (and only having 1 makes it easier to look through logs for development purposes!). My issues could well be JASMIN related, however I wanted to know if I’m missing something in my code which I should be doing.
This is not a reliable way to verify whether your cluster is working. The dask workers will not log every operation, only errors. The best way to monitor the cluster activity is to use the dask dashboard. Do you have access to the dashboard?
My guess based on your code above and the lack of errors is that everything is working totally normally.
This is absolutely correct behavior. Xarray will never create dask clusters. Xarray creates dask arrays (accessible via dataset.varname.data). When computation is triggered on these arrays, they will use whatever dask client current exists for their computations, or the default scheduler (see dask docs).
In general, I would encourage you to move in more incremental steps if you want to understand how the pieces fit together. You’ve jumped directly to a pretty complex workflow: streaming the transformation of netcdf files to object storage. This involves not only dask but also xarray, netcdf, s3fs, etc.
Again, the dask dashboard is an invaluable tool for debugging these computations, and I strongly encourage you to figure out how to access it.
Apologies for not getting back to you sooner! After some port forwarding, I got access to the Dask dashboard and confirmed that the dask workers were serving their purpose and dividing the work among them. I have since written some code that sets up a Dask cluster with a specified number of workers for multiple hours on Lotus (maximum wall time of the Lotus job currently) so I can use the same cluster over multiple workflows of xarray. Separating the cluster away from the workflows has a number of debugging benefits but also means that eventually, multiple datasets can be created from a single cluster, allowing better use of the workers in the cluster.
I’m currently working on scaling up the xarray workflow. Currently 10 netCDF files can be transferred to our object store (~800MB on the object store) in just under 2 minutes 20 seconds (using 5 single core, 1GB memory workers), so I’m slowly scaling these numbers up and dealing with issues as I see them. It seems I need to move to multi-core queues but that’s to be expected.
I’ll be sure to ask any more questions when I come across bigger issues, many thanks for all your help so far!
Has anyone dealt with distributed.client - WARNING - Couldn't gather 1 keys, rescheduling warning log messages when using Dask? They seem to be occurring during the open_mfdataset() part of the workflow, however any experiences with this warning would be good to read about! These warnings seem to occur a number of times (where the task has failed/errored, see: https://stackoverflow.com/questions/46691675/what-do-killedworker-exceptions-mean-in-dask) and after the allowed-failures count has been exceeded, a KilledWorker exception will be thrown. Any experiences with these would be great to read about
We are picking up the work that @MRichards was doing regarding writing CMIP6 into object store (Caringo). We have decided that we can go down the route of parallelising the conversion across nodes on our cluster rather than using a Dask cluster.
When you gave a talk to ESGF recently, you discussed your own workflow for converting CMIP6 into Zarr. Do you have a repository that we might be able to collaborate on? We will have the same issues in terms of data checking/cleaning before we can write the data.