I am trying to replace an ETL process using NodeJS streams. The Transform stream I am attempting to write takes in a dataset, and based on configuration data, will output one or more records per record inputted. In other words, if it's reading 100000 records, the transformation can end up writing anywhere from 100000-400000 records. The _transform
method only allows its callback to be called once, so I am trying to figure out how to output multiple objects per single input object.
I looked at duplexes but every example I saw was using it as a two way flow, whereas I definitely want my stream to be one way (or i may just not understand how they work). Anyone have any suggestions on how to implement this?
I am trying to replace an ETL process using NodeJS streams. The Transform stream I am attempting to write takes in a dataset, and based on configuration data, will output one or more records per record inputted. In other words, if it's reading 100000 records, the transformation can end up writing anywhere from 100000-400000 records. The _transform
method only allows its callback to be called once, so I am trying to figure out how to output multiple objects per single input object.
I looked at duplexes but every example I saw was using it as a two way flow, whereas I definitely want my stream to be one way (or i may just not understand how they work). Anyone have any suggestions on how to implement this?
Share Improve this question asked Aug 16, 2022 at 17:21 ColdSolsticeColdSolstice 4653 silver badges10 bronze badges2 Answers
Reset to default 7The callback can only be called once, but the .push
method is what emits data, and can be called as many times as necessary in the _transform
method. Example:
class MyTransform extends Transform {
_transform(chunk, enc, next) {
const arrayFromChunk = chunk.split(',');
arrayFromChunk.forEach(piece => {
// this.push is what will emit readable data, can be called as often
// as needed.
this.push(piece);
});
next(); // next can only be called once.
}
}
docs here: https://nodejs/docs/latest-v18.x/api/stream.html#stream_implementing_a_transform_stream
NodeJS streams are GREAT for ETL work, but while very powerful they're also pretty plex and it's easy to get lost when when you're starting from scratch--as you have already experienced. I was looking for an easy way to use streams them while also allowing for code reuse; I ended up creating gulp-etl, which uses streams under the hood. Using gulp-etl, you could use the gulp-etl-handlelines plugin, which calls a callback for each ining record; if you want a single ining record to produce multiple records it would look something like this:
const handleLines = require('gulp-etl-handlelines').handlelines;
const linehandler = (lineObj, context) => {
let recsToReturn = [];
// return null to remove this line
if (!lineObj.record || lineObj.record["TestValue"] == 'illegalValue')
{return null}
// return ining record
recsToReturn.push(lineObj);
// logic to create new record
if (lineObj.record.needsDuplication) {
// clone newRec from lineObj
let newRec = {...lineObj, record:...lineObj.record};
// change new record as needed
newRec.record.UniqueField = "newValue";
recsToReturn.push(newRec);
}
// return the record(s)
return recsToReturn;
}
exports.default = function() {
return src('data/100kRecs.ndjson', { buffer:false /* use streaming mode */})
// pipe the files through our handlelines plugin
.pipe(handlelines({}, { transformCallback: linehandler }))
.pipe(dest('output/'));
}
We're using Transform streams behind the scenes, but that's all abstracted away; you get the benefits without having to get into the weeds of streams implementation--unless you want to write your own plugins. Plus, you can use lots of existing plugins.
发布者:admin,转转请注明出处:http://www.yc00.com/questions/1745112333a4611919.html
评论列表(0条)