From 2707fb61236c87ae9581e19f2f0f13851c85623a Mon Sep 17 00:00:00 2001 From: HF Date: Thu, 22 Sep 2022 16:25:03 +0200 Subject: [PATCH] return cross-shard requests only to shard that requested make api/banme POST request --- src/data/sql/RegUser.js | 2 +- src/routes/api/banme.js | 28 ++++++++++++---- src/routes/api/index.js | 2 +- src/socket/MessageBroker.js | 64 ++++++++++++++++++++++++++----------- src/socket/SockEvents.js | 6 +++- src/store/actions/fetch.js | 5 +-- 6 files changed, 76 insertions(+), 31 deletions(-) diff --git a/src/data/sql/RegUser.js b/src/data/sql/RegUser.js index a016c1c..388dbcf 100644 --- a/src/data/sql/RegUser.js +++ b/src/data/sql/RegUser.js @@ -56,7 +56,7 @@ const RegUser = sequelize.define('User', { }, discordid: { - type: DataTypes.CHAR(18), + type: DataTypes.CHAR(20), allowNull: true, }, diff --git a/src/routes/api/banme.js b/src/routes/api/banme.js index 348ac0a..01795ab 100644 --- a/src/routes/api/banme.js +++ b/src/routes/api/banme.js @@ -7,11 +7,21 @@ import { banIP } from '../../data/sql/Ban'; import { getIPv6Subnet, getIPFromRequest } from '../../utils/ip'; async function banme(req, res) { - const { code } = req.query; + const { code } = req.body; + + const ip = getIPFromRequest(req); + // eslint-disable-next-line max-len + logger.info(`AUTOBAN ${code} - ${ip} of user ${req.user.id} with ua "${req.headers['user-agent']}"`); + let reason = 'AUTOBAN'; - if (code === '1') { + let expires = 0; + if (code === 1) { reason = 'Userscript Bot'; - } else if (code === '2') { + expires = Date.now() + 1000 * 3600 * 24 * 7; + /* + * ignore it for now to collect data manually + * + } else if (code === 2) { const ua = req.headers['user-agent']; if (ua && (ua.includes('Android') || ua.includes('iPhone'))) { res.json({ @@ -20,14 +30,18 @@ async function banme(req, res) { return; } reason = 'Captcha Solving Script'; + expires = Date.now() + 1000 * 3600 * 24 * 3; + */ + } else { + res.json({ + status: 'nope', + }); + return; } - const ip = getIPFromRequest(req); - // eslint-disable-next-line max-len - logger.info(`AUTOBAN ${code}${ip} of user ${req.user.id} with ua "${req.headers['user-agent']}"`); await banIP( getIPv6Subnet(ip), reason, - 0, + expires, 1, ); res.json({ diff --git a/src/routes/api/index.js b/src/routes/api/index.js index c64cd44..f0b633f 100644 --- a/src/routes/api/index.js +++ b/src/routes/api/index.js @@ -98,7 +98,7 @@ router.get('/chathistory', chatHistory); router.get('/me', me); -router.get('/banme', banme); +router.post('/banme', banme); router.use('/auth', auth); diff --git a/src/socket/MessageBroker.js b/src/socket/MessageBroker.js index 65fcf1b..46a2e83 100644 --- a/src/socket/MessageBroker.js +++ b/src/socket/MessageBroker.js @@ -35,6 +35,17 @@ class MessageBroker extends SocketEvents { super(); this.isCluster = true; this.thisShard = SHARD_NAME; + /* + * currently running cross-shard requests, + * are tracked in order to only send them to receiving + * shard + * [{ + * id: request id, + * shard: requesting shard name, + * ts: timestamp of request, + * },...] + */ + this.csReq = []; /* * all other shards */ @@ -95,22 +106,16 @@ class MessageBroker extends SocketEvents { const key = message.slice(message.indexOf(':') + 1, comma); console.log('CLUSTER: Broadcast', key); const val = JSON.parse(message.slice(comma + 1)); - /* if (key.startsWith('req:')) { - try { - const shard = message.slice(0, message.indexOf(':')); - const chan = val.shift(); - const ret = await super.req(key.slice(4), ...val); - this.publisher.publish( - `${LISTEN_PREFIX}:${shard}`, - `res:${chan},${JSON.stringify([ret])}`, - ); - } catch { - // nothing - } - return; + // cross-shard requests + const shard = message.slice(0, message.indexOf(':')); + const id = val[0]; + this.csReq.push({ + id, + shard, + ts: Date.now(), + }); } - */ super.emit(key, ...val); return; } @@ -145,6 +150,7 @@ class MessageBroker extends SocketEvents { try { const comma = message.indexOf(','); const key = message.slice(0, comma); + console.log(`CLUSTER shard listener got ${key}`); const val = JSON.parse(message.slice(comma + 1)); super.emit(key, ...val); } catch (err) { @@ -188,13 +194,14 @@ class MessageBroker extends SocketEvents { const callback = (retn) => { amountOtherShards -= 1; ret = combineObjects(ret, retn); - // eslint-disable-next-line - console.log(`CLUSTER got res:${type} from shard, ${amountOtherShards} still left`); if (amountOtherShards <= 0) { - console.log(`CLUSTER res:${type} finished`); + console.log(`CLUSTER res:${chan}:${type} finished`); this.off(chankey, callback); clearTimeout(id); resolve(ret); + } else { + // eslint-disable-next-line + console.log(`CLUSTER got res:${chan}:${type} from shard, ${amountOtherShards} still left`); } }; id = setTimeout(() => { @@ -202,7 +209,7 @@ class MessageBroker extends SocketEvents { if (ret) { resolve(ret); } else { - reject(new Error(`Timeout on req ${type}`)); + reject(new Error(`CLUSTER Timeout on wait for res:${chan}:${type}`)); } }, 45000); this.on(chankey, callback); @@ -210,6 +217,22 @@ class MessageBroker extends SocketEvents { }); } + res(chan, ret) { + // only response to requesting shard + const csre = this.csReq.find((r) => r.id === chan); + // eslint-disable-next-line + console.log(`CLUSTER send res:${chan} to shard ${csre && csre.shard}`); + if (csre) { + this.publisher.publish( + `${LISTEN_PREFIX}:${csre.shard}`, + `res:${chan},${JSON.stringify([ret])}`, + ); + this.csReq = this.csReq.filter((r) => r.id !== chan); + } else { + super.emit(`res:${chan}`, ret); + } + } + updateShardOnlineCounter(shard, cnt) { const shardCounter = this.shardOnlineCounters.find( (c) => c[0] === shard, @@ -347,7 +370,7 @@ class MessageBroker extends SocketEvents { checkHealth() { // remove disconnected shards - const threshold = Date.now() - 30000; + let threshold = Date.now() - 30000; const { shards } = this; Object.keys(shards).forEach((shard) => { if (shards[shard] < threshold) { @@ -364,6 +387,9 @@ class MessageBroker extends SocketEvents { }); // send keep alive to others this.publisher.publish(BROADCAST_CHAN, this.thisShard); + // clean up dead shard requests + threshold -= 30000; + this.csReq = this.csReq.filter((r) => r.ts > threshold); } } diff --git a/src/socket/SockEvents.js b/src/socket/SockEvents.js index bc8cfa3..919da47 100644 --- a/src/socket/SockEvents.js +++ b/src/socket/SockEvents.js @@ -73,10 +73,14 @@ class SocketEvents extends EventEmitter { }); } + res(chan, ret) { + this.emit(`res:${chan}`, ret); + } + onReq(type, cb) { this.on(`req:${type}`, async (chan, ...args) => { const ret = await cb(...args); - this.emit(`res:${chan}`, ret); + this.res(chan, ret); }); } diff --git a/src/store/actions/fetch.js b/src/store/actions/fetch.js index 8c3234b..cbe4267 100644 --- a/src/store/actions/fetch.js +++ b/src/store/actions/fetch.js @@ -348,7 +348,8 @@ export function requestIID() { } export function requestBanMe(code) { - return makeAPIGETRequest( - `/api/banme?code=${code}`, + return makeAPIPOSTRequest( + '/api/banme', + { code }, ); }