Merge branch 'devel'

This commit is contained in:
HF 2022-09-07 16:54:41 +02:00
commit dc01b994e7
2 changed files with 63 additions and 22 deletions

View File

@ -180,9 +180,19 @@ class User {
}
incrementPixelcount(amount = 1) {
if (this.regUser) {
incrementPixelcount(this.regUser, amount);
if (!this.id) {
return;
}
/*
* incrementation gets queued to be processed
* in batches
*/
if (!this.queuedPxlIncrement) {
this.queuedPxlIncrement = amount;
incrementPixelcount(this);
return;
}
this.queuedPxlIncrement += amount;
}
async getTotalPixels() {

View File

@ -205,44 +205,75 @@ export async function getNamesToIds(ids) {
}
/*
* increment user pixelcount in a batched transaction
* increment user pixelcount in batches sequentially
* Queue directly accesses queuedPxlIncrement in user object
*/
const incrementQueue = [];
let pushLoop = null;
let lastLog = 0;
const incrementLoop = async () => {
if (!incrementQueue.length) {
pushLoop = null;
return;
}
try {
await sequelize.transaction(async (t) => {
while (incrementQueue.length) {
const [model, amount] = incrementQueue.pop();
// eslint-disable-next-line no-await-in-loop
await model.increment(
['totalPixels', 'dailyTotalPixels'],
{ by: amount, transaction: t },
);
}
return true;
incrementQueue.sort((a, b) => {
const aa = a.queuedPxlIncrement;
const ba = b.queuedPxlIncrement;
if (aa < ba) return -1;
if (aa > ba) return 1;
return 0;
});
let cnt = incrementQueue.length;
let user = incrementQueue.pop();
while (cnt) {
cnt -= 1;
const amount = user.queuedPxlIncrement;
user.queuedPxlIncrement = 0;
const ids = [user.id];
while (cnt) {
cnt -= 1;
user = incrementQueue.pop();
if (user.queuedPxlIncrement !== amount) {
break;
}
user.queuedPxlIncrement = 0;
ids.push(user.id);
}
// eslint-disable-next-line no-await-in-loop
await RegUser.increment(['totalPixels', 'dailyTotalPixels'], {
by: amount,
where: {
id: ids,
},
silent: true,
raw: true,
// TODO: remove this after testing
logging: (msg) => {
const now = Date.now();
if (now - 5000 > lastLog) {
lastLog = now;
logger.warn(msg);
}
},
});
}
} catch (err) {
logger.warn(`Error on batched incrementing pixelcounts: ${err.message}`);
logger.warn(`Error on pixel increment: ${err.message}`);
incrementQueue.forEach((q) => {
if (q) q.queuedPxlIncrement = 0;
});
incrementQueue.length = 0;
}
pushLoop = setTimeout(incrementLoop, 50);
pushLoop = setTimeout(incrementLoop, 250);
};
// TODO remove this after testing
setInterval(() => {
// eslint-disable-next-line no-console
console.log('INCREMENTATION QUEUE SIZE', incrementQueue.length, pushLoop);
}, 300000);
export async function incrementPixelcount(model, amount) {
const exists = incrementQueue.find((q) => q[0] === model);
if (exists) {
exists[1] += amount;
} else {
incrementQueue.push([model, amount]);
}
export async function incrementPixelcount(user) {
incrementQueue.push(user);
if (!pushLoop) {
pushLoop = setTimeout(incrementLoop, 0);
}