javascript - @aws-sdklib-storage to Stream JSON from MongoDB to S3 with JSONStream.stringify() - Stack Overflow

I'm trying to Stream JSON from MongoDB to S3 with the new version of @aws-sdklib-storage:"@

I'm trying to Stream JSON from MongoDB to S3 with the new version of @aws-sdk/lib-storage:

"@aws-sdk/client-s3": "^3.17.0"
"@aws-sdk/lib-storage": "^3.34.0"
"JSONStream": "^1.3.5",

Try #1: It seems that I'm not using JSONStream.stringify() correctly:

import { MongoClient } from 'mongodb';
import { S3Client } from '@aws-sdk/client-s3';
import { Upload } from '@aws-sdk/lib-storage';
const s3Client = new S3Client({ region: env.AWS_REGION });

export const uploadMongoStreamToS3 = async (connectionString, collectionName) => {
  let client;

  try {
    client = await MongoClient.connect(connectionString);
    const db = client.db();
    const readStream = db.collection(collectionName).find('{}').limit(5).stream();
    readStream.pipe(JSONStream.stringify());
 
    const upload = new Upload({
      client: s3Client,
      params: {
        Bucket: 'test-bucket',
        Key: 'extracted-data/benda_mongo.json',
        Body: readStream,
      },
    });
    
    await upload.done(); 
  }
  catch (err) {
    log.error(err);
    throw err.name;
  }
  finally {
    if (client) {
      client.close();
    }
  }

};

Error #1:

TypeError [ERR_INVALID_ARG_TYPE]: The first argument must be one of type string, Buffer, ArrayBuffer, Array, or Array-like Object. Received type object at Function.from (buffer.js:305:9) at getDataReadable (/.../node_modules/@aws-sdk/lib-storage/src/chunks/getDataReadable.ts:6:18) at processTicksAndRejections (internal/process/task_queues.js:94:5) at Object.getChunkStream (/.../node_modules/@aws-sdk/lib-storage/src/chunks/getChunkStream.ts:17:20) at Upload.__doConcurrentUpload (/.../node_modules/@aws-sdk/lib-storage/src/Upload.ts:121:22) at async Promise.all (index 0) at Upload.__doMultipartUpload (/.../node_modules/@aws-sdk/lib-storage/src/Upload.ts:196:5) at Upload.done (/.../node_modules/@aws-sdk/lib-storage/src/Upload.ts:88:12)

Try #2, using the variable jsonStream:

  const readStream = db.collection(collectionName).find('{}').limit(5).stream();
    const jsonStream = readStream.pipe(JSONStream.stringify());
 
    const upload = new Upload({
      client: s3Client,
      params: {
        Bucket: 'test-bucket',
        Key: 'extracted-data/benda_mongo.json',
        Body: jsonStream,
      },
    });

Error #2:

ReferenceError: ReadableStream is not defined at Object.getChunk (/.../node_modules/@aws-sdk/lib-storage/src/chunker.ts:22:30) at Upload.__doMultipartUpload (/.../node_modules/@aws-sdk/lib-storage/src/Upload.ts:187:24) at Upload.done (/.../node_modules/@aws-sdk/lib-storage/src/Upload.ts:88:37)

Try #3: use stream.PassThrough:

    client = await MongoClient.connect(connectionString);
    const db = client.db();
    const readStream = db.collection(collectionName).find('{}').limit(5).stream();
    readStream.pipe(JSONStream.stringify()).pipe(uploadStreamFile('benda_mongo.json'));

...

const stream = require('stream');
export const uploadStreamFile = async(fileName) => {
  try{

    const pass = new stream.PassThrough();
    const upload = new Upload({
      client: s3Client,
      params: {
        Bucket: 'test-bucket',
        Key: 'extracted-data/benda_mongo.json',
        Body: pass,
      },
    });
    const res = await upload.done();
    
    log.info('finished uploading file', fileName);
    return res;
  }
  catch(err){
    return;
  }
};

Error #3:

'dest.on is not a function at Stream.pipe (internal/streams/legacy.js:30:8'

Try #4: mongodb.stream({transform: doc => JSON.stringify...}) instead of JSONStream:

import { S3Client } from '@aws-sdk/client-s3';
import { Upload } from '@aws-sdk/lib-storage';
import { env } from '../../../env';
const s3Client = new S3Client({ region: env.AWS_REGION });

