Datapipe Best Practices

February 26, 2019


Lessons learned across 15 years of building datapipes.

I often work on software projects that process a lot of data. Not “Big Data”-sized data (at least not recently), but “needs some careful thought to avoid a quagmire”- sized data. Architecturally, the best approach to data of this size is a “data pipeline” or “datapipe.” A datapipe is like an assembly line, where each step takes inputs, performs some useful operation, and produces an output, which is then consumed by the next step in the line. The final output is some useful knowledge – reports, predictions, recommendations, trained models, etc.

[INPUT] ->
  (Operation 1) ->
    (Operation 2) ->
      ... ->
        [OUTPUT]

The largest pipeline I built and ran was back in 2006. It ingested approximately 200GB of advertiser data every day from dozens of servers, hovered around 15TB in total retained data, and powered internal and external business intelligence reports for an internet advertising company that reached hundreds of millions of people daily. (15TB may sound small now, but adjusted for Moore’s law – or more accurately, Kryder’s law – it was huge. The biggest hard drive available at the time stored 750GB and cost a fortune. We used a SAN from EMC to handle it costing ~$1M. Now it could fit on a single 20TB drive for $800 with room to spare.)

The smallest pipeline I maintain is for my personal finances. I have raw bank and credit card records since 2015 totaling ~14k transactions. I run this through a series of steps to extract and semi-automatically tag transactions, eventually producing Ledger double-entry bookkeeping files for automated reporting and forecasting.

My most public facing datapipe was part of FiveStreet.com. It trained a machine learning system that could extract prospective customer information from semi-structured emails. When I left the project in October 2015, it processed over half a billion emails per year.

The most high-stakes datapipe of my career is the one I work on now for GlassRoom – it trains neural networks to understand how to think about publicly traded companies, then uses those same neural networks to create a proprietary, bottom-up estimate of a company’s net present value, and then uses those estimates to build a stock portfolio for an investment fund with a meaningful amount of capital behind it.

I share this because it’s fun to brag about my work, but also because this is evidence that datapipes scale well in both directions – upward and downward. The principles I’ve outlined below lead to all the hallmarks of a well-written software project – maintainable code, flexible purpose, stable operations, and minimal technical debt. Simplicity and elegance.

Gathering Data

Typically, the first step in a datapipe involves gathering data from multiple sources. The data sources may be internal (within a company) or external (purchased from 3rd party vendors or scraped).

When starting development of a pipeline, take a shortcut and gather data by hand. Eventually, it is worth the time to automate the data-grabbing process as much as possible, but this typically has low technical risk and can be deferred to one of the last steps in the project. Though the technical risk is low, this code is very important; it doubles as a form of documentation that describes from where – and how – the data is retrieved.

Often, fetching data is an expensive process (in terms of wall clock time) so save the data files in their original raw form, preserving directory structures and filenames to avoid re-downloading the same files in the future.

If the files are relatively chunky and few in number, then storing files directly to disk is fine. For a large number of small files, as is common in web scraping, some kind of local key-value store (e.g. LevelDB) with a custom index can help.

$PROJECT/
  data/
    sources/www.foo.com/bar/page1.html
    sources/www.foo.com/bar/page2.html
    ...
    sources/sales_history/2015-01-01 sales.csv
    sources/sales_history/2015-01-02 sales.csv
    ...
    sources/web_traffic/analytics_2015_01_01.json
    sources/web_traffic/analytics_2015_01_02.json
    ...

Saving raw data is especially important for HTML files. When web scraping, it is tempting to pre-process the data (e.g. cleaning up HTML) when you fetch the file. Don’t do this! Instead, write the raw web pages to a datastore, and then have a later step in the pipeline extract data from the pages. Otherwise, a change in scraping (fixing a bug, extracting more fields, etc.) will require re-fetching the data.

Even if it is not expensive to fetch data, it is advisable to retain raw data for troubleshooting in case of unexpected results. Often, data vendors or even internal departments will make “minor” tweaks to formats or values that break downstream code. Having raw files from before and after the change are helpful to make sense of why things broke.

Extracting Data

Now that the data is stored locally, the next step is to make it easily consumed by downstream code. The “extraction” step will wrangle the data from each source into shape in the following ways:

Most importantly, fail loudly! Be strict in what the system accepts, apply assertions around incoming file names, data formats, column names, and data types and, if possible, ranges of values. Fail loudly if anything deviates from expectations.

Combining and Processing Data

The first two steps in any datapipe are fairly obvious – get the data, extract the data into a more easily consumable format. After that, the datapipe will probably have somewhere between 3 to 20 more steps, with each step emitting some intermediate data, ie: a “checkpoint”. (More below.)

Deciding on the interfaces and boundaries for those steps can be an art, and the boundaries can often change over time. As the datapipe evolves, steps may split or merge.

