This repository was archived by the owner on Feb 6, 2026. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathserver.py
More file actions
150 lines (124 loc) · 4.36 KB
/
server.py
File metadata and controls
150 lines (124 loc) · 4.36 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
import asyncio
import json
from datetime import datetime
from pathlib import Path
from typing import Dict, Set
HISTORY_FILE = Path("history.log")
HOST = "0.0.0.0"
PORT = 9999
class ChatServer:
"""
现代化异步聊天室服务端:
- 基于 asyncio streams
- JSON 行协议:每条消息为一行 JSON
- 结构化历史记录写入 history.log(JSON Lines)
默认使用明文 TCP。
如需开启 TLS,请参考 main() 里的注释,配置 ssl.SSLContext 并传给 start_server。
"""
def __init__(self) -> None:
self._clients: Set[asyncio.StreamWriter] = set()
async def handle_client(
self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter
) -> None:
addr = writer.get_extra_info("peername")
client_id = f"{addr[0]}:{addr[1]}" if addr else "unknown"
self._clients.add(writer)
await self._broadcast(
{
"type": "system",
"from": "server",
"text": f"{client_id} joined the chat",
"timestamp": self._now(),
}
)
print(f"Client {client_id} connected")
try:
while True:
line = await reader.readline()
if not line:
break
try:
data = json.loads(line.decode("utf-8").strip())
except json.JSONDecodeError:
# 忽略非法数据
continue
# 标准化消息
message = {
"type": data.get("type", "message"),
"from": data.get("from", client_id),
"text": data.get("text", ""),
"timestamp": self._now(),
}
await self._broadcast(message)
self._append_history(message)
except Exception as exc: # noqa: BLE001
print(f"Error handling client {client_id}: {exc}")
finally:
self._clients.discard(writer)
writer.close()
await writer.wait_closed()
print(f"Client {client_id} disconnected")
await self._broadcast(
{
"type": "system",
"from": "server",
"text": f"{client_id} left the chat",
"timestamp": self._now(),
}
)
async def _broadcast(self, message: Dict) -> None:
"""
将 JSON 消息广播给所有在线客户端。
"""
if not self._clients:
return
data = (json.dumps(message) + "\n").encode("utf-8")
dead_clients: Set[asyncio.StreamWriter] = set()
for writer in self._clients:
try:
writer.write(data)
await writer.drain()
except Exception: # noqa: BLE001
dead_clients.add(writer)
for writer in dead_clients:
self._clients.discard(writer)
writer.close()
try:
await writer.wait_closed()
except Exception:
pass
@staticmethod
def _append_history(message: Dict) -> None:
"""
将消息以 JSON Lines 形式写入历史文件。
"""
HISTORY_FILE.parent.mkdir(parents=True, exist_ok=True)
with HISTORY_FILE.open("a", encoding="utf-8") as f:
f.write(json.dumps(message, ensure_ascii=False) + "\n")
@staticmethod
def _now() -> str:
return datetime.utcnow().isoformat() + "Z"
async def main() -> None:
server = ChatServer()
srv = await asyncio.start_server(
server.handle_client,
host=HOST,
port=PORT,
# 如需开启 TLS:
# 1. 导入 ssl 模块
# import ssl
# 2. 创建 SSL 上下文并加载证书
# ssl_ctx = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
# ssl_ctx.load_cert_chain(certfile="cert.pem", keyfile="key.pem")
# 3. 把下面一行改为 ssl=ssl_ctx
# ssl=ssl_ctx,
)
addr = ", ".join(str(sock.getsockname()) for sock in srv.sockets or [])
print(f"Chat server started on {addr}")
async with srv:
await srv.serve_forever()
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
print("\nServer stopped by user.")