Grouping/Quantile computing on sparse mask and large dataset

Hello community,

I hope I’m right here!

My goal is to compute quantiles of satellite datasets depending on a mask encoding certain plant species (those can occur anywhere on the whole dataset). This mask is also quite spare, roughly 80 % are no data at all. The dataset itself (satellite data) is a few TB large.

I already tried multiple approaches:

Using xarrays groupby (with and without flox)

but this failed. This failed since the mask cannot be NaN anywhere. If I replace the NaN mask value with an int it works, but all results are Nan (I assume this happens due to the only implemented method being ‘blockwise‘), but im not sure. My code is something like this (for flox):

import xarray as xr
import numpy as np
from flox.xarray import xarray_reduce

def compute_species_quantiles(
    ds,
    speciesvar_name="speciesmap",
    target_var_name="band_data",
    dim=("y", "x"),
    expected_groups=None,
    quantiles=None,
    method="blockwise",
):
    if expected_groups is None:
        expected_groups = np.arange(1, 7)

    if quantiles is None:
        quantiles = np.arange(0.01, 1.0, 0.01)

    return xarray_reduce(
        ds[target_var_name],
        ds[speciesvar_name].rename("species"),
        dim=dim,
        expected_groups=(expected_groups,),
        method=method,
        func="nanquantile",
        q=quantiles,
    )

Dumping to parquet

My next approach was dumping the data as a parquet file. I assumed since most of the data is NaN anyway (and will be dropped), I can then easily compute the quantiles using pandas. But this just explodes in RAM and all my HPC jobs get killed.

My code:

import xarray as xr

ddf = ds.to_dask_dataframe()

ddf.dropna().to_parquet('result.parquet', write_index=False, engine='pyarrow')

I also tried writing one file per block using pandas & map_blocks, but this currently fails since the result does not align with my template (maybe this will work after some tinkering?):

output_dir = Path('/some/path')

def filter_and_save(block):
    
    df = block.to_dataframe().dropna()
    
    # Write to disk if not empty
    if len(df) > 0:
        unique_id = uuid.uuid4()
        filename = output_dir / f'part_{unique_id}.parquet'
        df.to_parquet(filename, engine='pyarrow', index=False)
        print(f"Wrote {filename}")
    return block.isel(x=slice(0, 0), y=slice(0, 0))

template = ds.isel(x=slice(0, 0), y=slice(0, 0))

result = ds.map_blocks(filter_and_save, template=template)

So my questions are: Am I doing something fundamentally wrong? Or is xarray + dask just not fit for the task? (My datasets are a few TB large if that matters)

My next approaches will probably go further in the direction of dumping the data as parquet or something, dropping NaN rows and then being able to calculate quantiles in memory or something.

I’m thankful for any kind of hint/help! Just write if you need any further information!

Interesting. Can you create a minimal reproducible example please? It’s hard to help otherwise. The example should be representative of data size and chunking.

Hey thanks for replying!

Here is a minimal script to provide the structure of my dataset:

import xarray as xr
import dask.array as da
import numpy as np
from dask.distributed import Client

client = Client(
    n_workers=2,
    threads_per_worker=1,
    memory_limit='10GiB',
    processes=True
)


xaxis = np.arange(39000)
yaxis = np.arange(46000)
bands = np.arange(5)
doy = np.arange(1, 366, 4)

data_da = da.random.random(
    size=(yaxis.size, xaxis.size, bands.size, doy.size),
    chunks=(200, 200, -1, -1),
).astype('float32')


mock_ds = xr.DataArray(
    data_da,
    dims=("y", "x", "band", "doy"),
    coords={
        "x": xaxis,
        "y": yaxis,
        "band": bands,
        "doy": doy
    },
    name="satellite_mock"
)


mock_mask_data = da.random.random(
    size=(yaxis.size, xaxis.size),
    chunks=(200, 200),
).astype('float32')

mock_mask = xr.DataArray(
    mock_mask_data,
    dims=("y", "x"),
    coords={
        "x": xaxis,
        "y": yaxis,
    },
    name="mock_mask"
)

mock_mask2 = mock_mask

# set ~ 80 % nan randomly
mock_mask = mock_mask.where(mock_mask > 0.8)

mock_ds["mask"] = mock_mask

# add a second mask
mock_ds["mask2"] = mock_mask2.where(mock_mask2 > 0.3)


# this step fails! It seems the dataset is too large to compute the stacked index or something 
# stacking 
# ddf = mock_ds.stack(pixel=('y', 'x'), create_index=False).to_dask_dataframe().dropna(subset=["mask", "mask2"])

ddf = mock_ds.to_dask_dataframe().dropna(subset=["mask", "mask2"])


ddf.to_parquet("pangeo_example.parquet", write_index=False, engine='pyarrow')

I further thought about it and dumping the relevant (= non-NaN) parts of my data to parquet would be the best, since it would be very valuable data for other tasks. I figured that using dask dataframes would be the way to go here.

What I wanted (or thought how it works):

  1. convert each chunk to a small dataframe
  2. drop all NaN rows
  3. Append the data to parquet (order doesnt matter here, I just need one row for each observation (doy, band, mask values))

I thought this would be rather simple, but it already fails before starting the computation (I guess because of the indices?). I tried it with smaller sizes of y/x and it works! But the data I use is large and it seems this approach doesn’t scale well.

UPDATE:

I also tried dropping the indices (with mock_ds.drop_indexes(\["y", "x", "band", "doy"\])), which prevented the endless loading of the dataset, but is still very slow and unperformant.
For me a reproducible index of all my data is not really necessary, I just want the data reduced and dumped. Is there a way of achieving that efficiently?

