I want to run 1 thundered http requests in configurable chunks, and set configurable timeout between chunk requests. The request is based on the data provided with some.csv file.
It doesn't work because I am getting a TypeError, but when I remove () after f, it doesn't work either. I would be very grateful for a little help. Probably the biggest problem is that I don't really understand how exactly promises work, but I tried multiple solutions and I wasn't able to achieve what I want.
The timeout feature will probably give me even more headache so I would appreciate any tips for this too.
Can you please help me to understand why it doesn't work?
Here is the snippet:
const rp = require('request-promise');
const fs = require('fs');
const { chunk } = require('lodash');
const BATCH_SIZE = 2;
const QUERY_PARAMS = ['clientId', 'time', 'changeTime', 'newValue'];
async function update(id, time, query) {
const options = {
method: 'POST',
uri: `https://requesturl/${id}?query=${query}`,
body: {
"prop": {
"time": time
}
},
headers: {
"Content-Type": "application/json"
},
json: true
}
return async () => { return await rp(options) };
}
async function batchRequestRunner(data) {
const promises = [];
for (row of data) {
row = row.split(',');
promises.push(update(row[0], row[1], QUERY_PARAMS.join(',')));
}
const batches = chunk(promises, BATCH_SIZE);
for (let batch of batches) {
try {
Promise.all(
batch.map(async f => { return await f();})
).then((resp) => console.log(resp));
} catch (e) {
console.log(e);
}
}
}
async function main() {
const input = fs.readFileSync('./input.test.csv').toString().split("\n");
const requestData = input.slice(1);
await batchRequestRunner(requestData);
}
main();
Clarification for the first ment:
I have a csv file which looks like below:
clientId,startTime
123,13:40:00
321,13:50:00
the file size is ~100k rows the file contains information how to update time for a particular clientId in the database. I don't have an access to the database but I have access to an API which allows to update entries in the database. I cannot make 100k calls at once, because: my network is limited (I work remotely because of coronavirus), it sumpts a lot of memory, and API can also be limited and can crash if I will make all the requests at once.
What I want to achieve:
Load csv into memory, convert it to an Array
Handle api requests in chunks, for example take first two rows from the array, make API call based on the first two rows, wait 1000ms, take another two rows, and continue processing until the end of array (csv file)
I want to run 1 thundered http requests in configurable chunks, and set configurable timeout between chunk requests. The request is based on the data provided with some.csv file.
It doesn't work because I am getting a TypeError, but when I remove () after f, it doesn't work either. I would be very grateful for a little help. Probably the biggest problem is that I don't really understand how exactly promises work, but I tried multiple solutions and I wasn't able to achieve what I want.
The timeout feature will probably give me even more headache so I would appreciate any tips for this too.
Can you please help me to understand why it doesn't work?
Here is the snippet:
const rp = require('request-promise');
const fs = require('fs');
const { chunk } = require('lodash');
const BATCH_SIZE = 2;
const QUERY_PARAMS = ['clientId', 'time', 'changeTime', 'newValue'];
async function update(id, time, query) {
const options = {
method: 'POST',
uri: `https://requesturl/${id}?query=${query}`,
body: {
"prop": {
"time": time
}
},
headers: {
"Content-Type": "application/json"
},
json: true
}
return async () => { return await rp(options) };
}
async function batchRequestRunner(data) {
const promises = [];
for (row of data) {
row = row.split(',');
promises.push(update(row[0], row[1], QUERY_PARAMS.join(',')));
}
const batches = chunk(promises, BATCH_SIZE);
for (let batch of batches) {
try {
Promise.all(
batch.map(async f => { return await f();})
).then((resp) => console.log(resp));
} catch (e) {
console.log(e);
}
}
}
async function main() {
const input = fs.readFileSync('./input.test.csv').toString().split("\n");
const requestData = input.slice(1);
await batchRequestRunner(requestData);
}
main();
Clarification for the first ment:
I have a csv file which looks like below:
clientId,startTime
123,13:40:00
321,13:50:00
the file size is ~100k rows the file contains information how to update time for a particular clientId in the database. I don't have an access to the database but I have access to an API which allows to update entries in the database. I cannot make 100k calls at once, because: my network is limited (I work remotely because of coronavirus), it sumpts a lot of memory, and API can also be limited and can crash if I will make all the requests at once.
What I want to achieve:
Load csv into memory, convert it to an Array
Handle api requests in chunks, for example take first two rows from the array, make API call based on the first two rows, wait 1000ms, take another two rows, and continue processing until the end of array (csv file)
- Can you back up a few levels and describe (in words) what you're trying to acplish (like a mini spec for what you want to do)? This looks overly plicated to me so I suspect we can suggest an easier path if we understand the overall objective. – jfriend00 Commented Mar 20, 2020 at 23:43
- hey, I updated the objective bellow the code snippet – user3652705 Commented Mar 21, 2020 at 0:10
-
1
Why are you using the same
rowValues[0]
in both of theseconst newRowValues = [rowValues[0], rowValues[0]].join(',');
? – jfriend00 Commented Mar 21, 2020 at 0:17 -
Why does your function
update(id, time, query)
accept three arguments, but you only pass it two herepromises.push(update(row[0], row[1]));
? I'm working on a new solution, but I keep finding these types of coding errors in your example. – jfriend00 Commented Mar 21, 2020 at 0:23 - There are a lot of more processing preparation before I put it to the request. I tried to remove unnecessary code before sending it here but there are still some leftovers. I am sorry for confusion, I updated the code above, right now it just removes the headline from the csv. – user3652705 Commented Mar 21, 2020 at 0:24
2 Answers
Reset to default 4Well, it seems like this is a somewhat classic case of where you want to process an array of values with some asynchronous operation and to avoid consuming too many resources or overwhelming the target server, you want to have no more than N requests in-flight at the same time. This is a mon problem for which there are pre-built solutions for. My goto solution is a small piece of code called mapConcurrent()
. It's analagous to array.map()
, but it assumes a promise-returning asynchronous callback and you pass it the max number of items that should ever be in-flight at the same time. It then returns to you a promise that resolves to an array of results.
Here's mapConcurrent()
:
// takes an array of items and a function that returns a promise
// returns a promise that resolves to an array of results
function mapConcurrent(items, maxConcurrent, fn) {
let index = 0;
let inFlightCntr = 0;
let doneCntr = 0;
let results = new Array(items.length);
let stop = false;
return new Promise(function(resolve, reject) {
function runNext() {
let i = index;
++inFlightCntr;
fn(items[index], index++).then(function(val) {
++doneCntr;
--inFlightCntr;
results[i] = val;
run();
}, function(err) {
// set flag so we don't launch any more requests
stop = true;
reject(err);
});
}
function run() {
// launch as many as we're allowed to
while (!stop && inflightCntr < maxConcurrent && index < items.length) {
runNext();
}
// if all are done, then resolve parent promise with results
if (doneCntr === items.length) {
resolve(results);
}
}
run();
});
}
Your code can then be structured to use it like this:
function update(id, time, query) {
const options = {
method: 'POST',
uri: `https://requesturl/${id}?query=${query}`,
body: {
"prop": {
"time": time
}
},
headers: {
"Content-Type": "application/json"
},
json: true
}
return rp(options);
}
function processRow(row) {
let rowData = row.split(",");
return update(rowData[0], rowData[1], rowData[2]);
}
function main() {
const input = fs.readFileSync('./input.test.csv').toString().split("\n");
const requestData = input.slice(1);
// process this entire array with up to 5 requests "in-flight" at the same time
mapConcurrent(requestData, 5, processRow).then(results => {
console.log(results);
}).catch(err => {
console.log(err);
});
}
You can obviously adjust the number of concurrent requests to whatever number you want. I set it to 5 here in this example.
import { chunk } from "lodash";
interface ConcurrentResult<T> {
success: T[];
errors: {
error: Error;
item: string;
}[];
total: number;
pleted: number;
}
export async function mapConcurrent<T>(
items: string[],
concurrency: number,
fn: (item: string) => Promise<T>,
onProgress?: (progress: number) => void
): Promise<ConcurrentResult<T>> {
const result: ConcurrentResult<T> = {
success: [],
errors: [],
total: items.length,
pleted: 0,
};
const chunks = chunk(items, concurrency);
for (const chunk of chunks) {
const promises = chunk.map(async (item) => {
try {
const response = await fn(item);
result.success.push(response);
} catch (error) {
result.errors.push({ error: error as Error, item });
} finally {
result.pleted++;
onProgress?.(Math.floor((result.pleted / result.total) * 100));
}
});
await Promise.allSettled(promises);
}
return result;
}
it's good if you want handle on typescript
发布者:admin,转转请注明出处:http://www.yc00.com/questions/1742295463a4417024.html
评论列表(0条)