make admin watch tools go over socket events (breaks it for shards)

This commit is contained in:
HF 2022-09-17 00:29:12 +02:00
parent f7352a49c7
commit b84c6e9b60
5 changed files with 140 additions and 13 deletions

View File

@ -11,6 +11,7 @@ import Sequelize from 'sequelize';
import isIPAllowed from './isAllowed';
import { validateCoorRange } from '../utils/validation';
import CanvasCleaner from './CanvasCleaner';
import socketEvents from '../socket/socketEvents';
import { RegUser } from '../data/sql';
import {
cleanCacheForIP,
@ -36,12 +37,6 @@ import {
imageABGR2Canvas,
protectCanvasArea,
} from './Image';
import {
getIIDSummary,
getIIDPixels,
getSummaryFromArea,
getPixelsFromArea,
} from './parsePixelLog';
import rollbackCanvasArea from './rollback';
/*
@ -299,13 +294,17 @@ export async function executeWatchAction(
let ret;
if (!ulcoor && !brcoor && iid) {
if (action === 'summary') {
ret = await getIIDSummary(
ret = await socketEvents.req(
'watch',
'getIIDSummary',
iid,
time,
);
}
if (action === 'all') {
ret = await getIIDPixels(
ret = await socketEvents.req(
'watch',
'getIIDPixels',
iid,
time,
);
@ -332,7 +331,9 @@ export async function executeWatchAction(
}
if (action === 'summary') {
ret = await getSummaryFromArea(
ret = await socketEvents.req(
'watch',
'getSummaryFromArea',
canvasid,
x, y, u, v,
time,
@ -340,7 +341,9 @@ export async function executeWatchAction(
);
}
if (action === 'all') {
ret = await getPixelsFromArea(
ret = await socketEvents.req(
'watch',
'getPixelsFromArea',
canvasid,
x, y, u, v,
time,

View File

@ -2,6 +2,7 @@ import fs from 'fs';
import readline from 'readline';
import { PIXELLOGGER_PREFIX } from './logger';
import socketEvents from '../socket/socketEvents';
import { getNamesToIds } from '../data/sql/RegUser';
import {
getIdsToIps,
@ -392,3 +393,16 @@ export async function getPixelsFromArea(
rows,
};
}
socketEvents.onReq('watch', (action, ...args) => {
if (action === 'getIIDSummary') {
return getIIDSummary(...args);
} if (action === 'getIIDPixels') {
return getIIDPixels(...args);
} if (action === 'getSummaryFromArea') {
return getSummaryFromArea(...args);
} if (action === 'getPixelsFromArea') {
return getPixelsFromArea(...args);
}
return null;
});

View File

@ -496,3 +496,28 @@ export function parseInterval(interval) {
}
return (num * factor);
}
/*
* conbine two similar objects
*/
export function combineObjects(a, b) {
if (!b) {
return a;
}
if (!a) {
return b;
}
if (Array.isArray(a)) {
return a.concat(b);
}
if (typeof a === 'object') {
let keys = Object.keys(a);
keys.forEach((key) => {
const u = a[key];
const v = b[key];
a[key] = combineObjects(u, v);
});
return a;
}
return a + b;
}

View File

@ -1,6 +1,7 @@
/*
* sends messages to other ppfun instances
* to work as cluster
* If methods have no descriptions, they can be checked in ./SockEvents.js
*/
/* eslint-disable no-console */
@ -56,6 +57,9 @@ class MessageBroker extends SocketEvents {
console.log('CLUSTER: Initialized message broker');
}
/*
* messages on shared broadcast channels that every shard is listening to
*/
async onShardBCMessage(message) {
try {
/*
@ -117,6 +121,48 @@ class MessageBroker extends SocketEvents {
|| this.shardOnlineCounters[0][0] === this.thisShard;
}
/*
* requests that go over all shards and wait for responses from all
*/
reqOnShards(type, ...args) {
return new Promise((resolve, reject) => {
const chan = Math.floor(Math.random() * 100000).toString()
+ Math.floor(Math.random() * 100000).toString();
const chankey = `res:${chan}`;
let amountOtherShards = this.shardOnlineCounters.length - 1;
let id;
let ret = null;
await this.subscriber.subscribe(chankey, (message) => {
amountOtherShards -= 1;
const retn = JSON.parse(message);
combineObjects(ret, retn);
if (!amountOtherShards) {
this.subscriber.unsubscribe(chankey);
clearTimeout(id);
resolve(ret);
}
});
id = setTimeout(() => {
this.subscriber.unsubscribe(chankey);
reject(new Error(`Timeout on req ${type}`));
}, 45000);
this.emit(`req:${type}`, chan, ...args);
});
}
async reqOnShards(...args) {
const ret = Promise.all([
this.reqShard(...args),
super.req(...args),
]);
}
onReq(type, cb) {
this.on(`req:${type}`, async (chan, ...args) => {
const ret = await cb(...args);
this.emit(`res:${chan}`, ret);
});
}
updateShardOnlineCounter(shard, cnt) {
const shardCounter = this.shardOnlineCounters.find(
(c) => c[0] === shard,
@ -130,6 +176,9 @@ class MessageBroker extends SocketEvents {
this.sumOnlineCounters();
}
/*
* messages on binary shard channels, where specific shards send from
*/
onShardBinaryMessage(buffer, shard) {
try {
const opcode = buffer[0];
@ -186,9 +235,6 @@ class MessageBroker extends SocketEvents {
*/
emit(key, ...args) {
super.emit(key, ...args);
if (key === 'recvChatMessage') {
return;
}
const msg = `${this.thisShard}:${key},${JSON.stringify(args)}`;
this.publisher.publish('bc', msg);
}
@ -223,6 +269,14 @@ class MessageBroker extends SocketEvents {
}
}
recvChatMessage(
user,
message,
channelId,
) {
super.emit('recvChatMessage', user, message, channelId);
}
broadcastChunkUpdate(
canvasId,
chunk,

View File

@ -48,6 +48,37 @@ class SocketEvents extends EventEmitter {
});
}
/*
* requests that expect a response
* req(type, args) can be awaited
* it will return a response from whatever listenes on onReq(type, cb(args))
*/
req(type, ...args) {
return new Promise((resolve, reject) => {
const chan = Math.floor(Math.random() * 100000).toString()
+ Math.floor(Math.random() * 100000).toString();
const chankey = `res:${chan}`;
let id;
const callback = (ret) => {
clearTimeout(id);
resolve(ret);
};
id = setTimeout(() => {
this.off(chankey, callback);
reject(new Error(`Timeout on req ${type}`));
}, 45000);
this.once(chankey, callback);
this.emit(`req:${type}`, chan, ...args);
});
}
onReq(type, cb) {
this.on(`req:${type}`, async (chan, ...args) => {
const ret = await cb(...args);
this.emit(`res:${chan}`, ret);
});
}
/*
* broadcast pixel message via websocket
* @param canvasId number ident of canvas