使用socket
来实时通信是很常见的行为
某些场景下为了降低后端的压力,需要实现多个前端页面共用一个socket
连接
得益于最近写了写浏览器插件,对通信方式略有了解,使用BroadcastChannel
可以实现前文所述的需求,大致的交互是这样的
挑选一个页面A
和服务器进行数据交互,其他页面通过BroadcastChannel
和页面A
和后台服务进行交互
this.channel = new BroadcastChannel("shared-socket");
this.channel.postMessage({
type: "who-is-master"
})
this.channel.onmessage = (event)=>{
const { type, tabInfo, data, newTabInfo } = event.data;
switch (type) {
case 'i-am-master':{
// 说明有人连上了
}break;
case 'who-is-master':{
// 如果连上了就告诉他
}
}
}
this.setupTimer = setTimeout(() => {
const flag = '没有人回复'
if (flag) {
// 自己连
}
}, 1000);
回复是回复了,我怎么知道你是谁呢?只能整个身份信息了
this.tabInfo = {
tabId: uuid()
};
这下好了,大家交流的时候都带上身份信息,实名交流
this.channel.postMessage({
type: "who-is-master",
tabInfo: this.tabInfo
})
如题,页面A
正在连接,但是还没连上,页面B
开始问,有没有人连上了,那自然是回复不了他
很好,页面B
也开始连接了,寄
有两个方案:
页面A
开始连之后,就可以回复我在连了,阻止其他人连页面A
连上之后广播通知一下我连上了
,其他人就可以取消连接了
只能熬资历了,后连的自己断掉!
大家都靠页面A
和服务器联系,页面A
突然说不干了!
,寄
只能让下一位选手去连接了,又担心大家打起来,只能让页面A
在撂挑子之前挑一个资历最老的了
this.channel.postMessage({
type: "you-are-new-master",
tabInfo: this.tabInfo,
newTabInfo: 最老的家伙,
}
为了防止浏览器突然抽风最好还是加上心跳机制
,我比较懒,大家可以自己加一下
/**
* 共享Socket服务接口定义
*/
export interface ISharedSocket {
/**
* 初始化并连接到WebSocket服务器
* @param url - WebSocket服务器地址
* @param options - Socket.IO连接选项
*/
connect(url: string, options?: any): void;
/**
* 断开WebSocket连接
*/
disconnect(): void;
/**
* 发送消息到服务器
* @param event - 事件名称
* @param data - 消息数据
*/
send(event: string, data: any): void;
/**
* 监听服务器消息
* @param callback - 消息回调函数
* @returns 返回一个用于取消监听的函数
*/
onMessage(callback: (msg: any) => void): () => void;
/**
* 监听ws状态变化消息
* @param callback - 消息回调函数
* @returns 返回一个用于取消监听的函数
*/
onStatusChange(callback: (status: TSKStats) => void): () => void;
/**
* 获取当前连接状态
* @returns 连接状态码
*/
getStatus(): TSKStats;
/**
* 重连
*/
toReconnection(): void;
}
/** @params Tab连接状态:0-询问中 1-正在建立 2-已建立 3-已断开 4-已连接到主Tab 5-已断开连接,但在尝试中 6-参数异常无法连接 */
export type TSKStats = 0 | 1 | 2 | 3 | 4 | 5 | 6;
/**
* Tab信息结构
*/
export interface ITabInfo {
/** Tab唯一标识 */
tabId: string;
/** Tab创建时间戳 */
createtime: number;
/** Tab连接状态:0-询问中 1-正在建立 2-已建立 3-已断开 4-已连接到主Tab */
status: TSKStats;
/** 是否为主Tab */
isMaster?: boolean;
}
/**
* 跨Tab通信消息结构
*/
export interface ChannelMessage {
type: // 询问主tab
| "who-is-master"
// 告知我是主tab
| "i-am-master"
// ws信息广播
| "ws-message"
// 告诉继任者你是新的master
| "you-are-new-master"
// 主tab状态改变
| "master-stats-change"
// 通知主tab重连
| "reconnect-request";
tabInfo: ITabInfo;
data?: any;
newTabInfo?: ITabInfo;
}
export interface SingletonConstructor<T> {
instance: T;
getInstance(): T;
new (): Singleton;
}
export interface Singleton {}
export function createSingleton<T>(): SingletonConstructor<T> {
return class Singleton {
static instance: T;
static getInstance() {
if (!Singleton.instance) {
Singleton.instance = new this() as T;
}
return Singleton.instance as unknown as T;
}
};
}
import { io, ManagerOptions, Socket, SocketOptions } from "socket.io-client";
import { v4 as uuid } from "uuid";
import { createSingleton } from "../Singleton";
import { ChannelMessage, ISharedSocket, ITabInfo, TSKStats } from "./interface";
const MAX_RETRY_TIME = 10;
/**
* 共享WebSocket单例实现
*/
const SharedSocketBase = createSingleton<ISharedSocket>();
export class SharedSocket extends SharedSocketBase implements ISharedSocket {
private channel: BroadcastChannel;
private retryTime = 0;
private tabInfo: ITabInfo = {
tabId: uuid(),
createtime: Date.now(),
status: 0,
};
private socket: Socket | null = null;
private mainTabInfo: ITabInfo | null = null;
private clients: Map<string, ITabInfo> = new Map();
private listeners: Set<(msg: any) => void> = new Set();
private statusListeners: Set<(status: TSKStats) => void> = new Set();
private serverUrl: string = "http://localhost:3677";
private socketOptions: Partial<ManagerOptions & SocketOptions> = {
reconnectionAttempts: MAX_RETRY_TIME,
reconnection: true,
reconnectionDelay: 1000,
};
private setupTimer: ReturnType<typeof setTimeout> | null = null;
constructor() {
super();
this.channel = new BroadcastChannel("shared-socket");
this.setup();
}
private setup() {
this.requestMaster();
// 监听跨Tab消息
this.channel.onmessage = (event: MessageEvent<ChannelMessage>) => {
const { type, tabInfo, data, newTabInfo } = event.data;
switch (type) {
case "who-is-master":
{
// 如果我是主Tab,响应身份
if (this.isMaster() && this.tabInfo.status !== 3) {
this.channel.postMessage({
type: "i-am-master",
tabInfo: this.tabInfo,
} as ChannelMessage);
// 将询问者添加到客户端列表
if (tabInfo.tabId !== this.tabInfo.tabId) {
this.clients.set(tabInfo.tabId, tabInfo);
}
}
}
break;
case "reconnect-request":
{
// 主Tab收到重连请求
if (this.isMaster() && tabInfo.tabId === this.tabInfo.tabId) {
this.reconnect();
}
}
break;
case "you-are-new-master":
{
// 继承主Tab身份
if (
this.mainTabInfo &&
tabInfo.tabId === this.mainTabInfo.tabId &&
newTabInfo &&
newTabInfo.tabId === this.tabInfo.tabId
) {
this.becomeMaster();
} else {
this.mainTabInfo = null;
this.requestMaster();
}
}
break;
case "master-stats-change":
// 更新主Tab状态
if (
this.mainTabInfo &&
newTabInfo &&
newTabInfo.tabId === this.mainTabInfo.tabId
) {
this.mainTabInfo = newTabInfo;
this.statusListeners.forEach((listener) =>
listener(newTabInfo.status)
);
}
break;
default: {
// 更新master信息
if (this.isMaster()) {
if (
tabInfo.createtime < this.tabInfo.createtime ||
this.tabInfo.status === 3
) {
// 对方创建时间更早,或者我已经断开了,我放弃主Tab身份
this.disconnectSocket();
this.clients.clear();
this.mainTabInfo = tabInfo;
this.updateStatus(tabInfo.status === 2 ? 4 : tabInfo.status);
}
} else if (this.mainTabInfo) {
// 我不是主Tab,更新主Tab信息
if (
tabInfo.createtime < this.mainTabInfo.createtime ||
this.mainTabInfo.status === 3
) {
this.mainTabInfo = tabInfo;
this.updateStatus(tabInfo.status === 2 ? 4 : tabInfo.status);
}
} else {
this.mainTabInfo = tabInfo;
this.updateStatus(tabInfo.status === 2 ? 4 : tabInfo.status);
}
switch (type) {
case "ws-message":
// 转发主Tab的消息给所有监听者
if (
this.mainTabInfo &&
tabInfo.tabId === this.mainTabInfo.tabId
) {
this.listeners.forEach((listener) => listener(data));
}
break;
}
}
}
};
// 监听页面卸载事件
window.addEventListener("beforeunload", this.handleBeforeUnload.bind(this));
}
private requestMaster() {
// 发送询问主Tab的消息
this.channel.postMessage({
type: "who-is-master",
tabInfo: this.tabInfo,
} as ChannelMessage);
// 设置定时器检查是否有主Tab
this.setupTimer = setTimeout(() => {
if (!this.mainTabInfo && !this.isMaster()) {
this.becomeMaster();
}
}, 1000);
}
private handleBeforeUnload() {
if (this.isMaster() && this.clients.size > 0) {
// 找到最早创建的客户端Tab
const earliestClient = Array.from(this.clients.values()).sort(
(a, b) => a.createtime - b.createtime
)[0];
if (earliestClient) {
// 通知该Tab成为新的主Tab
this.channel.postMessage({
type: "you-are-new-master",
tabInfo: this.tabInfo,
newTabInfo: { ...earliestClient, status: 0 },
} as ChannelMessage);
}
}
// 断开连接
this.disconnectSocket();
}
private becomeMaster() {
this.retryTime = 0;
this.clients.clear();
this.updateStatus(1); // 正在建立连接
if (this.serverUrl) {
this.connectSocket();
} else {
console.error("Cannot become master: server URL not provided");
this.updateStatus(6); // 连接失败
}
}
private connectSocket() {
this.socket = io(this.serverUrl, this.socketOptions);
this.socket.on("connect", () => {
this.retryTime = 0;
this.updateStatus(2); // 已建立连接
this.channel.postMessage({
type: "i-am-master",
tabInfo: this.tabInfo,
} as ChannelMessage);
// 监听消息并转发给其他Tab和本地监听者
this.socket?.on("chat message", (msg: any) => {
// 转发给其他Tab
this.channel.postMessage({
type: "ws-message",
tabInfo: this.tabInfo,
data: msg,
} as ChannelMessage);
// 转发给本地监听者
this.listeners.forEach((listener) => listener(msg));
});
});
this.socket.on("connect_error", () => {
this.retryTime = this.retryTime + 1;
if (this.retryTime >= MAX_RETRY_TIME) {
this.updateStatus(3);
} else {
this.updateStatus(5); // 已断开连接
}
});
this.socket.on("disconnect", () => {
this.updateStatus(5); // 已断开连接
});
this.socket.on("reconnect", () => {
this.updateStatus(2); // 重新连接成功
});
}
private disconnectSocket() {
if (this.socket) {
this.socket.disconnect();
this.socket = null;
}
}
private updateStatus(newStatus: TSKStats) {
this.tabInfo.status = newStatus;
// 如果是主Tab,广播状态变化
if (this.isMaster()) {
this.channel.postMessage({
type: "master-stats-change",
tabInfo: this.tabInfo,
newTabInfo: { ...this.tabInfo, isMaster: true },
} as ChannelMessage);
}
// 转发给本地监听者
this.statusListeners.forEach((listener) => listener(newStatus));
}
private isMaster() {
return this.socket !== null;
}
private reconnect() {
if (this.socket) {
this.retryTime = 0;
this.updateStatus(5);
this.socket.connect();
} else if (this.serverUrl) {
this.connectSocket();
}
}
// 实现接口方法
connect(url: string, options?: any) {
this.serverUrl = url;
this.socketOptions = options || {};
if (!this.isMaster() && !this.mainTabInfo) {
// 如果没有主Tab且我也不是主Tab,尝试成为主Tab
this.becomeMaster();
}
}
disconnect() {
this.disconnectSocket();
this.updateStatus(3);
this.channel?.close?.();
}
send(event: string, data: any) {
if (this.isMaster()) {
this.socket?.emit(event, data);
} else if (this.mainTabInfo) {
// 如果有主Tab,转发消息给主Tab
this.channel.postMessage({
type: "ws-message",
tabInfo: this.tabInfo,
data: JSON.stringify({ event, data }),
} as ChannelMessage);
} else {
console.error("Cannot send message: no active connection");
}
}
onMessage(callback: (msg: any) => void): () => void {
this.listeners.add(callback);
// 返回取消监听的函数
return () => {
this.listeners.delete(callback);
};
}
onStatusChange(callback: (status: TSKStats) => void): () => void {
this.statusListeners.add(callback);
return () => {
this.statusListeners.delete(callback);
};
}
getStatus() {
return this.tabInfo.status;
}
toReconnection() {
if (this.socket && !this.socket.connected) {
this.reconnect();
return;
}
if (this.mainTabInfo) {
this.channel.postMessage({
type: "reconnect-request",
tabInfo: this.mainTabInfo,
});
return;
}
this.requestMaster();
}
}
const express = require("express");
const http = require("http");
const { Server } = require("socket.io");
const path = require("path");
const app = express();
const server = http.createServer(app);
const io = new Server(server, {
cors: { origin: "*" }, // 开发用,生产环境请配置白名单
});
// 静态资源托管(用于生产时打包后的前端页面)
app.use(express.static(path.join(__dirname, "client", "dist")));
io.on("connection", (socket) => {
let timer = null;
console.log("客户端已连接:", socket.id);
socket.on("chat message", (msg) => {
console.log(`收到消息: ${msg}`);
// 广播给所有客户端
io.emit("chat message", msg);
});
socket.on("disconnect", () => {
console.log("客户端断开:", socket.id);
clearInterval(timer);
});
timer = setInterval(() => {
const date = new Date();
const text = `${date.toLocaleDateString()} ${date.toLocaleTimeString()}`;
console.log({
id: socket.id,
date: text,
});
io.emit(
"chat message",
JSON.stringify({ type: "node", id: socket.id, date: text })
);
}, 10000);
});
const PORT = 3677;
server.listen(PORT, () => {
console.log(`服务器运行在 http://localhost:${PORT}`);
});