Django即时聊天应用实战
一、今日实战内容概览
功能模块 | 核心技术 |
---|---|
聊天室 | Django Channels, WebSocket, Redis |
在线用户列表 | Channel Layers, 实时更新 |
消息持久化 | Django Models, Database |
用户界面 | HTML, CSS, JavaScript |
让我们创建一个完整的即时聊天应用流程图:
二、项目结构
chat_project/
├── chat/
│ ├── __init__.py
│ ├── consumers.py
│ ├── models.py
│ ├── routing.py
│ ├── urls.py
│ ├── views.py
│ └── templates/
│ └── chat/
│ ├── index.html
│ └── room.html
├── static/
│ └── css/
│ └── chat.css
└── chat_project/├── __init__.py├── asgi.py├── settings.py└── urls.py
三、完整代码实现
1. 模型设计
# chat/models.py
from django.db import models
from django.contrib.auth.models import Userclass ChatRoom(models.Model):name = models.CharField(max_length=100, unique=True)created_at = models.DateTimeField(auto_now_add=True)def __str__(self):return self.nameclass ChatMessage(models.Model):room = models.ForeignKey(ChatRoom, on_delete=models.CASCADE)user = models.ForeignKey(User, on_delete=models.CASCADE)content = models.TextField()timestamp = models.DateTimeField(auto_now_add=True)class Meta:ordering = ['timestamp']def __str__(self):return f'{self.user.username}: {self.content[:50]}'
2. Consumer实现
# chat/consumers.py
import json
from channels.generic.websocket import AsyncWebsocketConsumer
from channels.db import database_sync_to_async
from .models import ChatRoom, ChatMessageclass ChatConsumer(AsyncWebsocketConsumer):async def connect(self):self.room_name = self.scope['url_route']['kwargs']['room_name']self.room_group_name = f'chat_{self.room_name}'self.user = self.scope['user']# 加入房间组await self.channel_layer.group_add(self.room_group_name,self.channel_name)# 连接建立await self.accept()# 获取在线用户列表await self.update_user_list(True)# 获取历史消息await self.send_chat_history()async def disconnect(self, close_code):# 离开房间组await self.channel_layer.group_discard(self.room_group_name,self.channel_name)# 更新在线用户列表await self.update_user_list(False)async def receive(self, text_data):text_data_json = json.loads(text_data)message_type = text_data_json.get('type', 'chat_message')if message_type == 'chat_message':message = text_data_json['message']# 保存消息到数据库await self.save_message(message)# 发送消息到房间组await self.channel_layer.group_send(self.room_group_name,{'type': 'chat_message','message': message,'username': self.user.username})async def chat_message(self, event):message = event['message']username = event['username']# 发送消息到WebSocketawait self.send(text_data=json.dumps({'type': 'chat_message','message': message,'username': username,'timestamp': event.get('timestamp', '')}))@database_sync_to_asyncdef save_message(self, message):room = ChatRoom.objects.get(name=self.room_name)ChatMessage.objects.create(room=room,user=self.user,content=message)@database_sync_to_asyncdef get_chat_history(self):room = ChatRoom.objects.get(name=self.room_name)messages = ChatMessage.objects.filter(room=room).select_related('user')return [(msg.user.username, msg.content, msg.timestamp.isoformat()) for msg in messages]async def send_chat_history(self):history = await self.get_chat_history()await self.send(text_data=json.dumps({'type': 'chat_history','messages': history}))async def update_user_list(self, joined):# 更新在线用户列表await self.channel_layer.group_send(self.room_group_name,{'type': 'user_list_changed','username': self.user.username,'joined': joined})async def user_list_changed(self, event):await self.send(text_data=json.dumps({'type': 'user_list_changed','username': event['username'],'joined': event['joined']}))
3. 路由配置
# chat/routing.py
from django.urls import re_path
from . import consumerswebsocket_urlpatterns = [re_path(r'ws/chat/(?P<room_name>\w+)/$', consumers.ChatConsumer.as_asgi()),
]
4. URLs配置
# chat/urls.py
from django.urls import path
from . import viewsapp_name = 'chat'urlpatterns = [path('', views.index, name='index'),path('room/<str:room_name>/', views.room, name='room'),
]
5. 视图实现
# chat/views.py
from django.shortcuts import render, redirect
from django.contrib.auth.decorators import login_required
from .models import ChatRoom, ChatMessage@login_required
def index(request):rooms = ChatRoom.objects.all()return render(request, 'chat/index.html', {'rooms': rooms})@login_required
def room(request, room_name):room, created = ChatRoom.objects.get_or_create(name=room_name)return render(request, 'chat/room.html', {'room': room})
6. 模板实现
<!-- chat/templates/chat/room.html -->
<!DOCTYPE html>
<html>
<head><title>Chat Room - {{ room.name }}</title><link rel="stylesheet" href="/static/css/chat.css">
</head>
<body><div class="chat-container"><div class="sidebar"><h3>Online Users</h3><div id="online-users"></div></div><div class="main-content"><div id="chat-messages"></div><div class="chat-input"><input type="text" id="chat-message-input" placeholder="Type a message..."><button id="chat-message-submit">Send</button></div></div></div><script>const roomName = {{ room.name|safe }};const username = {{ request.user.username|safe }};const chatSocket = new WebSocket('ws://' + window.location.host +'/ws/chat/' + roomName + '/');const onlineUsers = new Set();const messagesDiv = document.querySelector('#chat-messages');const onlineUsersDiv = document.querySelector('#online-users');chatSocket.onmessage = function(e) {const data = JSON.parse(e.data);switch(data.type) {case 'chat_message':appendMessage(data.username, data.message);break;case 'chat_history':data.messages.forEach(msg => {appendMessage(msg[0], msg[1], msg[2]);});break;case 'user_list_changed':if(data.joined) {onlineUsers.add(data.username);} else {onlineUsers.delete(data.username);}updateOnlineUsers();break;}};function appendMessage(username, message, timestamp = null) {const messageDiv = document.createElement('div');messageDiv.className = 'message';const time = timestamp ? new Date(timestamp).toLocaleTimeString() : new Date().toLocaleTimeString();messageDiv.innerHTML = `<span class="username">${username}</span><span class="time">${time}</span><p>${message}</p>`;messagesDiv.appendChild(messageDiv);messagesDiv.scrollTop = messagesDiv.scrollHeight;}function updateOnlineUsers() {onlineUsersDiv.innerHTML = Array.from(onlineUsers).map(user => `<div class="user">${user}</div>`).join('');}document.querySelector('#chat-message-input').focus();document.querySelector('#chat-message-input').onkeyup = function(e) {if (e.keyCode === 13) {document.querySelector('#chat-message-submit').click();}};document.querySelector('#chat-message-submit').onclick = function(e) {const messageInputDom = document.querySelector('#chat-message-input');const message = messageInputDom.value;if(message) {chatSocket.send(JSON.stringify({'type': 'chat_message','message': message}));messageInputDom.value = '';}};</script>
</body>
</html>
7. 样式实现
/* static/css/chat.css */
.chat-container {display: flex;height: 100vh;padding: 20px;box-sizing: border-box;
}.sidebar {width: 200px;background: #f5f5f5;padding: 15px;margin-right: 20px;border-radius: 5px;
}.main-content {flex: 1;display: flex;flex-direction: column;
}#chat-messages {flex: 1;overflow-y: auto;padding: 15px;background: #fff;border: 1px solid #ddd;border-radius: 5px;margin-bottom: 15px;
}.chat-input {display: flex;gap: 10px;
}#chat-message-input {flex: 1;padding: 10px;border: 1px solid #ddd;border-radius: 5px;
}#chat-message-submit {padding: 10px 20px;background: #007bff;color: white;border: none;border-radius: 5px;cursor: pointer;
}.message {margin-bottom: 15px;padding: 10px;background: #f9f9f9;border-radius: 5px;
}.message .username {font-weight: bold;color: #007bff;margin-right: 10px;
}.message .time {color: #666;font-size: 0.8em;
}.user {padding: 5px;margin: 5px 0;background: #fff;border-radius: 3px;
}
四、性能优化
- 消息分页加载:
@database_sync_to_async
def get_chat_history(self):room = ChatRoom.objects.get(name=self.room_name)messages = ChatMessage.objects.filter(room=room)\.select_related('user')\.order_by('-timestamp')[:50]\.reverse()return list(messages)
- Redis连接池配置:
# settings.py
CHANNEL_LAYERS = {"default": {"BACKEND": "channels_redis.core.RedisChannelLayer","CONFIG": {"hosts": [("localhost", 6379)],"capacity": 1500,"expiry": 10,},},
}
- 消息批处理:
class ChatConsumer(AsyncWebsocketConsumer):message_buffer = []async def receive(self, text_data):# ... 现有代码 ...self.message_buffer.append(message)if len(self.message_buffer) >= 10:await self.flush_messages()async def flush_messages(self):if self.message_buffer:messages = self.message_buffer[:]self.message_buffer = []# 批量保存消息await self.save_messages(messages)
五、单元测试
# chat/tests.py
import pytest
from channels.testing import WebsocketCommunicator
from channels.routing import URLRouter
from django.contrib.auth.models import User
from chat.routing import websocket_urlpatterns
from chat.models import ChatRoom, ChatMessage@pytest.mark.asyncio
async def test_chat_consumer():# 创建测试用户和房user = await database_sync_to_async(User.objects.create_user)(username='testuser',password='testpass')room = await database_sync_to_async(ChatRoom.objects.create)(name='testroom')# 创建WebSocket通讯器application = URLRouter(websocket_urlpatterns)communicator = WebsocketCommunicator(application=application,path=f'/ws/chat/testroom/')# 测试连接connected, _ = await communicator.connect()assert connected# 测试发送消息await communicator.send_json_to({'type': 'chat_message','message': 'Hello, World!'})# 测试接收消息response = await communicator.receive_json_from()assert response['type'] == 'chat_message'assert response['message'] == 'Hello, World!'# 测试断开连接await communicator.disconnect()# 验证消息是否保存到数据库messages = await database_sync_to_async(ChatMessage.objects.filter)(room=room).count()assert messages == 1@pytest.mark.asyncio
async def test_user_list_updates():# 创建两个测试用户user1 = await database_sync_to_async(User.objects.create_user)(username='user1',password='testpass')user2 = await database_sync_to_async(User.objects.create_user)(username='user2',password='testpass')# 创建两个通讯器application = URLRouter(websocket_urlpatterns)communicator1 = WebsocketCommunicator(application=application,path='/ws/chat/testroom/')communicator2 = WebsocketCommunicator(application=application,path='/ws/chat/testroom/')# 测试用户1连接await communicator1.connect()response1 = await communicator1.receive_json_from()assert response1['type'] == 'user_list_changed'assert response1['username'] == 'user1'assert response1['joined'] == True# 测试用户2连接await communicator2.connect()response2 = await communicator2.receive_json_from()assert response2['type'] == 'user_list_changed'assert response2['username'] == 'user2'assert response2['joined'] == True# 测试断开连接更新用户列表await communicator1.disconnect()response2 = await communicator2.receive_json_from()assert response2['type'] == 'user_list_changed'assert response2['username'] == 'user1'assert response2['joined'] == Falseawait communicator2.disconnect()
六、部署注意事项
- ASGI服务器配置
# asgi.py
import os
from django.core.asgi import get_asgi_application
from channels.routing import ProtocolTypeRouter, URLRouter
from channels.auth import AuthMiddlewareStack
from chat.routing import websocket_urlpatternsos.environ.setdefault('DJANGO_SETTINGS_MODULE', 'chat_project.settings')application = ProtocolTypeRouter({"http": get_asgi_application(),"websocket": AuthMiddlewareStack(URLRouter(websocket_urlpatterns)),
})
- Daphne服务器启动
daphne -b 0.0.0.0 -p 8000 chat_project.asgi:application
- Nginx配置
upstream channels-backend {server localhost:8000;
}server {listen 80;server_name example.com;location / {proxy_pass http://channels-backend;proxy_http_version 1.1;proxy_set_header Upgrade $http_upgrade;proxy_set_header Connection "upgrade";proxy_set_header Host $host;proxy_set_header X-Real-IP $remote_addr;}
}
七、进阶功能建议
- 添加私聊功能:
- 创建私聊房间
- 一对一消息发送
- 未读消息提醒
- 消息富文本支持:
- Markdown格式
- 表情支持
- 图片上传
- 房间管理功能:
- 房间创建/删除
- 成员管理
- 权限控制
- 消息搜索功能:
- 全文搜索
- 按时间筛选
- 按用户筛选
八、总结
今天我们实现了一个完整的即时聊天应用,包含以下核心功能:
- 实时消息发送和接收
- 在线用户列表管理
- 消息持久化存储
- 聊天历史记录
- 用户认证和授权
实现过程中的关键点:
- WebSocket连接管理
- Channel Layer消息广播
- 数据库操作异步化
- 前端实时更新
- 性能优化
建议练习:
- 添加消息编辑和删除功能
- 实现文件上传和共享
- 添加用户输入状态提示
- 实现消息已读确认
- 添加群组管理功能
通过这个项目,你应该更好地理解了Django Channels的工作原理和实时应用的开发流程。
怎么样今天的内容还满意吗?再次感谢朋友们的观看,关注GZH:凡人的AI工具箱,回复666,送您价值199的AI大礼包。最后,祝您早日实现财务自由,还请给个赞,谢谢!