Guest
Der socket.io-Client kann keine Ereignisse empfangen und funktioniert nach einiger Zeit nicht mehr. Aber wenn der Client
Post
by Guest » 20 Jan 2025, 14:34
Ich arbeite an einer Chat-App. Dabei verwende ich verschiedene Socket-Ereignisse, um Nachrichten über Sockets vom Client an den Server zu senden. So funktioniert es: Client-Sendeanforderung mit JWT am Socket. Ich überprüfe vorher, ob eine Verbindung besteht oder nicht, ob ich nur diese Sitzung lösche, eine neue Sitzung erstelle und ihre Informationen in Redies speichere. Hier ist mein Serverdateicode wie folgt
Code: Select all
require("dotenv").config();
const port = 8181;
const cluster = require("cluster");
const io_redis = require("socket.io-redis");
const num_processes = require("os").cpus().length;
import * as net from "net";
import cors from "cors";
import * as http from "http";
import router from "./routes";
import { PeerServer } from "peer";
import express from "express";
import * as socket from "socket.io";
import * as farmhash from "farmhash";
import cookieParser from "cookie-parser";
import { socketMain } from "./socket.io/socketMain";
import { inititalizeMongoDb } from "./database/mongoInstance";
import { isAuthSocket } from "./middlewares/isAuthSocket.middleware";
import { deleteOldMessageCron } from './services/deletemessagecron';
import { createClient, RedisClientType } from "redis";
// Create the Redis client using a URL
const redisClient: RedisClientType = createClient({ url: '' });
redisClient.connect();
(async () => {
if (cluster.isMaster) {
const workers: any = [];
const spawn = (i: number) => {
workers[i] = cluster.fork();
workers[i].on("exit", () => {
console.log("respawning worker", i);
spawn(i);
});
};
for (var i = 0; i < num_processes; i++) {
spawn(i);
}
const worker_index = (ip: string, len: number) => {
return farmhash.fingerprint32(ip) % len;
};
const server: net.Server = net.createServer(
{ pauseOnConnect: true },
(connection: net.Socket) => {
const worker =
workers[worker_index(connection.remoteAddress, num_processes)];
worker.send("sticky-session:connection", connection);
}
);
server.listen(port);
console.log(`Master listening on port ${port}`);
} else {
let app = express();
app.use(express.json({ limit: '50mb' }));
app.use(express.urlencoded({ limit: '50mb', extended: true }));
app.use(cookieParser());
app.use(
cors({
origin: [ "file://", "http://localhost", "http://localhost:3000"],
credentials: true,
})
);
app.use("/", router);
const server: http.Server = app.listen(0, "localhost");
console.log("Worker listening...");
const io = new socket.Server(server, {
cors: {
origin: [ "file://", "http://localhost", "http://localhost:3000"],
credentials: true,
},
pingTimeout: 120000,
pingInterval: 25000,
});
io.adapter(io_redis({
url: process.env.REDIS_URL,
retryStrategy: (times: any) => {
const delay = Math.min(times * 50, 2000);
return delay;
}
}));
await inititalizeMongoDb();
deleteOldMessageCron();
io.use(isAuthSocket);
io.on("error", (err: any) => {
console.log("Socket.io Error:", err);
});
io.on("connection", (socket: socket.Socket) => {
console.log("connected to socket server", socket.id);
socketMain(io, socket, redisClient);
console.log(`connected to worker: ${cluster.worker.id}`);
});
process.on("message", (message, connection) => {
if (message !== "sticky-session:connection") {
return;
}
server.emit("connection", connection);
//@ts-ignore
connection.resume();
});
}
})();
meine SocketHauptdatei, in der alle Ereignisse verarbeitet werden
Code: Select all
import { callOtherUser } from "./handlers/callOtherUser.handler";
import { disconnectVideoCall } from "./handlers/disconnectCall.handler";
import { getTotalUsers } from "./handlers/getTotalUsers.handler";
import { handleActiveSession } from "./handlers/handleActiveSession";
import { initialSocketConfig } from "./handlers/initialVerification";
import { iTextMessage } from "./handlers/iTextMessage.handler";
import { joinVideoRoom } from "./handlers/joinVideoRoom.handler";
import { rejectVideoCall } from "./handlers/rejectCall.handler";
import { socketDisconnect } from "./handlers/socketDisconnect.handler";
import { updateGroupInfo } from "./handlers/updateGroupInfo.handler";
import { updateOthersChats } from "./handlers/updateOthersChats.handler";
import { updateUserProfile } from "./handlers/updateUserProfile.handler";
import { userOnCall } from "./handlers/userOnCall.handler";
import { MarkAsReadMessage } from './handlers/MarkAsReadMessage.handler';
import { NickName } from './handlers/NickName.handler';
import { switchActiveChat } from "./handlers/switchActiveChat.handler";
import { RedisClientType } from "redis";
import { blockUser } from './blockUser.handler';
export const socketMain = async (io: any, socket: any, redisClient: RedisClientType) => {
try {
let heartbeatTimeout: NodeJS.Timeout;
const { _id, db, userPayload } = await initialSocketConfig(
io,
socket
);
handleActiveSession(io, socket, _id, userPayload?.displayName, redisClient);
socket.emit("signInSuccess", {
objectId: _id,
displayName: userPayload?.displayName,
email: userPayload?.email,
avatar: userPayload?.avatar,
createdOn: userPayload?.createdOn,
about: userPayload?.about,
lastSeen: userPayload?.lastSeen,
});
socket.on("callOtherUser", (payload: any) =>
callOtherUser(io, _id, db, payload)
);
socket.on("join-vc-room", (roomId: string, peerUserId: string) =>
joinVideoRoom(socket, roomId, peerUserId)
);
socket.on("diconnect-from-call", (roomId: string, peerUserId: string) =>
disconnectVideoCall(socket, roomId, peerUserId)
);
socket.on("reject-call", (roomId: string) =>
rejectVideoCall(socket, roomId)
);
socket.on("user-on-call", (roomId: string) => userOnCall(socket, roomId));
socket.on("getTotalUsers", () => getTotalUsers(db, socket, _id));
socket.broadcast.emit("updateTotalUsers", {
objectId: userPayload?._id,
displayName: userPayload?.displayName,
avatar: userPayload?.avatar,
createdOn: userPayload?.createdOn,
about: userPayload?.about,
lastSeen: userPayload?.lastSeen,
});
socket.on(
"updateUserProfile",
async (payload: any) => await updateUserProfile(socket, _id, payload, db)
);
socket.on("updateGroupInfo", async (payload: any) =>
updateGroupInfo(io, _id, payload, db)
);
socket.on("updateOthersChats", (payload: any) => {
console.log("updateOthersChats", payload)
updateOthersChats(db, io, _id, payload)
}
);
socket.on("iTextMessage", async (payload: any) =>
iTextMessage(io, socket, payload, db, _id)
);
socket.on("markAsReadMessage", async (payload: any) =>
MarkAsReadMessage(io, socket, payload, db, _id)
);
socket.on("setNickname", async (payload: any) =>
NickName(io, payload, db, _id)
);
socket.broadcast.emit("online", _id);
socket.on("switchActiveChat", async (payload: any) =>
switchActiveChat(io, socket, _id, db, payload)
);
socket.on("user-blocked", async (payload: any) => {
blockUser(io, socket, _id, db, payload)
})
socket.on('heartbeat', (data: any) => {
console.log('Client heartbeat:', socket.id);
// Respond immediately to client
socket.emit('heartbeat_received');
// Clear existing timeout
if (heartbeatTimeout) {
clearTimeout(heartbeatTimeout);
}
// Set new timeout for missed heartbeat
heartbeatTimeout = setTimeout(() => {
console.log('Client heartbeat timeout:', socket.id);
socket.disconnect(true);
}, 90000); // 90 seconds (allowing for network delays)
});
socket.on("disconnect", async (reason: any) => {
if (heartbeatTimeout) {
clearTimeout(heartbeatTimeout);
}
console.log("DISCONNECT REASON:", reason);
console.log("Socket Namespace:", socket.nsp.name);
console.log("Socket ID:", socket.id);
try {
await socketDisconnect(socket, _id, db, redisClient);
} catch (err) {
console.error("Error during socket disconnect:", err);
}
});
} catch (err) {
console.log("MAIN SOCKET ERR", err);
}
};
handleActiveSession-Datei, in der Benutzerverbindungen gespeichert oder aus Redies entfernt werden
Code: Select all
import * as socket from "socket.io";
import {
getActiveUserByObjectId,
removeActiveUserByObjectId,
addToActiveUsers,
getActiveUsers
} from "../../utils/activeUsers";
import { RedisClientType } from "redis";
export const handleActiveSession = async (
io: socket.Server,
socket: socket.Socket,
_id: string,
Name: string,
redisClient: RedisClientType
) => {
const userKey = `user:${_id}`;
const userSession = {
socketId: socket.id,
objectId: _id,
Name: Name
};
await redisClient.set(userKey, JSON.stringify(userSession));
if (!getActiveUserByObjectId(_id)) {
console.log("New session!");
addToActiveUsers(userSession);
console.log("Active Users: ", getActiveUsers());
} else {
console.log("Prev Disconnected, New session!");
const prevSocketId = getActiveUserByObjectId(_id)?.socketId;
if (io.sockets.sockets.get(prevSocketId)) {
console.log(prevSocketId + "multipleSession disconnected");
io.sockets.sockets.get(prevSocketId).emit("multipleSession");
io.sockets.sockets.get(prevSocketId).disconnect(true);
}
removeActiveUserByObjectId(_id);
addToActiveUsers(userSession);
}
};
1737380069
Guest
Ich arbeite an einer Chat-App. Dabei verwende ich verschiedene Socket-Ereignisse, um Nachrichten über Sockets vom Client an den Server zu senden. So funktioniert es: Client-Sendeanforderung mit JWT am Socket. Ich überprüfe vorher, ob eine Verbindung besteht oder nicht, ob ich nur diese Sitzung lösche, eine neue Sitzung erstelle und ihre Informationen in Redies speichere. Hier ist mein Serverdateicode wie folgt [code]require("dotenv").config(); const port = 8181; const cluster = require("cluster"); const io_redis = require("socket.io-redis"); const num_processes = require("os").cpus().length; import * as net from "net"; import cors from "cors"; import * as http from "http"; import router from "./routes"; import { PeerServer } from "peer"; import express from "express"; import * as socket from "socket.io"; import * as farmhash from "farmhash"; import cookieParser from "cookie-parser"; import { socketMain } from "./socket.io/socketMain"; import { inititalizeMongoDb } from "./database/mongoInstance"; import { isAuthSocket } from "./middlewares/isAuthSocket.middleware"; import { deleteOldMessageCron } from './services/deletemessagecron'; import { createClient, RedisClientType } from "redis"; // Create the Redis client using a URL const redisClient: RedisClientType = createClient({ url: '' }); redisClient.connect(); (async () => { if (cluster.isMaster) { const workers: any = []; const spawn = (i: number) => { workers[i] = cluster.fork(); workers[i].on("exit", () => { console.log("respawning worker", i); spawn(i); }); }; for (var i = 0; i < num_processes; i++) { spawn(i); } const worker_index = (ip: string, len: number) => { return farmhash.fingerprint32(ip) % len; }; const server: net.Server = net.createServer( { pauseOnConnect: true }, (connection: net.Socket) => { const worker = workers[worker_index(connection.remoteAddress, num_processes)]; worker.send("sticky-session:connection", connection); } ); server.listen(port); console.log(`Master listening on port ${port}`); } else { let app = express(); app.use(express.json({ limit: '50mb' })); app.use(express.urlencoded({ limit: '50mb', extended: true })); app.use(cookieParser()); app.use( cors({ origin: [ "file://", "http://localhost", "http://localhost:3000"], credentials: true, }) ); app.use("/", router); const server: http.Server = app.listen(0, "localhost"); console.log("Worker listening..."); const io = new socket.Server(server, { cors: { origin: [ "file://", "http://localhost", "http://localhost:3000"], credentials: true, }, pingTimeout: 120000, pingInterval: 25000, }); io.adapter(io_redis({ url: process.env.REDIS_URL, retryStrategy: (times: any) => { const delay = Math.min(times * 50, 2000); return delay; } })); await inititalizeMongoDb(); deleteOldMessageCron(); io.use(isAuthSocket); io.on("error", (err: any) => { console.log("Socket.io Error:", err); }); io.on("connection", (socket: socket.Socket) => { console.log("connected to socket server", socket.id); socketMain(io, socket, redisClient); console.log(`connected to worker: ${cluster.worker.id}`); }); process.on("message", (message, connection) => { if (message !== "sticky-session:connection") { return; } server.emit("connection", connection); //@ts-ignore connection.resume(); }); } })();[/code] meine SocketHauptdatei, in der alle Ereignisse verarbeitet werden [code]import { callOtherUser } from "./handlers/callOtherUser.handler"; import { disconnectVideoCall } from "./handlers/disconnectCall.handler"; import { getTotalUsers } from "./handlers/getTotalUsers.handler"; import { handleActiveSession } from "./handlers/handleActiveSession"; import { initialSocketConfig } from "./handlers/initialVerification"; import { iTextMessage } from "./handlers/iTextMessage.handler"; import { joinVideoRoom } from "./handlers/joinVideoRoom.handler"; import { rejectVideoCall } from "./handlers/rejectCall.handler"; import { socketDisconnect } from "./handlers/socketDisconnect.handler"; import { updateGroupInfo } from "./handlers/updateGroupInfo.handler"; import { updateOthersChats } from "./handlers/updateOthersChats.handler"; import { updateUserProfile } from "./handlers/updateUserProfile.handler"; import { userOnCall } from "./handlers/userOnCall.handler"; import { MarkAsReadMessage } from './handlers/MarkAsReadMessage.handler'; import { NickName } from './handlers/NickName.handler'; import { switchActiveChat } from "./handlers/switchActiveChat.handler"; import { RedisClientType } from "redis"; import { blockUser } from './blockUser.handler'; export const socketMain = async (io: any, socket: any, redisClient: RedisClientType) => { try { let heartbeatTimeout: NodeJS.Timeout; const { _id, db, userPayload } = await initialSocketConfig( io, socket ); handleActiveSession(io, socket, _id, userPayload?.displayName, redisClient); socket.emit("signInSuccess", { objectId: _id, displayName: userPayload?.displayName, email: userPayload?.email, avatar: userPayload?.avatar, createdOn: userPayload?.createdOn, about: userPayload?.about, lastSeen: userPayload?.lastSeen, }); socket.on("callOtherUser", (payload: any) => callOtherUser(io, _id, db, payload) ); socket.on("join-vc-room", (roomId: string, peerUserId: string) => joinVideoRoom(socket, roomId, peerUserId) ); socket.on("diconnect-from-call", (roomId: string, peerUserId: string) => disconnectVideoCall(socket, roomId, peerUserId) ); socket.on("reject-call", (roomId: string) => rejectVideoCall(socket, roomId) ); socket.on("user-on-call", (roomId: string) => userOnCall(socket, roomId)); socket.on("getTotalUsers", () => getTotalUsers(db, socket, _id)); socket.broadcast.emit("updateTotalUsers", { objectId: userPayload?._id, displayName: userPayload?.displayName, avatar: userPayload?.avatar, createdOn: userPayload?.createdOn, about: userPayload?.about, lastSeen: userPayload?.lastSeen, }); socket.on( "updateUserProfile", async (payload: any) => await updateUserProfile(socket, _id, payload, db) ); socket.on("updateGroupInfo", async (payload: any) => updateGroupInfo(io, _id, payload, db) ); socket.on("updateOthersChats", (payload: any) => { console.log("updateOthersChats", payload) updateOthersChats(db, io, _id, payload) } ); socket.on("iTextMessage", async (payload: any) => iTextMessage(io, socket, payload, db, _id) ); socket.on("markAsReadMessage", async (payload: any) => MarkAsReadMessage(io, socket, payload, db, _id) ); socket.on("setNickname", async (payload: any) => NickName(io, payload, db, _id) ); socket.broadcast.emit("online", _id); socket.on("switchActiveChat", async (payload: any) => switchActiveChat(io, socket, _id, db, payload) ); socket.on("user-blocked", async (payload: any) => { blockUser(io, socket, _id, db, payload) }) socket.on('heartbeat', (data: any) => { console.log('Client heartbeat:', socket.id); // Respond immediately to client socket.emit('heartbeat_received'); // Clear existing timeout if (heartbeatTimeout) { clearTimeout(heartbeatTimeout); } // Set new timeout for missed heartbeat heartbeatTimeout = setTimeout(() => { console.log('Client heartbeat timeout:', socket.id); socket.disconnect(true); }, 90000); // 90 seconds (allowing for network delays) }); socket.on("disconnect", async (reason: any) => { if (heartbeatTimeout) { clearTimeout(heartbeatTimeout); } console.log("DISCONNECT REASON:", reason); console.log("Socket Namespace:", socket.nsp.name); console.log("Socket ID:", socket.id); try { await socketDisconnect(socket, _id, db, redisClient); } catch (err) { console.error("Error during socket disconnect:", err); } }); } catch (err) { console.log("MAIN SOCKET ERR", err); } };[/code] handleActiveSession-Datei, in der Benutzerverbindungen gespeichert oder aus Redies entfernt werden [code]import * as socket from "socket.io"; import { getActiveUserByObjectId, removeActiveUserByObjectId, addToActiveUsers, getActiveUsers } from "../../utils/activeUsers"; import { RedisClientType } from "redis"; export const handleActiveSession = async ( io: socket.Server, socket: socket.Socket, _id: string, Name: string, redisClient: RedisClientType ) => { const userKey = `user:${_id}`; const userSession = { socketId: socket.id, objectId: _id, Name: Name }; await redisClient.set(userKey, JSON.stringify(userSession)); if (!getActiveUserByObjectId(_id)) { console.log("New session!"); addToActiveUsers(userSession); console.log("Active Users: ", getActiveUsers()); } else { console.log("Prev Disconnected, New session!"); const prevSocketId = getActiveUserByObjectId(_id)?.socketId; if (io.sockets.sockets.get(prevSocketId)) { console.log(prevSocketId + "multipleSession disconnected"); io.sockets.sockets.get(prevSocketId).emit("multipleSession"); io.sockets.sockets.get(prevSocketId).disconnect(true); } removeActiveUserByObjectId(_id); addToActiveUsers(userSession); } };[/code]