detect broken redis PUBSUB channels and automatically reconnect

This commit is contained in:
HF 2024-01-19 14:52:24 +01:00
parent da5598863a
commit 1077831c23
2 changed files with 84 additions and 30 deletions

View File

@ -2,6 +2,11 @@
* sends messages to other ppfun instances
* to work as cluster
* If methods have no descriptions, they can be checked in ./SockEvents.js
*
* Subscribed PUB/SUB Channels:
* 'bc': text broadcast channel for everyone
* 'l:[thisShardName]': text channel specific shards are listening too
* '[otherShardName]': of every single other shard, where they send binary
*/
/* eslint-disable no-console */
@ -56,8 +61,14 @@ class MessageBroker extends SocketEvents {
* },...]
*/
this.csReq = [];
/*
* channel keepalive pings
* { [channelName]: lastPingTimestamp }
*/
this.pings = {};
/*
* all other shards
* { [shardName]: lastBroadcastTimestamp, ... }
*/
this.shards = {};
/*
@ -78,17 +89,8 @@ class MessageBroker extends SocketEvents {
async initialize() {
this.publisher = pubsub.publisher;
this.subscriber = pubsub.subscriber;
// broadcast chan
await this.subscriber.subscribe(BROADCAST_CHAN, (...args) => {
this.onShardBCMessage(...args);
});
// shard specific listener
await this.subscriber.subscribe(
`${LISTEN_PREFIX}:${this.thisShard}`,
(...args) => {
this.onShardListenMessage(...args);
},
);
await this.connectBCChannel();
await this.connectShardChannel();
// give other shards 30s to announce themselves
await new Promise((resolve) => {
setTimeout(resolve, 25000);
@ -96,15 +98,32 @@ class MessageBroker extends SocketEvents {
console.log('CLUSTER: Initialized message broker');
}
async connectBCChannel() {
await this.subscriber.subscribe(BROADCAST_CHAN, (...args) => {
this.onShardBCMessage(...args);
});
this.pings[BROADCAST_CHAN] = Date.now();
}
async connectShardChannel() {
const channel = `${LISTEN_PREFIX}:${this.thisShard}`;
await this.subscriber.subscribe(channel, (...args) => {
this.onShardListenMessage(...args);
});
this.pings[channel] = Date.now();
}
/*
* messages on shared broadcast channels that every shard is listening to
*/
async onShardBCMessage(message) {
try {
const curTime = Date.now();
/*
* messages from own shard get dropped
* messages from own shard get used as ping
*/
if (!message || message.startsWith(this.thisShard)) {
if (message.startsWith(this.thisShard)) {
this.pings[BROADCAST_CHAN] = curTime;
return;
}
const comma = message.indexOf(',');
@ -135,7 +154,6 @@ class MessageBroker extends SocketEvents {
*/
if (!this.shards[message]) {
console.log(`CLUSTER: Shard ${message} connected`);
this.shards[message] = Date.now();
await this.subscriber.subscribe(
message,
(buffer) => this.onShardBinaryMessage(buffer, message),
@ -143,9 +161,9 @@ class MessageBroker extends SocketEvents {
);
// immediately give new shards information
this.publisher.publish(BROADCAST_CHAN, this.thisShard);
return;
}
this.shards[message] = Date.now();
this.pings[message] = curTime;
this.shards[message] = curTime;
} catch (err) {
console.error(`CLUSTER: Error on broadcast message: ${err.message}`);
}
@ -256,6 +274,15 @@ class MessageBroker extends SocketEvents {
this.sumOnlineCounters();
}
removeShardFromOnlienCounter(shard) {
const counterIndex = this.shardOnlineCounters.findIndex(
(c) => c[0] === shard,
);
if (~counterIndex) {
this.shardOnlineCounters.splice(counterIndex, 1);
}
}
/*
* messages on binary shard channels, where specific shards send from
*/
@ -278,6 +305,8 @@ class MessageBroker extends SocketEvents {
case ONLINE_COUNTER_OP: {
const cnt = hydrateOnlineCounter(buffer);
this.updateShardOnlineCounter(shard, cnt);
// use online counter as ping for binary shard channel
this.pings[shard] = Date.now();
break;
}
default:
@ -372,23 +401,46 @@ class MessageBroker extends SocketEvents {
super.emit('onlineCounter', this.onlineCounter);
}
checkHealth() {
// remove disconnected shards
async checkHealth() {
let threshold = Date.now() - 30000;
const { shards } = this;
Object.keys(shards).forEach((shard) => {
if (shards[shard] < threshold) {
console.log(`CLUSTER: Shard ${shard} disconnected`);
delete shards[shard];
const counterIndex = this.shardOnlineCounters.findIndex(
(c) => c[0] === shard,
);
if (~counterIndex) {
this.shardOnlineCounters.splice(counterIndex, 1);
const { shards, pings } = this;
try {
// remove disconnected shards
for (const [shard, timeLastPing] of Object.entries(shards)) {
if (timeLastPing < threshold) {
console.log(`CLUSTER: Shard ${shard} disconnected`);
this.removeShardFromOnlienCounter(shard);
// eslint-disable-next-line no-await-in-loop
await this.subscriber.unsubscribe(shard);
delete pings[shard];
delete shards[shard];
}
this.subscriber.unsubscribe(shard);
}
});
// check for disconnected redis channels
for (const [channel, timeLastPing] of Object.entries(pings)) {
if (timeLastPing < threshold) {
// eslint-disable-next-line no-await-in-loop
await this.subscriber.unsubscribe(channel);
delete pings[channel];
if (channel === BROADCAST_CHAN) {
console.warn('CLUSTER: Broadcaset channel broken, reconnect');
// eslint-disable-next-line no-await-in-loop
await this.connectBCChannel();
} else if (channel.startsWith(`${LISTEN_PREFIX}:`)) {
console.warn('CLUSTER: Shard text channel broken, reconnect');
// eslint-disable-next-line no-await-in-loop
await this.connectShardChannel();
} else {
console.warn(`CLUSTER: Binary channel to shard ${channel} broken`);
// will connect again on next broadcast of shard
this.removeShardFromOnlienCounter(channel);
delete shards[channel];
}
}
}
} catch (err) {
console.error(`CLUSTER: Error on health check: ${err.message}`);
}
// send keep alive to others
this.publisher.publish(BROADCAST_CHAN, this.thisShard);
// clean up dead shard requests

View File

@ -174,6 +174,8 @@ class SocketServer {
}
});
// when changing interval, remember that online counter gets used as ping
// for binary sharded channels in MessageBroker.js
setInterval(this.onlineCounterBroadcast, 20 * 1000);
setInterval(this.checkHealth, 15 * 1000);
}