socket

使用socket来实时通信是很常见的行为

某些场景下为了降低后端的压力,需要实现多个前端页面共用一个socket连接

得益于最近写了写浏览器插件,对通信方式略有了解,使用BroadcastChannel可以实现前文所述的需求,大致的交互是这样的

图1:交互示意图
图1:交互示意图

挑选一个页面A和服务器进行数据交互,其他页面通过BroadcastChannel页面A和后台服务进行交互

如何建立这样的机制

  1. 询问是否有页面已建立连接
this.channel = new BroadcastChannel("shared-socket");
this.channel.postMessage({
    type: "who-is-master"
})
this.channel.onmessage = (event)=>{
    const { type, tabInfo, data, newTabInfo } = event.data;
    switch (type) {
        case 'i-am-master':{
            // 说明有人连上了
        }break;
        case 'who-is-master':{
            // 如果连上了就告诉他
        }
    }
}
  1. 如果一直没人回复?那就自己上
this.setupTimer = setTimeout(() => {
    const flag = '没有人回复'
    if (flag) {
        // 自己连
    }
}, 1000);

怎么区分谁是谁呢

回复是回复了,我怎么知道你是谁呢?只能整个身份信息了

this.tabInfo = {
    tabId: uuid()
};

这下好了,大家交流的时候都带上身份信息,实名交流

this.channel.postMessage({
    type: "who-is-master",
    tabInfo: this.tabInfo
})

如果在我连的过程中有人发问咋办

如题,页面A正在连接,但是还没连上,页面B开始问,有没有人连上了,那自然是回复不了他

很好,页面B也开始连接了,

有两个方案:

  1. 方案一:加个状态,页面A开始连之后,就可以回复我在连了,阻止其他人连
  2. 方案二:页面A连上之后广播通知一下我连上了,其他人就可以取消连接了
    • 如果其他人也已经连上了咋整?

    只能熬资历了,后连的自己断掉!

如果建立连接的页面关掉了咋弄

大家都靠页面A和服务器联系,页面A突然说不干了!

只能让下一位选手去连接了,又担心大家打起来,只能让页面A在撂挑子之前挑一个资历最老的了

