javascript - Run HTTP requests in chunks - Stack Overflow

I want to run 1 thundered http requests in configurable chunks, and set configurable timeout between ch

I want to run 1 thundered http requests in configurable chunks, and set configurable timeout between chunk requests. The request is based on the data provided with some.csv file.

It doesn't work because I am getting a TypeError, but when I remove () after f, it doesn't work either. I would be very grateful for a little help. Probably the biggest problem is that I don't really understand how exactly promises work, but I tried multiple solutions and I wasn't able to achieve what I want.

The timeout feature will probably give me even more headache so I would appreciate any tips for this too.

Can you please help me to understand why it doesn't work?

Here is the snippet:

const rp = require('request-promise');
const fs = require('fs');
const { chunk } = require('lodash');

const BATCH_SIZE = 2;
const QUERY_PARAMS = ['clientId', 'time', 'changeTime', 'newValue'];

async function update(id, time, query) {
    const options = {
        method: 'POST',
        uri: `https://requesturl/${id}?query=${query}`,
        body: {
            "prop": {
                "time": time
            }
        },
        headers: {
            "Content-Type": "application/json"
        },
        json: true
    }

    return async () => { return await rp(options) };
}

async function batchRequestRunner(data) {
    const promises = [];
    for (row of data) {
        row = row.split(',');
        promises.push(update(row[0], row[1], QUERY_PARAMS.join(',')));
    }
    const batches = chunk(promises, BATCH_SIZE);

    for (let batch of batches) {
        try {
            Promise.all(
                batch.map(async f => { return await f();})
            ).then((resp) => console.log(resp));
        } catch (e) {
            console.log(e);
        }
    }
}

async function main() {
    const input = fs.readFileSync('./input.test.csv').toString().split("\n");
    const requestData = input.slice(1);

    await batchRequestRunner(requestData);
}

main();

Clarification for the first ment:

I have a csv file which looks like below:

clientId,startTime
123,13:40:00
321,13:50:00

the file size is ~100k rows the file contains information how to update time for a particular clientId in the database. I don't have an access to the database but I have access to an API which allows to update entries in the database. I cannot make 100k calls at once, because: my network is limited (I work remotely because of coronavirus), it sumpts a lot of memory, and API can also be limited and can crash if I will make all the requests at once.

What I want to achieve:

  • Load csv into memory, convert it to an Array

  • Handle api requests in chunks, for example take first two rows from the array, make API call based on the first two rows, wait 1000ms, take another two rows, and continue processing until the end of array (csv file)

I want to run 1 thundered http requests in configurable chunks, and set configurable timeout between chunk requests. The request is based on the data provided with some.csv file.

It doesn't work because I am getting a TypeError, but when I remove () after f, it doesn't work either. I would be very grateful for a little help. Probably the biggest problem is that I don't really understand how exactly promises work, but I tried multiple solutions and I wasn't able to achieve what I want.

The timeout feature will probably give me even more headache so I would appreciate any tips for this too.

Can you please help me to understand why it doesn't work?

Here is the snippet:

const rp = require('request-promise');
const fs = require('fs');
const { chunk } = require('lodash');

const BATCH_SIZE = 2;
const QUERY_PARAMS = ['clientId', 'time', 'changeTime', 'newValue'];

async function update(id, time, query) {
    const options = {
        method: 'POST',
        uri: `https://requesturl/${id}?query=${query}`,
        body: {
            "prop": {
                "time": time
            }
        },
        headers: {
            "Content-Type": "application/json"
        },
        json: true
    }

    return async () => { return await rp(options) };
}

async function batchRequestRunner(data) {
    const promises = [];
    for (row of data) {
        row = row.split(',');
        promises.push(update(row[0], row[1], QUERY_PARAMS.join(',')));
    }
    const batches = chunk(promises, BATCH_SIZE);

    for (let batch of batches) {
        try {
            Promise.all(
                batch.map(async f => { return await f();})
            ).then((resp) => console.log(resp));
        } catch (e) {
            console.log(e);
        }
    }
}

async function main() {
    const input = fs.readFileSync('./input.test.csv').toString().split("\n");
    const requestData = input.slice(1);

    await batchRequestRunner(requestData);
}

main();

Clarification for the first ment:

I have a csv file which looks like below:

clientId,startTime
123,13:40:00
321,13:50:00

the file size is ~100k rows the file contains information how to update time for a particular clientId in the database. I don't have an access to the database but I have access to an API which allows to update entries in the database. I cannot make 100k calls at once, because: my network is limited (I work remotely because of coronavirus), it sumpts a lot of memory, and API can also be limited and can crash if I will make all the requests at once.

What I want to achieve:

  • Load csv into memory, convert it to an Array

  • Handle api requests in chunks, for example take first two rows from the array, make API call based on the first two rows, wait 1000ms, take another two rows, and continue processing until the end of array (csv file)

