Interfacing models to xarray (map_block + accessor)

Hello,

I’ve recently been taking a deep dive into xarray and map_block this has been both exciting and painful. Exciting because it offers so much daskness painful as errors often happen deep into the computation, usually when I wake to find my job failed while I was dreaming of results… anyway this is what I have been trying to do.

Objective: To run a range dependant acoustic propagation model using temp and salinity from a hydrodynamic model

Why: To see how the model forcing affects the estimation of propagation

Steps:

  1. extract a slice through the water column for a given time, lat, long along a bearing with a given length (curvilinear grid so I use xESMF)
  2. Calculate sound speed (fill the missing bits etc.)
  3. Set the Frequency and source depth
  4. Add an absorbing bottom to the acoustic model that stops energy reflecting off the bottom (frequency dependant)
  5. Run the model (pyRAM)

wait a long time… then plot the output!
I’ve written accessor for the models netcdf files:
shoc is the model accessor
glider is my ocean glider accessor
ram is my acoustic model acessor

#get the times and locations of the glider profiles 
p=dsglider.sel(TI=model_run.time,method='nearest')
#extract the slices from the model (get positions from a glider and headings )     
slices =model_run.shoc.soundslicexr(p.time,p.LON.values,p.LAT.values,
                               np.dstack((p.BHOG,p.FHOG))[0],30000,50,maxdepth=125,depthstep=1,runname= 
                              runname)

slices=slices.ram.setfrequency(np.array([6000.,1000.]),np.array([5.,100.]))
slices=slices.ram.addbottomxr(125,thickness=100,attn=2)
slices = slices.ram.runxr()
tlos = slices.compute()

So it took a lot of pain to get this all working using map_block and the accessor structure, it kind of left me wondering if I had gone down a rabbit hole.

Things that I like:

  • The user interface is fairly clean
  • Delayed operations make for a quick build of the pipeline
  • Dask does a really good job of farming out the work.
  • Learned heaps about xarray dask and python :wink:

Things that I didn’t like:

  • The first attempt I used delayed functions and stacked the output myself (loads of code) Plus heaps of warnings about large objects in the dag
  • Choosing a block size: block size too small easy to write one slice delivered to the acoustic model at a time but then as soon as the number of slices got more than 200ish the number of processes got >100K and dask choked. (rewrite to run multiple slices and concatenate)
  • very hard to allow the user to set arbitrary chunking requires a fair bit of code to work out what dimensions are still there etc.
  • Keeping track of objects that are going to be used at a later date
  • unify_chunks() does not often fix the problem

I guess the question is should I have just used slurm job queue and done it with standard python or was this really worth doing? A job that takes 1 hour plus on a 100 node cluster is that really a good use case for Pangeo?
image

I’d be keen to hear others’ views on map_block + accessor…

My view on the best approach to these sort of calculations has evolved considerably. Chaining together dozens of lines of xarray operations on very large datasets rapidly leads to unmanageably large dask graphs. The workarounds you described (i.e. inner loop over slices) are often slow. Debugging why dask fails / runs out of memory is often practically impossible.

Recently I have become enamored with numba. The structure I recommend for many such problems is to write a numba jit, vectorize, or guvectorize function that operates on pure numpy arrays which does all of the hard low-level algorithmic work. Dask then only does the high-level job of batching this function across many chunks, plus possibly some reductions.

I can’t quite tell from your description whether your calculation is embarassingly parallel in the horizontal dimensions. Does the “Run the model (pyRAM)” step require ray tracing or any other communication between points in the x/y dimension? My impression based on peeking at the pyRAM code is “no”: it a 1D model.

Furthermore, pyRAM appears to already use numba! This is great! However, it is set up only for 1D inputs. If you wanted to apply pyRAM over many vertical profiles at the same time, you would need a guvectorize function, not a jit function that assumes 1D inputs. This would require some internal changes to pyRAM but should be feasible.

This question is only answerable with more information. How big is the dataset? How long does the slurm job take? What does your CPU utilization look like?

@rabernat Thanks yes pyRAM does use numba, unfortunatly it is a marching parabolic equation model so very intensive as the grid depends on the wavelength of the frequency you want to model. So no is correct.

So it is embarrassingly parallel only in the sense that one slice does not depend on the other.

