首页 关于 文章 作品集 工具 联系
返回文章列表

Node.js + Socket.io 实现实时通信

实时通信是现代应用的核心能力之一——即时聊天、在线状态、实时通知、协作编辑都依赖它。本文用 Node.js + Socket.io 从零搭建一套完整的实时通信系统,并在 Flutter 端完成对接。

一、WebSocket vs HTTP

HTTP 是请求-响应模式,客户端发起请求,服务器返回响应,一次就结束。而 WebSocket 建立的是一个持久连接,双方可以随时主动发送数据:

  • HTTP:半双工、短连接、开销大(每次都要握手)
  • WebSocket:全双工、长连接、开销小(一次握手持续通信)

Socket.io 在原生 WebSocket 之上做了封装,提供了自动重连、房间管理、命名空间、降级到轮询等能力,让实时通信的开发体验好很多。

二、后端搭建

2.1 基础服务器

const express = require('express');
const http = require('http');
const { Server } = require('socket.io');
const jwt = require('jsonwebtoken');

const app = express();
const server = http.createServer(app);
const io = new Server(server, {
  cors: {
    origin: process.env.CLIENT_URL || '*',
    methods: ['GET', 'POST'],
  },
  // 传输配置
  transports: ['websocket', 'polling'],
  pingInterval: 10000,
  pingTimeout: 5000,
});

// JWT 认证中间件
io.use((socket, next) => {
  try {
    const token = socket.handshake.auth.token;
    if (!token) return next(new Error('未提供认证 Token'));

    const decoded = jwt.verify(token, process.env.JWT_SECRET);
    socket.user = decoded;
    next();
  } catch (error) {
    next(new Error('认证失败'));
  }
});

// 在线用户管理
const onlineUsers = new Map();

io.on('connection', (socket) => {
  const userId = socket.user.id;
  

  // 上线:加入个人房间 + 更新在线状态
  onlineUsers.set(userId, {
    socketId: socket.id,
    joinedAt: Date.now(),
  });
  socket.join(`user_${userId}`);

  // 广播上线通知
  io.emit('user:online', { userId, online: true });
  io.emit('online:count', { count: onlineUsers.size });

  // ========== 聊天消息 ==========

  socket.on('chat:send', async (data) => {
    const { conversationId, content, type = 'text' } = data;

    // 验证消息
    if (!conversationId || !content || content.trim().length === 0) {
      return;
    }
    if (content.length > 5000) {
      socket.emit('chat:error', { message: '消息长度不能超过5000字' });
      return;
    }

    // 存储消息到数据库
    const message = await Message.create({
      conversationId,
      sender: userId,
      content: content.trim(),
      type,
      status: 'sent',
    });

    // 发送给对话中的所有用户
    io.to(`conversation_${conversationId}`).emit('chat:newMessage', {
      _id: message._id,
      conversationId,
      sender: userId,
      content: message.content,
      type: message.type,
      status: 'sent',
      createdAt: message.createdAt,
    });

    // 更新对话的最后一条消息
    await Conversation.findByIdAndUpdate(conversationId, {
      lastMessage: message.content,
      lastMessageAt: message.createdAt,
    });
  });

  // ========== 消息已读 ==========

  socket.on('chat:read', async ({ conversationId, messageIds }) => {
    await Message.updateMany(
      {
        _id: { $in: messageIds },
        conversationId,
        sender: { $ne: userId },
      },
      { status: 'read' }
    );

    // 通知发送者:消息已读
    socket.to(`conversation_${conversationId}`).emit('chat:messagesRead', {
      conversationId,
      readBy: userId,
    });
  });

  // ========== 正在输入 ==========

  socket.on('chat:typing', ({ conversationId }) => {
    socket.to(`conversation_${conversationId}`).emit('chat:userTyping', {
      userId,
      conversationId,
    });
  });

  // ========== 加入对话房间 ==========

  socket.on('conversation:join', (conversationId) => {
    socket.join(`conversation_${conversationId}`);
    
  });

  socket.on('conversation:leave', (conversationId) => {
    socket.leave(`conversation_${conversationId}`);
  });

  // ========== 断线处理 ==========

  socket.on('disconnect', (reason) => {
    
    onlineUsers.delete(userId);

    // 广播下线通知(延迟5秒,避免短暂断线闪烁)
    setTimeout(() => {
      if (!onlineUsers.has(userId)) {
        io.emit('user:online', { userId, online: false });
        io.emit('online:count', { count: onlineUsers.size });
      }
    }, 5000);
  });

  // 错误处理
  socket.on('error', (error) => {
    console.error(`Socket 错误 [用户 ${userId}]:`, error.message);
  });
});

const PORT = process.env.PORT || 3001;
server.listen(PORT, () => {
  
});

2.2 Express 路由集成

// 将 io 实例共享给 Express 路由
app.set('io', io);

// 在 Express 路由中发送通知
const sendNotification = (userId, event, data) => {
  const io = req.app.get('io');
  io.to(`user_${userId}`).emit(event, data);
};

// 使用示例:系统通知
app.post('/api/v1/notifications/send', protect, async (req, res) => {
  const { userId, title, body, type } = req.body;

  // 存入数据库
  const notification = await Notification.create({
    user: userId,
    title,
    body,
    type,
  });

  // 通过 Socket.io 实时推送
  const io = req.app.get('io');
  io.to(`user_${userId}`).emit('notification:new', {
    _id: notification._id,
    title: notification.title,
    body: notification.body,
    type: notification.type,
  });

  successResponse(res, notification);
});

三、Flutter 端对接

