From 1077831c23b28861753323abea53ab61f76f910c Mon Sep 17 00:00:00 2001 From: HF Date: Fri, 19 Jan 2024 14:52:24 +0100 Subject: [PATCH] detect broken redis PUBSUB channels and automatically reconnect --- src/socket/MessageBroker.js | 112 ++++++++++++++++++++++++++---------- src/socket/SocketServer.js | 2 + 2 files changed, 84 insertions(+), 30 deletions(-) diff --git a/src/socket/MessageBroker.js b/src/socket/MessageBroker.js index 4ea0edff..d1e3ae76 100644 --- a/src/socket/MessageBroker.js +++ b/src/socket/MessageBroker.js @@ -2,6 +2,11 @@ * sends messages to other ppfun instances * to work as cluster * If methods have no descriptions, they can be checked in ./SockEvents.js + * + * Subscribed PUB/SUB Channels: + * 'bc': text broadcast channel for everyone + * 'l:[thisShardName]': text channel specific shards are listening too + * '[otherShardName]': of every single other shard, where they send binary */ /* eslint-disable no-console */ @@ -56,8 +61,14 @@ class MessageBroker extends SocketEvents { * },...] */ this.csReq = []; + /* + * channel keepalive pings + * { [channelName]: lastPingTimestamp } + */ + this.pings = {}; /* * all other shards + * { [shardName]: lastBroadcastTimestamp, ... } */ this.shards = {}; /* @@ -78,17 +89,8 @@ class MessageBroker extends SocketEvents { async initialize() { this.publisher = pubsub.publisher; this.subscriber = pubsub.subscriber; - // broadcast chan - await this.subscriber.subscribe(BROADCAST_CHAN, (...args) => { - this.onShardBCMessage(...args); - }); - // shard specific listener - await this.subscriber.subscribe( - `${LISTEN_PREFIX}:${this.thisShard}`, - (...args) => { - this.onShardListenMessage(...args); - }, - ); + await this.connectBCChannel(); + await this.connectShardChannel(); // give other shards 30s to announce themselves await new Promise((resolve) => { setTimeout(resolve, 25000); @@ -96,15 +98,32 @@ class MessageBroker extends SocketEvents { console.log('CLUSTER: Initialized message broker'); } + async connectBCChannel() { + await this.subscriber.subscribe(BROADCAST_CHAN, (...args) => { + this.onShardBCMessage(...args); + }); + this.pings[BROADCAST_CHAN] = Date.now(); + } + + async connectShardChannel() { + const channel = `${LISTEN_PREFIX}:${this.thisShard}`; + await this.subscriber.subscribe(channel, (...args) => { + this.onShardListenMessage(...args); + }); + this.pings[channel] = Date.now(); + } + /* * messages on shared broadcast channels that every shard is listening to */ async onShardBCMessage(message) { try { + const curTime = Date.now(); /* - * messages from own shard get dropped + * messages from own shard get used as ping */ - if (!message || message.startsWith(this.thisShard)) { + if (message.startsWith(this.thisShard)) { + this.pings[BROADCAST_CHAN] = curTime; return; } const comma = message.indexOf(','); @@ -135,7 +154,6 @@ class MessageBroker extends SocketEvents { */ if (!this.shards[message]) { console.log(`CLUSTER: Shard ${message} connected`); - this.shards[message] = Date.now(); await this.subscriber.subscribe( message, (buffer) => this.onShardBinaryMessage(buffer, message), @@ -143,9 +161,9 @@ class MessageBroker extends SocketEvents { ); // immediately give new shards information this.publisher.publish(BROADCAST_CHAN, this.thisShard); - return; } - this.shards[message] = Date.now(); + this.pings[message] = curTime; + this.shards[message] = curTime; } catch (err) { console.error(`CLUSTER: Error on broadcast message: ${err.message}`); } @@ -256,6 +274,15 @@ class MessageBroker extends SocketEvents { this.sumOnlineCounters(); } + removeShardFromOnlienCounter(shard) { + const counterIndex = this.shardOnlineCounters.findIndex( + (c) => c[0] === shard, + ); + if (~counterIndex) { + this.shardOnlineCounters.splice(counterIndex, 1); + } + } + /* * messages on binary shard channels, where specific shards send from */ @@ -278,6 +305,8 @@ class MessageBroker extends SocketEvents { case ONLINE_COUNTER_OP: { const cnt = hydrateOnlineCounter(buffer); this.updateShardOnlineCounter(shard, cnt); + // use online counter as ping for binary shard channel + this.pings[shard] = Date.now(); break; } default: @@ -372,23 +401,46 @@ class MessageBroker extends SocketEvents { super.emit('onlineCounter', this.onlineCounter); } - checkHealth() { - // remove disconnected shards + async checkHealth() { let threshold = Date.now() - 30000; - const { shards } = this; - Object.keys(shards).forEach((shard) => { - if (shards[shard] < threshold) { - console.log(`CLUSTER: Shard ${shard} disconnected`); - delete shards[shard]; - const counterIndex = this.shardOnlineCounters.findIndex( - (c) => c[0] === shard, - ); - if (~counterIndex) { - this.shardOnlineCounters.splice(counterIndex, 1); + const { shards, pings } = this; + try { + // remove disconnected shards + for (const [shard, timeLastPing] of Object.entries(shards)) { + if (timeLastPing < threshold) { + console.log(`CLUSTER: Shard ${shard} disconnected`); + this.removeShardFromOnlienCounter(shard); + // eslint-disable-next-line no-await-in-loop + await this.subscriber.unsubscribe(shard); + delete pings[shard]; + delete shards[shard]; } - this.subscriber.unsubscribe(shard); } - }); + // check for disconnected redis channels + for (const [channel, timeLastPing] of Object.entries(pings)) { + if (timeLastPing < threshold) { + // eslint-disable-next-line no-await-in-loop + await this.subscriber.unsubscribe(channel); + delete pings[channel]; + if (channel === BROADCAST_CHAN) { + console.warn('CLUSTER: Broadcaset channel broken, reconnect'); + // eslint-disable-next-line no-await-in-loop + await this.connectBCChannel(); + } else if (channel.startsWith(`${LISTEN_PREFIX}:`)) { + console.warn('CLUSTER: Shard text channel broken, reconnect'); + // eslint-disable-next-line no-await-in-loop + await this.connectShardChannel(); + } else { + console.warn(`CLUSTER: Binary channel to shard ${channel} broken`); + // will connect again on next broadcast of shard + this.removeShardFromOnlienCounter(channel); + delete shards[channel]; + } + } + } + } catch (err) { + console.error(`CLUSTER: Error on health check: ${err.message}`); + } // send keep alive to others this.publisher.publish(BROADCAST_CHAN, this.thisShard); // clean up dead shard requests diff --git a/src/socket/SocketServer.js b/src/socket/SocketServer.js index 87db2b04..f56cb2c1 100644 --- a/src/socket/SocketServer.js +++ b/src/socket/SocketServer.js @@ -174,6 +174,8 @@ class SocketServer { } }); + // when changing interval, remember that online counter gets used as ping + // for binary sharded channels in MessageBroker.js setInterval(this.onlineCounterBroadcast, 20 * 1000); setInterval(this.checkHealth, 15 * 1000); }