I like to think about each step as having a starting boundary and an ending boundary. Here are a few good ways to find boundaries:

When naming steps (and their corresponding code files), I like to follow an “interactor” approach as much as possible. Think of each step as completing a use case, and name the step according to the business or conceptual value that it provides, rather than something technical. Also, follow a “verb_noun” naming convention. Put those together and you get: predict_best_mailings vs. nearest_neighbors_regression_step.

Finally, avoid circular dependencies. The datapipe should only flow in one direction. If data needs to flow upstream, you’ve done something wrong.

Checkpoints

If a data pipe is a series of steps that modify data, then a checkpoint is the name for what happens between those steps, when the data is at rest. The output of each step is a checkpoint, saved to a different, descriptively named file (or files), overwriting the file(s) produced by the same step in the previous iteration of the datapipe.

Checkpoints are useful because they are a “high water mark” for data processing. When a step fails, you can fix that step and re-run the pipeline starting with the output of the last successful step. Without checkpoints, the datapipe would need to re-process all previous steps.

When thinking through checkpoints, remember that space is cheap. Err on the side of storing more data, not less. Even if the pipeline will handle a large amount of data, buying more hard drives is typically way cheaper than hiring more engineers. As we saw above, one can buy a 20TB drive for ~$800 today.

Data Retention

I divide files into three types, with three different lifespans:

Data Storage

Most software projects benefit from some kind of transactional database or key-value datastore. Datapipes are not like most software projects. Over time I’ve found that the best way to store checkpoint data (the output of a step in the pipeline) is a flat file. This can be plain text files (.csv or .tsv), or something sexier like HDF5, Feather, or Parquet if you need faster read/write access.

To belabor the point, do not use a relational database to store checkpoints. Relational databases are optimized for transactional operations, where any one operation typically touches only a small subset of the data. This is not the access pattern that you need for reading and writing checkpoint data, and you will pay a tradeoff in disk space, RAM, bulk read, and bulk write performance for using the wrong tool.

The preferred access pattern of a datapipes is that a given step will read multiple flat files into RAM (or file-backed memory), operate on the data, and then write a new flat file to disk, overwriting the flat file created during the previous iteration of the datapipe.

$PROJECT/
  data/
    sources/...
    checkpoint1.dat
    checkpoint2.dat
    checkpoint3.dat
    ...

This may sound wasteful, because it means we reprocess a large amount of data that we’ve already processed, but it avoids a lot of potential bugs. By treating each step as an idempotent operation, code changes become much less scary. Depending on the size and scope of your datapipe, this approach may be sufficient to reprocess everything for the life of the project. Planning for anything else at the start is premature optimization. If the length of some step in the pipeline gets too long, prefer to either optimize the code or buy more powerful hardware rather than resorting to incremental processing.

The opposite approach – incremental processing – creates a link between the processed data and the version of code that produced it, which means if you change the code you need to figure out which data is bad. It is hard enough to fix bugs without worrying how to patch up previously processed results.

Avoid incremental processing in all cases, except the following:

First, when the pipeline can cleanly partition processed work by date. In this case, it might make sense to only process some recent time period, ie: year to date or month to date. You should create dated checkpoints, and the pipeline will still reprocess data for the most recent time period, but it can avoid redoing work for previous time periods. Here, you trade off the cognitive complexity of having to worry about reprocessing previous checkpoints for the benefit of a much faster feedback cycle. Then, when there is some kind of code change that requires reprocessing, the datapipe can reprocess in chunks, from most- to least-recent, during what would usually be downtime.

The second exception is when the pipeline generates Neural Net / Machine Learning Models. I will touch on this later.

Tools

One of the worst software development experiences of my life was using the visual tools of Microsoft’s SQL Server Analysis Services 2005 to build steps in a data pipeline. The tool made the easiest part of a datapipe easier (ie: connecting the output of one job to the input of another) at the expense of making everything else impossibly hard. All of our custom logic was buried inside of a massive, auto-generated XML file that also tracked layout positions, sizes, and last-accessed timestamps, making version control nearly meaningless. Merge conflicts sometimes took hours to resolve. Debugging was unnecessarily complicated. Eventually, we moved as much code as possible to plain old text files, using the visual tools only when necessary, mainly for orchestration.

Since then, I’ve stuck to building datapipes out of the simplest tools possible, and it has worked. The make utility will get you way farther than you would expect. It can be used to orchestrate steps in a pipeline by treating checkpoint files as intermediate build targets, only re-running the steps that are necessary to run when a “dependency” (input data) has changed. And it’s intuitive. Want something to re-run? Delete the file. Want to cause any dependencies to re-run? touch the file.

# Makefile

.PHONY: help download

help:
    cat docs/help.txt

