Uploading CSV to DynamoDB with Node JS

Uploading CSV to DynamoDB with Node JS

So I wanted to upload CSV to DynamoDB.
Easy right?
Not so fast.

It turns out, you have to obey your provisioned write capacity.

Unlike S3, "Simple Storage Service" where you simply upload a file, DynamoDB isn't "Simple". There's no "upload CSV" button. You have to write a program to do it.

So, first take a look at your provisioned write capacity.

as you can see, I have 80 write capacity units. For a large upload, you'll want to temporarily increase this. And then when you are in "operational" mode, switch back to a lower capacity.

I wrote a Node program to manage my throughput.

var parse = require('csv-parse');
var AWS = require('aws-sdk');
var fs = require('fs');
var uuid = require('uuid');
var _ = require('lodash');

AWS.config.update({region: 'us-west-2'});

var db = new AWS.DynamoDB();

var parser = parse({delimiter: ',', columns: true});

var batch = [];

function sendBatch() {
    if(batch.length == 0) return readStream.resume();
    console.log(batch.length);
    readStream.pause();

    // get these off the queue immediately so there's no race conditions.
    var mybatch;
    if(batch.length > 25) {
        mybatch = batch.splice(0,25);
    } else {
        mybatch = batch.slice(0);
        batch.length = 0;
    }

    var items = _(mybatch)
        .map(n=>_.omit(n, ['digest', 'grp', '']))
        .map(n=>_.assign(n, {id: uuid.v4()}))
        .map(n=>n.LATITUDE == 'NA' ? _.omit(n, ['LATITUDE', 'LONGITUDE']) : n)
        .map(n=>{return {PutRequest:{Item:_.mapValues(n, (v,k)=>/ITUDE$/.test(k)?{N:v}:{S:v})}}})
        .value();

    db.batchWriteItem({RequestItems:{gf_cause:items}})
        .on('success', (response) => {
            console.log("Success!");
        })
        .on('error', (response) => {
            console.log("Error!");
            console.log(response);
        })
        .on('complete', (response) => {
            console.log("Always!");
            if(batch.length < 25) readStream.resume();
        }).
        send();
}

parser.on('readable', function(){
  while(data = parser.read()){
    batch.push(data);
  }
});
parser.on('error', function(err){
  console.log(err.message);
});
parser.on('finish', function(){
  sendBatch();
  clearTimeout(timer);
});

var readStream = fs.createReadStream('toDynamo.csv');
readStream.on('open', ()=>{readStream.pipe(parser);})

var timer = setInterval(sendBatch, 313);

Notice, I can pause and resume the readStream like I do on line 16, 18, and 46 in order to control my queue size.
The actual batch calls happen on the throughput interval, which is your handle to manage your throughput. Set it lower to make your requests more frequent.

var timer = setInterval(sendBatch, 313);

Through experimentation, and watching your Write throughput on the AWS DynamoDB dashboard, you can adjust the timer interval until your write throughput is close to your capacity.

Of course DynamoDB max batch write is 25 so we will always send requests of 25 unless we are at the end of the input (line 59)

Lines 30-33 maps your CSV line into a PutRequest.

All of my values are strings, except for LONGITUDE and LATITUDE so I put a check in line 33 to handle that case.

I hope this helps you out a little. It would have helped me, if I'd found an example like this so I wouldn't have had to figure it out the hard way.

This program's going to run for a long time. Make sure you have caffeine or something to keep your machine awake.