I was given some hydrologic model output in the form of a 10GB fixed width ASCII file, where the first two columns are year and month, and the rest of the columns are streamflow at specific locations. There are 1512 rows, and 497,000 columns. Nice, eh?
Since this is just an array of numbers, I thought I’d read it with Dask Dataframe, convert to a Dask array and write to Zarr. But I can’t figure out how to write to Zarr:
import dask.dataframe as dd
df = dd.read_fwf('/scratch/streamflow.monthly.all', sample_rows=1, sample=256000, header=None)
da = df.to_dask_array()
da
but I can’t just do da.to_zarr()
because it complains about the chunk sizes not being known.
And I tried computing the chunksizes using: da = df.to_dask_array(lengths=True)
but ran for over an hour without returning, so clearly not the best solution.
If I could just specify chunks=(1512, 10000)
I’d be happy.
Is there a way to just specify the chunk size for my dask array?
I don’t know if it’s officially documented anywhere, but you can set the internal _chunks
attribute directly.
import pandas as pd
import dask.dataframe as dd
df = dd.from_pandas(pd.DataFrame({"A": [1, 2]}), npartitions=1)
a = df.to_dask_array()
a._chunks = ((2,), (1,))
That’ll trick dask into working, as long as the chunk size you tell it is actually correct.
But, I think the bigger problem might be that you’re getting zero parallelism from dask_dataframe
here. Dask DataFrames are only partitioned along the rows: it doesn’t handle short and wide datasets well (neither does pandas, for that matter).
How long does a simple pd.read_csv("/scratch/streamflow.monthly.all", nrows=2)
take?
Without testing it, my first approach would be to
- Get a sense for how many rows / second pandas can handle (parse CSV, convert to NumPy)
- Try to parallelize the I/O, maybe using pandas’
chunksize
, maybe using dask.dataframe
with blocksize
. See if you’re actually getting any benefit
- Figure out how to incrementally write the chunks into the Zarr store (chunking by rows and columns)
Edit: That’s assuming you’ve already asked the upstream data provider for a nicer file format and been denied
@TomAugspurger , reading 10 rows with:
kw = dict(parse_dates={'dates': [0, 1]}, header=None, index_col=0)
df = pd.read_fwf(infile, nrows=10, **kw)
takes about a minute. So if I had enough memory I guess I could just plow through the 1512 lines in about 2.5 hours.
Following on your idea of using pandas chunksize, I guess I could run “split” or something on the original file to get multiple ASCII files, and then use dask delayed to get parallelization?
Yeah, splitting the file ahead of time seems reasonable. You’ll also be writing at the same time, so this sounds like an I/O bound problem and you’ll be limited by the speed of your disk, but who knows.
Why not bypass the dataframe API completely and go directly to a 2D Zarr array with the desired chunks?
@rabernat, I was only using the dataframe api because read_csv
is an easy way to read these text files efficiently. But I guess that’s a good point – I certainly should be able to read arrays from text efficiently without using the dataframe API.
Thanks to the help I received here, I was able to figure out a nice solution!
It turned out that each line of the fixed width ASCII file contained a single time step model output from a CONUS grid, with the non-NaN values as a sequence. So after deciding I wanted the chunk size along the time dimension to be 120, I split the ASCII files using split
into files with 120 rows. This resulted in 12 files for each variables.
I was then able to use dask delayed to read and process in parallel, following the advice of @rabernat to avoid the dataframe API. Indeed, simply using np.loadtxt()
was blazing fast on these fixed width ASCII files.
I was able to convert the original 100GB of ASCII data (9 variables) to a 15GB Zarr file in about 15 minutes using a 4 processor local cluster.
See the Full Notebook here for details!
5 Likes