diff --git a/src/core/utils.js b/src/core/utils.js index f89b916..a70ac06 100644 --- a/src/core/utils.js +++ b/src/core/utils.js @@ -511,7 +511,7 @@ export function combineObjects(a, b) { return a.concat(b); } if (typeof a === 'object') { - let keys = Object.keys(a); + const keys = Object.keys(a); keys.forEach((key) => { const u = a[key]; const v = b[key]; diff --git a/src/socket/MessageBroker.js b/src/socket/MessageBroker.js index 5669143..f18b834 100644 --- a/src/socket/MessageBroker.js +++ b/src/socket/MessageBroker.js @@ -13,6 +13,21 @@ import PixelUpdate from './packets/PixelUpdateServer'; import PixelUpdateMB from './packets/PixelUpdateMB'; import ChunkUpdate from './packets/ChunkUpdate'; import { pubsub } from '../data/redis/client'; +import { combineObjects } from '../core/utils'; + +/* + * channel that all shards share and listen to + */ +const BROADCAST_CHAN = 'bc'; +/* + * prefix of channel that a specific shard listens to, + * for receiving targeted messages + */ +const LISTEN_PREFIX = 'l'; +/* + * channel where only one shard sends to is the name + * of the shard and has no prefix + */ class MessageBroker extends SocketEvents { @@ -39,17 +54,20 @@ class MessageBroker extends SocketEvents { setInterval(this.checkHealth, 10000); } - // TODO imprement shared storage that is run by main shard - async initialize() { - /* - * broadcast channel for staus messages between shards - */ this.publisher = pubsub.publisher; this.subscriber = pubsub.subscriber; - await this.subscriber.subscribe('bc', (...args) => { + // broadcast chan + await this.subscriber.subscribe(BROADCAST_CHAN, (...args) => { this.onShardBCMessage(...args); }); + // shard specific listener + await this.subscriber.subscribe( + `${LISTEN_PREFIX}:${this.thisShard}`, + (...args) => { + this.onShardListenMessage(...args); + }, + ); // give other shards 30s to announce themselves await new Promise((resolve) => { setTimeout(resolve, 25000); @@ -70,16 +88,38 @@ class MessageBroker extends SocketEvents { } const comma = message.indexOf(','); /* - * any other package in the form of 'shard:type,JSONArrayData' - * straight sent over websocket + * any message in the form of 'shard:type,JSONArrayData' + * straight emitted as socket event */ if (~comma) { const key = message.slice(message.indexOf(':') + 1, comma); console.log('CLUSTER: Broadcast', key); const val = JSON.parse(message.slice(comma + 1)); + if (key.startsWith('req:')) { + /* + * if its a request send the response to shard + * specific channel to avoid broadcast spam + */ + try { + const shard = message.slice(0, message.indexOf(':')); + const chan = val.shift(); + const ret = await super.req(key.slice(4), ...val); + this.publisher.publish( + `${LISTEN_PREFIX}:${shard}`, + `res:${chan},${JSON.stringify([ret])}`, + ); + } catch { + // nothing + } + return; + } super.emit(key, ...val); return; } + /* + * other messages are shard names that announce the existence + * of a shard + */ if (!this.shards[message]) { console.log(`CLUSTER: Shard ${message} connected`); this.shards[message] = Date.now(); @@ -89,16 +129,31 @@ class MessageBroker extends SocketEvents { true, ); // immediately give new shards informations - this.publisher.publish('bc', this.thisShard); + this.publisher.publish(BROADCAST_CHAN, this.thisShard); return; } this.shards[message] = Date.now(); - return; } catch (err) { console.error(`CLUSTER: Error on broadcast message: ${err.message}`); } } + /* + * messages on shard specific listener channel + * messages in form `type,JSONArrayData` + * straight emitted as socket event + */ + async onShardListenMessage(message) { + try { + const comma = message.indexOf(','); + const key = message.slice(0, comma); + const val = JSON.parse(message.slice(comma + 1)); + super.emit(key, ...val); + } catch (err) { + console.error(`CLUSTER: Error on listener message: ${err.message}`); + } + } + getLowestActiveShard() { let lowest = 0; let lShard = null; @@ -122,46 +177,37 @@ class MessageBroker extends SocketEvents { } /* - * requests that go over all shards and wait for responses from all + * requests that go over all shards and combine responses from all */ - reqOnShards(type, ...args) { + req(type, ...args) { return new Promise((resolve, reject) => { const chan = Math.floor(Math.random() * 100000).toString() + Math.floor(Math.random() * 100000).toString(); const chankey = `res:${chan}`; - - let amountOtherShards = this.shardOnlineCounters.length - 1; let id; + let amountOtherShards = this.shardOnlineCounters.length; let ret = null; - await this.subscriber.subscribe(chankey, (message) => { + const callback = (retn) => { amountOtherShards -= 1; - const retn = JSON.parse(message); - combineObjects(ret, retn); + ret = combineObjects(ret, retn); if (!amountOtherShards) { - this.subscriber.unsubscribe(chankey); + this.off(chankey, callback); clearTimeout(id); resolve(ret); } - }); + }; id = setTimeout(() => { - this.subscriber.unsubscribe(chankey); - reject(new Error(`Timeout on req ${type}`)); + this.off(chankey, callback); + if (ret) { + resolve(ret); + } else { + reject(new Error(`Timeout on req ${type}`)); + } }, 45000); + this.on(chankey, callback); this.emit(`req:${type}`, chan, ...args); }); } - async reqOnShards(...args) { - const ret = Promise.all([ - this.reqShard(...args), - super.req(...args), - ]); - } - onReq(type, cb) { - this.on(`req:${type}`, async (chan, ...args) => { - const ret = await cb(...args); - this.emit(`res:${chan}`, ret); - }); - } updateShardOnlineCounter(shard, cnt) { const shardCounter = this.shardOnlineCounters.find( @@ -235,8 +281,13 @@ class MessageBroker extends SocketEvents { */ emit(key, ...args) { super.emit(key, ...args); + if (key.startsWith('res:')) { + // responses of requests are handled specifically + // and not broadcasted + return; + } const msg = `${this.thisShard}:${key},${JSON.stringify(args)}`; - this.publisher.publish('bc', msg); + this.publisher.publish(BROADCAST_CHAN, msg); } /* @@ -316,7 +367,7 @@ class MessageBroker extends SocketEvents { } }); // send keep alive to others - this.publisher.publish('bc', this.thisShard); + this.publisher.publish(BROADCAST_CHAN, this.thisShard); } } diff --git a/src/socket/SockEvents.js b/src/socket/SockEvents.js index 01c39b9..bc8cfa3 100644 --- a/src/socket/SockEvents.js +++ b/src/socket/SockEvents.js @@ -52,6 +52,7 @@ class SocketEvents extends EventEmitter { * requests that expect a response * req(type, args) can be awaited * it will return a response from whatever listenes on onReq(type, cb(args)) + * Keep the arguments serializable for shard support */ req(type, ...args) { return new Promise((resolve, reject) => {