javascript - Node.js: Streaming from MongoDB to file - Stack Overflow

I am fairly new to javascriptnode.js and am trying to get a very basic scenario working: connect to Mo

I am fairly new to javascript/node.js and am trying to get a very basic scenario working: connect to MongoDB, transform the JSON response into CSV, write it to a file. I have tried as below with

fs = require('fs');
var MongoClient = require('mongodb').MongoClient;
var Db = require('mongodb').Db;
var Server = require('mongodb').Server;
var Json2csvStream = require('json2csv-stream');
var Stream = require('stream');
var JSONStream = require('JSONStream');
var es = require('event-stream');
var csv = require('csv');

var fields = ['execAmendTime', 'execTime', 'execType', 'lastMkt', 'manualExecFlag', 'orderId', 'riskTrade', 'rootOrdId', 'salesCommissionRate', 'salesCommissionType', 'theoPov20Px',
'theoPov20BL', 'tradeFlags', 'tradeNotes', 'transactTime', 'version', 'book.bookName', 'businessUnit', 'missionRate', 'missionSource', 'missionType', 'counterBook.bookName', 
'counterParty.name', 'createTime', 'currency', 'direction', 'execQuantity', 'fxRate','orderQuantity', 'positionTrader.name', 'price', 'primaryTrader.name','rootSystem', 'source',
'sectorGicsLevel1', 'salesTrader.name', 'tradedPrice', 'isCRB', 'clientCategory', 'tradeId', 'tradeDate', 'instrument.instrumentRic', 'notionalUSD','missionUSD', 'region'];

// Connect to the db
MongoClient.connect("mongodb://*****", function (err, db) {
    if (err) { return console.dir(err); }

    if (!err) {
        console.log("We are connected");
    }

    db.open(function (err, db) {
        if (err) { return console.dir(err); }
        var newDb = db.db("test_db");

        var collection = newDb.collection('test', function (err, collection) {
            if (err) { return console.dir(err); }
            var parser = new Json2csvStream();
            var writer = fs.createWriteStream('out.csv');
            var stream = collection.find({ tradeDate: new Date('2015-12-29T00:00:00.000Z') }).stream();

            stream.pipe(parser).pipe(writer);

            stream.on("data", function (item) {
                console.log(item);
            });

            stream.on('end', function () {
                console.log("ended");
            });

            stream.on("end", function () {
                newDb.close();
                db.close();
            });
        });
    });
});

I am getting errors as below.

I tried to add tranformations with JSON.stringify etc. but none of my attempts worked. It seems I need to wait until the query stream to Mongo has pleted before starting to feed it into the json2csv transformer?

Any ideas? Am I doing something fundamentally wrong here?

Many thanks!

Output:

We are connected
D:\WebTrial\MongoProject\node_modules\mongodb\lib\utils.js:98
    process.nextTick(function() { throw err; });
                                ^

TypeError: Invalid non-string/buffer chunk
    at validChunk (_stream_writable.js:178:14)
    at Writable.write (_stream_writable.js:205:12)
    at ondata (_stream_readable.js:525:20)
    at emitOne (events.js:82:20)
    at emit (events.js:169:7)
    at readableAddChunk (_stream_readable.js:146:16)
    at Readable.push (_stream_readable.js:110:10)
    at D:\WebTrial\MongoProject\node_modules\mongodb\lib\cursor.js:1102:10
    at handleCallback (D:\WebTrial\MongoProject\node_modules\mongodb\lib\utils.j
s:96:12)
    at D:\WebTrial\MongoProject\node_modules\mongodb\lib\cursor.js:673:5

I am fairly new to javascript/node.js and am trying to get a very basic scenario working: connect to MongoDB, transform the JSON response into CSV, write it to a file. I have tried as below with

fs = require('fs');
var MongoClient = require('mongodb').MongoClient;
var Db = require('mongodb').Db;
var Server = require('mongodb').Server;
var Json2csvStream = require('json2csv-stream');
var Stream = require('stream');
var JSONStream = require('JSONStream');
var es = require('event-stream');
var csv = require('csv');

var fields = ['execAmendTime', 'execTime', 'execType', 'lastMkt', 'manualExecFlag', 'orderId', 'riskTrade', 'rootOrdId', 'salesCommissionRate', 'salesCommissionType', 'theoPov20Px',
'theoPov20BL', 'tradeFlags', 'tradeNotes', 'transactTime', 'version', 'book.bookName', 'businessUnit', 'missionRate', 'missionSource', 'missionType', 'counterBook.bookName', 
'counterParty.name', 'createTime', 'currency', 'direction', 'execQuantity', 'fxRate','orderQuantity', 'positionTrader.name', 'price', 'primaryTrader.name','rootSystem', 'source',
'sectorGicsLevel1', 'salesTrader.name', 'tradedPrice', 'isCRB', 'clientCategory', 'tradeId', 'tradeDate', 'instrument.instrumentRic', 'notionalUSD','missionUSD', 'region'];

// Connect to the db
MongoClient.connect("mongodb://*****", function (err, db) {
    if (err) { return console.dir(err); }

    if (!err) {
        console.log("We are connected");
    }

    db.open(function (err, db) {
        if (err) { return console.dir(err); }
        var newDb = db.db("test_db");

        var collection = newDb.collection('test', function (err, collection) {
            if (err) { return console.dir(err); }
            var parser = new Json2csvStream();
            var writer = fs.createWriteStream('out.csv');
            var stream = collection.find({ tradeDate: new Date('2015-12-29T00:00:00.000Z') }).stream();

            stream.pipe(parser).pipe(writer);

            stream.on("data", function (item) {
                console.log(item);
            });

            stream.on('end', function () {
                console.log("ended");
            });

            stream.on("end", function () {
                newDb.close();
                db.close();
            });
        });
    });
});

