-
Notifications
You must be signed in to change notification settings - Fork 4
Expand file tree
/
Copy pathapp.py
More file actions
159 lines (136 loc) · 5.24 KB
/
app.py
File metadata and controls
159 lines (136 loc) · 5.24 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
import json
import logging
from typing import AsyncGenerator
from fastapi.responses import StreamingResponse, FileResponse
from fastapi.staticfiles import StaticFiles
from fastapi import FastAPI, Request, UploadFile, File
from pydantic import BaseModel
import os
import yaml
from core.agent import TinyAgent
# 加载配置文件
config_path = "config.yaml"
if os.path.exists(config_path):
with open(config_path, 'r', encoding='utf-8') as f:
config = yaml.safe_load(f) or {}
else:
config = {}
llm_config = config.get("llm", {})
workspace_path = "./workspace"
outputs_path = os.path.join(workspace_path, "outputs")
os.makedirs(outputs_path, exist_ok=True)
agent = TinyAgent(
workspace_dir=workspace_path,
openai_api_key=llm_config.get("api_key"),
base_url=llm_config.get("base_url"),
model=llm_config.get("model", "gpt-4o-mini")
)
app = FastAPI(title="Tiny Agent Backend")
# 挂载静态资源
app.mount("/static", StaticFiles(directory="static"), name="static")
app.mount("/outputs", StaticFiles(directory=outputs_path), name="outputs")
@app.get("/")
async def root():
"""返回前端主页"""
return FileResponse("static/index.html")
class ChatRequest(BaseModel):
message: str
@app.post("/api/chat")
async def chat_endpoint(req: ChatRequest):
"""
流式对话接口。使用 GET / POST 无所谓,这里为了获取 query 用 POST 接收 message 后,
将其转换成 SSE (Server-Sent Events) 返回。
"""
async def sse_generator() -> AsyncGenerator[str, None]:
# 遍历 agent_loop 的每一个步骤触发的字典事件
async for event in agent.chat_stream(req.message):
# 将 python 字典格式化为 JSON 字符串
data_str = json.dumps(event, ensure_ascii=False)
# SSE 要求格式以 data: 开头,以 \n\n 结尾
yield f"data: {data_str}\n\n"
# 指定媒体类型为 text/event-stream 这是 SSE 标准的配置
return StreamingResponse(sse_generator(), media_type="text/event-stream")
@app.get("/api/status")
async def get_status():
"""获取侧边栏展示的相关状态(刷新并返回技能和支持的工具)"""
agent.skills.load_all_skills() # Dynamic reload
return {
"skills": agent.get_skills_summary(),
"tools": agent.get_tools_summary()
}
@app.get("/api/memory")
async def get_memory():
"""获取当前 agent 的上下文和长期记忆"""
messages = agent.memory.get_messages(window_size=20)
system_prompt = agent.context.build_system_prompt()
long_term_memory = agent.memory.get_long_term_memory()
# 统计信息
stats = {
"total_messages_in_window": len(messages),
"has_long_term_memory": bool(long_term_memory)
}
return {
"stats": stats,
"long_term_memory": long_term_memory,
}
@app.get("/api/history")
async def get_history():
"""获取完整的历史会话和累积 token 消耗用于前端恢复渲染"""
return {
"messages": agent.memory.messages,
"tokens": agent.memory.get_tokens()
}
@app.get("/api/outputs")
async def list_outputs():
"""获取工作区所有的输出文件列表"""
files = []
if os.path.exists(outputs_path):
for f in os.listdir(outputs_path):
file_path = os.path.join(outputs_path, f)
if os.path.isfile(file_path):
stat = os.stat(file_path)
files.append({
"name": f,
"size": stat.st_size,
"mtime": stat.st_mtime
})
# 按修改时间倒序(最新的在前面)
files.sort(key=lambda x: x["mtime"], reverse=True)
return {"files": files}
@app.delete("/api/outputs/{filename}")
async def delete_output(filename: str):
"""Delete a specific file from the workspace outputs directory"""
# Security: Prevent directory traversal
if ".." in filename or "/" in filename or "\\" in filename:
return {"status": "error", "message": "Invalid filename"}
file_path = os.path.join(outputs_path, filename)
if os.path.exists(file_path) and os.path.isfile(file_path):
try:
os.remove(file_path)
return {"status": "success", "message": f"Deleted {filename}"}
except Exception as e:
return {"status": "error", "message": str(e)}
else:
return {"status": "error", "message": "File not found"}
@app.post("/api/upload")
async def upload_file(file: UploadFile = File(...)):
"""上传文件到 workspace outputs 目录"""
if not os.path.exists(outputs_path):
os.makedirs(outputs_path, exist_ok=True)
file_path = os.path.join(outputs_path, file.filename)
try:
content = await file.read()
with open(file_path, "wb") as f:
f.write(content)
return {"status": "success", "filename": file.filename}
except Exception as e:
return {"status": "error", "message": str(e)}
@app.post("/api/clear")
async def clear_memory():
"""清理内存会话记录"""
agent.clear_memory()
return {"status": "ok"}
if __name__ == "__main__":
import uvicorn
logging.info("Starting Tiny Agent server on http://localhost:8000")
uvicorn.run("app:app", host="0.0.0.0", port=8000, reload=True)