From b84c6e9b6012f72c8bac479f75cd29613143ac6f Mon Sep 17 00:00:00 2001 From: HF Date: Sat, 17 Sep 2022 00:29:12 +0200 Subject: [PATCH] make admin watch tools go over socket events (breaks it for shards) --- src/core/adminfunctions.js | 23 +++++++------- src/core/parsePixelLog.js | 14 +++++++++ src/core/utils.js | 25 ++++++++++++++++ src/socket/MessageBroker.js | 60 +++++++++++++++++++++++++++++++++++-- src/socket/SockEvents.js | 31 +++++++++++++++++++ 5 files changed, 140 insertions(+), 13 deletions(-) diff --git a/src/core/adminfunctions.js b/src/core/adminfunctions.js index 2ce590f..b50858b 100644 --- a/src/core/adminfunctions.js +++ b/src/core/adminfunctions.js @@ -11,6 +11,7 @@ import Sequelize from 'sequelize'; import isIPAllowed from './isAllowed'; import { validateCoorRange } from '../utils/validation'; import CanvasCleaner from './CanvasCleaner'; +import socketEvents from '../socket/socketEvents'; import { RegUser } from '../data/sql'; import { cleanCacheForIP, @@ -36,12 +37,6 @@ import { imageABGR2Canvas, protectCanvasArea, } from './Image'; -import { - getIIDSummary, - getIIDPixels, - getSummaryFromArea, - getPixelsFromArea, -} from './parsePixelLog'; import rollbackCanvasArea from './rollback'; /* @@ -299,13 +294,17 @@ export async function executeWatchAction( let ret; if (!ulcoor && !brcoor && iid) { if (action === 'summary') { - ret = await getIIDSummary( + ret = await socketEvents.req( + 'watch', + 'getIIDSummary', iid, time, ); } if (action === 'all') { - ret = await getIIDPixels( + ret = await socketEvents.req( + 'watch', + 'getIIDPixels', iid, time, ); @@ -332,7 +331,9 @@ export async function executeWatchAction( } if (action === 'summary') { - ret = await getSummaryFromArea( + ret = await socketEvents.req( + 'watch', + 'getSummaryFromArea', canvasid, x, y, u, v, time, @@ -340,7 +341,9 @@ export async function executeWatchAction( ); } if (action === 'all') { - ret = await getPixelsFromArea( + ret = await socketEvents.req( + 'watch', + 'getPixelsFromArea', canvasid, x, y, u, v, time, diff --git a/src/core/parsePixelLog.js b/src/core/parsePixelLog.js index 2dc749c..2373543 100644 --- a/src/core/parsePixelLog.js +++ b/src/core/parsePixelLog.js @@ -2,6 +2,7 @@ import fs from 'fs'; import readline from 'readline'; import { PIXELLOGGER_PREFIX } from './logger'; +import socketEvents from '../socket/socketEvents'; import { getNamesToIds } from '../data/sql/RegUser'; import { getIdsToIps, @@ -392,3 +393,16 @@ export async function getPixelsFromArea( rows, }; } + +socketEvents.onReq('watch', (action, ...args) => { + if (action === 'getIIDSummary') { + return getIIDSummary(...args); + } if (action === 'getIIDPixels') { + return getIIDPixels(...args); + } if (action === 'getSummaryFromArea') { + return getSummaryFromArea(...args); + } if (action === 'getPixelsFromArea') { + return getPixelsFromArea(...args); + } + return null; +}); diff --git a/src/core/utils.js b/src/core/utils.js index facf945..f89b916 100644 --- a/src/core/utils.js +++ b/src/core/utils.js @@ -496,3 +496,28 @@ export function parseInterval(interval) { } return (num * factor); } + +/* + * conbine two similar objects + */ +export function combineObjects(a, b) { + if (!b) { + return a; + } + if (!a) { + return b; + } + if (Array.isArray(a)) { + return a.concat(b); + } + if (typeof a === 'object') { + let keys = Object.keys(a); + keys.forEach((key) => { + const u = a[key]; + const v = b[key]; + a[key] = combineObjects(u, v); + }); + return a; + } + return a + b; +} diff --git a/src/socket/MessageBroker.js b/src/socket/MessageBroker.js index 00e8aa3..5669143 100644 --- a/src/socket/MessageBroker.js +++ b/src/socket/MessageBroker.js @@ -1,6 +1,7 @@ /* * sends messages to other ppfun instances * to work as cluster + * If methods have no descriptions, they can be checked in ./SockEvents.js */ /* eslint-disable no-console */ @@ -56,6 +57,9 @@ class MessageBroker extends SocketEvents { console.log('CLUSTER: Initialized message broker'); } + /* + * messages on shared broadcast channels that every shard is listening to + */ async onShardBCMessage(message) { try { /* @@ -117,6 +121,48 @@ class MessageBroker extends SocketEvents { || this.shardOnlineCounters[0][0] === this.thisShard; } + /* + * requests that go over all shards and wait for responses from all + */ + reqOnShards(type, ...args) { + return new Promise((resolve, reject) => { + const chan = Math.floor(Math.random() * 100000).toString() + + Math.floor(Math.random() * 100000).toString(); + const chankey = `res:${chan}`; + + let amountOtherShards = this.shardOnlineCounters.length - 1; + let id; + let ret = null; + await this.subscriber.subscribe(chankey, (message) => { + amountOtherShards -= 1; + const retn = JSON.parse(message); + combineObjects(ret, retn); + if (!amountOtherShards) { + this.subscriber.unsubscribe(chankey); + clearTimeout(id); + resolve(ret); + } + }); + id = setTimeout(() => { + this.subscriber.unsubscribe(chankey); + reject(new Error(`Timeout on req ${type}`)); + }, 45000); + this.emit(`req:${type}`, chan, ...args); + }); + } + async reqOnShards(...args) { + const ret = Promise.all([ + this.reqShard(...args), + super.req(...args), + ]); + } + onReq(type, cb) { + this.on(`req:${type}`, async (chan, ...args) => { + const ret = await cb(...args); + this.emit(`res:${chan}`, ret); + }); + } + updateShardOnlineCounter(shard, cnt) { const shardCounter = this.shardOnlineCounters.find( (c) => c[0] === shard, @@ -130,6 +176,9 @@ class MessageBroker extends SocketEvents { this.sumOnlineCounters(); } + /* + * messages on binary shard channels, where specific shards send from + */ onShardBinaryMessage(buffer, shard) { try { const opcode = buffer[0]; @@ -186,9 +235,6 @@ class MessageBroker extends SocketEvents { */ emit(key, ...args) { super.emit(key, ...args); - if (key === 'recvChatMessage') { - return; - } const msg = `${this.thisShard}:${key},${JSON.stringify(args)}`; this.publisher.publish('bc', msg); } @@ -223,6 +269,14 @@ class MessageBroker extends SocketEvents { } } + recvChatMessage( + user, + message, + channelId, + ) { + super.emit('recvChatMessage', user, message, channelId); + } + broadcastChunkUpdate( canvasId, chunk, diff --git a/src/socket/SockEvents.js b/src/socket/SockEvents.js index 0c3ba13..01c39b9 100644 --- a/src/socket/SockEvents.js +++ b/src/socket/SockEvents.js @@ -48,6 +48,37 @@ class SocketEvents extends EventEmitter { }); } + /* + * requests that expect a response + * req(type, args) can be awaited + * it will return a response from whatever listenes on onReq(type, cb(args)) + */ + req(type, ...args) { + return new Promise((resolve, reject) => { + const chan = Math.floor(Math.random() * 100000).toString() + + Math.floor(Math.random() * 100000).toString(); + const chankey = `res:${chan}`; + let id; + const callback = (ret) => { + clearTimeout(id); + resolve(ret); + }; + id = setTimeout(() => { + this.off(chankey, callback); + reject(new Error(`Timeout on req ${type}`)); + }, 45000); + this.once(chankey, callback); + this.emit(`req:${type}`, chan, ...args); + }); + } + + onReq(type, cb) { + this.on(`req:${type}`, async (chan, ...args) => { + const ret = await cb(...args); + this.emit(`res:${chan}`, ret); + }); + } + /* * broadcast pixel message via websocket * @param canvasId number ident of canvas