-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathserver.py
More file actions
139 lines (110 loc) · 5.04 KB
/
server.py
File metadata and controls
139 lines (110 loc) · 5.04 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
"""
飞书访谈预约机器人 — FastAPI 主服务。
路由:
POST /calendly — Calendly webhook(invitee.created)
POST /feishu-event — 飞书事件订阅(@mention 问答)
GET /health — 健康检查
"""
import logging
import sys
from contextlib import asynccontextmanager
import uvicorn
from fastapi import FastAPI, HTTPException, Request, Response
import config
from handlers.calendly import handle_invitee_created, verify_calendly_signature
from handlers.feishu_event import handle_message_receive, handle_url_verification
# ── 日志配置 ──────────────────────────────────────────────────────────────────
logging.basicConfig(
level=getattr(logging, config.LOG_LEVEL.upper(), logging.INFO),
format="%(asctime)s [%(levelname)s] %(name)s — %(message)s",
handlers=[logging.StreamHandler(sys.stdout)],
)
logger = logging.getLogger(__name__)
# ── 应用生命周期 ───────────────────────────────────────────────────────────────
@asynccontextmanager
async def lifespan(app: FastAPI):
logger.info("飞书访谈机器人启动")
_check_config()
yield
logger.info("飞书访谈机器人关闭")
def _check_config() -> None:
missing = []
required = {
"FEISHU_APP_ID": config.FEISHU_APP_ID,
"FEISHU_APP_SECRET": config.FEISHU_APP_SECRET,
"FEISHU_GROUP_CHAT_ID": config.FEISHU_GROUP_CHAT_ID,
"ANTHROPIC_API_KEY": config.ANTHROPIC_API_KEY,
}
for name, val in required.items():
if not val:
missing.append(name)
if missing:
logger.warning(f"以下环境变量未设置,部分功能可能无法使用: {', '.join(missing)}")
app = FastAPI(
title="飞书访谈预约机器人",
version="1.0.0",
lifespan=lifespan,
)
# ── /health ────────────────────────────────────────────────────────────────────
@app.api_route("/health", methods=["GET", "HEAD"])
async def health():
import state
interview = state.get_interview()
return {
"status": "ok",
"current_interview": {
"name": interview.name,
"email": interview.email,
"scheduled_at": interview.scheduled_at,
} if interview else None,
}
# ── /webhooks/calendly ─────────────────────────────────────────────────────────
@app.post("/webhooks/calendly")
async def calendly_webhook(request: Request):
raw_body = await request.body()
sig_header = request.headers.get("Calendly-Webhook-Signature", "")
# 签名验证
if sig_header and not verify_calendly_signature(raw_body, sig_header):
raise HTTPException(status_code=401, detail="Invalid Calendly signature")
try:
payload = await request.json()
except Exception:
raise HTTPException(status_code=400, detail="Invalid JSON body")
event_type = payload.get("event", "")
logger.info(f"收到 Calendly 事件: {event_type}")
if event_type == "invitee.created":
# 异步处理,避免阻塞 webhook 响应(Calendly 要求 200 内 3s 返回)
import asyncio
asyncio.create_task(_run_in_thread(handle_invitee_created, payload))
return Response(status_code=200)
async def _run_in_thread(func, *args):
"""在线程池中运行同步函数,避免阻塞事件循环。"""
import asyncio
loop = asyncio.get_event_loop()
await loop.run_in_executor(None, func, *args)
# ── /feishu-event ──────────────────────────────────────────────────────────────
@app.post("/feishu-event")
async def feishu_event(request: Request):
try:
body = await request.json()
except Exception:
raise HTTPException(status_code=400, detail="Invalid JSON body")
# 飞书事件订阅握手验证
if body.get("type") == "url_verification":
return handle_url_verification(body)
# 事件头
header = body.get("header", {})
event_type = header.get("event_type", "")
if event_type == "im.message.receive_v1":
import asyncio
asyncio.create_task(_run_in_thread(handle_message_receive, body))
return {"code": 0}
# ── 启动入口 ───────────────────────────────────────────────────────────────────
if __name__ == "__main__":
uvicorn.run(
"server:app",
host="0.0.0.0",
port=config.PORT,
reload=False,
log_level=config.LOG_LEVEL.lower(),
)