broadcasst req answer to all shards for now

This commit is contained in:
HF 2022-09-17 23:34:21 +02:00
parent 96c0744d2e
commit 1bf8169c57
2 changed files with 53 additions and 10 deletions

View File

@ -497,6 +497,50 @@ export function parseInterval(interval) {
return (num * factor); return (num * factor);
} }
/*
* combines tables
* a tables is an object with {
* columns: Array,
* types: Array,
* rows: Array,
* }
*/
export function combineTables(a, b) {
if (a.columns.length === b.columns.length) {
a.rows = a.rows.concat(b.rows);
return a;
}
let bTable;
let sTable;
if (a.columns.length < b.columns.length) {
bTable = b;
sTable = a;
} else {
bTable = a;
sTable = b;
}
if (!sTable.rows.length) {
return bTable;
}
const newRows = [];
for (let i = 0; i < sTable.rows.length; i += 1) {
newRows.push([]);
}
for (let i = 0; i < bTable.columns.length; i += 1) {
const colInd = sTable.columns.indexOf(bTable.columns[i]);
if (~colInd) {
for (let u = 0; u < sTable.rows.length; u += 1) {
newRows[u].push(sTable.rows[u][colInd]);
}
}
for (let u = 0; u < sTable.rows.length; u += 1) {
newRows[u].push(null);
}
}
bTable.rows = bTable.rows.concat(newRows);
return bTable;
}
/* /*
* conbine two similar objects * conbine two similar objects
*/ */
@ -512,6 +556,9 @@ export function combineObjects(a, b) {
} }
if (typeof a === 'object') { if (typeof a === 'object') {
const keys = Object.keys(a); const keys = Object.keys(a);
if (keys.includes('columns')) {
return combineTables(a, b);
}
keys.forEach((key) => { keys.forEach((key) => {
const u = a[key]; const u = a[key];
const v = b[key]; const v = b[key];

View File

@ -95,11 +95,8 @@ class MessageBroker extends SocketEvents {
const key = message.slice(message.indexOf(':') + 1, comma); const key = message.slice(message.indexOf(':') + 1, comma);
console.log('CLUSTER: Broadcast', key); console.log('CLUSTER: Broadcast', key);
const val = JSON.parse(message.slice(comma + 1)); const val = JSON.parse(message.slice(comma + 1));
/*
if (key.startsWith('req:')) { if (key.startsWith('req:')) {
/*
* if its a request send the response to shard
* specific channel to avoid broadcast spam
*/
try { try {
const shard = message.slice(0, message.indexOf(':')); const shard = message.slice(0, message.indexOf(':'));
const chan = val.shift(); const chan = val.shift();
@ -113,6 +110,7 @@ class MessageBroker extends SocketEvents {
} }
return; return;
} }
*/
super.emit(key, ...val); super.emit(key, ...val);
return; return;
} }
@ -190,7 +188,10 @@ class MessageBroker extends SocketEvents {
const callback = (retn) => { const callback = (retn) => {
amountOtherShards -= 1; amountOtherShards -= 1;
ret = combineObjects(ret, retn); ret = combineObjects(ret, retn);
if (!amountOtherShards) { // eslint-disable-next-line
console.log(`CLUSTER got res:${type} from shard, ${amountOtherShards} still left`);
if (amountOtherShards <= 0) {
console.log(`CLUSTER res:${type} finished`);
this.off(chankey, callback); this.off(chankey, callback);
clearTimeout(id); clearTimeout(id);
resolve(ret); resolve(ret);
@ -281,11 +282,6 @@ class MessageBroker extends SocketEvents {
*/ */
emit(key, ...args) { emit(key, ...args) {
super.emit(key, ...args); super.emit(key, ...args);
if (key.startsWith('res:')) {
// responses of requests are handled specifically
// and not broadcasted
return;
}
const msg = `${this.thisShard}:${key},${JSON.stringify(args)}`; const msg = `${this.thisShard}:${key},${JSON.stringify(args)}`;
this.publisher.publish(BROADCAST_CHAN, msg); this.publisher.publish(BROADCAST_CHAN, msg);
} }