Resolve circular dependencies

This commit is contained in:
HF 2020-01-14 17:42:18 +01:00
parent 2820805f35
commit 23bfebbfe5
13 changed files with 240 additions and 150 deletions

View File

@ -5,7 +5,7 @@ import { using } from 'bluebird';
import type { User } from '../data/models';
import { redlock } from '../data/redis';
import { getChunkOfPixel, getOffsetOfPixel } from './utils';
import { broadcastPixel } from '../socket/websockets';
import webSockets from '../socket/websockets';
import logger from './logger';
import RedisCanvas from '../data/models/RedisCanvas';
import { registerPixelChange } from './tileserver';
@ -29,7 +29,7 @@ export function setPixel(
const [i, j] = getChunkOfPixel([x, y], canvasSize);
const offset = getOffsetOfPixel(x, y, canvasSize);
RedisCanvas.setPixelInChunk(i, j, offset, color, canvasId);
broadcastPixel(canvasId, i, j, offset, color);
webSockets.broadcastPixel(canvasId, i, j, offset, color);
}
/**

View File

@ -22,6 +22,17 @@ export function sum(values: Array<number>): number {
return total;
}
/*
* returns random integer
* @param min Minimum of random integer
* @param max Maximum of random integer
* @return random integer between min and max (min <= ret <= max)
*/
export function getRandomInt(min, max) {
const range = max - min + 1;
return min + (Math.floor(Math.random() * range));
}
export function distMax([x1, y1]: Cell, [x2, y2]: Cell): number {
return Math.max(Math.abs(x1 - x2), Math.abs(y1 - y2));
}

View File

