Skip to content Skip to sidebar Skip to footer

Promise Closure Within Loop

I am receiving rows of data every second from Kafka. For each batch of data, I am inserting into my database. My app keeps reading the last message and id of each batch. The issue

Solution 1:

It might be helpful to abstract the ideas a little bit, and represnt them explicitly in data (rather than data retained implictly in the promises). Start with a queue:

let queue = [];

Add stuff to the queue with queue.push(element) and get and remove in order of arrival with element = queue.shift()

Our goal is to process whatever's on the queue, in the order, saving the results in order. The processing itself is async, and we want to finish one queue item before starting the next, so we need a chain of promises (called promises) to process the queue:

let results = [];
let promises = Promise.resolve();

functionprocessQueue() {
    queue.forEach(element => {
        promises = promises.then(processElement(element)).then(elementResult => {
            results.push(elementResult);
            queue.shift();
        });
    });
}

We can convince ourselves that this is right without even thinking about what processElement() does, so long as it returns a promise. (In the OP case, that promise is a promise to deal with an array of "rows"). processElement() will do it's thing, and the result (an array of results in the OP case) will get pushed to results.

Confident that the ordering of operations makes sense, when a new batch arrives, add it to the queue, and then process whatever's on the queue:

batchOfRows.on('message', function (data) {
    queue.push(batchOfRows.rows);
    processQueue();
});

We just need to define processElement(). Use @YuryTarabanko's helpful suggestions for that here (and leave his answer marked correct, IMO)

functionprocessElement(data) {
    const id = data.date + data.locationreturnDB.execute('select * from table1 where id = ?', id)
              .then(result =>insertIntoDB(data, id).then(() => result));
}

functioninsertIntoDB(message, id) {
    const query = "insert into table2 where id = ? and messageBody = ?";
    returnDB.execute(query, [id, JSON.Stringify(message)])
}

One nice side-effect of this is that you can measure progress. If the inputs are arriving too fast then the expression:

queue.length - results.length

... will grow over time.

EDIT Looking at the newer code, I am puzzled by why a query is done for each row (each element in batchOfRows.rows). Since the result of that query is ignored, don't do it...

function processElement(data) {
    const id = data.date + data.location
    // we know everything we need to know to call insert (data and id)// just call it and return what it returns :-)return insertIntoDB(data, id);
}

I understand now that this will be a long-running task, and it shouldn't accumulate results (even linearly). The cleaner fix for that is remove every reference to the results array that I suggested. The minimal version of insert just inserts and returns the result of the insertion...

functioninsertIntoDB(message, id) {
    const query = "insert into table2 where id = ? and messageBody = ?";
    returnDB.execute(query, [id, JSON.Stringify(message)]);
}

I think you added some code to log results (a better test that it worked would be to check the database via some outside process, but if you want to log, just remember to pass-through the result value after logging.

anyPromise.then(result => {
    console.log(result);
    return result;  // IMPORTANT
})

Solution 2:

You have various antipatterns in your code. First you don't need to manually create a promise likely you never need to call new Promise. Second, you are breaking promise chains by not returning a nested promise from within onFulfill handler. And finally you are polluting global scope when not declaring variables id = message.date + message.location

// This is live data, coming in concurrently, forever. Promises from previous batch must be resolved before the next batch is received.let pending = Promise.resolve([]); // previous batch starting w/ resolved promise
batchOfRows.on('message', function (data) {
    // not sure where was batchRows comming from in your codeconstnextBatch = () => Promise.all(
      data.batchOfRows.rows.map(validate)
    );

    // reassign pending to a new promise// whatever happend to previous promise we keep running
    pending = pending
      .then(nextBatch)
      .catch(e =>console.error(e))
});

// For each row received, give it an ID and then insert into the DBfunctionvalidate(data) {
    const id = data.date + data.locationreturnDB.execute('select * from table1 where id = ?', id)
              .then(result =>insertIntoDB(data, id).then(() => result));
}

// Inserting into DBfunctioninsertIntoDB(message, id) {
    const query = "insert into table2 where id = ? and messageBody = ?";
    returnDB.execute(query, [id, JSON.Stringify(message)])
}

Post a Comment for "Promise Closure Within Loop"