6.6 【动手】构建带监控的生产级 Agent 服务
实验目标
本节结束后,你将能够: - 用 FastAPI + ARQ 搭建一个支持异步任务的 Agent 服务,不再被 LLM 调用的高延迟阻塞主线程 - 通过 LangFuse 实现完整的 Agent 执行链路追踪(Trace),精确定位每次工具调用的耗时与 Token 消耗 - 用 Locust 对服务进行压测,找出 CPU/内存/LLM 调用三个维度的瓶颈,给出容量规划结论
核心学习点:① 异步任务队列解耦 Agent 长任务与 HTTP 响应;② 可观测性不是锦上添花,是生产稳定性的基础设施;③ 压测不是为了"跑一遍",是为了找到系统的承压边界。
架构总览
graph TD
Client["客户端 (Browser/API)"]
GW["FastAPI 网关\n/chat (同步)\n/task (异步)"]
Worker["ARQ Worker\nAgent 执行引擎"]
Redis["Redis\n任务队列 + 结果存储"]
LLM["LLM API\nOpenAI / Claude / Qwen / DeepSeek"]
Tools["工具层\n搜索 / 数据库 / 计算器"]
Prom["Prometheus\n指标采集"]
Graf["Grafana\n可视化大盘"]
Locust["Locust\n压测客户端"]
Client -->|HTTP POST| GW
GW -->|入队| Redis
Redis -->|消费| Worker
Worker -->|调用| LLM
Worker -->|调用| Tools
GW -->|/metrics| Prom
Worker -->|/metrics| Prom
Prom --> Graf
Locust -->|并发请求| GW
style Graf fill:#2a9d8f,color:#fff
style Redis fill:#e63946,color:#fff
整体思路:HTTP 网关只负责接收请求和返回 task_id,实际的 Agent 执行由 ARQ Worker 异步完成。这样即使 Agent 跑了 2 分钟,HTTP 连接也不会超时。Prometheus + Grafana 负责运营指标监控。模型调用通过 core_config.py 统一管理,支持 DeepSeek、Qwen 等多种模型自由切换。
环境准备
# 创建虚拟环境(uv)
uv venv --python 3.11 && source .venv/bin/activate
# 安装依赖(锁定版本)
uv pip install \
fastapi>=0.100.0 \
uvicorn>=0.20.0 \
arq>=0.25.0 \
redis>=4.0.0 \
langchain>=0.1.0 \
langchain-openai>=0.1.0 \
langchain-litellm>=0.1.0 \
langfuse>=2.0.0 \
prometheus-client>=0.17.0 \
python-dotenv>=1.0.0 \
pydantic>=2.0.0 \
pydantic-settings>=2.0.0 \
httpx>=0.24.0 \
openai>=1.0.0 \
litellm>=1.40.0 \
locust>=2.0.0 \
pytest>=7.0.0
Colab 用户:
!pip install fastapi uvicorn arq redis langchain langchain-openai langchain-litellm langfuse prometheus-client locust litellm python-dotenv pydantic-settings即可,Redis 使用!apt-get install -y redis-server && redis-server --daemonize yes启动本地实例。
# 启动 Redis(本地 Docker)
docker run -d --name redis-agent -p 6379:6379 redis:7-alpine
# 环境变量配置(.env 文件)
cat > .env << 'EOF'
OPENAI_API_KEY=sk-xxxx
DEEPSEEK_API_KEY=sk-xxxx
DASHSCOPE_API_KEY=sk-xxxx
LANGFUSE_PUBLIC_KEY=pk-xxxx
LANGFUSE_SECRET_KEY=sk-xxxx
LANGFUSE_HOST=https://cloud.langfuse.com
REDIS_URL=redis://localhost:6379
EOF
⚠️ 生产注意:LangFuse 支持自托管(Docker Compose 一键启动),数据不出境场景务必自托管,参考 langfuse.com/docs/deployment/self-host。
Step-by-Step 实现
Step 1:定义数据模型与配置
目标:集中管理所有配置,用 Pydantic 做类型安全保障,避免后续到处写 os.getenv。
# config.py
from __future__ import annotations
from functools import lru_cache
from pydantic_settings import BaseSettings # pydantic v2 拆包,需 pip install pydantic-settings
class Settings(BaseSettings):
"""应用配置,自动从 .env 文件读取。"""
openai_api_key: str = ""
langfuse_public_key: str = ""
langfuse_secret_key: str = ""
langfuse_host: str = "https://cloud.langfuse.com"
redis_url: str = "redis://localhost:6379"
# Agent 运行参数
agent_max_iterations: int = 10
agent_timeout_seconds: int = 120
# 成本控制:单次请求最大 Token 消耗
max_tokens_per_request: int = 4000
model_config = {
"env_file": ".env",
"env_file_encoding": "utf-8",
"extra": "allow",
}
@lru_cache(maxsize=1)
def get_settings() -> Settings:
"""全局单例配置,避免重复读取 .env 文件。"""
return Settings()
⚠️ 与旧版差异:Pydantic v2 中
class Config已废弃,改用model_config字典。字段默认值从必填改为空字符串"",配合"extra": "allow"允许额外环境变量传入。
# models.py
from __future__ import annotations
from datetime import datetime
from enum import Enum
from typing import Any
from pydantic import BaseModel, Field
class TaskStatus(str, Enum):
PENDING = "pending"
RUNNING = "running"
SUCCESS = "success"
FAILED = "failed"
class ChatRequest(BaseModel):
"""同步对话请求体。"""
message: str = Field(..., min_length=1, max_length=2000, description="用户输入")
session_id: str = Field(default="default", description="会话 ID,用于多轮记忆")
user_id: str | None = Field(default=None, description="用户标识,用于成本归因")
class TaskRequest(BaseModel):
"""异步任务请求体。"""
message: str = Field(..., min_length=1, max_length=2000)
session_id: str = "default"
user_id: str | None = None
class TaskResponse(BaseModel):
"""提交任务后立即返回的响应。"""
task_id: str
status: TaskStatus = TaskStatus.PENDING
created_at: datetime = Field(default_factory=datetime.utcnow)
class TaskResult(BaseModel):
"""任务结果查询响应。"""
task_id: str
status: TaskStatus
result: str | None = None
error: str | None = None
duration_ms: int | None = None
token_usage: dict[str, int] | None = None
created_at: datetime
completed_at: datetime | None = None
Step 1.5:统一管理大模型配置(core_config.py)
目标:通过 core_config.py 建立模型注册表,集中管理所有 LLM 供应商的配置,实现模型切换只需改一行。
# core_config.py
"""全局配置:模型注册表与定价信息"""
import os
from typing import TypedDict
class ModelConfig(TypedDict):
litellm_id: str # LiteLLM 识别的模型字符串
price_in: float # 每 1K input tokens 价格(美元)
price_out: float # 每 1K output tokens 价格(美元)
max_tokens_limit: int # 模型支持的最大 max_tokens
api_key_env: str | None # API Key 环境变量名
base_url: str | None # API 基础 URL(None 表示使用默认)
# 注册表:key 是界面显示名,value 是调用配置
MODEL_REGISTRY: dict[str, ModelConfig] = {
"DeepSeek-V3": {
"litellm_id": "deepseek/deepseek-chat",
"price_in": 0.00027,
"price_out": 0.0011,
"max_tokens_limit": 4096,
"api_key_env": "DEEPSEEK_API_KEY",
"base_url": None,
},
"Qwen-Max": {
"litellm_id": "qwen/qwen-plus",
"price_in": 0.001,
"price_out": 0.004,
"max_tokens_limit": 4096,
"api_key_env": "DASHSCOPE_API_KEY",
"base_url": "https://dashscope.aliyuncs.com/compatible-mode/v1",
},
}
# 当前激活模型 key — 修改此处全局生效,必须是 MODEL_REGISTRY 中的 key
ACTIVE_MODEL_KEY: str = "DeepSeek-V3"
def get_active_config() -> ModelConfig:
"""获取当前激活模型的完整配置"""
return MODEL_REGISTRY[ACTIVE_MODEL_KEY]
def get_litellm_id(model_key: str | None = None) -> str:
"""获取指定模型(默认激活模型)的 LiteLLM ID"""
key = model_key or ACTIVE_MODEL_KEY
return MODEL_REGISTRY[key]["litellm_id"]
def get_api_key(model_key: str | None = None) -> str | None:
"""从环境变量读取指定模型的 API Key"""
key = model_key or ACTIVE_MODEL_KEY
env_var = MODEL_REGISTRY[key]["api_key_env"]
return os.environ.get(env_var) if env_var else None
def get_base_url(model_key: str | None = None) -> str | None:
"""获取指定模型的 base_url(None 表示使用 SDK 默认值)"""
key = model_key or ACTIVE_MODEL_KEY
return MODEL_REGISTRY[key]["base_url"]
def get_model_list() -> list[str]:
"""获取所有已注册模型的显示名列表"""
return list(MODEL_REGISTRY.keys())
def estimate_cost(model_key: str, input_tokens: int, output_tokens: int) -> float:
"""根据 Token 数估算调用费用(美元)"""
cfg = MODEL_REGISTRY[model_key]
return (
input_tokens / 1000 * cfg["price_in"]
+ output_tokens / 1000 * cfg["price_out"]
)
切换模型只需修改
ACTIVE_MODEL_KEY的值,所有业务代码无需改动。
Step 2:构建 Agent
目标:使用 LangChain 的 create_agent + ChatLiteLLM 构建 Agent,通过 core_config.py 统一获取模型配置,避免硬编码模型名称和 API Key。
# agent.py
from __future__ import annotations
import time
import logging
from typing import Any
from langchain.agents import create_agent
from langchain_core.tools import Tool
from langchain_litellm import ChatLiteLLM
from config import get_settings
from core_config import get_litellm_id, get_api_key, get_base_url
logger = logging.getLogger(__name__)
settings = get_settings()
def build_tools() -> list[Tool]:
"""
构建工具列表。
生产环境这里会接入 Tavily 搜索、数据库查询等真实工具。
本节用简单工具确保代码可立即运行。
"""
def calculator(expression: str) -> str:
"""安全计算数学表达式,仅允许数字和基本运算符。"""
allowed_chars = set("0123456789+-*/()., ")
if not all(c in allowed_chars for c in expression):
return "错误:表达式包含不允许的字符"
try:
result = eval(expression, {"__builtins__": {}}, {}) # noqa: S307
return str(result)
except Exception as e:
return f"计算错误:{e}"
def get_current_time(_: str) -> str:
"""返回当前时间,演示无参工具的用法。"""
from datetime import datetime
return datetime.now().strftime("%Y-%m-%d %H:%M:%S UTC+8")
def mock_search(query: str) -> str:
"""模拟搜索工具,生产环境替换为 Tavily/Brave Search。"""
return f"搜索结果(模拟):关于「{query}」,根据最新资料显示..."
return [
Tool(name="calculator", func=calculator, description="计算数学表达式,输入: 数学表达式字符串"),
Tool(name="get_time", func=get_current_time, description="获取当前时间,输入: 任意字符串"),
Tool(name="search", func=mock_search, description="搜索网络信息,输入: 搜索关键词"),
]
# ReAct 系统提示词
REACT_SYSTEM_PROMPT = """You are a helpful assistant. You have access to a set of tools that you can use to answer questions.
Use your tools to solve problems step by step. When you have a final answer, provide it clearly to the user.
Available tools:
{tools}"""
async def run_agent(
message: str,
session_id: str = "default",
user_id: str | None = None,
) -> dict[str, Any]:
"""
执行 Agent 并返回结果与元数据。
返回值包含:
- output: Agent 最终回答
- duration_ms: 总耗时
- token_usage: 各阶段 Token 消耗(由 LangFuse Handler 汇总)
"""
start_time = time.monotonic()
tools = build_tools()
tools_desc = "\n".join(f"- {t.name}: {t.description}" for t in tools)
system_prompt = REACT_SYSTEM_PROMPT.format(tools=tools_desc)
llm = ChatLiteLLM(
model=get_litellm_id(),
temperature=0,
max_tokens=settings.max_tokens_per_request,
api_key=get_api_key(),
api_base=get_base_url(),
streaming=False,
)
agent = create_agent(
model=llm,
tools=tools,
system_prompt=system_prompt,
)
try:
result = await agent.ainvoke(
{"messages": [("user", message)]},
)
duration_ms = int((time.monotonic() - start_time) * 1000)
logger.info(
"agent_run_success",
extra={"session_id": session_id, "duration_ms": duration_ms},
)
# 提取最终回复
messages = result.get("messages", [])
output = ""
for msg in reversed(messages):
if hasattr(msg, "content") and msg.content:
output = msg.content
break
return {
"output": output,
"duration_ms": duration_ms,
"token_usage": {},
}
except Exception as e:
duration_ms = int((time.monotonic() - start_time) * 1000)
logger.error(
"agent_run_failed",
extra={"session_id": session_id, "error": str(e), "duration_ms": duration_ms},
)
raise
关键点:
- ChatLiteLLM 取代了 ChatOpenAI,通过 core_config.py 的 get_litellm_id()、get_api_key()、get_base_url() 获取模型配置,不再硬编码模型名和 API Key。这意味着你可以在 core_config.py 中将 ACTIVE_MODEL_KEY 改为 "Qwen-Max" 即可一键切换到通义千问,无需改动任何业务代码。
- create_agent 是 LangChain 推荐的新一代 Agent 构建方式,相比旧版 create_react_agent + AgentExecutor 更简洁。输入格式从 {"input": message} 变为 {"messages": [("user", message)]},符合 LangGraph 标准的消息协议。
- ReAct 提示词采用本地常量 REACT_SYSTEM_PROMPT,将工具描述动态注入到 {tools} 占位符中。相比 hub.pull("hwchase17/react"),本地化方案消除了对网络拉取 Prompt 的依赖,生产环境不会因网络故障导致服务启动失败。
- 结果提取方式从 result["output"] 改为遍历 messages 列表反向查找最后一个有 content 的消息,这是 create_agent 返回结构的特点。
Step 3:构建 ARQ 异步任务 Worker
目标:将 Agent 执行从 HTTP 请求中解耦。ARQ 相比 Celery 的优势是原生 asyncio,与 FastAPI 的异步生态天然契合,且无需 Kombu/Billiard 等重依赖。
# worker.py
from __future__ import annotations
import json
import logging
import uuid
from datetime import datetime, timezone
from typing import Any
from arq import create_pool
from arq.connections import RedisSettings
from agent import run_agent
from config import get_settings
logger = logging.getLogger(__name__)
settings = get_settings()
def _redis_settings() -> RedisSettings:
"""从 REDIS_URL 解析 ARQ 需要的 RedisSettings 对象。"""
from urllib.parse import urlparse
parsed = urlparse(settings.redis_url)
return RedisSettings(
host=parsed.hostname or "localhost",
port=parsed.port or 6379,
password=parsed.password,
)
async def execute_agent_task(
ctx: dict[str, Any],
task_id: str,
message: str,
session_id: str,
user_id: str | None,
) -> None:
"""
ARQ Worker 执行的任务函数。
ctx 由 ARQ 注入,包含 Redis 连接等上下文。
任务结果通过 Redis 存储,供 FastAPI 查询接口读取。
"""
redis = ctx["redis"]
result_key = f"task_result:{task_id}"
created_at = datetime.now(timezone.utc).isoformat()
# 更新状态为 running
await redis.set(
result_key,
json.dumps({
"task_id": task_id,
"status": "running",
"created_at": created_at,
}),
ex=3600, # 结果保留 1 小时
)
try:
agent_result = await run_agent(
message=message,
session_id=session_id,
user_id=user_id,
)
await redis.set(
result_key,
json.dumps({
"task_id": task_id,
"status": "success",
"result": agent_result["output"],
"duration_ms": agent_result["duration_ms"],
"token_usage": agent_result.get("token_usage"),
"created_at": created_at,
"completed_at": datetime.now(timezone.utc).isoformat(),
}),
ex=3600,
)
except Exception as e:
logger.exception("task_failed", extra={"task_id": task_id})
await redis.set(
result_key,
json.dumps({
"task_id": task_id,
"status": "failed",
"error": str(e),
"created_at": created_at,
"completed_at": datetime.now(timezone.utc).isoformat(),
}),
ex=3600,
)
class WorkerSettings:
"""ARQ Worker 全局配置。"""
# 注册所有任务函数
functions = [execute_agent_task]
# Redis 连接配置
redis_settings = _redis_settings()
# 并发控制:同时最多执行 5 个 Agent 任务
# 核心考量:LLM API 有速率限制,并发过高会触发 429
max_jobs = 5
# 任务超时:超过 2 分钟强制终止
job_timeout = 120
# 心跳间隔
health_check_interval = 30
关键点:
- max_jobs = 5 的数字不是拍脑袋定的。假设每个 Agent 平均 3 次 LLM 调用,OpenAI gpt-4o-mini 的 RPM 限制是 500,5 并发 × 3 调用 = 15 RPM,留有充足余量。实际值应根据你的 API Tier 计算。
- 任务结果存 Redis 而非数据库,是因为结果时效性短(1小时即过期),不需要持久化。如果业务需要查历史任务,才值得写入 PostgreSQL。
Step 4:构建 FastAPI 网关与 Prometheus 指标
目标:提供同步(短任务)和异步(长任务)两种接口,并暴露 Prometheus metrics endpoint 供监控系统抓取。
# main.py
from __future__ import annotations
import json
import logging
import uuid
from contextlib import asynccontextmanager
from datetime import datetime, timezone
from typing import AsyncGenerator
from arq import create_pool
from arq.connections import ArqRedis, RedisSettings
from fastapi import FastAPI, HTTPException, Request
from fastapi.responses import JSONResponse
from prometheus_client import (
Counter,
Histogram,
generate_latest,
CONTENT_TYPE_LATEST,
CollectorRegistry,
)
from starlette.responses import Response
from agent import run_agent
from config import get_settings
from models import (
ChatRequest,
TaskRequest,
TaskResponse,
TaskResult,
TaskStatus,
)
from worker import WorkerSettings
logger = logging.getLogger(__name__)
settings = get_settings()
# ─── Prometheus 指标定义 ────────────────────────────────────────────────────
# 每个指标的 label 设计直接影响 Grafana 的查询灵活度
REQUEST_COUNTER = Counter(
"agent_requests_total",
"Total number of agent requests",
["endpoint", "status"], # 按接口和状态码分层
)
LATENCY_HISTOGRAM = Histogram(
"agent_request_duration_seconds",
"Agent request duration in seconds",
["endpoint"],
buckets=[0.5, 1.0, 2.0, 5.0, 10.0, 30.0, 60.0, 120.0], # 适配 LLM 长尾延迟
)
TASK_QUEUE_GAUGE = Counter(
"agent_tasks_enqueued_total",
"Total tasks enqueued to ARQ worker",
)
# ─── 应用生命周期管理 ─────────────────────────────────────────────────────────
arq_pool: ArqRedis | None = None
@asynccontextmanager
async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]:
"""FastAPI 生命周期管理:启动时建立 Redis 连接池,关闭时清理。"""
global arq_pool
logger.info("connecting_to_redis", extra={"url": settings.redis_url})
from urllib.parse import urlparse
parsed = urlparse(settings.redis_url)
arq_pool = await create_pool(
RedisSettings(
host=parsed.hostname or "localhost",
port=parsed.port or 6379,
password=parsed.password,
)
)
logger.info("redis_connected")
yield
# 关闭时释放连接池
await arq_pool.close()
logger.info("redis_disconnected")
app = FastAPI(
title="Production Agent Service",
version="1.0.0",
lifespan=lifespan,
)
# ─── 中间件:自动记录延迟 ─────────────────────────────────────────────────────
@app.middleware("http")
async def metrics_middleware(request: Request, call_next):
"""对每个请求自动记录延迟和状态码,无需在每个路由里手动埋点。"""
import time
start = time.monotonic()
response = await call_next(request)
duration = time.monotonic() - start
endpoint = request.url.path
LATENCY_HISTOGRAM.labels(endpoint=endpoint).observe(duration)
REQUEST_COUNTER.labels(endpoint=endpoint, status=str(response.status_code)).inc()
return response
# ─── 路由 ─────────────────────────────────────────────────────────────────────
@app.get("/health")
async def health_check() -> dict:
"""健康检查接口,供 K8s/ECS 探针和 docker-compose healthcheck 使用。"""
return {"status": "healthy", "timestamp": datetime.now(timezone.utc).isoformat()}
@app.get("/metrics")
async def prometheus_metrics() -> Response:
"""Prometheus 抓取接口,返回 text/plain 格式指标数据。"""
return Response(
content=generate_latest(),
media_type=CONTENT_TYPE_LATEST,
)
@app.post("/chat", summary="同步对话接口(适合 <30s 的短任务)")
async def chat(request: ChatRequest) -> dict:
"""
同步执行 Agent 并返回结果。
⚠️ 适用场景:预期响应时间 < 30s 的请求。
超过这个阈值建议切换 /task 异步接口,否则客户端容易超时。
"""
try:
result = await run_agent(
message=request.message,
session_id=request.session_id,
user_id=request.user_id,
)
return {
"output": result["output"],
"duration_ms": result["duration_ms"],
"session_id": request.session_id,
}
except Exception as e:
logger.exception("chat_endpoint_error")
raise HTTPException(status_code=500, detail=str(e))
@app.post("/task", response_model=TaskResponse, summary="异步任务接口(适合长任务)")
async def submit_task(request: TaskRequest) -> TaskResponse:
"""
将 Agent 任务提交到 ARQ 队列,立即返回 task_id。
客户端通过 GET /task/{task_id} 轮询结果。
"""
if arq_pool is None:
raise HTTPException(status_code=503, detail="任务队列未就绪")
task_id = str(uuid.uuid4())
# 在 Redis 中预先写入 pending 状态
result_key = f"task_result:{task_id}"
await arq_pool.set(
result_key,
json.dumps({
"task_id": task_id,
"status": "pending",
"created_at": datetime.now(timezone.utc).isoformat(),
}),
ex=3600,
)
# 入队
await arq_pool.enqueue_job(
"execute_agent_task",
task_id=task_id,
message=request.message,
session_id=request.session_id,
user_id=request.user_id,
)
TASK_QUEUE_GAUGE.inc()
return TaskResponse(task_id=task_id)
@app.get("/task/{task_id}", response_model=TaskResult, summary="查询异步任务结果")
async def get_task_result(task_id: str) -> TaskResult:
"""轮询任务状态。前端建议以 2s 间隔轮询,任务完成后停止。"""
if arq_pool is None:
raise HTTPException(status_code=503, detail="服务未就绪")
result_key = f"task_result:{task_id}"
raw = await arq_pool.get(result_key)
if raw is None:
raise HTTPException(status_code=404, detail=f"任务 {task_id} 不存在或已过期")
data = json.loads(raw)
return TaskResult(
task_id=data["task_id"],
status=TaskStatus(data["status"]),
result=data.get("result"),
error=data.get("error"),
duration_ms=data.get("duration_ms"),
token_usage=data.get("token_usage"),
created_at=datetime.fromisoformat(data["created_at"]),
completed_at=(
datetime.fromisoformat(data["completed_at"])
if data.get("completed_at")
else None
),
)
Step 5:配置 Grafana Dashboard
目标:用 docker-compose 一键启动 Prometheus + Grafana,导入预配置 Dashboard,5 分钟内看到核心指标。
# docker-compose.monitoring.yml
version: "3.8"
services:
prometheus:
image: prom/prometheus:v2.55.1
ports:
- "9090:9090"
volumes:
- ./monitoring/prometheus.yml:/etc/prometheus/prometheus.yml:ro
command:
- '--config.file=/etc/prometheus/prometheus.yml'
- '--storage.tsdb.retention.time=7d'
grafana:
image: grafana/grafana:11.4.0
ports:
- "3000:3000"
environment:
GF_SECURITY_ADMIN_PASSWORD: admin123
GF_USERS_ALLOW_SIGN_UP: "false"
volumes:
- grafana_data:/var/lib/grafana
- ./monitoring/grafana/provisioning:/etc/grafana/provisioning:ro
volumes:
grafana_data:
# monitoring/prometheus.yml
global:
scrape_interval: 15s
scrape_configs:
- job_name: "agent-service"
static_configs:
- targets: ["host.docker.internal:8000"] # macOS/Windows Docker 访问宿主机
# Linux 环境改为宿主机实际 IP,如 172.17.0.1:8000
# monitoring_import_dashboard.py
"""
自动向 Grafana 导入 Agent 服务监控 Dashboard。
运行:python monitoring/import_dashboard.py
"""
import json
import httpx
GRAFANA_URL = "http://localhost:3000"
AUTH = ("admin", "admin123")
# 核心 Dashboard 配置(精简版,聚焦最重要的 4 个指标)
dashboard_config = {
"dashboard": {
"title": "Agent Service Overview",
"refresh": "30s",
"panels": [
{
"title": "请求 QPS(按接口)",
"type": "graph",
"gridPos": {"h": 8, "w": 12, "x": 0, "y": 0},
"targets": [{
"expr": 'rate(agent_requests_total[1m])',
"legendFormat": "{{endpoint}} - {{status}}",
}],
},
{
"title": "P50/P90/P99 延迟(秒)",
"type": "graph",
"gridPos": {"h": 8, "w": 12, "x": 12, "y": 0},
"targets": [
{
"expr": 'histogram_quantile(0.50, rate(agent_request_duration_seconds_bucket[5m]))',
"legendFormat": "P50",
},
{
"expr": 'histogram_quantile(0.90, rate(agent_request_duration_seconds_bucket[5m]))',
"legendFormat": "P90",
},
{
"expr": 'histogram_quantile(0.99, rate(agent_request_duration_seconds_bucket[5m]))',
"legendFormat": "P99",
},
],
},
{
"title": "错误率(5xx)",
"type": "singlestat",
"gridPos": {"h": 4, "w": 6, "x": 0, "y": 8},
"targets": [{
"expr": 'rate(agent_requests_total{status=~"5.."}[5m]) / rate(agent_requests_total[5m]) * 100',
"legendFormat": "错误率 %",
}],
},
{
"title": "异步任务入队总量",
"type": "singlestat",
"gridPos": {"h": 4, "w": 6, "x": 6, "y": 8},
"targets": [{
"expr": 'agent_tasks_enqueued_total',
"legendFormat": "累计入队",
}],
},
],
"schemaVersion": 39,
},
"overwrite": True,
"folderId": 0,
}
resp = httpx.post(
f"{GRAFANA_URL}/api/dashboards/db",
json=dashboard_config,
auth=AUTH,
timeout=10,
)
print(f"Dashboard 导入结果:{resp.status_code} - {resp.json()}")
Step 6:Locust 压测与容量规划
目标:用真实流量模型模拟并发,找出系统在哪里先撑不住——是 FastAPI Worker 进程、Redis 连接、还是 LLM API 速率限制。
# locustfile.py
from __future__ import annotations
import json
import time
import random
from locust import HttpUser, task, between, events
from locust.runners import MasterRunner
# 测试用的问题集,覆盖不同复杂度
QUESTIONS = [
"现在几点了?", # 简单:单工具调用
"计算 (123 * 456 + 789) / 3 的结果", # 中等:计算器工具
"搜索一下最新的 AI Agent 进展,并计算如果每天学习 2 小时,30 天能学多少小时", # 复杂:多工具
]
class AgentUser(HttpUser):
"""模拟真实用户行为:提交任务 → 轮询结果。"""
# 每个虚拟用户在两次请求之间等待 1-3 秒,模拟真实用户节奏
wait_time = between(1, 3)
@task(3)
def test_sync_chat(self):
"""测试同步接口(权重 3:高频)。"""
question = random.choice(QUESTIONS[:2]) # 同步接口只测简单问题
with self.client.post(
"/chat",
json={
"message": question,
"session_id": f"locust-{self.user_id}",
"user_id": f"user-{self.user_id}",
},
timeout=60,
catch_response=True,
) as response:
if response.status_code == 200:
response.success()
else:
response.failure(f"状态码 {response.status_code}: {response.text[:200]}")
@task(1)
def test_async_task(self):
"""测试异步接口(权重 1:低频):提交任务 → 轮询结果。"""
question = random.choice(QUESTIONS)
# Step 1: 提交任务
submit_resp = self.client.post(
"/task",
json={
"message": question,
"session_id": f"locust-async-{self.user_id}",
},
timeout=10,
)
if submit_resp.status_code != 200:
return
task_id = submit_resp.json().get("task_id")
if not task_id:
return
# Step 2: 轮询结果(最多等 120s)
max_polls = 40
for _ in range(max_polls):
time.sleep(3)
poll_resp = self.client.get(f"/task/{task_id}", timeout=10)
if poll_resp.status_code == 200:
data = poll_resp.json()
if data["status"] in ("success", "failed"):
break
@events.quitting.add_listener
def on_locust_quit(environment, **kwargs):
"""压测结束时打印容量规划建议。"""
stats = environment.runner.stats.total
print("\n" + "="*60)
print("📊 压测结论与容量规划建议")
print("="*60)
print(f"总请求数: {stats.num_requests}")
print(f"失败数: {stats.num_failures}")
print(f"失败率: {stats.fail_ratio:.2%}")
print(f"RPS (avg): {stats.current_rps:.1f}")
print(f"P50 延迟: {stats.get_response_time_percentile(0.50):.0f}ms")
print(f"P90 延迟: {stats.get_response_time_percentile(0.90):.0f}ms")
print(f"P99 延迟: {stats.get_response_time_percentile(0.99):.0f}ms")
# 容量规划:根据压测结果推算生产所需实例数
target_rps = 100 # 生产目标 QPS
current_rps = max(stats.current_rps, 0.1)
scale_factor = target_rps / current_rps
print(f"\n若目标 QPS={target_rps},当前压测 RPS={current_rps:.1f}")
print(f"建议实例数倍数: {scale_factor:.1f}x(含 20% buffer 则 {scale_factor*1.2:.1f}x)")
启动压测:
# 先启动服务(在另一个终端)
uvicorn main:app --host 0.0.0.0 --port 8000 --workers 4
# 启动 ARQ Worker(在另一个终端)
arq worker.WorkerSettings
# 运行压测:10 个并发用户,在 30s 内爬坡,持续 5 分钟
locust -f locustfile.py \
--host http://localhost:8000 \
--users 10 \
--spawn-rate 2 \
--run-time 5m \
--headless \
--html locust_report.html
💡 也可以使用
manage.py统一管理服务的启动和停止,见下方 Step 7。
Step 7:服务管理脚本(manage.py)
目标:提供一个统一的命令行工具来管理 FastAPI 和 ARQ Worker 的启动、停止、重启和状态检查,避免手动管理多个终端。
# manage.py
#!/usr/bin/env python3
"""
服务管理脚本 - 统一管理 FastAPI 和 ARQ Worker 的启动、停止和重启
使用方式:
python manage.py start # 启动所有服务
python manage.py stop # 停止所有服务
python manage.py restart # 重启所有服务
python manage.py status # 查看服务状态
"""
import os
import sys
import signal
import subprocess
import time
import argparse
from typing import Optional
# 配置参数
FASTAPI_PORT = 8000
WORK_DIR = os.path.dirname(os.path.abspath(__file__))
PYTHON_BIN = "/opt/homebrew/anaconda3/bin/python"
# 进程 ID 文件路径
PID_FILES = {
"fastapi": os.path.join(WORK_DIR, "fastapi.pid"),
"worker": os.path.join(WORK_DIR, "worker.pid"),
}
def get_pid(pid_file: str) -> Optional[int]:
"""从 pid 文件读取进程 ID"""
if os.path.exists(pid_file):
try:
with open(pid_file, "r") as f:
return int(f.read().strip())
except (ValueError, IOError):
return None
return None
def is_process_running(pid: int) -> bool:
"""检查进程是否运行"""
try:
os.kill(pid, 0)
return True
except OSError:
return False
def stop_process(pid_file: str, service_name: str) -> bool:
"""停止指定服务"""
pid = get_pid(pid_file)
if pid and is_process_running(pid):
print(f"⏹️ 正在停止 {service_name} (PID: {pid})...")
try:
os.kill(pid, signal.SIGTERM)
# 等待进程退出
for _ in range(10):
if not is_process_running(pid):
break
time.sleep(0.5)
if os.path.exists(pid_file):
os.remove(pid_file)
print(f" ✅ {service_name} 已停止")
return True
except OSError as e:
print(f" ❌ 停止 {service_name} 失败: {e}")
return False
else:
print(f" ℹ️ {service_name} 未运行")
if os.path.exists(pid_file):
os.remove(pid_file)
return True
def start_fastapi() -> bool:
"""启动 FastAPI 服务"""
pid_file = PID_FILES["fastapi"]
# 检查是否已运行
pid = get_pid(pid_file)
if pid and is_process_running(pid):
print(f" ⚠️ FastAPI 已在运行 (PID: {pid})")
return True
print("🚀 启动 FastAPI 服务...")
env = os.environ.copy()
env["PYTHONPATH"] = WORK_DIR
try:
# 使用 nohup 后台运行
cmd = [
"nohup",
PYTHON_BIN, "-m", "uvicorn", "main:app",
"--host", "0.0.0.0",
"--port", str(FASTAPI_PORT),
"--reload"
]
proc = subprocess.Popen(
cmd,
cwd=WORK_DIR,
env=env,
stdout=open(os.path.join(WORK_DIR, "fastapi.log"), "w"),
stderr=subprocess.STDOUT,
preexec_fn=os.setsid
)
# 保存 PID
with open(pid_file, "w") as f:
f.write(str(proc.pid))
# 等待启动
time.sleep(3)
# 检查是否启动成功
if is_process_running(proc.pid):
print(f" ✅ FastAPI 启动成功 (PID: {proc.pid}, 端口: {FASTAPI_PORT})")
return True
else:
print(" ❌ FastAPI 启动失败,请查看 fastapi.log")
if os.path.exists(pid_file):
os.remove(pid_file)
return False
except Exception as e:
print(f" ❌ 启动 FastAPI 失败: {e}")
return False
def start_worker() -> bool:
"""启动 ARQ Worker"""
pid_file = PID_FILES["worker"]
# 检查是否已运行
pid = get_pid(pid_file)
if pid and is_process_running(pid):
print(f" ⚠️ ARQ Worker 已在运行 (PID: {pid})")
return True
print("🚀 启动 ARQ Worker...")
env = os.environ.copy()
env["PYTHONPATH"] = WORK_DIR
try:
# 使用 nohup 后台运行
cmd = [
"nohup",
PYTHON_BIN, "-m", "arq", "worker.WorkerSettings"
]
proc = subprocess.Popen(
cmd,
cwd=WORK_DIR,
env=env,
stdout=open(os.path.join(WORK_DIR, "worker.log"), "w"),
stderr=subprocess.STDOUT,
preexec_fn=os.setsid
)
# 保存 PID
with open(pid_file, "w") as f:
f.write(str(proc.pid))
# 等待启动
time.sleep(2)
# 检查是否启动成功
if is_process_running(proc.pid):
print(f" ✅ ARQ Worker 启动成功 (PID: {proc.pid})")
return True
else:
print(" ❌ ARQ Worker 启动失败,请查看 worker.log")
if os.path.exists(pid_file):
os.remove(pid_file)
return False
except Exception as e:
print(f" ❌ 启动 ARQ Worker 失败: {e}")
return False
def check_health() -> bool:
"""检查服务健康状态"""
try:
import httpx
response = httpx.get(f"http://localhost:{FASTAPI_PORT}/health", timeout=5)
if response.status_code == 200:
data = response.json()
print(f" 🩺 健康检查通过: {data}")
return True
else:
print(f" ❌ 健康检查失败: HTTP {response.status_code}")
return False
except Exception as e:
print(f" ❌ 健康检查失败: {e}")
return False
def status():
"""查看服务状态"""
print("📊 服务状态检查:")
# FastAPI 状态
pid = get_pid(PID_FILES["fastapi"])
if pid and is_process_running(pid):
print(f" FastAPI: 🟢 运行中 (PID: {pid}, 端口: {FASTAPI_PORT})")
else:
print(" FastAPI: 🔴 未运行")
if os.path.exists(PID_FILES["fastapi"]):
os.remove(PID_FILES["fastapi"])
# ARQ Worker 状态
pid = get_pid(PID_FILES["worker"])
if pid and is_process_running(pid):
print(f" ARQ Worker: 🟢 运行中 (PID: {pid})")
else:
print(" ARQ Worker: 🔴 未运行")
if os.path.exists(PID_FILES["worker"]):
os.remove(PID_FILES["worker"])
def stop():
"""停止所有服务"""
print("⏹️ 停止所有服务:")
stop_process(PID_FILES["fastapi"], "FastAPI")
stop_process(PID_FILES["worker"], "ARQ Worker")
def start():
"""启动所有服务"""
print("🚀 启动所有服务:")
# 确保环境变量正确加载
env_file = os.path.join(WORK_DIR, ".env")
if os.path.exists(env_file):
with open(env_file, "r") as f:
for line in f:
line = line.strip()
if line and not line.startswith("#"):
if "=" in line:
key, value = line.split("=", 1)
os.environ[key] = value.strip('"').strip("'")
# 启动服务
success = True
success &= start_fastapi()
success &= start_worker()
# 健康检查
if success:
print("\n🩺 健康检查:")
check_health()
print("\n📋 服务启动完成!")
print(f" API 文档: http://localhost:{FASTAPI_PORT}/docs")
print(f" 健康检查: http://localhost:{FASTAPI_PORT}/health")
print(f" 指标接口: http://localhost:{FASTAPI_PORT}/metrics")
def restart():
"""重启所有服务"""
print("🔄 重启所有服务:")
print("-" * 50)
# 先停止
stop()
print()
# 等待清理
time.sleep(1)
# 再启动
start()
def main():
parser = argparse.ArgumentParser(description="服务管理脚本")
parser.add_argument("command", choices=["start", "stop", "restart", "status"],
help="操作命令: start|stop|restart|status")
args = parser.parse_args()
# 确保在正确的工作目录
os.chdir(WORK_DIR)
if args.command == "start":
start()
elif args.command == "stop":
stop()
elif args.command == "restart":
restart()
elif args.command == "status":
status()
if __name__ == "__main__":
main()
manage.py使用 PID 文件管理进程,启动后自动清理.pid文件。注意PYTHON_BIN需根据实际 Python 路径修改。
完整运行验证
将以下所有组件按顺序启动后,执行端到端冒烟测试:
# 方式一:使用 manage.py 一键管理(推荐)
python manage.py start # 启动 FastAPI + ARQ Worker
python manage.py status # 查看服务状态
python manage.py stop # 停止所有服务
# 方式二:手动启动(在多个终端)
# Terminal 1: 启动 FastAPI 服务
uvicorn main:app --reload --port 8000
# Terminal 2: 启动 ARQ Worker
arq worker.WorkerSettings
# Terminal 3: 启动监控栈
docker-compose -f docker-compose.monitoring.yml up -d
python monitoring/import_dashboard.py
# smoke_test.py — 端到端冒烟测试,验证完整链路
import asyncio
import httpx
import time
import os
BASE_URL = "http://localhost:8000"
# 禁用代理,避免连接本地服务时走代理
os.environ.pop('http_proxy', None)
os.environ.pop('https_proxy', None)
async def smoke_test():
async with httpx.AsyncClient(timeout=120) as client:
print("1️⃣ 健康检查...")
resp = await client.get(f"{BASE_URL}/health")
assert resp.status_code == 200, f"健康检查失败: {resp.text}"
print(f" ✅ {resp.json()}")
print("\n2️⃣ 同步对话接口...")
resp = await client.post(f"{BASE_URL}/chat", json={
"message": "现在几点了?",
"session_id": "smoke-test-001",
"user_id": "tester",
})
assert resp.status_code == 200, f"同步接口失败: {resp.text}"
data = resp.json()
print(f" ✅ 回答: {data['output'][:50]}...")
print(f" ✅ 耗时: {data['duration_ms']}ms")
print("\n3️⃣ 异步任务接口...")
resp = await client.post(f"{BASE_URL}/task", json={
"message": "计算 42 * 1337 + 100 的结果",
"session_id": "smoke-test-002",
})
assert resp.status_code == 200, f"提交任务失败: {resp.text}"
task_id = resp.json()["task_id"]
print(f" ✅ 任务 ID: {task_id}")
print(" ⏳ 轮询任务结果...")
for attempt in range(30):
await asyncio.sleep(3)
poll_resp = await client.get(f"{BASE_URL}/task/{task_id}")
result = poll_resp.json()
status = result["status"]
print(f" 第 {attempt+1} 次轮询,状态: {status}")
if status == "success":
print(f" ✅ 任务完成: {result['result'][:80]}...")
print(f" ✅ 耗时: {result['duration_ms']}ms")
break
elif status == "failed":
raise AssertionError(f"任务失败: {result['error']}")
else:
raise AssertionError("任务超时(90s),请检查 ARQ Worker 是否正常运行")
print("\n4️⃣ Prometheus 指标验证...")
resp = await client.get(f"{BASE_URL}/metrics")
assert "agent_requests_total" in resp.text, "Prometheus 指标未找到"
print(" ✅ 指标上报正常")
print("\n🎉 所有冒烟测试通过!打开 http://localhost:3000 查看 Grafana Dashboard")
asyncio.run(smoke_test())
预期输出:
1️⃣ 健康检查...
✅ {'status': 'healthy', 'timestamp': '2025-01-15T10:23:45.123456+00:00'}
2️⃣ 同步对话接口...
✅ 回答: 当前时间是 2025-01-15 18:23:46 UTC+8...
✅ 耗时: 2341ms
3️⃣ 异步任务接口...
✅ 任务 ID: f7a3c2e1-8b4d-4f9a-a123-456789abcdef
⏳ 轮询任务结果...
第 1 次轮询,状态: running
第 2 次轮询,状态: success
✅ 任务完成: 42 * 1337 + 100 = 56254 + 100 = 56354...
✅ 耗时: 4872ms
4️⃣ Prometheus 指标验证...
✅ 指标上报正常
🎉 所有冒烟测试通过!打开 http://localhost:3000 查看 Grafana Dashboard
常见报错与解决方案
| 报错信息 | 原因 | 解决方案 |
|---|---|---|
arq.jobs.JobNotFound |
task_id 对应的 Redis key 已过期(默认 1 小时) | 增加 ex 参数,或将结果持久化到数据库 |
langfuse.errors.AuthorizationError |
LANGFUSE_PUBLIC_KEY 或 SECRET_KEY 配置错误 | 检查 .env 文件,LangFuse Cloud 的 key 前缀分别为 pk-lf- 和 sk-lf- |
redis.exceptions.ConnectionError |
Redis 未启动或端口不通 | 执行 docker ps 确认 Redis 容器运行,或 redis-cli ping 测试连通性 |
langchain.errors.OutputParserException |
LLM 输出不符合 ReAct 格式 | create_agent 自带更好的容错机制,若频繁触发,考虑升级模型 |
httpx.ReadTimeout 压测时大量出现 |
LLM 调用超时,单请求超过 Locust 的 timeout 设置 | 将 Locust 的 timeout 参数从 60 提高到 120,或降低并发数 |
ModuleNotFoundError: No module named 'langchain_litellm' |
未安装 langchain-litellm 包 | pip install langchain-litellm,注意 pip 包名是 langchain-litellm 而非 langchain_litellm |
API Key 为空 |
未设置对应模型的环境变量 | 检查 .env 文件,确认 DEEPSEEK_API_KEY 或 DASHSCOPE_API_KEY 已正确配置 |
注意:当前版本已将 ReAct 提示词本地化(
REACT_SYSTEM_PROMPT常量),不再依赖hub.pull网络拉取,因此hub.pull卡住的问题已不复存在。
扩展练习(可选)
-
🟡 中等:在
LATENCY_HISTOGRAM的 label 中增加model维度(记录每次 LLM 调用使用的模型名),并在 Grafana 中创建按模型分组的延迟对比图,量化不同模型(如 DeepSeek-V3 vs Qwen-Max)的延迟差异。 -
🔴 困难:实现 SSE 流式输出版本的
/chat/stream接口——LangChain Agent 的中间 Thought 步骤实时推送给客户端(而不是等最终答案),并在 LangFuse 中正确记录流式调用的 Token 消耗。提示:需要用astream_eventsAPI 配合StreamingResponse。