export const uploadMongoStreamToS3 = async (connectionString, collectionName) => {
  let client;

  try {
    client = await MongoClient.connect(connectionString);
    const db = client.db();
    const readStream = db.collection(collectionName)
      .find('{}')
      .limit(5)
      .stream({ transform: doc => JSON.stringify(doc) + '\n' });
  
    const upload = new Upload({
      client: s3Client,
      params: {
        Bucket: 'test-bucket',
        Key: 'extracted-data/benda_mongo.json',
        Body: readStream,
      },
    });
  
    await upload.done(); 
  }
  catch (err) {
    log.error('waaaaa', err);
    throw err.name;
  }
  finally {
    if (client) {
      client.close();
    }
  }
};

Error: #4:

TypeError [ERR_INVALID_ARG_TYPE]: The first argument must be one of type string, Buffer, ArrayBuffer, Array, or Array-like Object. Received type object at Function.from (buffer.js:305:9) at getDataReadable (/.../node_modules/@aws-sdk/lib-storage/src/chunks/getDataReadable.ts:6:18) at processTicksAndRejections (internal/process/task_queues.js:94:5) at Object.getChunkStream (/.../node_modules/@aws-sdk/lib-storage/src/chunks/getChunkStream.ts:17:20) at Upload.__doConcurrentUpload (/.../node_modules/@aws-sdk/lib-storage/src/Upload.ts:121:22) at async Promise.all (index 0) at Upload.__doMultipartUpload (/.../node_modules/@aws-sdk/lib-storage/src/Upload.ts:196:5) at Upload.done (/.../node_modules/@aws-sdk/lib-storage/src/Upload.ts:88:12)

Try #5: using stream.PassThrough() and return pass to pipe:

export const uploadMongoStreamToS3 = async (connectionString, collectionName) => {
  let client;

  try {
    client = await MongoClient.connect(connectionString);
    const db = client.db();
    const readStream = db.collection(collectionName).find('{}').limit(5).stream({ transform: doc => JSON.stringify(doc) + '\n' });
    readStream.pipe(uploadStreamFile());
  }
  catch (err) {
    log.error('waaaaa', err);
    throw err.name;
  }
  finally {
    if (client) {
      client.close();
    }
  }
};


const stream = require('stream');

export const uploadStreamFile = async() => {
  try{
    const pass = new stream.PassThrough();
    const upload = new Upload({
      client: s3Client,
      params: {
        Bucket: 'test-bucket',
        Key: 'extracted-data/benda_mongo.json',
        Body: pass,
      },
    });
    await upload.done();
    return pass;
  }
  catch(err){
    log.error('pawoooooo', err);
    return;
  }
};

Error #5:

TypeError: dest.on is not a function at Cursor.pipe (_stream_readable.js:680:8)

I'm trying to Stream JSON from MongoDB to S3 with the new version of @aws-sdk/lib-storage:

"@aws-sdk/client-s3": "^3.17.0"
"@aws-sdk/lib-storage": "^3.34.0"
"JSONStream": "^1.3.5",

Try #1: It seems that I'm not using JSONStream.stringify() correctly:

import { MongoClient } from 'mongodb';
import { S3Client } from '@aws-sdk/client-s3';
import { Upload } from '@aws-sdk/lib-storage';
const s3Client = new S3Client({ region: env.AWS_REGION });

export const uploadMongoStreamToS3 = async (connectionString, collectionName) => {
  let client;

  try {
    client = await MongoClient.connect(connectionString);
    const db = client.db();
    const readStream = db.collection(collectionName).find('{}').limit(5).stream();
    readStream.pipe(JSONStream.stringify());
 
    const upload = new Upload({
      client: s3Client,
      params: {
        Bucket: 'test-bucket',
        Key: 'extracted-data/benda_mongo.json',
        Body: readStream,
      },
    });
    
    await upload.done(); 
  }
  catch (err) {
    log.error(err);
    throw err.name;
  }
  finally {
    if (client) {
      client.close();
    }
  }

};

Error #1:

TypeError [ERR_INVALID_ARG_TYPE]: The first argument must be one of type string, Buffer, ArrayBuffer, Array, or Array-like Object. Received type object at Function.from (buffer.js:305:9) at getDataReadable (/.../node_modules/@aws-sdk/lib-storage/src/chunks/getDataReadable.ts:6:18) at processTicksAndRejections (internal/process/task_queues.js:94:5) at Object.getChunkStream (/.../node_modules/@aws-sdk/lib-storage/src/chunks/getChunkStream.ts:17:20) at Upload.__doConcurrentUpload (/.../node_modules/@aws-sdk/lib-storage/src/Upload.ts:121:22) at async Promise.all (index 0) at Upload.__doMultipartUpload (/.../node_modules/@aws-sdk/lib-storage/src/Upload.ts:196:5) at Upload.done (/.../node_modules/@aws-sdk/lib-storage/src/Upload.ts:88:12)