download:
    # download source 1 to data/sources/$source1/
    # download source 2 to data/sources/$source2/
    ...

data/checkpoint1.h5: data/sources/*/*
    # Run step 1 code
    # Write to data/checkpoint1.h5
    ...

data/checkpoint2.h5: data/checkpoint1.h5
    # Run step 2 code
    # Write to data/checkpoint2.h5
    ...

checkpoint3.h5: data/checkpoint2.h5
    # Run step 3 code
    # Write to data/checkpoint3.h5
    ...

As a bonus, a makefile is language-agnostic. The checkpoint data can be produced by any language. Data gathering code can use Ruby or even wget, where other steps can leverage R or Python.

Almost every pipeline I’ve built has started out with some variation of this. Most of them graduated to a more complicated orchestration tool, but simple Makefiles worked for way longer than I would have anticipated.

Quality and Testing

Testing a datapipe is both more and less difficult than testing the same amount of code in other architectures.

Techniques like unit testing don’t apply neatly to datapipe code. Much of the code can’t easily be broken into functions, and the algorithms require so much data (in terms of rows, columns, or both) that preparing test samples feels like a Sisyphean task. Furthermore, the logic tends to be complicated enough that it is difficult to verify a correct answer without re-running the logic itself.

As much as I hate it, datapipe code is often best structured as long unbroken scripts where data is slowly massaged into shape using vectorized functions. Refactoring into subroutines would only add confusion for the next developer and wouldn’t help in terms of reusability – in most cases there would only ever be one caller to the method.

At the same time, unit testing seems less necessary with datapipes. The code tends to have lower cyclomatic complexity than other types of code, so the need for unit tests is reduced. (Curiously, a lot of complexity seems to fade away when your code is not intended for human interaction.) I’ve found that on web apps, I need 95+% coverage to feel confident that I won’t get alerts in the middle of the night. On a datapipe, I can get the same amount of confidence after a single full pass through the pipeline, assuming the output of the pipe passes muster.

The best approach to quality I’ve found is the following:

  1. Testing all new code on actual, real production data, in an environment identical to production.
  2. Liberal assert statements during the data extraction process to validate that upstream sources haven’t started to feed the datapipe garbage. Likewise, assert statements throughout the code, testing value ranges, counts, and the presence or absence of null values. Especially during the extraction step, to make sure that upstream data sources haven’t started sending something that your system interprets as garbage.
  3. Manually digging into one or two examples in the data, and making sure they look sane.
  4. Automatically generating and saving timestamped internal reports, and manually spot-checking them against previous reports to detect bugs before deploying major changes. Here, images help. Histograms make things clear in a way that numbers on a screen can’t.
  5. Careful code reviews by someone with fresh eyes.
  6. A healthy fear of upgrading 3rd party libraries – even if the library claims to follow semantic versioning, and even if it’s only a dot-dot release. I’ve seen a datapipe grind to a halt after upgrading to a new point release of a very popular library because we used the library in some way that caused an inefficient edge case to get triggered. When working with large amounts of data and pushing hard on the CPU, GPU, RAM, and disk, even the slightest change can have unexpected consequences.
  7. Never deploy on a Friday. If something breaks, you’ll want time to fix it.

I’ve explored the idea of generating statistical models for what data should look like in each checkpoint, and then automatically flagging anything that shows up outside of acceptable thresholds. I’ve also toyed with fuzzing inputs to find inputs that could crash the datapipe. In practice, this lead to many false positives, much wasted time, and me kicking myself for overengineering.

In short, the final output of your datapipe is like one giant integration test, and it is better than any artificial test you could write. Create visualizations so that it is easy for humans to judge the quality of output, and then make a pre-release checklist that includes actually looking at those visualizations. Remember to save snapshots so that you can compare against previous versions if (when?) things go off the rails.

Operations

Operations is where all of my muttering about idempotent steps and checkpoints start to pay off. When done correctly, the datapipe can be killed and restarted at any time without worry, and we know exactly which step failed, plus a snapshot of what made the step fail. Furthermore, with just a little bit of additional work, we can log the duration and memory usage of each step, and monitor it through a dashboard and/or alerting system.

The last recommendation, for mission critical systems, is to maintain a hot backup system, in a different geographic location, that runs on the same schedule as the production datapipe.

Growth: More Data

Data inevitably grows. The datapipe will undoubtedly need to handle more and more data as time goes by, eventually saturating its current hardware.

Let’s assume that we’ve already plucked the low-hanging fruit of optimizing code. We’ve looked at which steps take the longest, looked at which sections in those steps take the longest, and then either done some automated profiling, or at the very least pulled it all into a Jupyter notebook, and used “%%time” statements while tweaking code to try to get some quick wins. Let’s assume that any other optimizations will have a negative impact on code clarity or maintainability.

The next step is to think about scaling up the hardware – moving the datapipe to a more powerful box on which to run – when the data hits ~70% of capacity of its current hardware. The threshold of 70% is a reasonable rule of thumb, but the actual number may vary depending on a number of factors – how quickly the data is growing, how quickly data growth is accelerating, and how critical the datapipe is, to start. Be sure to leave enough headroom so you can sleep at night.

If scaling up isn’t an option, then the datapipe will need to scale outward – partitioning the data so that processing can run in parallel. One can either partition by horizontally, processing different rows on different pipelines, splitting by user_id modulo number_of_servers for example. Or vertically, where different datapipe servers handle different steps in the process, and then the data comes back together at the end. It is difficult to say much more about this here. There is no general advice, solutions require careful thought and tailoring to the data, and are beyond the scope of my writing.

Well before the data size affects operations, it will affect development. The key to good development is a fast feedback cycle (it’s what makes REPL languages so powerful). As a datapipe step takes longer to run, changing the code becomes more and more difficult. When a step grows much beyond 30 minutes to run, development seems to grind to a halt. (For me… this may vary by developer.)

To preserve a fast feedback cycle, the most useful approach I’ve found is to create a subset of data. Build a development-mode flag into the extraction step that chooses some small percentage of the data in a predictable and repeatable fashion. (Use a hash function and/or a seeded random number generator to get predictable randomness.) This will allow the entire datapipe to be run in a fraction of the time it might run in production, while still exercizing all code paths (if the subset is chosen thoughtfully.)

Growth: More Functionality

Software isn't built, it is grown.

At some point, an interactor or model in the datapipe will need to change in a big way. This is tricky for a number of reasons:

  1. At development time, we’ll want to run both the old and new code side-by-side and compare results.
  2. At production time, we’ll want the ability to easily roll back to the old version.
  3. We want to do this in a way that is clean, consistent, and where different versions of code don’t overwrite eachothers’ checkpoint data.

To manage this complexity in a clean way, set an environment variable that determine which version of an interactor or model to use. The Makefile that orchestrates the code can use the environment variable to call the correct version of code:

# Makefile

...

data/best_mailings_$(MAILING_VERSION).h5:
    python ./core/interactors/predict_best_mailings_$(MAILING_VERSION).py

...

The interactor can use environment variables to figure out from where (and to where) data should be read (and written). This is conceptually the same as creating a backup copy of the data directory, but by letting a computer do the bookkeeping we reduce development friction and ensure consistency of approach. And we avoid making multiple copies of our source data, which can grow quite large.

Above, we change the checkpoint filename based on the MAILING_VERSION environment variable. Depending on how the steps within the pipeline, we may want to create entirely different directories underneath data/

$PROJECT/
  data/
    sources/...
    checkpoint1.dat
    version_1/checkpoint2.dat
    version_1/checkpoint3.dat
    ...
    version_2/checkpoint2.dat
    version_2/checkpoint3.dat
    ...

Trained Models

Often, a pipeline will train (or fit) models that are used downstream in later processing steps. The training process can be long, taking days or even weeks. Furthermore, in the case of neural nets, an expensive GPU may be required for training the model, but not for running the model. In this case, developers may need special hardware to train models, but we can save money by omitting this hardware from the production environment (and the hot backup environment.)

These models aren’t like the data produced by other steps in the pipeline. It is expensive (from a time and resource perspective) to recreate them on every pass through the pipeline, as we try to do with other checkpoint data. Also, training models is a stochastic process, so even if we could retrain models daily, this would cause the datapipe to give inconsistent results, making debugging more difficult and possibly causing datapipe results to “flap” (give opposite answers on back-to-back runs for datapoints that are on decision boundaries.)

But models aren’t code, either. We don’t want to check the models into source control, because they are typically quite large compared to what version control systems were built to handle, and the model files are binary data. (Well, at the very least they are a bunch of weights without human meaning.)

The best approach I’ve found is to track a hash (eg: SHA1) of each model, treating it like the version number of a 3rd party library. The hashes should be tracked in source control. In production, the datapipe should verify that the hashes of the models match the hashes in source control.

Real-Time Updates

When new raw data is produced, how quickly should it propagate to the final results? Do you need real-time (or nearly real-time) updates? Datapipes are useful for update intervals of a day or longer. Hourly updates are possible, but for anything more real-time than that, an entirely different set of architectural principles apply.

Some anecdotal evidence, in ~2009 Google began testing their new back-end indexing system, code-named “Caffeine”. It allowed Google to continuously update search results. Architecturally, this was a huge change, requiring a move from a staged datapipe (using MapReduce) to a more incremental approach (using BigTable).

That said, even when real-time updates are a requirement, it is useful to construct a rudimentary pipeline first, for a variety of reasons:

Back


Content © 2006-2019 Rusty Klophaus