I am getting errors as below.

I tried to add tranformations with JSON.stringify etc. but none of my attempts worked. It seems I need to wait until the query stream to Mongo has pleted before starting to feed it into the json2csv transformer?

Any ideas? Am I doing something fundamentally wrong here?

Many thanks!

Output:

We are connected
D:\WebTrial\MongoProject\node_modules\mongodb\lib\utils.js:98
    process.nextTick(function() { throw err; });
                                ^

TypeError: Invalid non-string/buffer chunk
    at validChunk (_stream_writable.js:178:14)
    at Writable.write (_stream_writable.js:205:12)
    at ondata (_stream_readable.js:525:20)
    at emitOne (events.js:82:20)
    at emit (events.js:169:7)
    at readableAddChunk (_stream_readable.js:146:16)
    at Readable.push (_stream_readable.js:110:10)
    at D:\WebTrial\MongoProject\node_modules\mongodb\lib\cursor.js:1102:10
    at handleCallback (D:\WebTrial\MongoProject\node_modules\mongodb\lib\utils.j
s:96:12)
    at D:\WebTrial\MongoProject\node_modules\mongodb\lib\cursor.js:673:5
Share Improve this question edited Dec 31, 2015 at 19:38 APC 146k20 gold badges177 silver badges286 bronze badges asked Dec 31, 2015 at 14:15 Jennifer TenzerJennifer Tenzer 111 silver badge2 bronze badges 1
  • Can you point out which lines of your source the last three lines of the stacktrace are referring to? – Nicky McCurdy Commented Dec 31, 2015 at 19:26
Add a ment  | 

2 Answers 2

Reset to default 6

It's because find().stream() streams objects while Json2csvStream expects strings. event-stream can help you stringify the objects. I also simplified your code, there were unnecessary stuff:

var fs = require('fs');
var MongoClient = require('mongodb').MongoClient;
var es = require('event-stream');
var Json2csvStream = require('json2csv-stream');

// var Db = require('mongodb').Db;
// var Server = require('mongodb').Server;
// var Stream = require('stream');
// var JSONStream = require('JSONStream');
// var csv = require('csv');

var fields = ['execAmendTime', 'execTime', 'missionUSD', 'region'];

// Connect to the db
// you can put the db name in the url
MongoClient.connect("mongodb://localhost:27017/test_db", function (err, db) {
    if (err) {
        return console.dir(err);
    } else {
        console.log("We are connected");
    }

    // without strict: true, err is always null
    // in strict mode, there is an err if the collection doesn't exist
    db.collection('test', { strict: true }, function (err, collection) {
        if (err) {
            return console.dir(err);
        }

        var json2csv = new Json2csvStream();
        var writer = fs.createWriteStream('out.csv');

        var mongoStream = collection.find(
            { tradeDate: new Date('2015-12-29T00:00:00.000Z') }
        ).stream();

        var stream = mongoStream
            .pipe(es.map(function (doc, next) {
                doc = JSON.stringify(doc);
                // console.log(doc);
                next(null, doc);
            })).pipe(json2csv).pipe(writer).on('close', function () {
                console.log('done...');
                db.close();
            });
    });
});

@Shanoor's answer helped me a lot. Here is my modified (generic) script to write the MongoDB cursor/stream output to a gz csv file.

const fs = require("fs");
const csvStringify = require("csv-stringify");
const { Transform } = require('stream');
const zlib = require("zlib");

const { MongoClient } = require('mongodb');
const url = 'mongodb://localhost:27017';
const client = new MongoClient(url);
const dbName = 'myProject'; // your DB name

// input the file name on mand prompt
const gzFilePath = process.argv[2];

(async () => {
  // Use connect method to connect to the server
  await client.connect();
  console.log('Connected successfully to server');
  const db = client.db(dbName);

  // exucute the query
  const cursor = await db.collection(/*collection name*/'user')
    .find({/* your query*/});

  // write stream to input gz file name
  const writeStream = fs.createWriteStream(gzFilePath);
  const stringifier = csvStringify.stringify({
    header: true
  });
  writeStream.on('finish', () => {
    console.log('All filtered records written.');
  });
  writeStream.on('error', err => {
    // handle the error properly here
    throw error;
  });

  const gz = zlib.createGzip();
  // process the data with stream
  cursor.stream()
    .pipe(handleStream)
    .pipe(stringifier)
    .pipe(gz)
    .pipe(writeStream);

})();

const handleStream = new Transform({
  readableObjectMode: true,
  writableObjectMode: true,
  transform (chunk, encoding, callback) {
    // process or filter the document here
    const {_id, name, age} = chunk;
    if (age > 18) {
      this.push({_id, name, age});
    }
    callback();
  }
});

发布者:admin,转转请注明出处:http://www.yc00.com/questions/1745514304a4630913.html

相关推荐

  • javascript - Node.js: Streaming from MongoDB to file - Stack Overflow

    I am fairly new to javascriptnode.js and am trying to get a very basic scenario working: connect to Mo

    4小时前
    20

发表回复

评论列表(0条)

  • 暂无评论

联系我们

400-800-8888

在线咨询: QQ交谈

邮件:admin@example.com

工作时间:周一至周五,9:30-18:30,节假日休息

关注微信