It is a great problem for CPU utilization! very little IO and heaps of solving matrices once the model phase kicks in it get 100% utilization. The only problem is that if I batch 6 model runs per worker then I need to have 12 runs to use 2 workers etc. really I would have to allow the user to make changes to the chunking to enable logical end numbers of model runs per worker.

I spent a while looking at gunvectorize and ufunc what I like about map_block was I could use xarray objects right to the point I pass data to pyRAM.

I’ve never written slurm job queue (I’m new to HPC) and Dask just seems to offer so much to me as a python programmer that I tried to push this to see how far I could go.

So thinking about your approach I guess numba would let me use multiple threads on a worker to run each model, at the moment I’ve been setting my cluster up with a single thread on each worker.

I’ve been stopping at 2000 odd runs of the model in a batch. If I was to run this on all the glider data but if you wanted to get a 3d field then you would have to run this model say every degree an that would be very expensive in CPU.

I will look more deeply into Numba!

Here’s an example on combining apply_ufunc and numba’s guvectorize: https://xarray.pydata.org/en/stable/examples/apply_ufunc_vectorize_1d.html#High-performance-vectorization:-gufuncs,-numba-&-guvectorize

@dcherian Thanks for you help both here and on github, would be great to catch up next time I’m in Boulder.

I did read this, my problem was that pyRAM took so many variables some of which are singular e.g. frequency, source depth and some are 2d arrays. Anyway here’s the code that I came up with for the function passed to map_block

def _runramxr(self,ds):
    if sum(ds.sizes.values()) == 0:
        return ds
    results=[]
    for runname,azimuth,frequency,
                 source,time in  itertools.product(ds.runname,ds.azimuth,ds.frequency,ds.source,ds.time):
         aslice =ds.sel(runname=runname,time=time,
                                azimuth=azimuth,frequency=frequency,source=source)
         rp_ss =aslice.range.values
         rp_sb =aslice.range.values
         zs = aslice.zs.values
         dz = (1500/frequency.values)/4
         dr = 1500/frequency.values
         z_sb = aslice.z_sb.values.copy()
         attn = aslice.attn.values.copy()
         cb = aslice.cb.values
         rhob = aslice.rhob.values.squeeze()
         z_ss = aslice.depth.values.astype(int)
         cw = aslice.soundspeed.values
         rbzb= np.dstack((aslice.range.values,aslice.botz))[0]
         ram = PyRAM(freq=frequency.values,zs=zs,
                        zr=zs,z_ss=z_ss,rp_ss=rp_ss,cw=cw,
                            z_sb=z_sb,rp_sb=rp_sb,cb=cb,rhob=rhob,attn=attn,
                            rbzb=rbzb,dz=dz,dr=dr)
         output =ram.run()  
         result =xr.DataArray(output['TL Grid'],name='tloss',
                                    dims=['depth','range'],
                                    coords={'depth':output['Depths'],'range':output['Ranges']},
                                    attrs='units':'dB','long_name': 'Trasmision loss','valid_range':np.array([0,1000]) }) 
            # use the current depths and center bins
               depths = np.hstack((0,ds.depth[0:-1].values + ds.depth.diff(dim='depth').values/2,ds.depth.max()))
        ranges = np.hstack((0,ds.range[0:-1].values + 
                                                            ds.range.diff(dim='range').values/2,ds.range.max()))
        result = np.power(10,result/10)
        result = result.groupby_bins('depth',depths).mean()
        result = result.groupby_bins('range',ranges).mean()
        result = 10*np.log10(result)
        result =result.expand_dims({'runname':[runname.values],'time':[time.values],'azimuth':[azimuth],'frequency':[frequency],'source':[source]})
            results.append(result)
    ds['tlos'] = (('runname', 'time', 'azimuth', 'frequency', 'source', 'depth','range'),(xr.combine_by_coords(results).rename({'depth_bins':'depth', 'range_bins':'range'}).transpose('runname', 'time', 'azimuth', 'frequency', 'source', 'depth','range').tloss))
    return ds

The code is ugly mainly because I didn’t want to change pyRAM and my lack of python skill. The actual computation done by pyRAM takes a long time so the looping aspect of this code is not the bottleneck. The problem is that each loop block takes place on one worker so unless there is more than one time step there’s no speedup, at the moment I have lots of timesteps, but I want to write stuff that works regardless of the chunking the user puts in.