3.1 Socket 服务封装

import 'package:socket_io_client/socket_io_client.dart' as IO;

class SocketService {
  static final SocketService _instance = SocketService._();
  factory SocketService() => _instance;

  IO.Socket? _socket;
  bool _isConnected = false;

  bool get isConnected => _isConnected;

  void connect(String token) {
    _socket = IO.io(
      'http://your-server.com',
      IO.OptionBuilder()
          .setTransports(['websocket'])
          .enableAutoReconnect()
          .setReconnectionAttempts(5)
          .setReconnectionDelay(2000)
          .setQuery({'token': token})
          .build(),
    );

    _socket!.onConnect((_) {
      print('Socket 已连接');
      _isConnected = true;
    });

    _socket!.onDisconnect((_) {
      print('Socket 已断开');
      _isConnected = false;
    });

    _socket!.onConnectError((error) {
      print('Socket 连接错误: $error');
    });
  }

  // 加入对话
  void joinConversation(String conversationId) {
    _socket?.emit('conversation:join', conversationId);
  }

  // 离开对话
  void leaveConversation(String conversationId) {
    _socket?.emit('conversation:leave', conversationId);
  }

  // 发送消息
  void sendMessage({
    required String conversationId,
    required String content,
    String type = 'text',
  }) {
    _socket?.emit('chat:send', {
      'conversationId': conversationId,
      'content': content,
      'type': type,
    });
  }

  // 正在输入
  void sendTyping(String conversationId) {
    _socket?.emit('chat:typing', {
      'conversationId': conversationId,
    });
  }

  // 消息已读
  void markAsRead(String conversationId, List<String> messageIds) {
    _socket?.emit('chat:read', {
      'conversationId': conversationId,
      'messageIds': messageIds,
    });
  }

  // 监听新消息
  void onNewMessage(Function(dynamic) callback) {
    _socket?.on('chat:newMessage', callback);
  }

  // 监听用户正在输入
  void onUserTyping(Function(dynamic) callback) {
    _socket?.on('chat:userTyping', callback);
  }

  // 监听消息已读
  void onMessagesRead(Function(dynamic) callback) {
    _socket?.on('chat:messagesRead', callback);
  }

  // 监听通知
  void onNotification(Function(dynamic) callback) {
    _socket?.on('notification:new', callback);
  }

  void disconnect() {
    _socket?.dispose();
    _socket = null;
    _isConnected = false;
  }
}

3.2 在聊天页面中使用

class ChatPage extends StatefulWidget {
  @override
  State<ChatPage> createState() => _ChatPageState();
}

class _ChatPageState extends State<ChatPage> {
  final _socket = SocketService();
  final _textController = TextEditingController();
  final _scrollController = ScrollController();
  List<Message> _messages = [];
  String _conversationId = '';

  @override
  void initState() {
    super.initState();
    _setupSocket();
  }

  void _setupSocket() {
    // 加入对话房间
    _socket.joinConversation(_conversationId);

    // 监听新消息
    _socket.onNewMessage((data) {
      setState(() {
        _messages.add(Message.fromJson(data));
      });
      _scrollToBottom();
    });

    // 监听对方正在输入
    _socket.onUserTyping((data) {
      // 显示"对方正在输入..."
    });

    // 监听消息已读
    _socket.onMessagesRead((data) {
      // 更新消息状态为已读
    });
  }

  void _sendMessage() {
    final text = _textController.text.trim();
    if (text.isEmpty) return;

    _socket.sendMessage(
      conversationId: _conversationId,
      content: text,
    );
    _textController.clear();
  }

  void _scrollToBottom() {
    WidgetsBinding.instance.addPostFrameCallback((_) {
      _scrollController.animateTo(
        _scrollController.position.maxScrollExtent,
        duration: Duration(milliseconds: 300),
        curve: Curves.easeOut,
      );
    });
  }

  @override
  void dispose() {
    _socket.leaveConversation(_conversationId);
    _textController.dispose();
    _scrollController.dispose();
    super.dispose();
  }

  @override
  Widget build(BuildContext context) {
    return Scaffold(
      appBar: AppBar(title: Text('聊天')),
      body: Column(
        children: [
          Expanded(
            child: ListView.builder(
              controller: _scrollController,
              itemCount: _messages.length,
              itemBuilder: (context, index) {
                final msg = _messages[index];
                return MessageBubble(message: msg);
              },
            ),
          ),
          _buildInputArea(),
        ],
      ),
    );
  }
}

四、生产环境注意事项

  • Redis Adapter:多实例部署时必须用 Redis 作为消息中间件
  • 心跳检测:配置合理的 pingInterval 和 pingTimeout
  • 限流:单用户每秒消息频率上限,防止刷屏
  • 消息持久化:离线消息需要在用户上线后推送
  • 断线重连:客户端自动重连 + 重连后拉取离线消息
// Redis Adapter(多实例部署)
const { createAdapter } = require('@socket.io/redis-adapter');
const { createClient } = require('redis');

const pubClient = createClient({ url: process.env.REDIS_URL });
const subClient = pubClient.duplicate();

Promise.all([pubClient.connect(), subClient.connect()]).then(() => {
  io.adapter(createAdapter(pubClient, subClient));
});

总结

Socket.io 大幅降低了 WebSocket 的开发门槛,配合 JWT 认证、房间管理、消息持久化,可以快速构建一套生产级的实时通信系统。Flutter 端的 socket_io_client 插件也足够成熟,支持自动重连和事件监听。希望这篇文章能帮你在自己的项目中跑通实时通信。