rewrite counter inrease queue

This commit is contained in:
HF 2022-09-12 13:55:01 +02:00
parent 747bbced92
commit 922e5ecb5b
2 changed files with 31 additions and 48 deletions

View File

@ -180,19 +180,10 @@ class User {
}
incrementPixelcount(amount = 1) {
if (!this.id) {
return;
const { id } = this;
if (id) {
incrementPixelcount(id, amount);
}
/*
* incrementation gets queued to be processed
* in batches
*/
if (!this.queuedPxlIncrement) {
this.queuedPxlIncrement = amount;
incrementPixelcount(this);
return;
}
this.queuedPxlIncrement += amount;
}
async getTotalPixels() {

View File

@ -208,58 +208,50 @@ export async function getNamesToIds(ids) {
* increment user pixelcount in batches sequentially
* Queue directly accesses queuedPxlIncrement in user object
*/
const incrementQueue = [];
let incrementQueue = {};
let pushLoop = null;
const incrementLoop = async () => {
if (!incrementQueue.length) {
const idKeys = Object.keys(incrementQueue);
if (!idKeys.length) {
pushLoop = null;
return;
}
try {
incrementQueue.sort((a, b) => {
const aa = a.queuedPxlIncrement;
const ba = b.queuedPxlIncrement;
if (aa < ba) return -1;
if (aa > ba) return 1;
return 0;
});
let cnt = incrementQueue.length;
let user = incrementQueue.pop();
while (cnt) {
cnt -= 1;
const amount = user.queuedPxlIncrement;
user.queuedPxlIncrement = 0;
const ids = [user.id];
while (cnt) {
cnt -= 1;
user = incrementQueue.pop();
if (user.queuedPxlIncrement !== amount) {
break;
}
user.queuedPxlIncrement = 0;
ids.push(user.id);
}
const queue = incrementQueue;
incrementQueue = {};
const orderedCnts = {};
idKeys.forEach((id) => {
const cnt = queue[id];
const cntArr = orderedCnts[cnt];
if (cntArr) {
cntArr.push(id);
} else {
orderedCnts[cnt] = [id];
}
});
Object.keys(orderedCnts).forEach(async (cnt) => {
const ids = orderedCnts[cnt];
try {
// eslint-disable-next-line no-await-in-loop
await RegUser.increment(['totalPixels', 'dailyTotalPixels'], {
by: amount,
by: cnt,
where: {
id: ids,
},
silent: true,
raw: true,
});
} catch (err) {
logger.warn(`Error on pixel increment: ${err.message}`);
}
} catch (err) {
logger.warn(`Error on pixel increment: ${err.message}`);
incrementQueue.forEach((q) => {
if (q) q.queuedPxlIncrement = 0;
});
incrementQueue.length = 0;
}
});
pushLoop = setTimeout(incrementLoop, 250);
};
export async function incrementPixelcount(user) {
incrementQueue.push(user);
export async function incrementPixelcount(id, by) {
if (incrementQueue[id]) {
incrementQueue[id] += by;
} else {
incrementQueue[id] = by;
}
if (!pushLoop) {
pushLoop = setTimeout(incrementLoop, 0);
}