use EventEmitter for websockets

This commit is contained in:
HF 2021-07-10 15:51:15 +02:00
parent 21c99f7447
commit 566f91c690
13 changed files with 214 additions and 258 deletions

View File

@ -3,12 +3,12 @@ import { Op } from 'sequelize';
import logger from './logger';
import redis from '../data/redis';
import User from '../data/models/User';
import webSockets from '../socket/websockets';
import RateLimiter from '../utils/RateLimiter';
import {
Channel, RegUser, UserChannel, Message,
} from '../data/models';
import ChatMessageBuffer from './ChatMessageBuffer';
import socketEvents from '../socket/SocketEvents';
import { cheapDetector } from './isProxy';
import { DailyCron } from '../utils/cron';
import ttags from './ttag';
@ -51,6 +51,18 @@ export class ChatProvider {
this.mutedCountries = [];
this.chatMessageBuffer = new ChatMessageBuffer();
this.clearOldMessages = this.clearOldMessages.bind(this);
socketEvents.on('recvChatMessage', (user, message, channelId) => {
const errorMsg = this.sendMessage(user, message, channelId);
if (errorMsg) {
this.broadcastChatMessage(
'info',
errorMsg,
channelId,
this.infoUserId,
);
}
});
}
async clearOldMessages() {
@ -174,7 +186,7 @@ export class ChatProvider {
});
if (created) {
webSockets.broadcastAddChatChannel(
socketEvents.broadcastAddChatChannel(
userId,
channelId,
channelArray,
@ -430,7 +442,7 @@ export class ChatProvider {
id,
country,
);
webSockets.broadcastChatMessage(
socketEvents.broadcastChatMessage(
name,
message,
channelId,
@ -497,5 +509,4 @@ export class ChatProvider {
}
}
const chatProvider = new ChatProvider();
export default chatProvider;
export default new ChatProvider();

View File

@ -4,7 +4,7 @@
* @flow
*/
import webSockets from '../socket/websockets';
import socketEvents from '../socket/SocketEvents';
class PixelCache {
PXL_CACHE: Map<number, Buffer>;
@ -56,7 +56,7 @@ class PixelCache {
cache.forEach((pxls, chunkCanvasId) => {
const canvasId = (chunkCanvasId & 0xFF0000) >> 16;
const chunkId = chunkCanvasId & 0x00FFFF;
webSockets.broadcastPixels(canvasId, chunkId, pxls);
socketEvents.broadcastPixels(canvasId, chunkId, pxls);
});
}
}

View File

@ -6,8 +6,7 @@
*
* @flow
*/
import webSockets from '../socket/websockets';
import WebSocketEvents from '../socket/WebSocketEvents';
import socketEvents from '../socket/SocketEvents';
import PixelUpdate from '../socket/packets/PixelUpdateServer';
import { setPixelByOffset } from './setPixel';
import { TILE_SIZE } from './constants';
@ -19,7 +18,7 @@ const TARGET_RADIUS = 62;
const EVENT_DURATION_MIN = 10;
// const EVENT_DURATION_MIN = 1;
class Void extends WebSocketEvents {
class Void {
i: number;
j: number;
maxClr: number;
@ -33,7 +32,6 @@ class Void extends WebSocketEvents {
ended: boolean;
constructor(centerCell) {
super();
// chunk coordinates
const [i, j] = centerCell;
this.i = i;
@ -41,7 +39,7 @@ class Void extends WebSocketEvents {
this.ended = false;
this.maxClr = canvases[CANVAS_ID].colors.length;
const area = TARGET_RADIUS ** 2 * Math.PI;
const online = webSockets.onlineCounter;
const online = socketEvents.onlineCounter;
// require an average of 0.25 px / min / user
const requiredSpeed = Math.floor(online / 1.8);
const ppm = Math.ceil(area / EVENT_DURATION_MIN + requiredSpeed);
@ -60,7 +58,7 @@ class Void extends WebSocketEvents {
this.cancel = this.cancel.bind(this);
this.checkStatus = this.checkStatus.bind(this);
this.broadcastPixelBuffer = this.broadcastPixelBuffer.bind(this);
webSockets.addListener(this);
socketEvents.addListener('pixelUpdate', this.broadcastPixelBuffer);
this.voidLoop();
}
@ -162,13 +160,13 @@ class Void extends WebSocketEvents {
}
cancel() {
webSockets.remListener(this);
socketEvents.removeListener('pixelUpdate', this.broadcastPixelBuffer);
this.ended = true;
}
checkStatus() {
if (this.ended) {
webSockets.remListener(this);
socketEvents.removeListener('pixelUpdate', this.broadcastPixelBuffer);
return 100;
}
return Math.floor(this.curRadius * 100 / TARGET_RADIUS);

View File

@ -6,7 +6,7 @@
import type { Request, Response } from 'express';
import webSockets from '../../../socket/websockets';
import socketEvents from '../../../socket/SocketEvents';
import { RegUser } from '../../../data/models';
import { validateName } from '../../../utils/validation';
@ -46,7 +46,7 @@ export default async (req: Request, res: Response) => {
await user.regUser.update({ name });
webSockets.reloadUser(oldname);
socketEvents.reloadUser(oldname);
res.json({
success: true,

View File

@ -5,7 +5,7 @@
import type { Request, Response } from 'express';
import webSockets from '../../../socket/websockets';
import socketEvents from '../../../socket/SocketEvents';
import getHtml from '../../../ssr-components/RedirectionPage';
import { getHostFromRequest } from '../../../utils/ip';
import mailProvider from '../../../core/mail';
@ -19,7 +19,7 @@ export default async (req: Request, res: Response) => {
if (name) {
// notify websoecket to reconnect user
// thats a bit counter productive because it directly links to the websocket
webSockets.reloadUser(name);
socketEvents.reloadUser(name);
// ---
const index = getHtml(
t`Mail verification`,

View File

@ -8,7 +8,7 @@
import type { Request, Response } from 'express';
import logger from '../../core/logger';
import webSockets from '../../socket/websockets';
import socketEvents from '../../socket/SocketEvents';
import { RegUser, UserBlock, Channel } from '../../data/models';
async function block(req: Request, res: Response) {
@ -108,8 +108,8 @@ async function block(req: Request, res: Response) {
if (channel) {
const channelId = channel.id;
channel.destroy();
webSockets.broadcastRemoveChatChannel(user.id, channelId);
webSockets.broadcastRemoveChatChannel(userId, channelId);
socketEvents.broadcastRemoveChatChannel(user.id, channelId);
socketEvents.broadcastRemoveChatChannel(userId, channelId);
}
if (ret) {

View File

@ -8,7 +8,7 @@
import type { Request, Response } from 'express';
import logger from '../../core/logger';
import webSockets from '../../socket/websockets';
import socketEvents from '../../socket/SocketEvents';
async function blockdm(req: Request, res: Response) {
const { block } = req.body;
@ -47,8 +47,8 @@ async function blockdm(req: Request, res: Response) {
const channelId = channel.id;
const { dmu1id, dmu2id } = channel;
channel.destroy();
webSockets.broadcastRemoveChatChannel(dmu1id, channelId);
webSockets.broadcastRemoveChatChannel(dmu2id, channelId);
socketEvents.broadcastRemoveChatChannel(dmu1id, channelId);
socketEvents.broadcastRemoveChatChannel(dmu2id, channelId);
}
}

View File

@ -8,7 +8,7 @@
import type { Request, Response } from 'express';
import logger from '../../core/logger';
import webSockets from '../../socket/websockets';
import socketEvents from '../../socket/SocketEvents';
async function leaveChan(req: Request, res: Response) {
const channelId = parseInt(req.body.channelId, 10);
@ -65,7 +65,7 @@ async function leaveChan(req: Request, res: Response) {
user.regUser.removeChannel(channel);
webSockets.broadcastRemoveChatChannel(user.id, channelId);
socketEvents.broadcastRemoveChatChannel(user.id, channelId);
res.json({
status: 'ok',

View File

@ -10,14 +10,11 @@
import WebSocket from 'ws';
import WebSocketEvents from './WebSocketEvents';
import webSockets from './websockets';
import socketEvents from './SocketEvents';
import { getIPFromRequest } from '../utils/ip';
import { setPixelByCoords } from '../core/setPixel';
import logger from '../core/logger';
import { APISOCKET_KEY } from '../core/config';
import chatProvider from '../core/ChatProvider';
function heartbeat() {
this.isAlive = true;
@ -38,13 +35,11 @@ async function verifyClient(info, done) {
}
class APISocketServer extends WebSocketEvents {
class APISocketServer {
wss: WebSocket.Server;
constructor() {
super();
logger.info('Starting API websocket server');
webSockets.addListener(this);
const wss = new WebSocket.Server({
perMessageDeflate: false,
@ -75,7 +70,15 @@ class APISocketServer extends WebSocketEvents {
});
});
this.broadcast = this.broadcast.bind(this);
this.broadcastPixelBuffer = this.broadcastPixelBuffer.bind(this);
this.ping = this.ping.bind(this);
this.broadcastChatMessage = this.broadcastChatMessage.bind(this);
socketEvents.on('broadcast', this.broadcast);
socketEvents.on('pixelUpdate', this.broadcastPixelBuffer);
socketEvents.on('chatMessage', this.broadcastChatMessage);
setInterval(this.ping, 45 * 1000);
}
@ -100,26 +103,34 @@ class APISocketServer extends WebSocketEvents {
});
}
broadcastOnlineCounter(buffer) {
const frame = WebSocket.Sender.frame(buffer, {
readOnly: true,
mask: false,
rsv1: false,
opcode: 2,
fin: true,
});
this.wss.clients.forEach((client) => {
if (client.subOnline && client.readyState === WebSocket.OPEN) {
frame.forEach((data) => {
try {
// eslint-disable-next-line no-underscore-dangle
client._socket.write(data);
} catch (error) {
logger.error('(!) Catched error on write apisocket:', error);
}
});
}
});
broadcast(data) {
if (typeof data === 'string') {
this.wss.clients.forEach((ws) => {
if (ws.readyState === WebSocket.OPEN) {
ws.send(data);
}
});
} else {
const frame = WebSocket.Sender.frame(data, {
readOnly: true,
mask: false,
rsv1: false,
opcode: 2,
fin: true,
});
this.wss.clients.forEach((ws) => {
if (ws.readyState === WebSocket.OPEN) {
frame.forEach((buffer) => {
try {
// eslint-disable-next-line no-underscore-dangle
ws._socket.write(buffer);
} catch (error) {
logger.error(`WebSocket broadcast error: ${error.message}`);
}
});
}
});
}
}
broadcastPixelBuffer(canvasId, chunkid, buffer) {
@ -181,11 +192,17 @@ class APISocketServer extends WebSocketEvents {
logger.info(`APISocket message ${message}`);
if (command === 'chat') {
const [name, msg, country, channelId] = packet;
chatProvider.broadcastChatMessage(
/*
* do not send message back up ws that sent it
* TODO: user id should not be hardcoded,
* consider it whenever this actually gets used and
* becomes an issue.
*/
socketEvents.broadcastChatMessage(
name,
msg,
channelId,
chatProvider.infoUserId,
1,
country,
false,
);
@ -193,7 +210,7 @@ class APISocketServer extends WebSocketEvents {
name,
msg,
channelId,
chatProvider.infoUserId,
1,
country,
true,
ws,

View File

@ -1,42 +1,25 @@
/* @flow
*
* Serverside communication with websockets.
* In general all values that get broadcasted here have to be sanitized already.
*
* Events for WebSockets
*/
import EventEmitter from 'events';
import OnlineCounter from './packets/OnlineCounter';
import PixelUpdate from './packets/PixelUpdateServer';
class WebSockets {
listeners: Array<Object>;
onlineCounter: number;
class SocketEvents extends EventEmitter {
constructor() {
this.listeners = [];
super();
this.onlineCounter = 0;
}
addListener(listener) {
this.listeners.push(listener);
}
remListener(listener) {
const index = this.listeners.indexOf(listener);
if (index > -1) {
this.listeners.splice(index, 1);
}
}
/*
* broadcast message via websocket
* @param message Message to send
*/
broadcast(message: Buffer) {
this.listeners.forEach(
(listener) => listener.broadcast(message),
);
this.emit('broadcast', message);
}
/*
@ -51,9 +34,21 @@ class WebSockets {
pixels: Buffer,
) {
const buffer = PixelUpdate.dehydrate(chunkId, pixels);
this.listeners.forEach(
(listener) => listener.broadcastPixelBuffer(canvasId, chunkId, buffer),
);
this.emit('pixelUpdate', canvasId, chunkId, buffer);
}
/*
* received Chat message on own websocket
* @param user User Instance that sent the message
* @param message text message
* @param channelId numerical channel id
*/
recvChatMessage(
user: Object,
message: string,
channelId: number,
) {
this.emit('recvChatMessage', user, message, channelId);
}
/*
@ -71,16 +66,14 @@ class WebSockets {
country: string = 'xx',
sendapi: boolean = true,
) {
country = country || 'xx';
this.listeners.forEach(
(listener) => listener.broadcastChatMessage(
name,
message,
channelId,
id,
country,
sendapi,
),
this.emit(
'chatMessage',
name,
message,
channelId,
id,
country || 'xx',
sendapi,
);
}
@ -95,12 +88,11 @@ class WebSockets {
channelId: number,
channelArray: Array,
) {
this.listeners.forEach(
(listener) => listener.broadcastAddChatChannel(
userId,
channelId,
channelArray,
),
this.emit(
'addChatChannel',
userId,
channelId,
channelArray,
);
}
@ -114,21 +106,14 @@ class WebSockets {
userId: number,
channelId: number,
) {
this.listeners.forEach(
(listener) => listener.broadcastRemoveChatChannel(
userId,
channelId,
),
);
this.emit('remChatChannel', userId, channelId);
}
/*
* reload user on websocket to get changes
*/
reloadUser(name: string) {
this.listeners.forEach(
(listener) => listener.reloadUser(name),
);
this.emit('reloadUser', name);
}
/*
@ -138,11 +123,8 @@ class WebSockets {
broadcastOnlineCounter(online: number) {
this.onlineCounter = online;
const buffer = OnlineCounter.dehydrate({ online });
this.listeners.forEach(
(listener) => listener.broadcastOnlineCounter(buffer),
);
this.emit('broadcast', buffer);
}
}
const webSockets = new WebSockets();
export default webSockets;
export default new SocketEvents();

View File

@ -18,10 +18,9 @@ import DeRegisterMultipleChunks from './packets/DeRegisterMultipleChunks';
import ChangedMe from './packets/ChangedMe';
import OnlineCounter from './packets/OnlineCounter';
import socketEvents from './SocketEvents';
import chatProvider, { ChatProvider } from '../core/ChatProvider';
import authenticateClient from './verifyClient';
import WebSocketEvents from './WebSocketEvents';
import webSockets from './websockets';
import { drawSafeByOffsets } from '../core/draw';
import { needCaptcha } from '../utils/captcha';
import { cheapDetector } from '../core/isProxy';
@ -50,16 +49,14 @@ async function verifyClient(info, done) {
}
class SocketServer extends WebSocketEvents {
class SocketServer {
wss: WebSocket.Server;
CHUNK_CLIENTS: Map<number, Array>;
// constructor(server: http.Server) {
constructor() {
super();
this.CHUNK_CLIENTS = new Map();
logger.info('Starting websocket server');
webSockets.addListener(this);
const wss = new WebSocket.Server({
perMessageDeflate: false,
@ -80,7 +77,6 @@ class SocketServer extends WebSocketEvents {
ws.isAlive = true;
ws.canvasId = null;
ws.startDate = Date.now();
ws.on('pong', heartbeat);
const user = await authenticateClient(req);
ws.user = user;
ws.name = user.getName();
@ -91,15 +87,20 @@ class SocketServer extends WebSocketEvents {
}));
const ip = getIPFromRequest(req);
ws.on('error', (e) => {
logger.error(`WebSocket Client Error for ${ws.name}: ${e.message}`);
});
ws.on('pong', heartbeat);
ws.on('close', () => {
// is close called on terminate?
// possible memory leak?
ipCounter.delete(ip);
this.deleteAllChunks(ws);
});
ws.on('message', (message) => {
if (typeof message === 'string') {
this.onTextMessage(message, ws);
@ -109,18 +110,53 @@ class SocketServer extends WebSocketEvents {
});
});
this.broadcast = this.broadcast.bind(this);
this.broadcastPixelBuffer = this.broadcastPixelBuffer.bind(this);
this.reloadUser = this.reloadUser.bind(this);
this.ping = this.ping.bind(this);
/*
* i don't tink that we really need that, it just stresses the server
* with lots of reconnects at once, the overhead of having a few idle
* connections isn't too bad in comparison
*/
// this.killOld = this.killOld.bind(this);
// setInterval(this.killOld, 10 * 60 * 1000);
socketEvents.on('broadcast', this.broadcast);
socketEvents.on('pixelUpdate', this.broadcastPixelBuffer);
socketEvents.on('reloadUser', this.reloadUser);
socketEvents.on('chatMessage', (
name,
message,
channelId,
id,
country,
) => {
const text = JSON.stringify([name, message, country, channelId, id]);
this.wss.clients.forEach((ws) => {
if (ws.readyState === WebSocket.OPEN) {
if (chatProvider.userHasChannelAccess(ws.user, channelId)) {
ws.send(text);
}
}
});
});
socketEvents.on('addChatChannel', (userId, channelId, channelArray) => {
this.findAllWsByUerId(userId).forEach((ws) => {
ws.user.addChannel(channelId, channelArray);
const text = JSON.stringify([
'addch', {
[channelId]: channelArray,
},
]);
ws.send(text);
});
});
socketEvents.on('remChatChannel', (userId, channelId) => {
this.findAllWsByUerId(userId).forEach((ws) => {
ws.user.removeChannel(channelId);
const text = JSON.stringify(['remch', channelId]);
ws.send(text);
});
});
setInterval(SocketServer.onlineCounterBroadcast, 10 * 1000);
// https://github.com/websockets/ws#how-to-detect-and-close-broken-connections
setInterval(this.ping, 45 * 1000);
}
@ -129,47 +165,34 @@ class SocketServer extends WebSocketEvents {
* https://github.com/websockets/ws/issues/617
* @param data
*/
broadcast(data: Buffer) {
const frame = WebSocket.Sender.frame(data, {
readOnly: true,
mask: false,
rsv1: false,
opcode: 2,
fin: true,
});
this.wss.clients.forEach((ws) => {
if (ws.readyState === WebSocket.OPEN) {
frame.forEach((buffer) => {
try {
// eslint-disable-next-line no-underscore-dangle
ws._socket.write(buffer);
} catch (error) {
logger.error(`WebSocket broadcast error: ${error.message}`);
}
});
}
});
}
broadcastOnlineCounter(buffer: Buffer) {
this.broadcast(buffer);
}
broadcastChatMessage(
name: string,
message: string,
channelId: number,
id: number,
country: string,
) {
const text = JSON.stringify([name, message, country, channelId, id]);
this.wss.clients.forEach((ws) => {
if (ws.readyState === WebSocket.OPEN) {
if (chatProvider.userHasChannelAccess(ws.user, channelId)) {
ws.send(text);
broadcast(data) {
if (typeof data === 'string') {
this.wss.clients.forEach((ws) => {
if (ws.readyState === WebSocket.OPEN) {
ws.send(data);
}
}
});
});
} else {
const frame = WebSocket.Sender.frame(data, {
readOnly: true,
mask: false,
rsv1: false,
opcode: 2,
fin: true,
});
this.wss.clients.forEach((ws) => {
if (ws.readyState === WebSocket.OPEN) {
frame.forEach((buffer) => {
try {
// eslint-disable-next-line no-underscore-dangle
ws._socket.write(buffer);
} catch (error) {
logger.error(`WebSocket broadcast error: ${error.message}`);
}
});
}
});
}
}
/*
@ -189,35 +212,18 @@ class SocketServer extends WebSocketEvents {
return null;
}
broadcastAddChatChannel(
userId: number,
channelId: number,
channelArray: Array,
) {
this.wss.clients.forEach((ws) => {
findAllWsByUerId(userId) {
const clients = [];
const it = this.wss.clients.keys();
let client = it.next();
while (!client.done) {
const ws = client.value;
if (ws.user.id === userId && ws.readyState === WebSocket.OPEN) {
ws.user.addChannel(channelId, channelArray);
const text = JSON.stringify([
'addch', {
[channelId]: channelArray,
},
]);
ws.send(text);
clients.push(ws);
}
});
}
broadcastRemoveChatChannel(
userId: number,
channelId: number,
) {
this.wss.clients.forEach((ws) => {
if (ws.user.id === userId && ws.readyState === WebSocket.OPEN) {
ws.user.removeChannel(channelId);
const text = JSON.stringify(['remch', channelId]);
ws.send(text);
}
});
client = it.next();
}
return clients;
}
broadcastPixelBuffer(canvasId: number, chunkid, data: Buffer) {
@ -263,14 +269,6 @@ class SocketServer extends WebSocketEvents {
});
}
killOld() {
const now = Date.now();
this.wss.clients.forEach((ws) => {
const lifetime = now - ws.startDate;
if (lifetime > 30 * 60 * 1000 && Math.random() < 0.3) ws.terminate();
});
}
ping() {
this.wss.clients.forEach((ws) => {
if (!ws.isAlive) {
@ -284,7 +282,7 @@ class SocketServer extends WebSocketEvents {
static onlineCounterBroadcast() {
const online = ipCounter.amount() || 0;
webSockets.broadcastOnlineCounter(online);
socketEvents.broadcastOnlineCounter(online);
}
async onTextMessage(text, ws) {
@ -319,6 +317,9 @@ class SocketServer extends WebSocketEvents {
* if DM channel, make sure that other user has DM open
* (needed because we allow user to leave one-sided
* and auto-join on message)
* TODO: if we scale and have multiple websocket servers at some point
* this might be an issue. We would hve to make a shared list of online
* users and act based on that on 'chatMessage' event
*/
const dmUserId = chatProvider.checkIfDm(user, channelId);
if (dmUserId) {
@ -337,20 +338,11 @@ class SocketServer extends WebSocketEvents {
/*
* send chat message
*/
const errorMsg = await chatProvider.sendMessage(
socketEvents.recvChatMessage(
user,
message,
channelId,
);
if (errorMsg) {
ws.send(JSON.stringify([
'info',
errorMsg,
'il',
channelId,
chatProvider.infoUserId,
]));
}
} else {
logger.info('Got empty message or message from unidentified ws');
}

View File

@ -1,44 +0,0 @@
/* @flow
*
* Parent class for socket servers
*
*/
/* eslint-disable no-unused-vars */
/* eslint-disable class-methods-use-this */
class WebSocketEvents {
broadcast(message: Buffer) {
}
broadcastPixelBuffer(canvasId: number, chunkid: number, buffer: Buffer) {
}
broadcastChatMessage(
name: string,
message: string,
channelId: number,
userId: number,
) {}
broadcastAddChatChannel(
userId: number,
channelId: number,
channelArray: Array,
) {
}
broadcastRemoveChatChannel(
userId: number,
channelId: number,
) {
}
reloadUser(name: string) {
}
broadcastOnlineCounter(data: Buffer) {
}
}
export default WebSocketEvents;

View File

@ -194,9 +194,9 @@ const promise = models.sync({ alter: { drop: false } })
// const promise = models.sync()
.catch((err) => logger.error(err.stack));
promise.then(() => {
rankings.updateRanking();
chatProvider.initialize();
server.listen(PORT, HOST, () => {
rankings.updateRanking();
chatProvider.initialize();
const address = server.address();
logger.log(
'info',