this.channel.postMessage({
    type: "you-are-new-master",
    tabInfo: this.tabInfo,
    newTabInfo: 最老的家伙,
}

为了防止浏览器突然抽风最好还是加上心跳机制,我比较懒,大家可以自己加一下

完整实现

接口定义

interface.ts
/**
 * 共享Socket服务接口定义
 */
export interface ISharedSocket {
  /**
   * 初始化并连接到WebSocket服务器
   * @param url - WebSocket服务器地址
   * @param options - Socket.IO连接选项
   */
  connect(url: string, options?: any): void;

  /**
   * 断开WebSocket连接
   */
  disconnect(): void;

  /**
   * 发送消息到服务器
   * @param event - 事件名称
   * @param data - 消息数据
   */
  send(event: string, data: any): void;

  /**
   * 监听服务器消息
   * @param callback - 消息回调函数
   * @returns 返回一个用于取消监听的函数
   */
  onMessage(callback: (msg: any) => void): () => void;
  /**
   * 监听ws状态变化消息
   * @param callback - 消息回调函数
   * @returns 返回一个用于取消监听的函数
   */
  onStatusChange(callback: (status: TSKStats) => void): () => void;
  /**
   * 获取当前连接状态
   * @returns 连接状态码
   */
  getStatus(): TSKStats;
  /**
   * 重连
   */
  toReconnection(): void;
}

/** @params Tab连接状态:0-询问中 1-正在建立 2-已建立 3-已断开 4-已连接到主Tab 5-已断开连接,但在尝试中 6-参数异常无法连接 */
export type TSKStats = 0 | 1 | 2 | 3 | 4 | 5 | 6;

/**
 * Tab信息结构
 */
export interface ITabInfo {
  /** Tab唯一标识 */
  tabId: string;
  /** Tab创建时间戳 */
  createtime: number;
  /** Tab连接状态:0-询问中 1-正在建立 2-已建立 3-已断开 4-已连接到主Tab */
  status: TSKStats;
  /** 是否为主Tab */
  isMaster?: boolean;
}

/**
 * 跨Tab通信消息结构
 */
export interface ChannelMessage {
  type: // 询问主tab
  | "who-is-master"
    // 告知我是主tab
    | "i-am-master"
    // ws信息广播
    | "ws-message"
    // 告诉继任者你是新的master
    | "you-are-new-master"
    // 主tab状态改变
    | "master-stats-change"
    // 通知主tab重连
    | "reconnect-request";
  tabInfo: ITabInfo;
  data?: any;
  newTabInfo?: ITabInfo;
}

单例函数

Singleton.ts
export interface SingletonConstructor<T> {
  instance: T;
  getInstance(): T;
  new (): Singleton;
}

export interface Singleton {}

export function createSingleton<T>(): SingletonConstructor<T> {
  return class Singleton {
    static instance: T;
    static getInstance() {
      if (!Singleton.instance) {
        Singleton.instance = new this() as T;
      }
      return Singleton.instance as unknown as T;
    }
  };
}

代码主体

sharedSocket.ts
import { io, ManagerOptions, Socket, SocketOptions } from "socket.io-client";
import { v4 as uuid } from "uuid";
import { createSingleton } from "../Singleton";
import { ChannelMessage, ISharedSocket, ITabInfo, TSKStats } from "./interface";

const MAX_RETRY_TIME = 10;

/**
 * 共享WebSocket单例实现
 */
const SharedSocketBase = createSingleton<ISharedSocket>();
export class SharedSocket extends SharedSocketBase implements ISharedSocket {
  private channel: BroadcastChannel;
  private retryTime = 0;
  private tabInfo: ITabInfo = {
    tabId: uuid(),
    createtime: Date.now(),
    status: 0,
  };
  private socket: Socket | null = null;
  private mainTabInfo: ITabInfo | null = null;
  private clients: Map<string, ITabInfo> = new Map();
  private listeners: Set<(msg: any) => void> = new Set();
  private statusListeners: Set<(status: TSKStats) => void> = new Set();
  private serverUrl: string = "http://localhost:3677";
  private socketOptions: Partial<ManagerOptions & SocketOptions> = {
    reconnectionAttempts: MAX_RETRY_TIME,
    reconnection: true,
    reconnectionDelay: 1000,
  };
  private setupTimer: ReturnType<typeof setTimeout> | null = null;

  constructor() {
    super();
    this.channel = new BroadcastChannel("shared-socket");
    this.setup();
  }

  private setup() {
    this.requestMaster();
    // 监听跨Tab消息
    this.channel.onmessage = (event: MessageEvent<ChannelMessage>) => {
      const { type, tabInfo, data, newTabInfo } = event.data;
      switch (type) {
        case "who-is-master":
          {
            // 如果我是主Tab,响应身份
            if (this.isMaster() && this.tabInfo.status !== 3) {
              this.channel.postMessage({
                type: "i-am-master",
                tabInfo: this.tabInfo,
              } as ChannelMessage);

              // 将询问者添加到客户端列表
              if (tabInfo.tabId !== this.tabInfo.tabId) {
                this.clients.set(tabInfo.tabId, tabInfo);
              }
            }
          }

          break;
        case "reconnect-request":
          {
            // 主Tab收到重连请求
            if (this.isMaster() && tabInfo.tabId === this.tabInfo.tabId) {
              this.reconnect();
            }
          }
          break;
        case "you-are-new-master":
          {
            // 继承主Tab身份
            if (
              this.mainTabInfo &&
              tabInfo.tabId === this.mainTabInfo.tabId &&
              newTabInfo &&
              newTabInfo.tabId === this.tabInfo.tabId
            ) {
              this.becomeMaster();
            } else {
              this.mainTabInfo = null;
              this.requestMaster();
            }
          }

          break;
        case "master-stats-change":
          // 更新主Tab状态
          if (
            this.mainTabInfo &&
            newTabInfo &&
            newTabInfo.tabId === this.mainTabInfo.tabId
          ) {
            this.mainTabInfo = newTabInfo;
            this.statusListeners.forEach((listener) =>
              listener(newTabInfo.status)
            );
          }
          break;
        default: {
          // 更新master信息
          if (this.isMaster()) {
            if (
              tabInfo.createtime < this.tabInfo.createtime ||
              this.tabInfo.status === 3
            ) {
              // 对方创建时间更早,或者我已经断开了,我放弃主Tab身份
              this.disconnectSocket();
              this.clients.clear();
              this.mainTabInfo = tabInfo;
              this.updateStatus(tabInfo.status === 2 ? 4 : tabInfo.status);
            }
          } else if (this.mainTabInfo) {
            // 我不是主Tab,更新主Tab信息
            if (
              tabInfo.createtime < this.mainTabInfo.createtime ||
              this.mainTabInfo.status === 3
            ) {
              this.mainTabInfo = tabInfo;
              this.updateStatus(tabInfo.status === 2 ? 4 : tabInfo.status);
            }
          } else {
            this.mainTabInfo = tabInfo;
            this.updateStatus(tabInfo.status === 2 ? 4 : tabInfo.status);
          }

          switch (type) {
            case "ws-message":
              // 转发主Tab的消息给所有监听者
              if (
                this.mainTabInfo &&
                tabInfo.tabId === this.mainTabInfo.tabId
              ) {
                this.listeners.forEach((listener) => listener(data));
              }
              break;
          }
        }
      }
    };

    // 监听页面卸载事件
    window.addEventListener("beforeunload", this.handleBeforeUnload.bind(this));
  }

  private requestMaster() {
    // 发送询问主Tab的消息
    this.channel.postMessage({
      type: "who-is-master",
      tabInfo: this.tabInfo,
    } as ChannelMessage);
    // 设置定时器检查是否有主Tab
    this.setupTimer = setTimeout(() => {
      if (!this.mainTabInfo && !this.isMaster()) {
        this.becomeMaster();
      }
    }, 1000);
  }

  private handleBeforeUnload() {
    if (this.isMaster() && this.clients.size > 0) {
      // 找到最早创建的客户端Tab
      const earliestClient = Array.from(this.clients.values()).sort(
        (a, b) => a.createtime - b.createtime
      )[0];

      if (earliestClient) {
        // 通知该Tab成为新的主Tab
        this.channel.postMessage({
          type: "you-are-new-master",
          tabInfo: this.tabInfo,
          newTabInfo: { ...earliestClient, status: 0 },
        } as ChannelMessage);
      }
    }

    // 断开连接
    this.disconnectSocket();
  }

  private becomeMaster() {
    this.retryTime = 0;
    this.clients.clear();

    this.updateStatus(1); // 正在建立连接

    if (this.serverUrl) {
      this.connectSocket();
    } else {
      console.error("Cannot become master: server URL not provided");
      this.updateStatus(6); // 连接失败
    }
  }

  private connectSocket() {
    this.socket = io(this.serverUrl, this.socketOptions);

    this.socket.on("connect", () => {
      this.retryTime = 0;
      this.updateStatus(2); // 已建立连接
      this.channel.postMessage({
        type: "i-am-master",
        tabInfo: this.tabInfo,
      } as ChannelMessage);

      // 监听消息并转发给其他Tab和本地监听者
      this.socket?.on("chat message", (msg: any) => {
        // 转发给其他Tab
        this.channel.postMessage({
          type: "ws-message",
          tabInfo: this.tabInfo,
          data: msg,
        } as ChannelMessage);

        // 转发给本地监听者
        this.listeners.forEach((listener) => listener(msg));
      });
    });

    this.socket.on("connect_error", () => {
      this.retryTime = this.retryTime + 1;
      if (this.retryTime >= MAX_RETRY_TIME) {
        this.updateStatus(3);
      } else {
        this.updateStatus(5); // 已断开连接
      }
    });

    this.socket.on("disconnect", () => {
      this.updateStatus(5); // 已断开连接
    });

    this.socket.on("reconnect", () => {
      this.updateStatus(2); // 重新连接成功
    });
  }

  private disconnectSocket() {
    if (this.socket) {
      this.socket.disconnect();
      this.socket = null;
    }
  }

  private updateStatus(newStatus: TSKStats) {
    this.tabInfo.status = newStatus;

    // 如果是主Tab,广播状态变化
    if (this.isMaster()) {
      this.channel.postMessage({
        type: "master-stats-change",
        tabInfo: this.tabInfo,
        newTabInfo: { ...this.tabInfo, isMaster: true },
      } as ChannelMessage);
    }

    // 转发给本地监听者
    this.statusListeners.forEach((listener) => listener(newStatus));
  }

  private isMaster() {
    return this.socket !== null;
  }

  private reconnect() {
    if (this.socket) {
      this.retryTime = 0;
      this.updateStatus(5);
      this.socket.connect();
    } else if (this.serverUrl) {
      this.connectSocket();
    }
  }

  // 实现接口方法
  connect(url: string, options?: any) {
    this.serverUrl = url;
    this.socketOptions = options || {};

    if (!this.isMaster() && !this.mainTabInfo) {
      // 如果没有主Tab且我也不是主Tab,尝试成为主Tab
      this.becomeMaster();
    }
  }

  disconnect() {
    this.disconnectSocket();
    this.updateStatus(3);
    this.channel?.close?.();
  }

  send(event: string, data: any) {
    if (this.isMaster()) {
      this.socket?.emit(event, data);
    } else if (this.mainTabInfo) {
      // 如果有主Tab,转发消息给主Tab
      this.channel.postMessage({
        type: "ws-message",
        tabInfo: this.tabInfo,
        data: JSON.stringify({ event, data }),
      } as ChannelMessage);
    } else {
      console.error("Cannot send message: no active connection");
    }
  }

  onMessage(callback: (msg: any) => void): () => void {
    this.listeners.add(callback);

    // 返回取消监听的函数
    return () => {
      this.listeners.delete(callback);
    };
  }

  onStatusChange(callback: (status: TSKStats) => void): () => void {
    this.statusListeners.add(callback);
    return () => {
      this.statusListeners.delete(callback);
    };
  }

  getStatus() {
    return this.tabInfo.status;
  }

  toReconnection() {
    if (this.socket && !this.socket.connected) {
      this.reconnect();
      return;
    }
    if (this.mainTabInfo) {
      this.channel.postMessage({
        type: "reconnect-request",
        tabInfo: this.mainTabInfo,
      });
      return;
    }

    this.requestMaster();
  }
}

测试使用的服务器代码

index.js
const express = require("express");
const http = require("http");
const { Server } = require("socket.io");
const path = require("path");

const app = express();
const server = http.createServer(app);
const io = new Server(server, {
  cors: { origin: "*" }, // 开发用,生产环境请配置白名单
});

// 静态资源托管(用于生产时打包后的前端页面)
app.use(express.static(path.join(__dirname, "client", "dist")));

io.on("connection", (socket) => {
  let timer = null;
  console.log("客户端已连接:", socket.id);

  socket.on("chat message", (msg) => {
    console.log(`收到消息: ${msg}`);
    // 广播给所有客户端
    io.emit("chat message", msg);
  });

  socket.on("disconnect", () => {
    console.log("客户端断开:", socket.id);
    clearInterval(timer);
  });

  timer = setInterval(() => {
    const date = new Date();
    const text = `${date.toLocaleDateString()} ${date.toLocaleTimeString()}`;
    console.log({
      id: socket.id,
      date: text,
    });
    io.emit(
      "chat message",
      JSON.stringify({ type: "node", id: socket.id, date: text })
    );
  }, 10000);
});

const PORT = 3677;
server.listen(PORT, () => {
  console.log(`服务器运行在 http://localhost:${PORT}`);
});