Try #2, using the variable jsonStream:

  const readStream = db.collection(collectionName).find('{}').limit(5).stream();
    const jsonStream = readStream.pipe(JSONStream.stringify());
 
    const upload = new Upload({
      client: s3Client,
      params: {
        Bucket: 'test-bucket',
        Key: 'extracted-data/benda_mongo.json',
        Body: jsonStream,
      },
    });

Error #2:

ReferenceError: ReadableStream is not defined at Object.getChunk (/.../node_modules/@aws-sdk/lib-storage/src/chunker.ts:22:30) at Upload.__doMultipartUpload (/.../node_modules/@aws-sdk/lib-storage/src/Upload.ts:187:24) at Upload.done (/.../node_modules/@aws-sdk/lib-storage/src/Upload.ts:88:37)

Try #3: use stream.PassThrough:

    client = await MongoClient.connect(connectionString);
    const db = client.db();
    const readStream = db.collection(collectionName).find('{}').limit(5).stream();
    readStream.pipe(JSONStream.stringify()).pipe(uploadStreamFile('benda_mongo.json'));

...

const stream = require('stream');
export const uploadStreamFile = async(fileName) => {
  try{

    const pass = new stream.PassThrough();
    const upload = new Upload({
      client: s3Client,
      params: {
        Bucket: 'test-bucket',
        Key: 'extracted-data/benda_mongo.json',
        Body: pass,
      },
    });
    const res = await upload.done();
    
    log.info('finished uploading file', fileName);
    return res;
  }
  catch(err){
    return;
  }
};

Error #3:

'dest.on is not a function at Stream.pipe (internal/streams/legacy.js:30:8'

Try #4: mongodb.stream({transform: doc => JSON.stringify...}) instead of JSONStream:

import { S3Client } from '@aws-sdk/client-s3';
import { Upload } from '@aws-sdk/lib-storage';
import { env } from '../../../env';
const s3Client = new S3Client({ region: env.AWS_REGION });

export const uploadMongoStreamToS3 = async (connectionString, collectionName) => {
  let client;

  try {
    client = await MongoClient.connect(connectionString);
    const db = client.db();
    const readStream = db.collection(collectionName)
      .find('{}')
      .limit(5)
      .stream({ transform: doc => JSON.stringify(doc) + '\n' });
  
    const upload = new Upload({
      client: s3Client,
      params: {
        Bucket: 'test-bucket',
        Key: 'extracted-data/benda_mongo.json',
        Body: readStream,
      },
    });
  
    await upload.done(); 
  }
  catch (err) {
    log.error('waaaaa', err);
    throw err.name;
  }
  finally {
    if (client) {
      client.close();
    }
  }
};

Error: #4:

TypeError [ERR_INVALID_ARG_TYPE]: The first argument must be one of type string, Buffer, ArrayBuffer, Array, or Array-like Object. Received type object at Function.from (buffer.js:305:9) at getDataReadable (/.../node_modules/@aws-sdk/lib-storage/src/chunks/getDataReadable.ts:6:18) at processTicksAndRejections (internal/process/task_queues.js:94:5) at Object.getChunkStream (/.../node_modules/@aws-sdk/lib-storage/src/chunks/getChunkStream.ts:17:20) at Upload.__doConcurrentUpload (/.../node_modules/@aws-sdk/lib-storage/src/Upload.ts:121:22) at async Promise.all (index 0) at Upload.__doMultipartUpload (/.../node_modules/@aws-sdk/lib-storage/src/Upload.ts:196:5) at Upload.done (/.../node_modules/@aws-sdk/lib-storage/src/Upload.ts:88:12)

Try #5: using stream.PassThrough() and return pass to pipe:

export const uploadMongoStreamToS3 = async (connectionString, collectionName) => {
  let client;

  try {
    client = await MongoClient.connect(connectionString);
    const db = client.db();
    const readStream = db.collection(collectionName).find('{}').limit(5).stream({ transform: doc => JSON.stringify(doc) + '\n' });
    readStream.pipe(uploadStreamFile());
  }
  catch (err) {
    log.error('waaaaa', err);
    throw err.name;
  }
  finally {
    if (client) {
      client.close();
    }
  }
};


const stream = require('stream');

export const uploadStreamFile = async() => {
  try{
    const pass = new stream.PassThrough();
    const upload = new Upload({
      client: s3Client,
      params: {
        Bucket: 'test-bucket',
        Key: 'extracted-data/benda_mongo.json',
        Body: pass,
      },
    });
    await upload.done();
    return pass;
  }
  catch(err){
    log.error('pawoooooo', err);
    return;
  }
};

Error #5:

TypeError: dest.on is not a function at Cursor.pipe (_stream_readable.js:680:8)

