finish req on shards

This commit is contained in:
HF 2022-09-17 22:26:12 +02:00
parent f35a3ac9f2
commit 0ba7063d9e
3 changed files with 88 additions and 36 deletions

View File

@ -511,7 +511,7 @@ export function combineObjects(a, b) {
return a.concat(b); return a.concat(b);
} }
if (typeof a === 'object') { if (typeof a === 'object') {
let keys = Object.keys(a); const keys = Object.keys(a);
keys.forEach((key) => { keys.forEach((key) => {
const u = a[key]; const u = a[key];
const v = b[key]; const v = b[key];

View File

@ -13,6 +13,21 @@ import PixelUpdate from './packets/PixelUpdateServer';
import PixelUpdateMB from './packets/PixelUpdateMB'; import PixelUpdateMB from './packets/PixelUpdateMB';
import ChunkUpdate from './packets/ChunkUpdate'; import ChunkUpdate from './packets/ChunkUpdate';
import { pubsub } from '../data/redis/client'; import { pubsub } from '../data/redis/client';
import { combineObjects } from '../core/utils';
/*
* channel that all shards share and listen to
*/
const BROADCAST_CHAN = 'bc';
/*
* prefix of channel that a specific shard listens to,
* for receiving targeted messages
*/
const LISTEN_PREFIX = 'l';
/*
* channel where only one shard sends to is the name
* of the shard and has no prefix
*/
class MessageBroker extends SocketEvents { class MessageBroker extends SocketEvents {
@ -39,17 +54,20 @@ class MessageBroker extends SocketEvents {
setInterval(this.checkHealth, 10000); setInterval(this.checkHealth, 10000);
} }
// TODO imprement shared storage that is run by main shard
async initialize() { async initialize() {
/*
* broadcast channel for staus messages between shards
*/
this.publisher = pubsub.publisher; this.publisher = pubsub.publisher;
this.subscriber = pubsub.subscriber; this.subscriber = pubsub.subscriber;
await this.subscriber.subscribe('bc', (...args) => { // broadcast chan
await this.subscriber.subscribe(BROADCAST_CHAN, (...args) => {
this.onShardBCMessage(...args); this.onShardBCMessage(...args);
}); });
// shard specific listener
await this.subscriber.subscribe(
`${LISTEN_PREFIX}:${this.thisShard}`,
(...args) => {
this.onShardListenMessage(...args);
},
);
// give other shards 30s to announce themselves // give other shards 30s to announce themselves
await new Promise((resolve) => { await new Promise((resolve) => {
setTimeout(resolve, 25000); setTimeout(resolve, 25000);
@ -70,16 +88,38 @@ class MessageBroker extends SocketEvents {
} }
const comma = message.indexOf(','); const comma = message.indexOf(',');
/* /*
* any other package in the form of 'shard:type,JSONArrayData' * any message in the form of 'shard:type,JSONArrayData'
* straight sent over websocket * straight emitted as socket event
*/ */
if (~comma) { if (~comma) {
const key = message.slice(message.indexOf(':') + 1, comma); const key = message.slice(message.indexOf(':') + 1, comma);
console.log('CLUSTER: Broadcast', key); console.log('CLUSTER: Broadcast', key);
const val = JSON.parse(message.slice(comma + 1)); const val = JSON.parse(message.slice(comma + 1));
if (key.startsWith('req:')) {
/*
* if its a request send the response to shard
* specific channel to avoid broadcast spam
*/
try {
const shard = message.slice(0, message.indexOf(':'));
const chan = val.shift();
const ret = await super.req(key.slice(4), ...val);
this.publisher.publish(
`${LISTEN_PREFIX}:${shard}`,
`res:${chan},${JSON.stringify([ret])}`,
);
} catch {
// nothing
}
return;
}
super.emit(key, ...val); super.emit(key, ...val);
return; return;
} }
/*
* other messages are shard names that announce the existence
* of a shard
*/
if (!this.shards[message]) { if (!this.shards[message]) {
console.log(`CLUSTER: Shard ${message} connected`); console.log(`CLUSTER: Shard ${message} connected`);
this.shards[message] = Date.now(); this.shards[message] = Date.now();
@ -89,16 +129,31 @@ class MessageBroker extends SocketEvents {
true, true,
); );
// immediately give new shards informations // immediately give new shards informations
this.publisher.publish('bc', this.thisShard); this.publisher.publish(BROADCAST_CHAN, this.thisShard);
return; return;
} }
this.shards[message] = Date.now(); this.shards[message] = Date.now();
return;
} catch (err) { } catch (err) {
console.error(`CLUSTER: Error on broadcast message: ${err.message}`); console.error(`CLUSTER: Error on broadcast message: ${err.message}`);
} }
} }
/*
* messages on shard specific listener channel
* messages in form `type,JSONArrayData`
* straight emitted as socket event
*/
async onShardListenMessage(message) {
try {
const comma = message.indexOf(',');
const key = message.slice(0, comma);
const val = JSON.parse(message.slice(comma + 1));
super.emit(key, ...val);
} catch (err) {
console.error(`CLUSTER: Error on listener message: ${err.message}`);
}
}
getLowestActiveShard() { getLowestActiveShard() {
let lowest = 0; let lowest = 0;
let lShard = null; let lShard = null;
@ -122,46 +177,37 @@ class MessageBroker extends SocketEvents {
} }
/* /*
* requests that go over all shards and wait for responses from all * requests that go over all shards and combine responses from all
*/ */
reqOnShards(type, ...args) { req(type, ...args) {
return new Promise((resolve, reject) => { return new Promise((resolve, reject) => {
const chan = Math.floor(Math.random() * 100000).toString() const chan = Math.floor(Math.random() * 100000).toString()
+ Math.floor(Math.random() * 100000).toString(); + Math.floor(Math.random() * 100000).toString();
const chankey = `res:${chan}`; const chankey = `res:${chan}`;
let amountOtherShards = this.shardOnlineCounters.length - 1;
let id; let id;
let amountOtherShards = this.shardOnlineCounters.length;
let ret = null; let ret = null;
await this.subscriber.subscribe(chankey, (message) => { const callback = (retn) => {
amountOtherShards -= 1; amountOtherShards -= 1;
const retn = JSON.parse(message); ret = combineObjects(ret, retn);
combineObjects(ret, retn);
if (!amountOtherShards) { if (!amountOtherShards) {
this.subscriber.unsubscribe(chankey); this.off(chankey, callback);
clearTimeout(id); clearTimeout(id);
resolve(ret); resolve(ret);
} }
}); };
id = setTimeout(() => { id = setTimeout(() => {
this.subscriber.unsubscribe(chankey); this.off(chankey, callback);
reject(new Error(`Timeout on req ${type}`)); if (ret) {
resolve(ret);
} else {
reject(new Error(`Timeout on req ${type}`));
}
}, 45000); }, 45000);
this.on(chankey, callback);
this.emit(`req:${type}`, chan, ...args); this.emit(`req:${type}`, chan, ...args);
}); });
} }
async reqOnShards(...args) {
const ret = Promise.all([
this.reqShard(...args),
super.req(...args),
]);
}
onReq(type, cb) {
this.on(`req:${type}`, async (chan, ...args) => {
const ret = await cb(...args);
this.emit(`res:${chan}`, ret);
});
}
updateShardOnlineCounter(shard, cnt) { updateShardOnlineCounter(shard, cnt) {
const shardCounter = this.shardOnlineCounters.find( const shardCounter = this.shardOnlineCounters.find(
@ -235,8 +281,13 @@ class MessageBroker extends SocketEvents {
*/ */
emit(key, ...args) { emit(key, ...args) {
super.emit(key, ...args); super.emit(key, ...args);
if (key.startsWith('res:')) {
// responses of requests are handled specifically
// and not broadcasted
return;
}
const msg = `${this.thisShard}:${key},${JSON.stringify(args)}`; const msg = `${this.thisShard}:${key},${JSON.stringify(args)}`;
this.publisher.publish('bc', msg); this.publisher.publish(BROADCAST_CHAN, msg);
} }
/* /*
@ -316,7 +367,7 @@ class MessageBroker extends SocketEvents {
} }
}); });
// send keep alive to others // send keep alive to others
this.publisher.publish('bc', this.thisShard); this.publisher.publish(BROADCAST_CHAN, this.thisShard);
} }
} }

View File

@ -52,6 +52,7 @@ class SocketEvents extends EventEmitter {
* requests that expect a response * requests that expect a response
* req(type, args) can be awaited * req(type, args) can be awaited
* it will return a response from whatever listenes on onReq(type, cb(args)) * it will return a response from whatever listenes on onReq(type, cb(args))
* Keep the arguments serializable for shard support
*/ */
req(type, ...args) { req(type, ...args) {
return new Promise((resolve, reject) => { return new Promise((resolve, reject) => {