@ -72,7 +72,7 @@ class RedisCanvas {
RedisCanvas.registerChunkChange(canvasId, [i, j]);
}
static async getPixel(
static async getPixelIfExists(
x: number,
y: number,
canvasId: number,
@ -81,7 +81,6 @@ class RedisCanvas {
// 3rd bit -> protected or not
// rest (5 bits) -> index of color
const canvasSize = canvases[canvasId].size;
const canvasAlpha = canvases[canvasId].alpha;
const [i, j] = getChunkOfPixel([x, y], canvasSize);
const offset = getOffsetOfPixel(x, y, canvasSize);
const args = [
@ -91,9 +90,19 @@ class RedisCanvas {
`#${offset}`,
];
const result: ?number = await redis.sendCommandAsync('bitfield', args);
if (!result) return canvasAlpha;
if (!result) return null;
const color = result[0];
return color || canvasAlpha;
return color;
}
static async getPixel(
x: number,
y: number,
canvasId: number,
): Promise<number> {
const canvasAlpha = canvases[canvasId].alpha;
const clr = RedisCanvas.getPixelIfExists(x, y, canvasId);
return (clr == null) ? canvasAlpha : clr;
}
}

View File

@ -5,7 +5,7 @@
import type { Request, Response } from 'express';
import { broadcastMinecraftLink } from '../../../socket/websockets';
import webSockets from '../../../socket/websockets';
export default async (req: Request, res: Response) => {
@ -40,5 +40,5 @@ export default async (req: Request, res: Response) => {
} else {
return;
}
broadcastMinecraftLink(name, minecraftid, accepted);
webSockets.broadcastMinecraftLink(name, minecraftid, accepted);
};

View File

@ -10,7 +10,7 @@
import type { Request, Response } from 'express';
import canvases from '../../canvases.json';
import { broadcastMinecraftTP } from '../../socket/websockets';
import webSockets from '../../socket/websockets';
const CANVAS_MAX_XY = (canvases[0].size / 2);
const CANVAS_MIN_XY = -CANVAS_MAX_XY;
@ -47,7 +47,7 @@ export default async (req: Request, res: Response) => {
return;
}
await broadcastMinecraftTP(minecraftid, x, y);
webSockets.broadcastMinecraftTP(minecraftid, x, y);
res.json({
success: true,

View File

@ -8,17 +8,15 @@
* @flow */
import EventEmitter from 'events';
import WebSocket from 'ws';
import WebSocketEvents from './WebSocketEvents';
import webSockets from './websockets';
import { getIPFromRequest } from '../utils/ip';
import Minecraft from '../core/minecraft';
import { drawUnsafe, setPixel } from '../core/draw';
import logger from '../core/logger';
import redis from '../data/redis';
import ChatHistory from './ChatHistory';
import { broadcastChatMessage, notifyChangedMe } from './websockets';
import { APISOCKET_KEY } from '../core/config';
function heartbeat() {
@ -39,13 +37,14 @@ async function verifyClient(info, done) {
}
class APISocketServer extends EventEmitter {
class APISocketServer extends WebSocketEvents {
wss: WebSocket.Server;
mc: Minecraft;
constructor() {
super();
logger.info('Starting API websocket server');
webSockets.addListener(this);
const wss = new WebSocket.Server({
perMessageDeflate: false,
@ -79,7 +78,9 @@ class APISocketServer extends EventEmitter {
setInterval(this.ping, 45 * 1000);
}
broadcastChatMessage(name, msg, ws = null) {
broadcastChatMessage(name, msg, sendapi, ws = null) {
if (!sendapi) return;
const sendmsg = JSON.stringify(['msg', name, msg]);
this.wss.clients.forEach((client) => {
if (client !== ws && client.subChat && client.readyState === WebSocket.OPEN) {
@ -127,7 +128,8 @@ class APISocketServer extends EventEmitter {
});
}
broadcastPixelBuffer(chunkid, buffer) {
broadcastPixelBuffer(canvasId, chunkid, buffer) {
if (canvasId !== 0) return;
const frame = WebSocket.Sender.frame(buffer, {
readOnly: true,
mask: false,
@ -228,21 +230,21 @@ class APISocketServer extends EventEmitter {
const [minecraftname, msg] = packet;
const user = this.mc.minecraftname2User(minecraftname);
const chatname = (user.id) ? `[MC] ${user.regUser.name}` : `[MC] ${minecraftname}`;
broadcastChatMessage(chatname, msg, false);
this.broadcastChatMessage(chatname, msg, ws);
webSockets.broadcastChatMessage(chatname, msg, false);
this.broadcastChatMessage(chatname, msg, true, ws);
return;
}
if (command == 'chat') {
const [name, msg] = packet;
broadcastChatMessage(name, msg, false);
this.broadcastChatMessage(name, msg, ws);
webSockets.broadcastChatMessage(name, msg, false);
this.broadcastChatMessage(name, msg, true, ws);
return;
}
if (command == 'linkacc') {
const [minecraftid, minecraftname, name] = packet;
const ret = await this.mc.linkacc(minecraftid, minecraftname, name);
if (!ret) {
notifyChangedMe(name);
webSockets.notifyChangedMe(name);
}
ws.send(JSON.stringify([
'linkret',

View File

@ -1,7 +1,6 @@
/* @flow */
import EventEmitter from 'events';
import WebSocket from 'ws';
import logger from '../core/logger';
@ -16,12 +15,12 @@ import DeRegisterChunk from './packets/DeRegisterChunk';
import DeRegisterMultipleChunks from './packets/DeRegisterMultipleChunks';
import RequestChatHistory from './packets/RequestChatHistory';
import CoolDownPacket from './packets/CoolDownPacket';
import PixelUpdate from './packets/PixelUpdate';
import ChangedMe from './packets/ChangedMe';
import ChatHistory from './ChatHistory';
import authenticateClient from './verifyClient';
import { broadcastChatMessage } from './websockets';
import WebSocketEvents from './WebSocketEvents';
import webSockets from './websockets';
const ipCounter: Counter<string> = new Counter();
@ -46,7 +45,7 @@ async function verifyClient(info, done) {
}
class SocketServer extends EventEmitter {
class SocketServer extends WebSocketEvents {
wss: WebSocket.Server;
CHUNK_CLIENTS: Map<number, Array>;
@ -55,6 +54,7 @@ class SocketServer extends EventEmitter {
super();
this.CHUNK_CLIENTS = new Map();
logger.info('Starting websocket server');
webSockets.addListener(this);
const wss = new WebSocket.Server({
perMessageDeflate: false,
@ -92,14 +92,20 @@ class SocketServer extends EventEmitter {
this.deleteAllChunks(ws);
});
ws.on('message', (message) => {
if (typeof message === 'string') { this.onTextMessage(message, ws); } else { this.onBinaryMessage(message, ws); }
if (typeof message === 'string') {
this.onTextMessage(message, ws);
} else {
this.onBinaryMessage(message, ws);
}
});
});
this.onlineCounterBroadcast = this.onlineCounterBroadcast.bind(this);
this.ping = this.ping.bind(this);
this.killOld = this.killOld.bind(this);
setInterval(this.killOld, 10 * 60 * 1000);
setInterval(this.onlineCounterBroadcast, 10 * 1000);
// https://github.com/websockets/ws#how-to-detect-and-close-broken-connections
setInterval(this.ping, 45 * 1000);
}
@ -130,7 +136,12 @@ class SocketServer extends EventEmitter {
});
}
broadcastText(text: string) {
broadcastOnlineCounter(buffer: Buffer) {
this.broadcast(buffer);
}
broadcastChatMessage(name: string, message: string) {
const text = JSON.stringify([name, message]);
this.wss.clients.forEach((ws) => {
if (ws.readyState == WebSocket.OPEN) {
ws.send(text);
@ -171,10 +182,6 @@ class SocketServer extends EventEmitter {
});
}
getConnections(): number {
return this.wss.clients.size || 0;
}
killOld() {
const now = Date.now();
this.wss.clients.forEach((ws) => {
@ -192,13 +199,18 @@ class SocketServer extends EventEmitter {
});
}
onlineCounterBroadcast() {
const online = this.wss.clients.size || 0;
webSockets.broadcastOnlineCounter(online);
}
onTextMessage(message, ws) {
if (ws.name && message) {
const waitLeft = ws.rateLimiter.tick();
if (waitLeft) {
ws.send(JSON.stringify(['info', `You are sending messages too fast, you have to wait ${Math.floor(waitLeft / 1000)}s :(`]));
} else {
broadcastChatMessage(ws.name, message);
webSockets.broadcastChatMessage(ws.name, message);
}
} else {
logger.info('Got empty message or message from unidentified ws');
@ -256,7 +268,7 @@ class SocketServer extends EventEmitter {
pushChunk(chunkid, ws) {
if (!this.CHUNK_CLIENTS.has(chunkid)) {
this.CHUNK_CLIENTS.set(chunkid, new Array());
this.CHUNK_CLIENTS.set(chunkid, []);
}
const clients = this.CHUNK_CLIENTS.get(chunkid);
const pos = clients.indexOf(ws);

View File

@ -0,0 +1,33 @@
/* @flow
*
* Parent class for socket servers
*
*/
/* eslint-disable no-unused-vars */
/* eslint-disable class-methods-use-this */
class WebSocketEvents {
broadcast(message: Buffer) {
}
broadcastPixelBuffer(canvasId: number, chunkid: number, buffer: Buffer) {
}
broadcastChatMessage(name: string, message: string) {
}
broadcastMinecraftLink(name: string, minecraftid: string, accepted: boolean) {
}
notifyChangedMe(name: string) {
}
broadcastMinecraftTP(minecraftid: string, x: number, y: number) {
}
broadcastOnlineCounter(data: Buffer) {
}
}
export default WebSocketEvents;

View File

@ -23,5 +23,6 @@ export default {
return buffer;
}
return 0;
},
};

View File

@ -2,12 +2,6 @@
import type { ColorIndex } from '../../core/Palette';
import {
getChunkOfPixel,
getOffsetOfPixel,
getPixelFromChunkOffset,
} from '../../core/utils';
type PixelUpdatePacket = {
x: number,
@ -42,5 +36,6 @@ export default {
return buffer;
}
return null;
},
};

View File

@ -4,7 +4,6 @@
import express from 'express';
import logger from '../core/logger';
import session from '../core/session';
import passport from '../core/passport';
import { User } from '../data/models';
@ -19,7 +18,6 @@ router.use(session);
* (cut IPv6 to subnet to prevent abuse)
*/
router.use(async (req, res, next) => {
const { session } = req;
const ip = await getIPFromRequest(req);
const trueIp = ip || '0.0.0.1';
req.trueIp = trueIp;
@ -33,9 +31,9 @@ router.use(passport.initialize());
router.use(passport.session());
export function authenticateClient(req) {
function authenticateClient(req) {
return new Promise(
((resolve, reject) => {
((resolve) => {
router(req, {}, () => {
if (req.user) {
resolve(req.user);

View File

@ -5,123 +5,127 @@
*
*/
import url from 'url';
import logger from '../core/logger';
import { SECOND } from '../core/constants';
import { getChunkOfPixel } from '../core/utils';
import ChatHistory from './ChatHistory';
import SocketServer from './SocketServer';
import APISocketServer from './APISocketServer';
import OnlineCounter from './packets/OnlineCounter';
import PixelUpdate from './packets/PixelUpdate';
const usersocket = new SocketServer();
const apisocket = new APISocketServer();
/*
* broadcast message via websocket
* @param message Message to send
*/
export async function broadcast(message: Buffer) {
if (usersocket) usersocket.broadcast(message);
}
class WebSockets {
listeners: Array<Object>;
/*
* broadcast pixel message via websocket
* @param canvasIdent ident of canvas
* @param i x coordinates of chunk
* @param j y coordinates of chunk
* @param offset offset of pixel within this chunk
* @param color colorindex
*/
export async function broadcastPixel(
canvasId: number,
i: number,
j: number,
offset: number,
color: number,
) {
const chunkid = (i << 8) | j;
const buffer = PixelUpdate.dehydrate(i, j, offset, color);
if (usersocket) usersocket.broadcastPixelBuffer(canvasId, chunkid, buffer);
if (apisocket && canvasId == 0) apisocket.broadcastPixelBuffer(chunkid, buffer);
}
constructor() {
this.listeners = [];
}
/*
* broadcast chat message
* @param name chatname
* @param message Message to send
* @param sendapi If chat message should get boradcasted to api websocket
* (usefull if the api is supposed to not answer to its own messages)
*/
export async function broadcastChatMessage(name: string, message: string, sendapi: boolean = true) {
logger.info(`Received chat message ${message} from ${name}`);
ChatHistory.addMessage(name, message);
if (usersocket) usersocket.broadcastText(JSON.stringify([name, message]));
if (sendapi && apisocket) apisocket.broadcastChatMessage(name, message);
}
addListener(listener) {
this.listeners.push(listener);
}
/*
* broadcast minecraft linking to API
* @param name pixelplanetname
* @param minecraftid minecraftid
* @param accepted If link request got accepted
*/
export async function broadcastMinecraftLink(name: string, minecraftid: string, accepted: boolean) {
if (apisocket) apisocket.broadcastMinecraftLink(name, minecraftid, accepted);
}
/*
* broadcast message via websocket
* @param message Message to send
*/
broadcast(message: Buffer) {
this.listeners.forEach(
(listener) => listener.broadcast(message),
);
}
/*
* Notify user on websocket that he should rerequest api/message
* Currently just used for getting minecraft link message.
*/
export async function notifyChangedMe(name: string) {
if (usersocket) usersocket.notifyChangedMe(name);
}
/*
* broadcast pixel message via websocket
* @param canvasIdent ident of canvas
* @param i x coordinates of chunk
* @param j y coordinates of chunk
* @param offset offset of pixel within this chunk
* @param color colorindex
*/
broadcastPixel(
canvasId: number,
i: number,
j: number,
offset: number,
color: number,
) {
const chunkid = (i << 8) | j;
const buffer = PixelUpdate.dehydrate(i, j, offset, color);
this.listeners.forEach(
(listener) => listener.broadcastPixelBuffer(canvasId, chunkid, buffer),
);
}
/*
* broadcast mc tp request to API
* @param minecraftid minecraftid
* @param x x coords
* @param y y coords
*/
export async function broadcastMinecraftTP(minecraftid, x, y) {
if (apisocket) apisocket.broadcastMinecraftTP(minecraftid, x, y);
}
/*
* broadcast chat message
* @param name chatname
* @param message Message to send
* @param sendapi If chat message should get boradcasted to api websockets
* (usefull if the api is supposed to not answer to its own messages)
*/
broadcastChatMessage(
name: string,
message: string,
sendapi: boolean = true,
) {
logger.info(`Received chat message ${message} from ${name}`);
ChatHistory.addMessage(name, message);
this.listeners.forEach(
(listener) => listener.broadcastChatMessage(name, message, sendapi),
);
}
/*
* send websocket package of online counter every x seconds
*/
function startOnlineCounterBroadcast() {
setInterval(() => {
if (usersocket) {
const online = usersocket.getConnections();
const buffer = OnlineCounter.dehydrate({ online });
usersocket.broadcast(buffer);
if (apisocket) apisocket.broadcastOnlineCounter(buffer);
}
}, 15 * SECOND);
}
startOnlineCounterBroadcast();
/*
* broadcast minecraft linking to API
* @param name pixelplanetname
* @param minecraftid minecraftid
* @param accepted If link request got accepted
*/
broadcastMinecraftLink(
name: string,
minecraftid: string,
accepted: boolean,
) {
this.listeners.forEach(
(listener) => listener.broadcastMinecraftLink(
name,
minecraftid,
accepted,
),
);
}
/*
* websocket upgrade / establishing connection
* Get hooked up to httpServer and routes to the right socket
*/
export function wsupgrade(request, socket, head) {
const { pathname } = url.parse(request.url);
/*
* Notify user on websocket that he should rerequest api/message
* Currently just used for getting minecraft link message.
*/
notifyChangedMe(name: string) {
this.listeners.forEach(
(listener) => listener.notifyChangedMe(name),
);
}
if (pathname === '/ws') {
usersocket.wss.handleUpgrade(request, socket, head, (ws) => {
usersocket.wss.emit('connection', ws, request);
});
} else if (pathname === '/mcws') {
apisocket.wss.handleUpgrade(request, socket, head, (ws) => {
apisocket.wss.emit('connection', ws, request);
});
} else {
socket.destroy();
/*
* broadcast mc tp request to API
* @param minecraftid minecraftid
* @param x x coords
* @param y y coords
*/
broadcastMinecraftTP(minecraftid, x, y) {
this.listeners.forEach(
(listener) => listener.broadcastMinecraftTP(minecraftid, x, y),
);
}
/*
* broadcast online counter
* @param online Number of users online
*/
broadcastOnlineCounter(online: number) {
const buffer = OnlineCounter.dehydrate({ online });
this.listeners.forEach(
(listener) => listener.broadcastOnlineCounter(buffer),
);
}
}
const webSockets = new WebSockets();
export default webSockets;

View File

@ -1,5 +1,6 @@
/* @flow */
import url from 'url';
import path from 'path';
import compression from 'compression';
import express from 'express';
@ -14,6 +15,9 @@ import logger from './core/logger';
import rankings from './core/ranking';
import models from './data/models';
import SocketServer from './socket/SocketServer';
import APISocketServer from './socket/APISocketServer';
import {
api,
tiles,
@ -28,7 +32,6 @@ import { SECOND, MONTH } from './core/constants';
import { PORT, DISCORD_INVITE } from './core/config';
import { ccToCoords } from './utils/location';
import { wsupgrade } from './socket/websockets';
import { startAllCanvasLoops } from './core/tileserver';
startAllCanvasLoops();
@ -41,8 +44,30 @@ app.disable('x-powered-by');
// Call Garbage Collector every 30 seconds
setInterval(forceGC, 15 * 60 * SECOND);
// create websocket
// create http server
const server = http.createServer(app);
//
// websockets
// -----------------------------------------------------------------------------
const usersocket = new SocketServer();
const apisocket = new APISocketServer();
function wsupgrade(request, socket, head) {
const { pathname } = url.parse(request.url);
if (pathname === '/ws') {
usersocket.wss.handleUpgrade(request, socket, head, (ws) => {
usersocket.wss.emit('connection', ws, request);
});
} else if (pathname === '/mcws') {
apisocket.wss.handleUpgrade(request, socket, head, (ws) => {
apisocket.wss.emit('connection', ws, request);
});
} else {
socket.destroy();
}
}
server.on('upgrade', wsupgrade);