HPC Time series processes

Hi all,

I’m working since a while to a project that analyses time series derived from satellites to create some indices. Unfortunately, for the nature of the algorithm that is multispatial and multitemporal, there isn’t any other way than process every single pixel by its own and create a netCDF with multiple arrays as output.
In few words, dataset (read as a xarray data array) is processed row by row (to avoid memory problems), the client creates a list of valid pixels that need to be processed and scatter data and “futures” to workers.
Every single worker read the assigned pixels from the scattered data, analyse the time series and send results back as an object that contains a set of indices (organised singularly as a pd.dataframe).
Whenever results are ready, the master, extract indices from the object and reassign them to the proper cache dataframe; finally, whenever the row has been completely analysed, the cache is flushed on a netCDF file.
Just for a matter of clarification, everything has been organised and processed using pandas dataframes and series as I need to keep strict control over the time dimension. Other faster approach using numpy has been rejected as the alignment over time can be quite messy without the help of pandas.
On local machines the overall idea works pretty fine; memory doesn’t grow infinity, output objects are quickly reassigned in the cache that permits to append every single row to the output (instead than create a preassigned output file that doesn’t fit the memory) and time dimension is adequately managed.
Problems come whenever I try to scale up this approach on clusters. The time between the end of the calculation and the finalisation of the row over the netCDF takes an unacceptable time. All the attempts to understand where the bottleneck is has failed and, if I try to analyse the process locally, this doesn’t highlight anything that can help me on the cluster.
Many tentatives have been done on the infrastructure side, from an HPC to Google Clouds or internal implementation of something that’s almost the same as Pangeo to prove that’s not related to the infrastructure.
The bottleneck seems to be how I manage results and how I assign to the output.
What I would like to know is if out there is there anyone that had the same needs, pixel base analysis and multiple-output, and is using futures to process them. I would like to share my experience and eventually be inspired to take other approaches as mine seems not to be the winning one.

1 Like

Hi,

I think multi-temporal pixel based analysis is pretty common, but I feel I don’t really understand your need and algorithm. Could you share some notebook or piece of code that demonstrate what you are doing?

Assuming that I’m just doing some stupid math along each pixel with multiple indexes/values as results. To manage these analyses, I’m extracting each single time series from a shared dataset; these have been organised with futures.
Each time that I have a result, I’m going to write it on a NetCDF where I’ve multiple indices, so I need to find the right position in all the dimension (parameter, time, x, y) and then write it.
Indeed I’ll try in these days to provide a minimum example, to make the idea more precise, but for the moment I would like to have an opinion on how these type of cases can be managed not using the mapping or the dask bags.

@geynard

Sorry for the late answer but it took some time to identify the bottleneck.

The node of the problem is that futures results invoke a time consuming, blocking “pandas” function and, unfortunately, this function can’t be avoided.
The optimum would be to have something that let me create another process, detached from the for loop, that’s able to ingest the flow of results. For other constraints, not present in the example, the output can’t be serialized and sent to workers and must be processed on the master.

here a small mockup. Just grab the idea and not focus too much on the details of the code.

class pxldrl(object):
    def __init__(self, df):
        self.table = df
def simulation(list_param):
    time.sleep(random.random())
    val = sum(list_param)/4
    if val < 0.5:
        result = {'param_e': val}
    else:
        result = {'param_f': val}
    return pxldrl(result)
def costly_function(result, output):
    time.sleep(1)
    # blocking pandas function 
    output = output.append(result.table, sort=False, ignore_index=True)

    return output
def main():
    client = Client(n_workers=4, threads_per_worker=1)

    output = pd.DataFrame(columns=['param_e', 'param_f'])

    input = pd.DataFrame(np.random.random(size=(100, 4)),
                                columns=['param_a', 'param_b', 'param_c', 'param_d'])

    for i in range(2):

        futures = client.map(simulation, input.values)

        for future, result in as_completed(futures, with_results=True):
            output = costly_function(result, output)

Hi,

Can I ask you what in the costly_function is consuming? Is this the pd.Dataframe.append part? Or is there something else? Can’t you do part of this function directly on workers? Or better, can’t you just write all results in a dask Dataframe, flush all to disk, and then if you need do some reordering?

Hi sorry for the late answer.
The costly function can’t be moved to workers as the object isn’t pickable.
Indeed the solution to convert the output in a Dask Dataframe must be explored and, in the next days, I’ll try. About the idea to write down on the disk I’m quite sceptic as the I/O operation usually are quite something in comparison to memory computations.
Right now I’m testing a daemon approach, the overall approach isn’t gain anything in terms of time but at least I’ve optimise the use of the cluster.