alxolr's blog

the road to node.js mastery

How to fork a nodejs stream into many streams

How to fork a nodejs stream into many streams

Edit 7 Jan 2020

After posting the article on reddit, the user chocoreader correctly notified that it's not a fork of the stream. It's a classical enterprise pattern Content-Based Router which routes each message to the correct recipient based on message content.

Intro

In the following tutorial, we will process a relatively big CSV file using nodejs streams API and the fast-csv module. The CSV file contains economic data for all the countries on the globe.

Our goal is to create a CSV file containing the data for each country separately. The result will be stored in a separate folder called countries.

Code solution

'use strict';

const fs = require('fs');
const csv = require('fast-csv');
const { Transform, PassThrough, pipeline } = require('stream');

const readable = fs.createReadStream('1500000-sales-records.csv');

// Create a map of streams for each country, at first empty
const streams = {};

const splitStream = new Transform({
  objectMode: true,
  transform(object, _enc, next) {
    // if there is no stream created for the country, then we create one
    // our csv contains the Country header which is used as discriminator map here
    if (!streams.hasOwnProperty(object.Country)) {
      const through = new PassThrough({ objectMode: true });
      streams[object.Country] = through; // attaching the passthrough to the streams object

      pipeline(
        through,
        csv.format(),
        fs.createWriteStream(
          'countries/' + object.Country.toLowerCase() + '.csv'
        )
      );
    }

    streams[object.Country].push(object); // pushing the csv line to filtered stream
    next(null);
  },
  flush(next) {
    // Once the stream has finished processing we need to notify all active
    // streams that they are finished by sending null
    Object.keys(streams).forEach(country => {
      streams[country].push(null);
    });

    next(null);
  }
});

pipeline(readable, csv.parse({ headers: true }), splitStream, err => {
  if (err) console.error(err);

  console.log('everything went very fine');
});

Conclusion

The idea is straight forward we create the streamflow the moment we find data. After we push data to the subsequent stream, the memory footprint is linear.
A critical moment is to use the flush() function to send null to all the opened streams. This will finish the process and close all the opened streams.

If you find this article useful, please share it, follow me on twitter @alxolr, or subscribe to get an email notification when a new article is released.

×