本文面向需要长连接与多人交互的实时应用,总结“会话管理 → 匹配流程 → 房间模型”三方面的工程实践与可扩展方案。
| 对象 | 职责 | 关键状态 |
|---|---|---|
| 会话 Session | 承载一条长连接,保存身份、最近心跳、所在房间 | CONNECTED / AUTHED / CLOSED |
| 匹配 Match | 把多个待服务的用户组织成一组 | QUEUED / MATCHED / CANCELLED |
| 房间 Room | 组内协作空间,管理成员与广播 | OPEN / ACTIVE / CLOSED |
建议分离多种索引,降低耦合并便于快速定位:
import java.util.concurrent.*;
import javax.websocket.Session;
class SessionHub {
// 连接 ID -> Session
static final ConcurrentHashMap SESSIONS = new ConcurrentHashMap<>();
// 用户 ID -> 连接 ID
static final ConcurrentHashMap USER2SID = new ConcurrentHashMap<>();
// 连接 ID -> 最近心跳时间
static final ConcurrentHashMap LAST_PING = new ConcurrentHashMap<>();
static void onOpen(String sid, Session s) {
SESSIONS.put(sid, s);
LAST_PING.put(sid, System.currentTimeMillis());
}
static void onClose(String sid) {
SESSIONS.remove(sid);
LAST_PING.remove(sid);
// 同时清理与用户 ID 的关联
USER2SID.entrySet().removeIf(e -> sid.equals(e.getValue()));
}
}
心跳策略:客户端每 30s 发送一次 ping,服务端回复 pong 并更新时间戳;守护线程定期扫描超时连接并关闭。
class Matcher {
private static final ConcurrentLinkedQueue WAIT = new ConcurrentLinkedQueue<>();
static void enqueue(String userId) { WAIT.offer(userId); }
// 周期调度:每 X 毫秒尝试组队
static void tick() {
int size = 2; // 例如两人一组
while (WAIT.size() >= size) {
String a = WAIT.poll(), b = WAIT.poll();
RoomHub.createRoom(a, b);
}
}
static boolean cancel(String userId) {
return WAIT.remove(userId);
}
}
实践要点:可根据等级/地区/延时等维度分段队列;超时升级策略避免长时间等待。
广播时采用快照避免与成员增删并发冲突,成员集合宜用写时复制或在广播前拷贝快照。
class RoomHub {
static final ConcurrentHashMap> ROOM_MEMBERS = new ConcurrentHashMap<>();
static String createRoom(String... users) {
String roomId = "r-" + System.currentTimeMillis();
CopyOnWriteArraySet set = new CopyOnWriteArraySet<>();
for (String u: users) set.add(u);
ROOM_MEMBERS.put(roomId, set);
broadcast(roomId, "{\\"type\\":\\"room.ready\\",\\"roomId\\":\\""+roomId+"\\"}");
return roomId;
}
static void broadcast(String roomId, String msg) {
CopyOnWriteArraySet members = ROOM_MEMBERS.get(roomId);
if (members == null) return;
for (String uid : members) {
String sid = SessionHub.USER2SID.get(uid);
Session s = sid == null ? null : SessionHub.SESSIONS.get(sid);
if (s != null && s.isOpen()) {
try { s.getBasicRemote().sendText(msg); } catch (Exception ignored) {}
}
}
}
static void leave(String roomId, String uid) {
CopyOnWriteArraySet members = ROOM_MEMBERS.get(roomId);
if (members != null) {
members.remove(uid);
if (members.isEmpty()) ROOM_MEMBERS.remove(roomId);
}
}
}
本文示例为教学用途,可按业务场景扩展鉴权、路由与跨节点广播等能力。