Am I thinking about this data processing/chunking workflow correctly?

I’m working on processing Nasa’s downscaled CMIP6 projections, these are currently NetCDF’s stored in S3. I want to create a Zarr archive in our own S3 bucket.

Our use case is around individual location analysis, so we’re really interested in the full time series (31411), across all models (22) and scenarios (4), for at most a few grid cells. I’ve noticed from reading that @josephyang has a similar use case.

We plan on processing in groups for the full time series of each variable, so our chunks are dict(model=1, scenario=1, time=31411, lat=20, lon=20), which is a reasonable size for S3. Then once all this is done use rechunker to get our “operational” chunks of dict(model=22, scenario=4, time=31411, lat=2, lon=2).

Is this the right way to approach this problem when it comes to rechunking? I’m guessing this whole process will take at least a week so want to make sure I’m not making some error as I haven’t used rechunker yet.

1 Like

Hi @chase - from my experience, the chunking part is usually not the bottleneck in terms of time (esp. with tools like Rechunker - it’s super quick) so I think you’ll probably have a chance to experiment with different chunking spec.

I’m not clear if the concept of chunking can be applied across multiple datasets though? If I were to create one Zarr store combining all these projections, I think I would organize them to treat each model like separate parameters - e.g. temperature_model1, temperature_model2 etc. When you fetch them, the accessing part would be done all in parallel anyhow.

1 Like

This sounds reasonable. But I would recommend prototyping with a small subset of the data (like a single model / scenario / variable) and verifying that the queries are performing the way you want before doing a massive data processing job.

Thanks this is what I’ve ended up doing. Are there any guidelines on compute resources to speed up the rechunking? I’m using AWS/EC2 so I have flexibility in what gets used. Happy to spin up an absurdly large instance if it means reducing the wall clock time.

The way to speed up the rechunking is to increase the parallelism. Are you using some sort of distributed cluster (dask, prefect, beam, etc)?

Thanks, I’m using a local Desk cluster on an EC2 instance with 48 vCPUs and 96 GB of memory. I think the next step is to either:

  • Increase the size of the instance and still use a local cluster
  • Use a more distributed setup

I’m testing wall clock time with the former and will likely end up using a local cluster. I will slowly try to learn how to do the latter as it’s a better long-term solution as I start doing more of these workflows. I don’t see any obvious bottlenecks by using the local cluster however; the bandwidth is very high (10s of Gigabits) since the data is all in the same AWS region, with source and target chunk sizes roughly 100MB.

If you have any recommendations on the minimum GBs per worker, I’d appreciate it. I’m assuming more workers are preferred over more memory per worker. I’m using the following to set the memory per worker that I’m passing to the rechunker. I can go all the way up to 192 vCPUs with over a TB of memory but if there are any heuristics on what generally works well, that’d be great.

n_workers = int(psutil.cpu_count(logical=False))
threads_per_worker = 2
cluster = LocalCluster(
        n_workers=n_workers, threads_per_worker=threads_per_worker, processes=True
)
# Get system memory information
memory_info = psutil.virtual_memory()
# Get total system memory (in bytes
total_memory = memory_info.total
# Get the number of virtual workers (CPU cores)
num_virtual_workers = multiprocessing.cpu_count()
# Calculate memory available per virtual worker (in bytes)
memory_per_worker = total_memory / num_virtual_workers
# Using 0.75 of the available memory suppresses warnings from Dask about memory usage
memory_per_worker_mb = memory_per_worker / (1024 * 1024) * 0.75
max_mem = f"{int(memory_per_worker_mb)}MB"
1 Like

Thanks, yes my groups are like store.zarr/{model}/{variable} which seems to be working well and makes selecting data easy with xr.open_mfdataset. So hopefully I don’t end up figuring out down the road that this was a mistake!

1 Like

So I’ve had some success after some initial troubles with rechunking. I ran into an issue where workers are exceeding the maximum memory usage, but only on the copy_intermediate_to_write task. A similar issue is on Github, however the fix in that issue thread does not work for me.

What I have gotten to work for me, is to just only allow the worker memory to be 2GB out of the available 8GB for the worker. Watching the dask dashboard I can see the memory usage going up into 5-6GB per worker, even though my max memory is set to 2GB for rechunker like below.

Is this likely a problem with dask or just an oddity of perhaps how my source data is structured? It seems like a possible fix would be to find a way increase the number of copy_intermediate_to_write_tasks, since that’s the memory-intensive operation, but I didn’t see any way to do that.

cluster = FargateCluster(
    n_workers=24,
    worker_cpu=1024, # 1 vCPU
    worker_mem=8192, # 8GB
    image="pangeo/pangeo-notebook",
    cloudwatch_logs_group=f"dask-climo-{time_string}",
    environment=get_aws_credentials(),
)

client = Client(cluster)
rechunked = rechunk(
    ds,
    target_chunks=dict(lat=10, lon=10, model=1, scenario=-1, time=39411),
    target_store=rechunker_target,
    temp_store=rechunker_int,
    max_mem=f"{8192 // 4}MB",
    executor="dask",
)

Certainly the worker memory limit is a “best effort” with LocalCluster and can be exceeded. So I’d guess the same would be true with FargateCluster.

1 Like