1.3 【动手二】构建一个提示词调试器
实验目标
本节结束后,你将拥有一个可以本地运行的多模型 Prompt 并发调试工具,能够同时对 DeepSeek-V3、Qwen-Max 等模型发送相同的 Prompt,在浏览器界面中并排对比输出、记录评分,并将历史实验持久化为 JSONL 文件供后续分析。
核心学习点(3 个):
- 受控实验思维:优化 Prompt 的本质是科学实验——每次只改一个变量(System Prompt / Temperature / 模型),才能得出可信结论;本工具在界面层强制实现这一约束。
- asyncio 并发调用:串行调用 2 个模型需等待 2 倍时间;用
asyncio.gather并发后总耗时约等于最慢那个模型,是 LLM 应用中最高性价比的优化手段。 - 轻量级实验追踪:不引入数据库,用 JSONL 追加写入做版本历史,学习"够用就好"的工程取舍。
架构总览
graph TD
subgraph UI["Gradio 前端 (app.py)"]
A[System Prompt 输入] --> E
B[User Prompt 输入] --> E
C[模型多选框] --> E
D[Temperature / Max Tokens 滑块] --> E
E[🚀 运行实验按钮]
end
subgraph Core["核心调用层 (core/caller.py)"]
E --> F[call_all]
F -->|asyncio.gather| G1[call_single DeepSeek-V3]
F -->|asyncio.gather| G2[call_single Qwen-Max]
G1 --> H[聚合结果 List]
G2 --> H
end
subgraph Storage["持久化层 (core/history.py)"]
H --> I[save_run → history.jsonl 追加写入]
I --> J[load_history → DataFrame(带缓存)]
J --> K[历史记录表格展示]
K -->|点击行| L[回填 Prompt 参数]
end
subgraph Output["结果展示"]
H --> M1[模型A输出 + 耗时 + Token + 费用]
H --> M2[模型B输出 + 耗时 + Token + 费用]
M1 --> N[手动评分 1-5 + 备注]
M2 --> N
N --> O[导出 Markdown 对比报告]
end
环境准备
# 创建项目目录与虚拟环境(uv)
mkdir prompt-debugger && cd prompt-debugger
uv venv --python 3.11 && source .venv/bin/activate
# 安装依赖
uv pip install \
litellm>=1.40.0 \
gradio>=4.0.0 \
python-dotenv>=1.0.0 \
pandas>=2.0.0 \
openai>=1.0.0 \
pytest>=7.0.0
# 项目结构
mkdir -p core tests
touch core/__init__.py core/caller.py core/history.py core/config.py app.py smoke_test.py .env
Colab 用户:
!pip install litellm gradio python-dotenv pandas openai pytest即可,无需虚拟环境。
配置 .env 文件(不要提交到 Git):
# .env — 复制 .env.example 并填入真实 Key
DEEPSEEK_API_KEY=sk-... # DeepSeek 模型必填
DASHSCOPE_API_KEY=sk-... # Qwen 模型必填
获取 API Key:DeepSeek https://platform.deepseek.com/api_keys ;Qwen(通义千问)https://help.aliyun.com/zh/dashscope/developer-reference/api-keys
Step-by-Step 实现
Step 1:项目骨架与配置加载
目标:建立清晰的目录结构,并在程序入口处统一加载所有 API Key。模型配置通过根目录的 core_config.py 集中管理,core/config.py 作为兼容层重新导出所有符号。
prompt-debugger/
├── core_config.py # 模型注册表与定价(根目录)
├── core/
│ ├── __init__.py # 触发 dotenv 加载 + sys.path 修正
│ ├── config.py # 兼容层:从 core_config 重新导出
│ ├── caller.py # LLM 并发调用层
│ └── history.py # 实验历史持久化层
├── app.py # Gradio 界面入口
├── smoke_test.py # 端到端冒烟测试
├── main.py # 运行入口(从 core_config 导入并启动 app)
├── tests/
│ ├── __init__.py
│ └── test_main.py # pytest 测试
├── requirements.txt
├── .env.example # 环境变量模板
└── .env # 真实配置(不提交)
# core/__init__.py
"""
prompt-debugger 核心包
模块说明:
caller.py - LLM 并发调用层
history.py - 实验历史持久化层
"""
import os
import sys
from dotenv import load_dotenv
# 确保项目根目录在 sys.path 中,使 core_config.py 可被 core/ 下模块导入
_project_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
if _project_root not in sys.path:
sys.path.insert(0, _project_root)
# 包加载时立即读取 .env,确保后续所有模块都能拿到环境变量
load_dotenv()
# core_config.py — 模型注册表(项目根目录)
"""全局配置:模型注册表与定价信息"""
import os
from typing import TypedDict
class ModelConfig(TypedDict):
litellm_id: str # LiteLLM 识别的模型字符串
price_in: float # 每 1K input tokens 价格(美元)
price_out: float # 每 1K output tokens 价格(美元)
max_tokens_limit: int # 模型支持的最大 max_tokens
api_key_env: str | None # API Key 环境变量名
base_url: str | None # API 基础 URL(None 表示使用默认)
# 注册表:key 是界面显示名,value 是调用配置
MODEL_REGISTRY: dict[str, ModelConfig] = {
"DeepSeek-V3": {
"litellm_id": "deepseek/deepseek-chat",
"price_in": 0.00027,
"price_out": 0.0011,
"max_tokens_limit": 4096,
"api_key_env": "DEEPSEEK_API_KEY",
"base_url": None,
},
"Qwen-Max": {
"litellm_id": "openai/qwen-plus",
"price_in": 0.001,
"price_out": 0.004,
"max_tokens_limit": 4096,
"api_key_env": "DASHSCOPE_API_KEY",
"base_url": "https://dashscope.aliyuncs.com/compatible-mode/v1",
},
}
# 当前激活模型 key — 修改此处全局生效,必须是 MODEL_REGISTRY 中的 key
ACTIVE_MODEL_KEY: str = "DeepSeek-V3"
def get_active_config() -> ModelConfig:
"""获取当前激活模型的完整配置"""
return MODEL_REGISTRY[ACTIVE_MODEL_KEY]
def get_litellm_id(model_key: str | None = None) -> str:
"""获取指定模型(默认激活模型)的 LiteLLM ID"""
key = model_key or ACTIVE_MODEL_KEY
return MODEL_REGISTRY[key]["litellm_id"]
def get_api_key(model_key: str | None = None) -> str | None:
"""从环境变量读取指定模型的 API Key"""
key = model_key or ACTIVE_MODEL_KEY
env_var = MODEL_REGISTRY[key]["api_key_env"]
return os.environ.get(env_var) if env_var else None
def get_base_url(model_key: str | None = None) -> str | None:
"""获取指定模型的 base_url(None 表示使用 SDK 默认值)"""
key = model_key or ACTIVE_MODEL_KEY
return MODEL_REGISTRY[key]["base_url"]
def get_model_list() -> list[str]:
"""获取所有已注册模型的显示名列表"""
return list(MODEL_REGISTRY.keys())
def estimate_cost(model_key: str, input_tokens: int, output_tokens: int) -> float:
"""根据 Token 数估算调用费用(美元)"""
cfg = MODEL_REGISTRY[model_key]
return (
input_tokens / 1000 * cfg["price_in"]
+ output_tokens / 1000 * cfg["price_out"]
)
# core/config.py — 兼容层(从 core_config 重新导出)
"""
向后兼容模块 — 实际配置已迁移到项目根目录的 core_config.py。
此模块重新导出所有符号,确保 core/caller.py 等原有导入无需修改。
"""
from core_config import (
ModelConfig,
MODEL_REGISTRY,
ACTIVE_MODEL_KEY,
estimate_cost,
get_litellm_id,
get_api_key,
get_base_url,
get_model_list,
get_active_config,
)
__all__ = [
"MODEL_REGISTRY",
"ACTIVE_MODEL_KEY",
"estimate_cost",
"get_litellm_id",
"get_api_key",
"get_base_url",
"get_model_list",
"get_active_config",
]
关键点:
- core_config.py 放在根目录,解决 core/ 子包内模块无法直接导入同级目录外文件的问题。
- core/config.py 是兼容层,保留原有的 from core.config import MODEL_REGISTRY 导入路径不被破坏。
- 模型配置比之前增加了 api_key_env 和 base_url 字段,支持通过环境变量动态读取 API Key,以及为 Qwen 等非标准端点的模型指定 base_url。
- ⚠️ 价格数据会随时调整,建议使用前确认各模型官网最新定价。
Step 2:核心调用层(异步并发)
目标:封装单模型调用并暴露并发入口。这是整个工具的核心引擎——让多个 HTTP 请求同时飞出去,而不是排队等待。
# core/caller.py
"""
LLM 并发调用层
设计原则:
1. 每个 call_single 是独立协程,互不干扰
2. call_all 用 asyncio.gather 并发,总耗时 ≈ 最慢模型的单次耗时
3. return_exceptions=True 确保一个模型出错不影响其他模型的结果
"""
import asyncio
import os
import time
from dataclasses import dataclass, field
from typing import Any
from litellm import acompletion
from core.config import MODEL_REGISTRY, estimate_cost
@dataclass
class CallResult:
"""单次模型调用的结构化结果"""
model: str
output: str
latency: float # 秒
input_tokens: int
output_tokens: int
total_tokens: int
estimated_cost: float # 美元
error: str | None = None # 非 None 表示调用失败
async def call_single(
model_key: str,
system_prompt: str,
user_prompt: str,
temperature: float,
max_tokens: int,
) -> CallResult:
"""
调用单个模型并返回结构化结果。
Args:
model_key: MODEL_REGISTRY 中的键名(如 "DeepSeek-V3")
system_prompt: 系统提示词
user_prompt: 用户输入
temperature: 采样温度 [0, 2]
max_tokens: 最大输出 Token 数
Returns:
CallResult 对象,error 字段非 None 表示失败
"""
cfg = MODEL_REGISTRY[model_key]
start = time.perf_counter()
kwargs: dict[str, Any] = {
"model": cfg["litellm_id"],
"messages": [
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt},
],
"temperature": temperature,
"max_tokens": max_tokens,
}
if cfg.get("api_key_env"):
kwargs["api_key"] = os.environ.get(cfg["api_key_env"])
if cfg.get("base_url"):
kwargs["base_url"] = cfg["base_url"]
try:
resp = await acompletion(**kwargs)
latency = round(time.perf_counter() - start, 2)
usage = resp.usage
input_tok = usage.prompt_tokens
output_tok = usage.completion_tokens
return CallResult(
model=model_key,
output=resp.choices[0].message.content or "",
latency=latency,
input_tokens=input_tok,
output_tokens=output_tok,
total_tokens=usage.total_tokens,
estimated_cost=round(estimate_cost(model_key, input_tok, output_tok), 6),
)
except Exception as exc:
# 捕获所有异常(限流、超时、Key 失效等),不让单个失败拖垮整批
latency = round(time.perf_counter() - start, 2)
return CallResult(
model=model_key,
output="",
latency=latency,
input_tokens=0,
output_tokens=0,
total_tokens=0,
estimated_cost=0.0,
error=_friendly_error(exc),
)
def _friendly_error(exc: Exception) -> str:
"""将技术性异常转换为对用户友好的提示"""
msg = str(exc).lower()
if "rate limit" in msg or "429" in msg:
return "❌ API 限流,请稍后重试(建议降低并发频率)"
if "auth" in msg or "401" in msg or "invalid api key" in msg:
return "❌ API Key 无效,请检查 .env 配置"
if "timeout" in msg:
return "❌ 请求超时,模型响应过慢"
if "model_not_found" in msg or "404" in msg:
return "❌ 模型不可用,请检查模型名称或账户权限"
return f"❌ 调用失败:{exc.__class__.__name__}: {str(exc)[:100]}"
async def call_all(
selected_models: list[str],
system_prompt: str,
user_prompt: str,
temperature: float,
max_tokens: int,
) -> list[CallResult]:
"""
并发调用所有选中的模型。
关键设计:asyncio.gather 让所有请求同时发出,
总耗时 ≈ max(各模型耗时),而非 sum(各模型耗时)。
例如:DeepSeek-V3 需 2s,Qwen-Max 需 3s
串行总计 5s,并发只需约 3s。
"""
tasks = [
call_single(m, system_prompt, user_prompt, temperature, max_tokens)
for m in selected_models
]
# return_exceptions=False 已被上层 try/except 处理,此处无需重复兜底
results: list[CallResult] = await asyncio.gather(*tasks)
return results
关键点:
- call_single 通过 kwargs 字典动态组装调用参数,从 core_config.py 中读取 api_key_env 和 base_url,支持不同模型使用不同的认证和端点配置。
- asyncio.gather 的 return_exceptions=False(默认)配合内层 try/except 是推荐组合——异常被"就地消化"成 CallResult.error,而不是让 gather 抛出,调用方代码更简洁。
- ⚠️ 注意 acompletion 是 LiteLLM 的异步版本,不能和同步的 completion 混用。如果你在 Jupyter/Colab 里运行,需要用 nest_asyncio 解决事件循环嵌套问题(见"常见报错"节)。
Step 3:历史持久化层
目标:用 JSONL(每行一个 JSON 对象)格式记录每次实验,追加写入天然防止数据损坏,无需数据库,重启后历史依然存在。相比上一版,增加了内存缓存机制避免频繁读取文件。
# core/history.py
"""
实验历史持久化
格式选择:JSONL(JSON Lines)
- 追加写入:每次 save_run 只 append 一行,不重写整个文件
- 可读性强:每行独立,可用任何文本工具查看
- 容错性好:某行损坏不影响其他行的读取
"""
import json
import os
from datetime import datetime, timezone
from functools import lru_cache
from pathlib import Path
import pandas as pd
from core.caller import CallResult
HISTORY_FILE = Path("history.jsonl")
_history_cache = None
_cache_timestamp = None
def _invalidate_cache():
global _history_cache, _cache_timestamp
_history_cache = None
_cache_timestamp = None
def save_run(
system_prompt: str,
user_prompt: str,
selected_models: list[str],
temperature: float,
max_tokens: int,
results: list[CallResult],
scores: dict[str, int] | None = None,
notes: str = "",
) -> str:
"""
将一次实验追加写入 history.jsonl。
Returns:
本次实验的唯一 ID(时间戳格式)
"""
run_id = datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%S_%f")
record = {
"run_id": run_id,
"timestamp": datetime.now(timezone.utc).isoformat(),
"params": {
"system_prompt": system_prompt,
"user_prompt": user_prompt,
"selected_models": selected_models,
"temperature": temperature,
"max_tokens": max_tokens,
},
"results": [
{
"model": r.model,
"output": r.output,
"latency": r.latency,
"input_tokens": r.input_tokens,
"output_tokens": r.output_tokens,
"total_tokens": r.total_tokens,
"estimated_cost": r.estimated_cost,
"error": r.error,
"score": (scores or {}).get(r.model, -1),
}
for r in results
],
"notes": notes,
}
with open(HISTORY_FILE, "a", encoding="utf-8") as f:
f.write(json.dumps(record, ensure_ascii=False) + "\n")
_invalidate_cache()
return run_id
def load_history(use_cache: bool = True) -> pd.DataFrame:
"""
读取 history.jsonl 并转换为 DataFrame,供 Gradio 表格展示。
使用内存缓存避免重复读取文件。
Args:
use_cache: 是否使用缓存,False 时强制重新读取文件
Returns:
DataFrame,每行对应一次模型调用(非实验),按时间倒序排列
"""
global _history_cache, _cache_timestamp
columns = [
"run_id", "timestamp", "模型", "耗时(s)", "Tokens",
"费用($)", "评分", "User Prompt 预览", "备注"
]
if not HISTORY_FILE.exists():
return pd.DataFrame(columns=columns)
current_mtime = HISTORY_FILE.stat().st_mtime if HISTORY_FILE.exists() else 0
if use_cache and _history_cache is not None and _cache_timestamp == current_mtime:
return _history_cache
rows = []
with open(HISTORY_FILE, "r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if not line:
continue
try:
record = json.loads(line)
except json.JSONDecodeError:
continue
if "results" not in record:
continue
for result in record["results"]:
rows.append({
"run_id": record["run_id"],
"timestamp": record["timestamp"][:19].replace("T", " "),
"模型": result["model"],
"耗时(s)": result["latency"],
"Tokens": result["total_tokens"],
"费用($)": result["estimated_cost"],
"评分": result["score"] if result["score"] != -1 else "—",
"User Prompt 预览": record["params"]["user_prompt"][:40] + "...",
"备注": record.get("notes", ""),
})
df = pd.DataFrame(rows)
if not df.empty:
df = df.sort_values("timestamp", ascending=False).reset_index(drop=True)
else:
df = pd.DataFrame(columns=columns)
_history_cache = df
_cache_timestamp = current_mtime
return df
def get_run_by_id(run_id: str) -> dict | None:
"""通过 run_id 查找完整实验记录,用于历史回填功能"""
if not HISTORY_FILE.exists():
return None
with open(HISTORY_FILE, "r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if not line:
continue
try:
record = json.loads(line)
if record["run_id"] == run_id:
return record
except json.JSONDecodeError:
continue
return None
def export_comparison_report(run_ids: list[str]) -> str:
"""
将多条历史记录生成 Markdown 对比报告。
Args:
run_ids: 要对比的实验 ID 列表
Returns:
Markdown 格式的对比报告字符串
"""
records = [r for rid in run_ids if (r := get_run_by_id(rid))]
if not records:
return "未找到指定实验记录"
lines = ["# Prompt 实验对比报告\n"]
lines.append(f"生成时间:{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
lines.append(f"对比实验数:{len(records)}\n\n---\n")
for rec in records:
p = rec["params"]
lines.append(f"## 实验 `{rec['run_id']}`\n")
lines.append(f"**System Prompt**:{p['system_prompt'][:100]}...\n\n")
lines.append(f"**User Prompt**:{p['user_prompt']}\n\n")
lines.append(f"**参数**:Temperature={p['temperature']}, Max Tokens={p['max_tokens']}\n\n")
for r in rec["results"]:
score_str = f"⭐ {r['score']}/5" if r["score"] != -1 else "未评分"
lines.append(f"### {r['model']} — {score_str}\n")
if r["error"]:
lines.append(f"> {r['error']}\n\n")
else:
lines.append(
f"⏱ {r['latency']}s | 🪙 {r['total_tokens']} tokens | "
f"💰 ${r['estimated_cost']}\n\n"
)
lines.append(f"{r['output']}\n\n")
lines.append("---\n")
return "\n".join(lines)
关键点:
- JSONL 追加写入是"最简单的实验追踪系统"——MLflow、W&B 做的事情在这里用 50 行实现了 80%。
- ensure_ascii=False 必须加,否则中文 Prompt 会被转义为 \uXXXX,JSONL 文件人眼不可读。
- 新增 use_cache 参数和文件 mtime 缓存机制:load_history(use_cache=True) 只在文件未修改时返回缓存结果,load_history(use_cache=False) 强制重新读取。刷新历史按钮使用 False,确保看到最新数据。
- load_history 增加了对非实验记录行(如评分更新行 "results" not in record)的跳过逻辑。
- ⚠️ 多进程同时写入同一 JSONL 可能导致行交叉损坏,本工具为单用户单进程,无此风险。生产环境多进程写入需要文件锁或改用数据库。
Step 4:Gradio 界面搭建
目标:将核心逻辑包裹成交互界面。Gradio Blocks 允许自由布局——横向排列多模型输出是关键 UX 设计,让用户眼睛不用上下滚动即可对比差异。当前支持 DeepSeek-V3 和 Qwen-Max 两个模型,输出区固定为 2 列。
# app.py
"""
Prompt 调试器主程序
运行:python app.py
然后打开浏览器访问 http://localhost:7861
支持 DeepSeek 和 Qwen 模型。
"""
import asyncio
import json
from datetime import datetime
import gradio as gr
import pandas as pd
import core # 触发 dotenv 加载
from core.caller import call_all, CallResult, MODEL_REGISTRY
from core.history import (
export_comparison_report,
get_run_by_id,
load_history,
save_run,
)
ALL_MODELS = list(MODEL_REGISTRY.keys())
def format_result_markdown(r: CallResult) -> str:
"""将 CallResult 格式化为 Markdown,供 Gradio Markdown 组件展示"""
if r.error:
return f"## ❌ {r.model}\n\n{r.error}"
return (
f"## ✅ {r.model}\n\n"
f"⏱ **{r.latency}s** | "
f"🪙 **{r.total_tokens}** tokens "
f"({r.input_tokens} in / {r.output_tokens} out) | "
f"💰 **${r.estimated_cost}**\n\n"
f"---\n\n"
f"{r.output}"
)
def run_experiment(
system_prompt: str,
user_prompt: str,
selected_models: list[str],
temperature: float,
max_tokens: int,
) -> tuple:
"""
Gradio 事件处理函数:触发并发调用,返回各模型结果。
Returns:
2 个 Markdown 内容 + 状态信息,对应界面上 2 个输出列
"""
if not selected_models:
return ("⚠️ 请至少选择一个模型", "", "未选择模型")
if not user_prompt.strip():
return ("⚠️ User Prompt 不能为空", "", "Prompt 为空")
# 在同步函数中运行异步代码
# 注意:Gradio 5.x 在内部线程中运行事件处理器,asyncio.run() 是安全的
results: list[CallResult] = asyncio.run(
call_all(selected_models, system_prompt, user_prompt, temperature, int(max_tokens))
)
# 将结果映射到固定的 2 个输出槽
model_to_result = {r.model: r for r in results}
outputs = []
for model in ALL_MODELS: # 固定顺序:DeepSeek-V3, Qwen-Max
if model in model_to_result:
outputs.append(format_result_markdown(model_to_result[model]))
else:
outputs.append("") # 未选择则留空
# 保存历史(评分在 UI 中单独触发,此处先存 -1)
run_id = save_run(
system_prompt, user_prompt, selected_models,
temperature, int(max_tokens), results
)
status = (
f"✅ 实验完成 [{run_id}] — "
f"共 {len(results)} 个模型,"
f"总费用约 ${sum(r.estimated_cost for r in results):.6f}"
)
return tuple(outputs) + (status,)
def save_scores_and_notes(
run_id_input: str,
score_deepseek: int,
score_qwen: int,
notes: str,
) -> str:
"""将用户评分写回 history.jsonl(通过重写对应行实现)"""
record = {
"type": "score_update",
"target_run_id": run_id_input.strip(),
"timestamp": datetime.utcnow().isoformat(),
"scores": {
"DeepSeek-V3": score_deepseek,
"Qwen-Max": score_qwen,
},
"notes": notes,
}
with open("history.jsonl", "a", encoding="utf-8") as f:
f.write(json.dumps(record, ensure_ascii=False) + "\n")
return f"✅ 评分已保存到实验 {run_id_input.strip()}"
def refresh_history() -> pd.DataFrame:
"""刷新历史记录表格(强制重新读取文件)"""
return load_history(use_cache=False)
def fill_from_history(evt: gr.SelectData, df: pd.DataFrame):
"""
点击历史记录表格某行时,回填 Prompt 和参数到输入区。
Gradio SelectData 包含 index(行号)和 value(单元格值)。
我们通过行号找到 run_id,再从文件读取完整记录。
"""
if evt.index is None or df.empty:
return gr.update(), gr.update(), gr.update(), gr.update(), gr.update()
row_idx = evt.index[0]
if row_idx >= len(df):
return gr.update(), gr.update(), gr.update(), gr.update(), gr.update()
run_id = df.iloc[row_idx]["run_id"]
record = get_run_by_id(run_id)
if not record:
return gr.update(), gr.update(), gr.update(), gr.update(), gr.update()
p = record["params"]
return (
gr.update(value=p["system_prompt"]),
gr.update(value=p["user_prompt"]),
gr.update(value=p["selected_models"]),
gr.update(value=p["temperature"]),
gr.update(value=p["max_tokens"]),
)
# ─────────────── Gradio Blocks UI ───────────────
with gr.Blocks(
title="🔬 Prompt 调试器",
) as demo:
gr.Markdown("# 🔬 Prompt 调试器\n> 改变一个变量,观察输出变化,记录结论\n\n**支持的模型:DeepSeek-V3, Qwen-Max**")
# ── 输入区 ──
with gr.Row():
with gr.Column(scale=2):
system_box = gr.Textbox(
label="System Prompt",
placeholder="你是一个专业的代码审查员...",
lines=4,
value="You are a helpful assistant. Be concise and precise.",
)
with gr.Column(scale=2):
user_box = gr.Textbox(
label="User Prompt",
placeholder="在这里输入你的问题或指令...",
lines=4,
)
# ── 参数控制区 ──
with gr.Row():
with gr.Column(scale=2):
model_check = gr.CheckboxGroup(
choices=ALL_MODELS,
value=["DeepSeek-V3"],
label="选择模型(可多选,并发调用)",
)
with gr.Column(scale=1):
temp_slider = gr.Slider(
minimum=0.0, maximum=2.0, value=0.7, step=0.1,
label="Temperature(越高越随机)",
)
with gr.Column(scale=1):
token_slider = gr.Slider(
minimum=100, maximum=4000, value=1000, step=100,
label="Max Tokens(输出上限)",
)
run_btn = gr.Button("🚀 运行实验", variant="primary", size="lg")
status_box = gr.Textbox(label="实验状态", interactive=False)
# ── 输出区(2 列固定对应 2 个模型)──
gr.Markdown("## 📊 模型输出对比")
with gr.Row(equal_height=False):
out_deepseek = gr.Markdown(elem_classes=["output-col"])
out_qwen = gr.Markdown(elem_classes=["output-col"])
# ── 评分区 ──
with gr.Accordion("📝 手动评分(可选)", open=False):
gr.Markdown("打分后点击保存,评分会关联到本次实验 run_id")
run_id_input = gr.Textbox(
label="Run ID(从实验状态栏复制)", placeholder="20241201_143022_123456"
)
with gr.Row():
score_deepseek = gr.Slider(1, 5, value=3, step=1, label="DeepSeek-V3 评分")
score_qwen = gr.Slider(1, 5, value=3, step=1, label="Qwen-Max 评分")
notes_box = gr.Textbox(label="备注", placeholder="DeepSeek 格式更规范,但少了一个边界条件...")
save_score_btn = gr.Button("💾 保存评分")
score_status = gr.Textbox(label="保存状态", interactive=False)
# ── 历史记录区 ──
with gr.Accordion("📚 历史记录", open=False):
refresh_btn = gr.Button("🔄 刷新历史")
history_df = gr.DataFrame(
value=load_history(),
label="实验历史(点击行可回填参数)",
interactive=False,
wrap=True,
)
gr.Markdown("*点击表格中任意行,Prompt 和参数会自动回填到输入区*")
# ── 报告导出区 ──
with gr.Accordion("📄 导出对比报告", open=False):
export_ids = gr.Textbox(
label="输入 Run ID(多个用逗号分隔)",
placeholder="20241201_143022_123456, 20241201_150033_654321",
)
export_btn = gr.Button("📥 生成 Markdown 报告")
report_output = gr.Markdown()
# ─── 事件绑定 ───
run_btn.click(
fn=run_experiment,
inputs=[system_box, user_box, model_check, temp_slider, token_slider],
outputs=[out_deepseek, out_qwen, status_box],
)
save_score_btn.click(
fn=save_scores_and_notes,
inputs=[run_id_input, score_deepseek, score_qwen, notes_box],
outputs=[score_status],
)
refresh_btn.click(fn=refresh_history, outputs=[history_df])
history_df.select(
fn=fill_from_history,
inputs=[history_df],
outputs=[system_box, user_box, model_check, temp_slider, token_slider],
)
export_btn.click(
fn=lambda ids: export_comparison_report(
[x.strip() for x in ids.split(",") if x.strip()]
),
inputs=[export_ids],
outputs=[report_output],
)
if __name__ == "__main__":
demo.launch(
server_name="127.0.0.1",
server_port=7861,
theme=gr.themes.Soft(),
css=".output-col { min-height: 300px; }",
share=False,
show_error=True,
)
关键点:
- 输出固定为 2 个 Markdown 组件对应 2 个模型(DeepSeek-V3、Qwen-Max),而非动态创建组件——Gradio 的输出数量必须在 launch 前静态确定,这是一个常见的架构约束。
- 相比之前的 3 模型版本,输出从 (out_gpt, out_claude, out_deepseek, status_box) 缩减为 (out_deepseek, out_qwen, status_box),run_experiment 返回值从 4 元组变为 3 元组。
- 评分区相应缩减为 2 个滑块(DeepSeek-V3 评分、Qwen-Max 评分)。
- 服务器绑定 127.0.0.1:7861(原为 0.0.0.0:7860),theme 和 css 从 Blocks() 构造函数移到 launch() 参数中。
- ⚠️ asyncio.run() 在 Jupyter/Colab 中会报 This event loop is already running,因为 Jupyter 自带运行中的事件循环。解决方案见"常见报错"。
完整运行验证
冒烟测试(命令行)
# smoke_test.py — 端到端冒烟测试(不启动 UI,仅测试核心逻辑)
"""
运行:python smoke_test.py
预期:在 ~5 秒内并发拿到所有选中模型的响应
"""
import asyncio
import os
import sys
# 确保能找到 core 模块
sys.path.insert(0, os.path.dirname(__file__))
from dotenv import load_dotenv
load_dotenv() # 加载环境变量
from core.caller import call_all
from core.history import save_run, load_history
async def main():
print("🔬 Prompt 调试器 — 冒烟测试\n")
system = "You are a concise assistant. Answer in one sentence."
user = "What is the capital of France?"
models = ["DeepSeek-V3", "Qwen-Max"]
print(f"📤 发送到模型:{models}")
print(f"📝 User Prompt:{user}\n")
results = await call_all(
selected_models=models,
system_prompt=system,
user_prompt=user,
temperature=0.0, # 确定性输出便于验证
max_tokens=100,
)
print("=" * 60)
for r in results:
if r.error:
print(f"❌ {r.model}: {r.error}")
else:
print(f"✅ {r.model}")
print(f" 输出: {r.output.strip()}")
print(f" 耗时: {r.latency}s | Tokens: {r.total_tokens} | 费用: ${r.estimated_cost}")
print("=" * 60)
# 测试历史存储
run_id = save_run(system, user, models, 0.0, 100, results)
print(f"\n💾 历史已保存,Run ID: {run_id}")
df = load_history()
print(f"📚 当前历史记录数(行数): {len(df)}")
print(df.head(3).to_string())
print("\n✅ 冒烟测试通过!运行 `python app.py` 启动完整 UI")
if __name__ == "__main__":
asyncio.run(main())
预期输出:
🔬 Prompt 调试器 — 冒烟测试
📤 发送到模型:['DeepSeek-V3', 'Qwen-Max']
📝 User Prompt:What is the capital of France?
============================================================
✅ DeepSeek-V3
输出: The capital of France is Paris.
耗时: 0.87s | Tokens: 25 | 费用: $0.000003
✅ Qwen-Max
输出: The capital of France is Paris.
耗时: 1.23s | Tokens: 28 | 费用: $0.000035
============================================================
💾 历史已保存,Run ID: 20241201_143022_123456
📚 当前历史记录数(行数): 2
run_id timestamp 模型 耗时(s) Tokens 费用($) 评分 User Prompt 预览 备注
0 20241201_... 2024-12-01 14:30:22 DeepSeek-V3 0.87 25 0.000003 — What is the capital...
1 20241201_... 2024-12-01 14:30:22 Qwen-Max 1.23 28 0.000035 — What is the capital...
✅ 冒烟测试通过!运行 `python app.py` 启动完整 UI
pytest 测试
cd prompt-debugger
python -m pytest tests/ -v
项目包含 tests/test_main.py,覆盖以下测试类别:
- TestCoreConfig — 模型注册表结构、各辅助函数正确性
- TestModuleImports — 核心模块可导入性
- TestLLMCall — Mock 下的单模型调用、并发调用、错误处理(异步测试)
- TestHistory — 历史保存与读取(使用临时目录,不污染项目文件)
- TestAppHelpers — format_result_markdown 正常/异常场景
常见报错与解决方案
| 报错信息 | 原因 | 解决方案 |
|---|---|---|
This event loop is already running |
Colab/Jupyter 已有事件循环,asyncio.run() 冲突 |
在 Colab 首行运行 !pip install nest_asyncio,然后 import nest_asyncio; nest_asyncio.apply(),之后 asyncio.run() 即可正常使用 |
litellm.AuthenticationError: OpenAIException - Incorrect API key |
.env 未加载或 Key 填错 | 检查 .env 文件在项目根目录;确认 import core 在任何 litellm 调用之前执行 |
ModuleNotFoundError: No module named 'core' |
从错误目录启动 | 确保在项目根目录(包含 app.py 的目录)运行 python app.py |
gradio.Error: Cannot call ... as it has not been created yet |
Gradio 组件在 Blocks 上下文外创建 |
所有 gr.xxx() 组件定义必须在 with gr.Blocks() as demo: 缩进块内 |
TypeError: cannot unpack non-iterable NoneType |
模型未返回结果,run_experiment 返回了 None |
确认 run_experiment 函数所有分支都有显式 return,特别是异常路径 |
ConnectionError: ('Connection aborted.', RemoteDisconnected...) |
网络代理问题(国内访问 OpenAI) | Qwen 模型通过 DashScope 端点直接调用,无需代理;DeepSeek 若受限可单独使用 Qwen 测试 |
扩展练习(可选)
-
🟡 中等 — 增加 Diff 视图:在两个模型输出之间,用不同颜色高亮词语级别的差异。提示:Python 标准库
difflib.unified_diff可在字符级别计算差异,Gradio 支持 HTML 渲染,你可以用<span style="background:yellow">标记不同词。 -
🔴 困难 — Prompt 自动优化循环:接入 DSPy,将你手动评分的历史数据作为训练信号,让 DSPy 自动搜索更好的 Prompt 写法。核心思路:用
dspy.BootstrapFewShot以你的评分作为 metric,基于已有的(prompt, score)对优化出新的候选 Prompt,然后在调试器中一键测试新候选。 -
🟢 简单 — 增加更多模型:在
core_config.py的MODEL_REGISTRY中添加新模型条目(如通义千问的其他规格、MiniMax、Moonshot 等),app.py的输出列数量需要相应增加到与模型数一致。注意 Gradio 的 outputs 数量必须在launch前静态确定。
依赖说明
requirements.txt 内容(按实际代码使用):
litellm>=1.40.0
python-dotenv>=1.0.0
gradio>=4.0.0
pandas>=2.0.0
openai>=1.0.0
pytest>=7.0.0
环境变量说明
.env.example 定义了以下变量:
DEEPSEEK_API_KEY=your_deepseek_api_key_here # DeepSeek 模型必填
DASHSCOPE_API_KEY=your_dashscope_api_key_here # Qwen 模型必填
测试说明
tests/test_main.py 包含完整的 pytest 测试,类别如下:
- TestCoreConfig (9个用例):注册表结构、各辅助函数、schema 验证
- TestModuleImports (4个用例):caller、history、app 模块可导入性
- TestLLMCall (3个异步用例):Mock 单模型调用、并发调用、错误处理
- TestHistory (1个用例):使用临时目录的 save/load 测试
- TestAppHelpers (2个用例):format_result_markdown 正常/异常场景