Share Improve this question edited Mar 21, 2020 at 0:28 user3652705 asked Mar 20, 2020 at 23:18 user3652705user3652705 431 silver badge5 bronze badges 6
  • Can you back up a few levels and describe (in words) what you're trying to acplish (like a mini spec for what you want to do)? This looks overly plicated to me so I suspect we can suggest an easier path if we understand the overall objective. – jfriend00 Commented Mar 20, 2020 at 23:43
  • hey, I updated the objective bellow the code snippet – user3652705 Commented Mar 21, 2020 at 0:10
  • 1 Why are you using the same rowValues[0] in both of these const newRowValues = [rowValues[0], rowValues[0]].join(',');? – jfriend00 Commented Mar 21, 2020 at 0:17
  • Why does your function update(id, time, query) accept three arguments, but you only pass it two here promises.push(update(row[0], row[1]));? I'm working on a new solution, but I keep finding these types of coding errors in your example. – jfriend00 Commented Mar 21, 2020 at 0:23
  • There are a lot of more processing preparation before I put it to the request. I tried to remove unnecessary code before sending it here but there are still some leftovers. I am sorry for confusion, I updated the code above, right now it just removes the headline from the csv. – user3652705 Commented Mar 21, 2020 at 0:24
 |  Show 1 more ment

2 Answers 2

Reset to default 4

Well, it seems like this is a somewhat classic case of where you want to process an array of values with some asynchronous operation and to avoid consuming too many resources or overwhelming the target server, you want to have no more than N requests in-flight at the same time. This is a mon problem for which there are pre-built solutions for. My goto solution is a small piece of code called mapConcurrent(). It's analagous to array.map(), but it assumes a promise-returning asynchronous callback and you pass it the max number of items that should ever be in-flight at the same time. It then returns to you a promise that resolves to an array of results.

Here's mapConcurrent():

// takes an array of items and a function that returns a promise
// returns a promise that resolves to an array of results
function mapConcurrent(items, maxConcurrent, fn) {
    let index = 0;
    let inFlightCntr = 0;
    let doneCntr = 0;
    let results = new Array(items.length);
    let stop = false;

    return new Promise(function(resolve, reject) {

        function runNext() {
            let i = index;
            ++inFlightCntr;
            fn(items[index], index++).then(function(val) {
                ++doneCntr;
                --inFlightCntr;
                results[i] = val;
                run();
            }, function(err) {
                // set flag so we don't launch any more requests
                stop = true;
                reject(err);
            });
        }

        function run() {
            // launch as many as we're allowed to
            while (!stop && inflightCntr < maxConcurrent && index < items.length) {
                runNext();
            }
            // if all are done, then resolve parent promise with results
            if (doneCntr === items.length) {
                resolve(results);
            }
        }

        run();
    });
}

Your code can then be structured to use it like this:

function update(id, time, query) {
    const options = {
        method: 'POST',
        uri: `https://requesturl/${id}?query=${query}`,
        body: {
            "prop": {
                "time": time
            }
        },
        headers: {
            "Content-Type": "application/json"
        },
        json: true
    }
    return rp(options);
}

function processRow(row) {
    let rowData = row.split(",");
    return update(rowData[0], rowData[1], rowData[2]);
}


function main() {
    const input = fs.readFileSync('./input.test.csv').toString().split("\n");
    const requestData = input.slice(1);    

    // process this entire array with up to 5 requests "in-flight" at the same time
    mapConcurrent(requestData, 5, processRow).then(results => {
        console.log(results);
    }).catch(err => {
        console.log(err);
    });
}

You can obviously adjust the number of concurrent requests to whatever number you want. I set it to 5 here in this example.

    import { chunk } from "lodash";

interface ConcurrentResult<T> {
  success: T[];
  errors: {
    error: Error;
    item: string;
  }[];
  total: number;
  pleted: number;
}

export async function mapConcurrent<T>(
  items: string[],
  concurrency: number,
  fn: (item: string) => Promise<T>,
  onProgress?: (progress: number) => void
): Promise<ConcurrentResult<T>> {
  const result: ConcurrentResult<T> = {
    success: [],
    errors: [],
    total: items.length,
    pleted: 0,
  };

  const chunks = chunk(items, concurrency);

  for (const chunk of chunks) {
    const promises = chunk.map(async (item) => {
      try {
        const response = await fn(item);
        result.success.push(response);
      } catch (error) {
        result.errors.push({ error: error as Error, item });
      } finally {
        result.pleted++;
        onProgress?.(Math.floor((result.pleted / result.total) * 100));
      }
    });

    await Promise.allSettled(promises);
  }

  return result;
}

it's good if you want handle on typescript

发布者:admin,转转请注明出处:http://www.yc00.com/questions/1742295463a4417024.html

相关推荐

  • javascript - Run HTTP requests in chunks - Stack Overflow

    I want to run 1 thundered http requests in configurable chunks, and set configurable timeout between ch

    18小时前
    20

发表回复

评论列表(0条)

  • 暂无评论

联系我们

400-800-8888

在线咨询: QQ交谈

邮件:admin@example.com

工作时间:周一至周五,9:30-18:30,节假日休息

关注微信