Share Improve this question edited Oct 7, 2021 at 6:01 Oron Bendavid asked Oct 3, 2021 at 11:15 Oron BendavidOron Bendavid 1,5333 gold badges19 silver badges34 bronze badges 4
  • Thanks but got the same error. I succeeded to pipe to a fs.createWriteStream and write to a file, later I can use fs.createReadStream and pipe it to my 'uploadStreamFile' and it will work. but I don't like this solution as it causing my server to write to a temp file instead of streaming the mongoDb res to s3 directly. – Oron Bendavid Commented Oct 6, 2021 at 7:44
  • Thanks, the full stacktrace is: 'dest.on is not a function at Stream.pipe (internal/streams/legacy.js:30:8' – Oron Bendavid Commented Oct 6, 2021 at 18:38
  • I've updated the question with all relevant error messages – Oron Bendavid Commented Oct 6, 2021 at 19:02
  • I removed the ments as they are already included in the answer. I hope that any of the proposed alternatives work for you. – jccampanero Commented Oct 6, 2021 at 22:16
Add a ment  | 

2 Answers 2

Reset to default 2 +100

After reviewing your error stack traces, probably the problem has to do with the fact that the MongoDB driver provides a cursor in object mode whereas the Body parameter of Upload requires a traditional stream, suitable for be processed by Buffer in this case.

Taking your original code as reference, you can try providing a Transform stream for dealing with both requirements.

Please, consider for instance the following code:

import { Transform } from 'stream';
import { MongoClient } from 'mongodb';
import { S3Client } from '@aws-sdk/client-s3';
import { Upload } from '@aws-sdk/lib-storage';
const s3Client = new S3Client({ region: env.AWS_REGION });

export const uploadMongoStreamToS3 = async (connectionString, collectionName) => {
  let client;

  try {
    client = await MongoClient.connect(connectionString);
    const db = client.db();
    const readStream = db.collection(collectionName).find('{}').limit(5).stream();
    // We are creating here a Transform to adapt both sides
    const toJSONTransform = new Transform({
      writableObjectMode: true,
      transform(chunk, encoding, callback) {
        this.push(JSON.stringify(chunk) + '\n');
        callback();  
      }  
    });

    readStream.pipe(toJSONTransform);
 
    const upload = new Upload({
      client: s3Client,
      params: {
        Bucket: 'test-bucket',
        Key: 'extracted-data/benda_mongo.json',
        Body: toJSONTransform,
      },
    });
    
    await upload.done(); 
  }
  catch (err) {
    log.error(err);
    throw err.name;
  }
  finally {
    if (client) {
      client.close();
    }
  }

};

In the code, in toJSONTransform we are defining the writable part of the stream as object mode; in contrast, the readable part will be suitable for being read from the S3 Upload method... at least, I hope so.

Regarding the second error you reported, the one related with dest.on, I initially thought, and I wrote you about the possibility, that the error was motivated because in uploadStreamFile you are returning a Promise, not a stream, and you are passing that Promise to the pipe method, which requires a stream, basically that you returned the wrong variable. But I didn't realize that you are trying passing the PassThrough stream as a param to the Upload method: please, be aware that this stream doesn't contain any information because you are not passing any information to it, the contents of the readable stream obtained from the MongoDB query are never passed to the callback nor the Upload itself.

I found additional solution using stream.PassThrough, using JSONStream will stream array of objects instead of one after the other:

export const uploadMongoStreamToS3 = async (connectionString, collectionName) => {
  let client;

  try {
    client = await MongoClient.connect(connectionString);
    const db = client.db();
    const passThroughStream = new stream.PassThrough();
    const readStream = db.collection(collectionName)
      .find('{}')
      .stream();

    readStream.on('end', () => passThroughStream.end());

    readStream.pipe(JSONStream.stringify()).pipe(passThroughStream);
    await uploadStreamFile('benda_mongo.json', passThroughStream);
  }
  catch (err) {
    log.error(err);
    throw err.name;
  }
  finally {
    if (client) {
      client.close();
    }
  }
};


export const uploadStreamFile = async(fileName, stream) => {
  try{
    log.info('start uploading file', fileName);
    const upload = new Upload({
      client: s3Client,
      params: {
        Bucket: 'test-bucket',
        Key: `${fileName}`,
        Body: stream,
      },
    });

    const res = await upload.done();
    log.info('finished uploading file', fileName);
    return res;
  }
  catch(err){
    log.error(err);
    return;
  }
};

发布者:admin,转转请注明出处:http://www.yc00.com/questions/1745559837a4633041.html

相关推荐

发表回复

评论列表(0条)

  • 暂无评论

联系我们

400-800-8888

在线咨询: QQ交谈

邮件:admin@example.com

工作时间:周一至周五,9:30-18:30,节假日休息

关注微信