How to allow dask workers to read from a requester pays s3 bucket using rasterio?

I can read a COG from a requester pays s3 bucket with xarray/rasterio by doing:

os.environ["AWS_REQUEST_PAYER"] = "requester" 
cog = 's3://dev-et-data/compressed/NDVI_filled/2001/2001001.250_m_NDVI.tif'
da = xr.open_rasterio(cog, chunks={'band':1, 'x':4096, 'y':4096})

but if I create a dask client to speed things up, the workers get access denied errors. Has anyone used dask to read from a requester pays bucket with rasterio?

Looking at http://xarray.pydata.org/en/stable/generated/xarray.open_rasterio.html, I don’t see a way to pass arguments to open_rasterio that eventually go through to whatever is interacting with AWS (s3fs?). So I think you’re stuck trying to set environment variables on the workers.

How’s the cluster being created? Setting it on the workers as they’re created is the most robust way to do this.

Short of that, you can run a function on the workers that sets them, after you have a cluster scaled up.

client.run(lambda: os.environ["AWS_REQUEST_PAYER"] = "requester" )

But if a worker crashes and is replaced, the new worker won’t have that variable set, and so will fail to read data.

I’m running on dask-kubernetes (and hopefully soon on dask-gateway). I tried to figure out how to set environment variables via the env: {} in ~/.config/dask/kubernetes.yaml:

# kubernetes:
#   name: "dask-{user}-{uuid}"
#   namespace: null
#   count:
#     start: 0
#     max: null
#   host: "0.0.0.0"
#   port: 0
#   env: {}

but could not figure it out.

It turns out that I was setting the worker environment correctly in my ~/.config/dask/kubernetes.yaml file:

kubernetes:
  env: 
    AWS_REQUEST_PAYER: requester

but my KubeCluster was not picking that up because I was creating the cluster in the Dask JupyterLab extension. If I create my cluster inline in the Notebook using cluster = KubeCluster() it works fine.

I spent some time trying to set environment variables when KubeCluster is created in the extension, but failed.

So for now, my workaround will be to just create my KubeCluster from the notebook and pass in the env as a parameter (instead of modifying the kubernetes.yaml file):

cluster = KubeCluster(n_workers=2, env={'AWS_REQUEST_PAYER': 'requester'})

So here’s complete notebook example to read COGs from a requester pays bucket in parallel using xarray and dask:

import os
import xarray as xr
from dask.distributed import Client
from dask_kubernetes import KubeCluster

os.environ["AWS_REQUEST_PAYER"] = "requester" 
cluster = KubeCluster(n_workers=2, env={'AWS_REQUEST_PAYER': 'requester'})
client = Client(cluster)
cog = 's3://dev-et-data/compressed/NDVI_filled/2001/2001001.250_m_NDVI.tif'
da = xr.open_rasterio(cog, chunks={'band':1, 'x':4096, 'y':4096})
da.load()

Thanks @TomAugspurger and @jsignell for the help!
2 Likes

A PR to intake-xarray to pass arguments through would be welcomed.