Friday, February 19, 2021

ML at Scale Part 3: distributed compute

In part 2 I focused on ML when your data won't fit in memory. This post will move on to slow, or compute bound ML instead. I'll continue to use Dask and explore how it can help us.

Dask leverages multiple CPU cores to enable efficient parallel computation on a single machine. It can also run on a thousand-machine cluster, breaking up computations and routing them efficiently. The Comparison to Spark documentation is a great reference for understanding Dask in the context of an older tool and that older post. Perhaps the most interesting difference is Spark just being an extension of the MapReduce paradigm and Dask being able to implement more sophisticated algorithms by being generic task scheduling-based.

Sticking with the scikit-learn examples, here is replacing a parallel algorithm's Joblib backend with Dask to (potentially) spread work out across a cluster.

The Dask documentation is quick to point out in their best practices that not everyone needs distributed ML as it has some overhead. Compiling code with Numba or Cython could help, as could intelligently sampling some of your data. In this post I got huge speedups by vectorizing some code that was looping through large matrices.

Across this three part series we've now seen how to speed up reading large datasets, work with datasets that don't fit entirely in memory, and distribute processing across multiple machines. There's obviously a lot more to this, but I wanted to develop a better intuition for how to approach these types of issues and at least know where to start. Hopefully you learned something too.

UPDATE:

A specific tool isn't supposed to be the focus of this post. Dask was used here to illustrate the idea and show how simple it can be, but there are other options in this space. Here are a couple of other examples that I've come across:

UPDATE 2:

Both Pandas 2.0 and Polars now use Apache Arrow as a memory model. Polars, a relatively new entrant to this space, is implemented in Rust and exposes a Python API. It is created specifically for fast data processing, not ML, but overlaps enough with this series that it is worth checking out.

Monday, February 15, 2021

ML at Scale Part 2: memory

When data we want to train a machine learning model on becomes too big to fit in memory, we need to find a way to work on subsets of the data. I hinted at this in part 1. We can use Pandas to read chunks of a file, but that is fairly primitive and slow.

Libraries like Vaex and Dask attempt to abstract this away. 

Vaex provides lazy, out-of-core (not all in memory at once) DataFrames via memory-mapping. Pre-processing and feature engineering are more efficient, and memory is freed up for model training. It also has a vaex.ml package which provides a scikit-learn wrapper.

Dask provides large, parallel DataFrames composed of smaller Pandas DataFrames. This helps with data too big to fit in memory because the individual Pandas DataFrames can be stored on disk. DaskML provides estimators designed to work with Dask DataFrames.

The accuracies both came out to 96%. Similar ideas, different implementation. In fact, these DataFrames remind me a little bit of the persistent data structures covered in my Exploring Immutability post.

Even with these fancy DataFrames, many machine learning algorithms are designed train on all the data at once. If our data is too big to fit in memory then that's going to be a problem. As in the code above, we need to use online learning or incremental algorithms to solve this problem. The Incremental and IncrementalPredictor classes handle "streaming" the data (in batches) and we specify upfront the possible classes.

DaskML adds several generalized linear model implementations. scikit-multiflow is designed for actual streaming data and adds several other online learning algorithms. Neural networks are also trained in this manner, often being fed "mini-batches", so they are good candidates for datasets that don't fit in memory as well.

Stay tuned for part 3 where I'll get into being compute-constrained instead of memory-constrained.

UPDATE:

There's a lot of I/O and memory-related work coming out of the TensorFlow community as well. Checkout Better performance with the tf.data API as it overlaps nicely with Parts 1 and 2 of this series.

Sunday, February 7, 2021

ML at Scale Part 1: I/O

I recently came across a somewhat large dataset from a Kaggle competition where the data was provided as an approximately 6 GB CSV file. A frequent comment in the discussion forum was how long it took just to read this file. A few GBs is large enough where this starts to become noticeable, but it's not really that big. If a laptop can have a 1+ TB drive and 32 GB memory then this isn't even in the realm of "big data". That's good, though, because it means there are some simple tricks we can use to cut down on that read time.

The pandas read_csv() method takes about 63 seconds for me. That's our baseline.

First we try reducing the precision. This gets us to 59 seconds. Not great.

Next we try reading the files in chunks. This is actually slower, but the technique could help us if the file didn't fit entirely in memory. More on that in future posts.

Then we try Dask which will spread the work across multiple processers. 30 seconds. Better, but I still don't want to wait that long. More on Dask in future posts as well.

Finally we convert the CSV file to a different format. I tried Apache Parquet but there are others. It's a binary columnar format (remember Column-oriented Database Basics?). Stored in this manner the data is 2.5 GB. And this gets us to just 3 seconds for reading the whole file!

Converting our data to the binary file format and possibly reducing the precision or using Dask as well would really shorten our feedback loop while training a ML model. It would seem that any cleaning of the data or preprocessing that we can do ahead of time would make sense to do once, before converting the file format, when the data is this size.

UPDATE:

Apache Arrow is another project to checkout in this space along with memory-mapped files. Reading this data in the Feather file format is even faster than Parquet.