diff --git a/src/core/draw.js b/src/core/draw.js index 98650b0..2f97cea 100644 --- a/src/core/draw.js +++ b/src/core/draw.js @@ -5,7 +5,7 @@ import { using } from 'bluebird'; import type { User } from '../data/models'; import { redlock } from '../data/redis'; import { getChunkOfPixel, getOffsetOfPixel } from './utils'; -import { broadcastPixel } from '../socket/websockets'; +import webSockets from '../socket/websockets'; import logger from './logger'; import RedisCanvas from '../data/models/RedisCanvas'; import { registerPixelChange } from './tileserver'; @@ -29,7 +29,7 @@ export function setPixel( const [i, j] = getChunkOfPixel([x, y], canvasSize); const offset = getOffsetOfPixel(x, y, canvasSize); RedisCanvas.setPixelInChunk(i, j, offset, color, canvasId); - broadcastPixel(canvasId, i, j, offset, color); + webSockets.broadcastPixel(canvasId, i, j, offset, color); } /** diff --git a/src/core/utils.js b/src/core/utils.js index a0de380..4e33215 100644 --- a/src/core/utils.js +++ b/src/core/utils.js @@ -22,6 +22,17 @@ export function sum(values: Array): number { return total; } +/* + * returns random integer + * @param min Minimum of random integer + * @param max Maximum of random integer + * @return random integer between min and max (min <= ret <= max) + */ +export function getRandomInt(min, max) { + const range = max - min + 1; + return min + (Math.floor(Math.random() * range)); +} + export function distMax([x1, y1]: Cell, [x2, y2]: Cell): number { return Math.max(Math.abs(x1 - x2), Math.abs(y1 - y2)); } diff --git a/src/data/models/RedisCanvas.js b/src/data/models/RedisCanvas.js index 4e06cc5..bb56e3c 100644 --- a/src/data/models/RedisCanvas.js +++ b/src/data/models/RedisCanvas.js @@ -72,7 +72,7 @@ class RedisCanvas { RedisCanvas.registerChunkChange(canvasId, [i, j]); } - static async getPixel( + static async getPixelIfExists( x: number, y: number, canvasId: number, @@ -81,7 +81,6 @@ class RedisCanvas { // 3rd bit -> protected or not // rest (5 bits) -> index of color const canvasSize = canvases[canvasId].size; - const canvasAlpha = canvases[canvasId].alpha; const [i, j] = getChunkOfPixel([x, y], canvasSize); const offset = getOffsetOfPixel(x, y, canvasSize); const args = [ @@ -91,9 +90,19 @@ class RedisCanvas { `#${offset}`, ]; const result: ?number = await redis.sendCommandAsync('bitfield', args); - if (!result) return canvasAlpha; + if (!result) return null; const color = result[0]; - return color || canvasAlpha; + return color; + } + + static async getPixel( + x: number, + y: number, + canvasId: number, + ): Promise { + const canvasAlpha = canvases[canvasId].alpha; + const clr = RedisCanvas.getPixelIfExists(x, y, canvasId); + return (clr == null) ? canvasAlpha : clr; } } diff --git a/src/routes/api/auth/mclink.js b/src/routes/api/auth/mclink.js index 9df3ad3..bb6e472 100644 --- a/src/routes/api/auth/mclink.js +++ b/src/routes/api/auth/mclink.js @@ -5,7 +5,7 @@ import type { Request, Response } from 'express'; -import { broadcastMinecraftLink } from '../../../socket/websockets'; +import webSockets from '../../../socket/websockets'; export default async (req: Request, res: Response) => { @@ -40,5 +40,5 @@ export default async (req: Request, res: Response) => { } else { return; } - broadcastMinecraftLink(name, minecraftid, accepted); + webSockets.broadcastMinecraftLink(name, minecraftid, accepted); }; diff --git a/src/routes/api/mctp.js b/src/routes/api/mctp.js index 02747e9..3f18b4e 100644 --- a/src/routes/api/mctp.js +++ b/src/routes/api/mctp.js @@ -10,7 +10,7 @@ import type { Request, Response } from 'express'; import canvases from '../../canvases.json'; -import { broadcastMinecraftTP } from '../../socket/websockets'; +import webSockets from '../../socket/websockets'; const CANVAS_MAX_XY = (canvases[0].size / 2); const CANVAS_MIN_XY = -CANVAS_MAX_XY; @@ -47,7 +47,7 @@ export default async (req: Request, res: Response) => { return; } - await broadcastMinecraftTP(minecraftid, x, y); + webSockets.broadcastMinecraftTP(minecraftid, x, y); res.json({ success: true, diff --git a/src/socket/APISocketServer.js b/src/socket/APISocketServer.js index f5f81fc..28c477a 100644 --- a/src/socket/APISocketServer.js +++ b/src/socket/APISocketServer.js @@ -8,17 +8,15 @@ * @flow */ -import EventEmitter from 'events'; import WebSocket from 'ws'; +import WebSocketEvents from './WebSocketEvents'; +import webSockets from './websockets'; import { getIPFromRequest } from '../utils/ip'; import Minecraft from '../core/minecraft'; import { drawUnsafe, setPixel } from '../core/draw'; import logger from '../core/logger'; -import redis from '../data/redis'; -import ChatHistory from './ChatHistory'; -import { broadcastChatMessage, notifyChangedMe } from './websockets'; import { APISOCKET_KEY } from '../core/config'; function heartbeat() { @@ -39,13 +37,14 @@ async function verifyClient(info, done) { } -class APISocketServer extends EventEmitter { +class APISocketServer extends WebSocketEvents { wss: WebSocket.Server; mc: Minecraft; constructor() { super(); logger.info('Starting API websocket server'); + webSockets.addListener(this); const wss = new WebSocket.Server({ perMessageDeflate: false, @@ -79,7 +78,9 @@ class APISocketServer extends EventEmitter { setInterval(this.ping, 45 * 1000); } - broadcastChatMessage(name, msg, ws = null) { + broadcastChatMessage(name, msg, sendapi, ws = null) { + if (!sendapi) return; + const sendmsg = JSON.stringify(['msg', name, msg]); this.wss.clients.forEach((client) => { if (client !== ws && client.subChat && client.readyState === WebSocket.OPEN) { @@ -127,7 +128,8 @@ class APISocketServer extends EventEmitter { }); } - broadcastPixelBuffer(chunkid, buffer) { + broadcastPixelBuffer(canvasId, chunkid, buffer) { + if (canvasId !== 0) return; const frame = WebSocket.Sender.frame(buffer, { readOnly: true, mask: false, @@ -228,21 +230,21 @@ class APISocketServer extends EventEmitter { const [minecraftname, msg] = packet; const user = this.mc.minecraftname2User(minecraftname); const chatname = (user.id) ? `[MC] ${user.regUser.name}` : `[MC] ${minecraftname}`; - broadcastChatMessage(chatname, msg, false); - this.broadcastChatMessage(chatname, msg, ws); + webSockets.broadcastChatMessage(chatname, msg, false); + this.broadcastChatMessage(chatname, msg, true, ws); return; } if (command == 'chat') { const [name, msg] = packet; - broadcastChatMessage(name, msg, false); - this.broadcastChatMessage(name, msg, ws); + webSockets.broadcastChatMessage(name, msg, false); + this.broadcastChatMessage(name, msg, true, ws); return; } if (command == 'linkacc') { const [minecraftid, minecraftname, name] = packet; const ret = await this.mc.linkacc(minecraftid, minecraftname, name); if (!ret) { - notifyChangedMe(name); + webSockets.notifyChangedMe(name); } ws.send(JSON.stringify([ 'linkret', diff --git a/src/socket/SocketServer.js b/src/socket/SocketServer.js index bb6538a..d9b0f93 100644 --- a/src/socket/SocketServer.js +++ b/src/socket/SocketServer.js @@ -1,7 +1,6 @@ /* @flow */ -import EventEmitter from 'events'; import WebSocket from 'ws'; import logger from '../core/logger'; @@ -16,12 +15,12 @@ import DeRegisterChunk from './packets/DeRegisterChunk'; import DeRegisterMultipleChunks from './packets/DeRegisterMultipleChunks'; import RequestChatHistory from './packets/RequestChatHistory'; import CoolDownPacket from './packets/CoolDownPacket'; -import PixelUpdate from './packets/PixelUpdate'; import ChangedMe from './packets/ChangedMe'; import ChatHistory from './ChatHistory'; import authenticateClient from './verifyClient'; -import { broadcastChatMessage } from './websockets'; +import WebSocketEvents from './WebSocketEvents'; +import webSockets from './websockets'; const ipCounter: Counter = new Counter(); @@ -46,7 +45,7 @@ async function verifyClient(info, done) { } -class SocketServer extends EventEmitter { +class SocketServer extends WebSocketEvents { wss: WebSocket.Server; CHUNK_CLIENTS: Map; @@ -55,6 +54,7 @@ class SocketServer extends EventEmitter { super(); this.CHUNK_CLIENTS = new Map(); logger.info('Starting websocket server'); + webSockets.addListener(this); const wss = new WebSocket.Server({ perMessageDeflate: false, @@ -92,14 +92,20 @@ class SocketServer extends EventEmitter { this.deleteAllChunks(ws); }); ws.on('message', (message) => { - if (typeof message === 'string') { this.onTextMessage(message, ws); } else { this.onBinaryMessage(message, ws); } + if (typeof message === 'string') { + this.onTextMessage(message, ws); + } else { + this.onBinaryMessage(message, ws); + } }); }); + this.onlineCounterBroadcast = this.onlineCounterBroadcast.bind(this); this.ping = this.ping.bind(this); this.killOld = this.killOld.bind(this); setInterval(this.killOld, 10 * 60 * 1000); + setInterval(this.onlineCounterBroadcast, 10 * 1000); // https://github.com/websockets/ws#how-to-detect-and-close-broken-connections setInterval(this.ping, 45 * 1000); } @@ -130,7 +136,12 @@ class SocketServer extends EventEmitter { }); } - broadcastText(text: string) { + broadcastOnlineCounter(buffer: Buffer) { + this.broadcast(buffer); + } + + broadcastChatMessage(name: string, message: string) { + const text = JSON.stringify([name, message]); this.wss.clients.forEach((ws) => { if (ws.readyState == WebSocket.OPEN) { ws.send(text); @@ -171,10 +182,6 @@ class SocketServer extends EventEmitter { }); } - getConnections(): number { - return this.wss.clients.size || 0; - } - killOld() { const now = Date.now(); this.wss.clients.forEach((ws) => { @@ -192,13 +199,18 @@ class SocketServer extends EventEmitter { }); } + onlineCounterBroadcast() { + const online = this.wss.clients.size || 0; + webSockets.broadcastOnlineCounter(online); + } + onTextMessage(message, ws) { if (ws.name && message) { const waitLeft = ws.rateLimiter.tick(); if (waitLeft) { ws.send(JSON.stringify(['info', `You are sending messages too fast, you have to wait ${Math.floor(waitLeft / 1000)}s :(`])); } else { - broadcastChatMessage(ws.name, message); + webSockets.broadcastChatMessage(ws.name, message); } } else { logger.info('Got empty message or message from unidentified ws'); @@ -256,7 +268,7 @@ class SocketServer extends EventEmitter { pushChunk(chunkid, ws) { if (!this.CHUNK_CLIENTS.has(chunkid)) { - this.CHUNK_CLIENTS.set(chunkid, new Array()); + this.CHUNK_CLIENTS.set(chunkid, []); } const clients = this.CHUNK_CLIENTS.get(chunkid); const pos = clients.indexOf(ws); diff --git a/src/socket/WebSocketEvents.js b/src/socket/WebSocketEvents.js new file mode 100644 index 0000000..6db6b33 --- /dev/null +++ b/src/socket/WebSocketEvents.js @@ -0,0 +1,33 @@ +/* @flow + * + * Parent class for socket servers + * + */ + +/* eslint-disable no-unused-vars */ +/* eslint-disable class-methods-use-this */ + +class WebSocketEvents { + broadcast(message: Buffer) { + } + + broadcastPixelBuffer(canvasId: number, chunkid: number, buffer: Buffer) { + } + + broadcastChatMessage(name: string, message: string) { + } + + broadcastMinecraftLink(name: string, minecraftid: string, accepted: boolean) { + } + + notifyChangedMe(name: string) { + } + + broadcastMinecraftTP(minecraftid: string, x: number, y: number) { + } + + broadcastOnlineCounter(data: Buffer) { + } +} + +export default WebSocketEvents; diff --git a/src/socket/packets/OnlineCounter.js b/src/socket/packets/OnlineCounter.js index 02517d7..4e4999f 100644 --- a/src/socket/packets/OnlineCounter.js +++ b/src/socket/packets/OnlineCounter.js @@ -23,5 +23,6 @@ export default { return buffer; } + return 0; }, }; diff --git a/src/socket/packets/PixelUpdate.js b/src/socket/packets/PixelUpdate.js index 974ca9d..98c0fea 100644 --- a/src/socket/packets/PixelUpdate.js +++ b/src/socket/packets/PixelUpdate.js @@ -2,12 +2,6 @@ import type { ColorIndex } from '../../core/Palette'; -import { - getChunkOfPixel, - getOffsetOfPixel, - getPixelFromChunkOffset, -} from '../../core/utils'; - type PixelUpdatePacket = { x: number, @@ -42,5 +36,6 @@ export default { return buffer; } + return null; }, }; diff --git a/src/socket/verifyClient.js b/src/socket/verifyClient.js index 64e0493..c60036b 100644 --- a/src/socket/verifyClient.js +++ b/src/socket/verifyClient.js @@ -4,7 +4,6 @@ import express from 'express'; -import logger from '../core/logger'; import session from '../core/session'; import passport from '../core/passport'; import { User } from '../data/models'; @@ -19,7 +18,6 @@ router.use(session); * (cut IPv6 to subnet to prevent abuse) */ router.use(async (req, res, next) => { - const { session } = req; const ip = await getIPFromRequest(req); const trueIp = ip || '0.0.0.1'; req.trueIp = trueIp; @@ -33,9 +31,9 @@ router.use(passport.initialize()); router.use(passport.session()); -export function authenticateClient(req) { +function authenticateClient(req) { return new Promise( - ((resolve, reject) => { + ((resolve) => { router(req, {}, () => { if (req.user) { resolve(req.user); diff --git a/src/socket/websockets.js b/src/socket/websockets.js index 5293976..1dba69a 100644 --- a/src/socket/websockets.js +++ b/src/socket/websockets.js @@ -5,123 +5,127 @@ * */ -import url from 'url'; - import logger from '../core/logger'; -import { SECOND } from '../core/constants'; -import { getChunkOfPixel } from '../core/utils'; - import ChatHistory from './ChatHistory'; -import SocketServer from './SocketServer'; -import APISocketServer from './APISocketServer'; import OnlineCounter from './packets/OnlineCounter'; import PixelUpdate from './packets/PixelUpdate'; -const usersocket = new SocketServer(); -const apisocket = new APISocketServer(); -/* - * broadcast message via websocket - * @param message Message to send - */ -export async function broadcast(message: Buffer) { - if (usersocket) usersocket.broadcast(message); -} +class WebSockets { + listeners: Array; -/* - * broadcast pixel message via websocket - * @param canvasIdent ident of canvas - * @param i x coordinates of chunk - * @param j y coordinates of chunk - * @param offset offset of pixel within this chunk - * @param color colorindex - */ -export async function broadcastPixel( - canvasId: number, - i: number, - j: number, - offset: number, - color: number, -) { - const chunkid = (i << 8) | j; - const buffer = PixelUpdate.dehydrate(i, j, offset, color); - if (usersocket) usersocket.broadcastPixelBuffer(canvasId, chunkid, buffer); - if (apisocket && canvasId == 0) apisocket.broadcastPixelBuffer(chunkid, buffer); -} + constructor() { + this.listeners = []; + } -/* - * broadcast chat message - * @param name chatname - * @param message Message to send - * @param sendapi If chat message should get boradcasted to api websocket - * (usefull if the api is supposed to not answer to its own messages) - */ -export async function broadcastChatMessage(name: string, message: string, sendapi: boolean = true) { - logger.info(`Received chat message ${message} from ${name}`); - ChatHistory.addMessage(name, message); - if (usersocket) usersocket.broadcastText(JSON.stringify([name, message])); - if (sendapi && apisocket) apisocket.broadcastChatMessage(name, message); -} + addListener(listener) { + this.listeners.push(listener); + } -/* - * broadcast minecraft linking to API - * @param name pixelplanetname - * @param minecraftid minecraftid - * @param accepted If link request got accepted - */ -export async function broadcastMinecraftLink(name: string, minecraftid: string, accepted: boolean) { - if (apisocket) apisocket.broadcastMinecraftLink(name, minecraftid, accepted); -} + /* + * broadcast message via websocket + * @param message Message to send + */ + broadcast(message: Buffer) { + this.listeners.forEach( + (listener) => listener.broadcast(message), + ); + } -/* - * Notify user on websocket that he should rerequest api/message - * Currently just used for getting minecraft link message. - */ -export async function notifyChangedMe(name: string) { - if (usersocket) usersocket.notifyChangedMe(name); -} + /* + * broadcast pixel message via websocket + * @param canvasIdent ident of canvas + * @param i x coordinates of chunk + * @param j y coordinates of chunk + * @param offset offset of pixel within this chunk + * @param color colorindex + */ + broadcastPixel( + canvasId: number, + i: number, + j: number, + offset: number, + color: number, + ) { + const chunkid = (i << 8) | j; + const buffer = PixelUpdate.dehydrate(i, j, offset, color); + this.listeners.forEach( + (listener) => listener.broadcastPixelBuffer(canvasId, chunkid, buffer), + ); + } -/* - * broadcast mc tp request to API - * @param minecraftid minecraftid - * @param x x coords - * @param y y coords - */ -export async function broadcastMinecraftTP(minecraftid, x, y) { - if (apisocket) apisocket.broadcastMinecraftTP(minecraftid, x, y); -} + /* + * broadcast chat message + * @param name chatname + * @param message Message to send + * @param sendapi If chat message should get boradcasted to api websockets + * (usefull if the api is supposed to not answer to its own messages) + */ + broadcastChatMessage( + name: string, + message: string, + sendapi: boolean = true, + ) { + logger.info(`Received chat message ${message} from ${name}`); + ChatHistory.addMessage(name, message); + this.listeners.forEach( + (listener) => listener.broadcastChatMessage(name, message, sendapi), + ); + } -/* - * send websocket package of online counter every x seconds - */ -function startOnlineCounterBroadcast() { - setInterval(() => { - if (usersocket) { - const online = usersocket.getConnections(); - const buffer = OnlineCounter.dehydrate({ online }); - usersocket.broadcast(buffer); - if (apisocket) apisocket.broadcastOnlineCounter(buffer); - } - }, 15 * SECOND); -} -startOnlineCounterBroadcast(); + /* + * broadcast minecraft linking to API + * @param name pixelplanetname + * @param minecraftid minecraftid + * @param accepted If link request got accepted + */ + broadcastMinecraftLink( + name: string, + minecraftid: string, + accepted: boolean, + ) { + this.listeners.forEach( + (listener) => listener.broadcastMinecraftLink( + name, + minecraftid, + accepted, + ), + ); + } -/* - * websocket upgrade / establishing connection - * Get hooked up to httpServer and routes to the right socket - */ -export function wsupgrade(request, socket, head) { - const { pathname } = url.parse(request.url); + /* + * Notify user on websocket that he should rerequest api/message + * Currently just used for getting minecraft link message. + */ + notifyChangedMe(name: string) { + this.listeners.forEach( + (listener) => listener.notifyChangedMe(name), + ); + } - if (pathname === '/ws') { - usersocket.wss.handleUpgrade(request, socket, head, (ws) => { - usersocket.wss.emit('connection', ws, request); - }); - } else if (pathname === '/mcws') { - apisocket.wss.handleUpgrade(request, socket, head, (ws) => { - apisocket.wss.emit('connection', ws, request); - }); - } else { - socket.destroy(); + /* + * broadcast mc tp request to API + * @param minecraftid minecraftid + * @param x x coords + * @param y y coords + */ + broadcastMinecraftTP(minecraftid, x, y) { + this.listeners.forEach( + (listener) => listener.broadcastMinecraftTP(minecraftid, x, y), + ); + } + + /* + * broadcast online counter + * @param online Number of users online + */ + broadcastOnlineCounter(online: number) { + const buffer = OnlineCounter.dehydrate({ online }); + this.listeners.forEach( + (listener) => listener.broadcastOnlineCounter(buffer), + ); } } + +const webSockets = new WebSockets(); +export default webSockets; diff --git a/src/web.js b/src/web.js index b5142a7..650906b 100644 --- a/src/web.js +++ b/src/web.js @@ -1,5 +1,6 @@ /* @flow */ +import url from 'url'; import path from 'path'; import compression from 'compression'; import express from 'express'; @@ -14,6 +15,9 @@ import logger from './core/logger'; import rankings from './core/ranking'; import models from './data/models'; +import SocketServer from './socket/SocketServer'; +import APISocketServer from './socket/APISocketServer'; + import { api, tiles, @@ -28,7 +32,6 @@ import { SECOND, MONTH } from './core/constants'; import { PORT, DISCORD_INVITE } from './core/config'; import { ccToCoords } from './utils/location'; -import { wsupgrade } from './socket/websockets'; import { startAllCanvasLoops } from './core/tileserver'; startAllCanvasLoops(); @@ -41,8 +44,30 @@ app.disable('x-powered-by'); // Call Garbage Collector every 30 seconds setInterval(forceGC, 15 * 60 * SECOND); -// create websocket +// create http server const server = http.createServer(app); + + +// +// websockets +// ----------------------------------------------------------------------------- +const usersocket = new SocketServer(); +const apisocket = new APISocketServer(); +function wsupgrade(request, socket, head) { + const { pathname } = url.parse(request.url); + + if (pathname === '/ws') { + usersocket.wss.handleUpgrade(request, socket, head, (ws) => { + usersocket.wss.emit('connection', ws, request); + }); + } else if (pathname === '/mcws') { + apisocket.wss.handleUpgrade(request, socket, head, (ws) => { + apisocket.wss.emit('connection', ws, request); + }); + } else { + socket.destroy(); + } +} server.on('upgrade', wsupgrade);