I would be very grateful for any advice if:

  1. My approach is wrong/ doesn’t work that way
  2. Are there other methods of dumping and filtering xarray data?
  3. What could be better alternatives to achieve my goal?
1 Like

I am having trouble squaring this example with the groupby approach in the first problem.

  1. These masks appear to be float32 with NaNs in the [0, 1] range. But expected_groups=np.arange(1,7) in the first example suggests you are grouping by a categorical array with values [1, 2, 3, 4, 5, 6].
  2. quantiles=np.arange(0.01, 1.0, 0.01) this is basically a histogram whose bin edges you don’t know. Is that what you want?
  3. You are reducing along x and y using method=”blockwise” but there are multiple chunks along those axes. This is almost certainly not what you want.

Can you clarify what you are trying to do? What are the inputs and what is the expected output?

Hi, Im sorry for being confusing, I totally see where you are coming from :grimacing:

Here is an updated example, with the mask being in the range 1-6 (with about 80% of it being NaNs), so its closer to my actual data.

import xarray as xr
import dask.array as da
import numpy as np
from dask.distributed import Client
from flox.xarray import xarray_reduce

def compute_species_quantiles(
    ds,
    speciesvar_name="speciesmap",
    target_var_name="band_data",
    dim=("y", "x"),
    expected_groups=None,
    quantiles=None,
    method="blockwise",
):
    if expected_groups is None:
        expected_groups = np.arange(1, 7)

    if quantiles is None:
        quantiles = np.arange(0.01, 1.0, 0.01)

    return xarray_reduce(
        ds[target_var_name],
        ds[speciesvar_name].rename("species"),
        dim=dim,
        expected_groups=(expected_groups,),
        method=method,
        func="nanquantile",
        q=quantiles,
    )

client = Client(
    n_workers=2,
    threads_per_worker=1,
    memory_limit='10GiB',
    processes=True
)


xaxis = np.arange(39000)
yaxis = np.arange(46000)
bands = np.arange(5)
doy = np.arange(1, 366, 4)

data_da = da.random.random(
    size=(yaxis.size, xaxis.size, bands.size, doy.size),
    chunks=(200, 200, -1, -1),
).astype('float32')


mock_ds = xr.DataArray(
    data_da,
    dims=("y", "x", "band", "doy"),
    coords={
        "x": xaxis,
        "y": yaxis,
        "band": bands,
        "doy": doy
    },
    name="satellite_mock"
)

# due to cloud masking and disturbances, parts of the satellite data are NaN
mock_ds = mock_ds.where(mock_ds > 0.05)

# we create a mock mask, representing the tree species map (categori)
mock_mask_data = da.random.randint(
    low=1,
    high=30,
    size=(yaxis.size, xaxis.size),
    chunks=(200, 200),
)

mock_mask = xr.DataArray(
    mock_mask_data,
    dims=("y", "x"),
    coords={
        "x": xaxis,
        "y": yaxis,
    },
    name="mock_mask"
)

# set ~ 80 % nan randomly
mock_mask = mock_mask.where(mock_mask < 7).astype('float16')

# to make it actually work somehow, we need to fill the NaN data (group 0 is not needed)
mock_ds["speciesmap"] = mock_mask.fillna(0).astype('uint8')

goal = compute_species_quantiles(
    ds=mock_ds,
    speciesvar_name="speciesmap",
    target_var_name="satellite_mock",
    expected_groups=np.arange(0, 7),
    quantiles=np.arange(0.01, 1.0, 0.01),
    method='blockwise'
)

My goal was to calculate quantiles of each band and doy, grouped by the values of the mask. (The goal dataset is simply the one you can see in the goal variable).

The quantiles need to be very granular, so I think this is pretty much equivalent to calculating the histogram without knowing the bin edges (if you want more information, i basically want to do something similar as performed in this paper: Redirecting, this also shows the reason why I’m actually trying to do this).

I hope this makes my problem and goal more clear!

As I wrote in my first post, figured this wont work (since nanquantile is only implemented for method='blockwise'):

NotImplementedError: Aggregation 'nanquantile' is only implemented for dask arrays when method='blockwise'.Received method='map-reduce'

And thats where I figured the flox groupby approach is not optimal for this task, and I may need another strategy.

At this point I thought about converting it to a dask dataframe and dumping the data to parquet, since I could remove the rows where the satellite observation or my speciesmap are NaN and this greatly reduces the amount of data and makes it far easier to split the problem into multiple independent tasks (e.g. running it once per year and concatenating the results for final statistics).
The problem can then be tackled by using polars or pandas, and additionally i can compute nice stats of forest observations.

And this is where I’m standing right now. The “blowing up of RAM” in my first post was actually the computation of stacked indices (i think), which seems to be solved by just dropping them:

# make the mask have NaNs again

mock_ds["speciesmap"] = mock_mask

dropped_indices = mock_ds.drop_indexes(["y", "x", "band", "doy"])

ddf = dropped_indices.chunk(x=600, y=600).to_dask_dataframe(['y', 'x', 'band', 'doy']).dropna()
ddf.to_parquet("pangeo_example.parquet", write_index=False, engine='pyarrow')

This actually runs now, but it is not as performant as I hoped. It seems it shuffles the data globally to an created index (I think), which I dont need. I just need to have an observation with the necessary data (band, doy, satellite_mock and speciesmap) be in one row, no matter the order in which the results may appear in the resulting parquet file (thats why I thought about the map_blocks approach mentioned, but that feels quite hacky.).

So maybe you have some hint or idea how this can be more performant, but I think I can somehow manage it to run that way, even though it takes quite some time.

PS: Thanks a lot for contributing to xarray and dask and all these other tools! It made working with this kind of data a blast, and I’m very motivated to learn more!