返回文章列表
Node.js + Socket.io 实现实时通信
实时通信是现代应用的核心能力之一——即时聊天、在线状态、实时通知、协作编辑都依赖它。本文用 Node.js + Socket.io 从零搭建一套完整的实时通信系统,并在 Flutter 端完成对接。
一、WebSocket vs HTTP
HTTP 是请求-响应模式,客户端发起请求,服务器返回响应,一次就结束。而 WebSocket 建立的是一个持久连接,双方可以随时主动发送数据:
- HTTP:半双工、短连接、开销大(每次都要握手)
- WebSocket:全双工、长连接、开销小(一次握手持续通信)
Socket.io 在原生 WebSocket 之上做了封装,提供了自动重连、房间管理、命名空间、降级到轮询等能力,让实时通信的开发体验好很多。
二、后端搭建
2.1 基础服务器
const express = require('express');
const http = require('http');
const { Server } = require('socket.io');
const jwt = require('jsonwebtoken');
const app = express();
const server = http.createServer(app);
const io = new Server(server, {
cors: {
origin: process.env.CLIENT_URL || '*',
methods: ['GET', 'POST'],
},
// 传输配置
transports: ['websocket', 'polling'],
pingInterval: 10000,
pingTimeout: 5000,
});
// JWT 认证中间件
io.use((socket, next) => {
try {
const token = socket.handshake.auth.token;
if (!token) return next(new Error('未提供认证 Token'));
const decoded = jwt.verify(token, process.env.JWT_SECRET);
socket.user = decoded;
next();
} catch (error) {
next(new Error('认证失败'));
}
});
// 在线用户管理
const onlineUsers = new Map();
io.on('connection', (socket) => {
const userId = socket.user.id;
// 上线:加入个人房间 + 更新在线状态
onlineUsers.set(userId, {
socketId: socket.id,
joinedAt: Date.now(),
});
socket.join(`user_${userId}`);
// 广播上线通知
io.emit('user:online', { userId, online: true });
io.emit('online:count', { count: onlineUsers.size });
// ========== 聊天消息 ==========
socket.on('chat:send', async (data) => {
const { conversationId, content, type = 'text' } = data;
// 验证消息
if (!conversationId || !content || content.trim().length === 0) {
return;
}
if (content.length > 5000) {
socket.emit('chat:error', { message: '消息长度不能超过5000字' });
return;
}
// 存储消息到数据库
const message = await Message.create({
conversationId,
sender: userId,
content: content.trim(),
type,
status: 'sent',
});
// 发送给对话中的所有用户
io.to(`conversation_${conversationId}`).emit('chat:newMessage', {
_id: message._id,
conversationId,
sender: userId,
content: message.content,
type: message.type,
status: 'sent',
createdAt: message.createdAt,
});
// 更新对话的最后一条消息
await Conversation.findByIdAndUpdate(conversationId, {
lastMessage: message.content,
lastMessageAt: message.createdAt,
});
});
// ========== 消息已读 ==========
socket.on('chat:read', async ({ conversationId, messageIds }) => {
await Message.updateMany(
{
_id: { $in: messageIds },
conversationId,
sender: { $ne: userId },
},
{ status: 'read' }
);
// 通知发送者:消息已读
socket.to(`conversation_${conversationId}`).emit('chat:messagesRead', {
conversationId,
readBy: userId,
});
});
// ========== 正在输入 ==========
socket.on('chat:typing', ({ conversationId }) => {
socket.to(`conversation_${conversationId}`).emit('chat:userTyping', {
userId,
conversationId,
});
});
// ========== 加入对话房间 ==========
socket.on('conversation:join', (conversationId) => {
socket.join(`conversation_${conversationId}`);
});
socket.on('conversation:leave', (conversationId) => {
socket.leave(`conversation_${conversationId}`);
});
// ========== 断线处理 ==========
socket.on('disconnect', (reason) => {
onlineUsers.delete(userId);
// 广播下线通知(延迟5秒,避免短暂断线闪烁)
setTimeout(() => {
if (!onlineUsers.has(userId)) {
io.emit('user:online', { userId, online: false });
io.emit('online:count', { count: onlineUsers.size });
}
}, 5000);
});
// 错误处理
socket.on('error', (error) => {
console.error(`Socket 错误 [用户 ${userId}]:`, error.message);
});
});
const PORT = process.env.PORT || 3001;
server.listen(PORT, () => {
});
2.2 Express 路由集成
// 将 io 实例共享给 Express 路由
app.set('io', io);
// 在 Express 路由中发送通知
const sendNotification = (userId, event, data) => {
const io = req.app.get('io');
io.to(`user_${userId}`).emit(event, data);
};
// 使用示例:系统通知
app.post('/api/v1/notifications/send', protect, async (req, res) => {
const { userId, title, body, type } = req.body;
// 存入数据库
const notification = await Notification.create({
user: userId,
title,
body,
type,
});
// 通过 Socket.io 实时推送
const io = req.app.get('io');
io.to(`user_${userId}`).emit('notification:new', {
_id: notification._id,
title: notification.title,
body: notification.body,
type: notification.type,
});
successResponse(res, notification);
});
三、Flutter 端对接
3.1 Socket 服务封装
import 'package:socket_io_client/socket_io_client.dart' as IO;
class SocketService {
static final SocketService _instance = SocketService._();
factory SocketService() => _instance;
IO.Socket? _socket;
bool _isConnected = false;
bool get isConnected => _isConnected;
void connect(String token) {
_socket = IO.io(
'http://your-server.com',
IO.OptionBuilder()
.setTransports(['websocket'])
.enableAutoReconnect()
.setReconnectionAttempts(5)
.setReconnectionDelay(2000)
.setQuery({'token': token})
.build(),
);
_socket!.onConnect((_) {
print('Socket 已连接');
_isConnected = true;
});
_socket!.onDisconnect((_) {
print('Socket 已断开');
_isConnected = false;
});
_socket!.onConnectError((error) {
print('Socket 连接错误: $error');
});
}
// 加入对话
void joinConversation(String conversationId) {
_socket?.emit('conversation:join', conversationId);
}
// 离开对话
void leaveConversation(String conversationId) {
_socket?.emit('conversation:leave', conversationId);
}
// 发送消息
void sendMessage({
required String conversationId,
required String content,
String type = 'text',
}) {
_socket?.emit('chat:send', {
'conversationId': conversationId,
'content': content,
'type': type,
});
}
// 正在输入
void sendTyping(String conversationId) {
_socket?.emit('chat:typing', {
'conversationId': conversationId,
});
}
// 消息已读
void markAsRead(String conversationId, List<String> messageIds) {
_socket?.emit('chat:read', {
'conversationId': conversationId,
'messageIds': messageIds,
});
}
// 监听新消息
void onNewMessage(Function(dynamic) callback) {
_socket?.on('chat:newMessage', callback);
}
// 监听用户正在输入
void onUserTyping(Function(dynamic) callback) {
_socket?.on('chat:userTyping', callback);
}
// 监听消息已读
void onMessagesRead(Function(dynamic) callback) {
_socket?.on('chat:messagesRead', callback);
}
// 监听通知
void onNotification(Function(dynamic) callback) {
_socket?.on('notification:new', callback);
}
void disconnect() {
_socket?.dispose();
_socket = null;
_isConnected = false;
}
}
3.2 在聊天页面中使用
class ChatPage extends StatefulWidget {
@override
State<ChatPage> createState() => _ChatPageState();
}
class _ChatPageState extends State<ChatPage> {
final _socket = SocketService();
final _textController = TextEditingController();
final _scrollController = ScrollController();
List<Message> _messages = [];
String _conversationId = '';
@override
void initState() {
super.initState();
_setupSocket();
}
void _setupSocket() {
// 加入对话房间
_socket.joinConversation(_conversationId);
// 监听新消息
_socket.onNewMessage((data) {
setState(() {
_messages.add(Message.fromJson(data));
});
_scrollToBottom();
});
// 监听对方正在输入
_socket.onUserTyping((data) {
// 显示"对方正在输入..."
});
// 监听消息已读
_socket.onMessagesRead((data) {
// 更新消息状态为已读
});
}
void _sendMessage() {
final text = _textController.text.trim();
if (text.isEmpty) return;
_socket.sendMessage(
conversationId: _conversationId,
content: text,
);
_textController.clear();
}
void _scrollToBottom() {
WidgetsBinding.instance.addPostFrameCallback((_) {
_scrollController.animateTo(
_scrollController.position.maxScrollExtent,
duration: Duration(milliseconds: 300),
curve: Curves.easeOut,
);
});
}
@override
void dispose() {
_socket.leaveConversation(_conversationId);
_textController.dispose();
_scrollController.dispose();
super.dispose();
}
@override
Widget build(BuildContext context) {
return Scaffold(
appBar: AppBar(title: Text('聊天')),
body: Column(
children: [
Expanded(
child: ListView.builder(
controller: _scrollController,
itemCount: _messages.length,
itemBuilder: (context, index) {
final msg = _messages[index];
return MessageBubble(message: msg);
},
),
),
_buildInputArea(),
],
),
);
}
}
四、生产环境注意事项
- Redis Adapter:多实例部署时必须用 Redis 作为消息中间件
- 心跳检测:配置合理的 pingInterval 和 pingTimeout
- 限流:单用户每秒消息频率上限,防止刷屏
- 消息持久化:离线消息需要在用户上线后推送
- 断线重连:客户端自动重连 + 重连后拉取离线消息
// Redis Adapter(多实例部署)
const { createAdapter } = require('@socket.io/redis-adapter');
const { createClient } = require('redis');
const pubClient = createClient({ url: process.env.REDIS_URL });
const subClient = pubClient.duplicate();
Promise.all([pubClient.connect(), subClient.connect()]).then(() => {
io.adapter(createAdapter(pubClient, subClient));
});
总结
Socket.io 大幅降低了 WebSocket 的开发门槛,配合 JWT 认证、房间管理、消息持久化,可以快速构建一套生产级的实时通信系统。Flutter 端的 socket_io_client 插件也足够成熟,支持自动重连和事件监听。希望这篇文章能帮你在自己的项目中跑通实时通信。