From 1bf8169c577785fc7544df35b2326a7a34ee1fb4 Mon Sep 17 00:00:00 2001 From: HF Date: Sat, 17 Sep 2022 23:34:21 +0200 Subject: [PATCH] broadcasst req answer to all shards for now --- src/core/utils.js | 47 +++++++++++++++++++++++++++++++++++++ src/socket/MessageBroker.js | 16 +++++-------- 2 files changed, 53 insertions(+), 10 deletions(-) diff --git a/src/core/utils.js b/src/core/utils.js index a70ac06..3ec32ca 100644 --- a/src/core/utils.js +++ b/src/core/utils.js @@ -497,6 +497,50 @@ export function parseInterval(interval) { return (num * factor); } +/* + * combines tables + * a tables is an object with { + * columns: Array, + * types: Array, + * rows: Array, + * } + */ +export function combineTables(a, b) { + if (a.columns.length === b.columns.length) { + a.rows = a.rows.concat(b.rows); + return a; + } + let bTable; + let sTable; + if (a.columns.length < b.columns.length) { + bTable = b; + sTable = a; + } else { + bTable = a; + sTable = b; + } + if (!sTable.rows.length) { + return bTable; + } + const newRows = []; + for (let i = 0; i < sTable.rows.length; i += 1) { + newRows.push([]); + } + for (let i = 0; i < bTable.columns.length; i += 1) { + const colInd = sTable.columns.indexOf(bTable.columns[i]); + if (~colInd) { + for (let u = 0; u < sTable.rows.length; u += 1) { + newRows[u].push(sTable.rows[u][colInd]); + } + } + for (let u = 0; u < sTable.rows.length; u += 1) { + newRows[u].push(null); + } + } + bTable.rows = bTable.rows.concat(newRows); + return bTable; +} + /* * conbine two similar objects */ @@ -512,6 +556,9 @@ export function combineObjects(a, b) { } if (typeof a === 'object') { const keys = Object.keys(a); + if (keys.includes('columns')) { + return combineTables(a, b); + } keys.forEach((key) => { const u = a[key]; const v = b[key]; diff --git a/src/socket/MessageBroker.js b/src/socket/MessageBroker.js index f18b834..65fcf1b 100644 --- a/src/socket/MessageBroker.js +++ b/src/socket/MessageBroker.js @@ -95,11 +95,8 @@ class MessageBroker extends SocketEvents { const key = message.slice(message.indexOf(':') + 1, comma); console.log('CLUSTER: Broadcast', key); const val = JSON.parse(message.slice(comma + 1)); + /* if (key.startsWith('req:')) { - /* - * if its a request send the response to shard - * specific channel to avoid broadcast spam - */ try { const shard = message.slice(0, message.indexOf(':')); const chan = val.shift(); @@ -113,6 +110,7 @@ class MessageBroker extends SocketEvents { } return; } + */ super.emit(key, ...val); return; } @@ -190,7 +188,10 @@ class MessageBroker extends SocketEvents { const callback = (retn) => { amountOtherShards -= 1; ret = combineObjects(ret, retn); - if (!amountOtherShards) { + // eslint-disable-next-line + console.log(`CLUSTER got res:${type} from shard, ${amountOtherShards} still left`); + if (amountOtherShards <= 0) { + console.log(`CLUSTER res:${type} finished`); this.off(chankey, callback); clearTimeout(id); resolve(ret); @@ -281,11 +282,6 @@ class MessageBroker extends SocketEvents { */ emit(key, ...args) { super.emit(key, ...args); - if (key.startsWith('res:')) { - // responses of requests are handled specifically - // and not broadcasted - return; - } const msg = `${this.thisShard}:${key},${JSON.stringify(args)}`; this.publisher.publish(BROADCAST_CHAN, msg); }