3.2 【动手二】数据库查询 MCP Server
难度 ⭐⭐⭐ | 类型 Tools + Resources | 适合人群 后端开发者
实验目标
本节结束后,你将能够:
1. 独立实现一个支持 SQLite 的数据库查询 MCP Server,让 Claude 可以直接理解并查询数据库;
2. 掌握 MCP Resources 的设计模式——用 db://schema 等 URI 向 LLM 注入结构化上下文,而不是每次调用都靠 Prompt 塞表结构;
3. 理解 Text-to-SQL 场景下的只读安全保护、结果分页、Schema 感知三个核心工程问题,以及为什么它们在生产中不可忽视。
架构总览
graph TD
User["👤 用户自然语言提问"] --> Claude["🤖 Claude\n(MCP Host)"]
Claude -->|"读取 Resources\ndb://schema\ndb://tables"| MCPServer["🔧 Database MCP Server\n(FastMCP)"]
Claude -->|"调用 Tools\nexecute_query / describe_table / get_sample_data"| MCPServer
MCPServer --> Guard["🛡️ SQL 安全守卫\n只允许 SELECT\nAST 语法解析验证"]
Guard --> DB[(":floppy_disk: SQLite 数据库")]
DB --> Formatter["📋 结果格式化器\n行数截断 + JSON 序列化\n+ 列类型推断"]
Formatter --> MCPServer
MCPServer -->|"结构化结果"| Claude
Claude -->|"自然语言回答 + SQL 引用"| User
style Guard fill:#ff9999,stroke:#cc0000
style DB fill:#99ccff,stroke:#0066cc
style MCPServer fill:#99ff99,stroke:#006600
数据流说明:Claude 首先通过 Resources 拉取 Schema(这是被动注入,无需用户触发),再通过 Tools 执行具体查询。两种能力分工明确——Resources 负责"让 LLM 知道有什么",Tools 负责"让 LLM 拿到具体数据"。
环境准备
# 创建虚拟环境
uv venv && source .venv/bin/activate # Windows: .venv\Scripts\activate
# 安装依赖
uv pip install "mcp>=1.0.0" "sqlparse>=0.4.0" "python-dotenv>=1.0.0"
# 验证安装
python -c "import mcp, sqlparse; print('依赖安装成功')"
Colab 用户:
!pip install "mcp>=1.0.0" "sqlparse>=0.4.0"即可,SQLite 内置无需额外安装。
# 创建项目结构
mkdir -p mcp-database-server/data
cd mcp-database-server
Step-by-Step 实现
Step 1:生成示例数据库
目标:创建一个包含真实业务场景数据的 SQLite 数据库,让后续的 SQL 生成有实质意义可验证。
# scripts_create_sample_db.py
"""
生成示例电商数据库,包含:用户、商品、订单、订单明细 四张表
字段设计参考真实业务场景,包含外键关系和字段注释
"""
import sqlite3
import random
from datetime import datetime, timedelta
from pathlib import Path
DB_PATH = Path("data/sample.db")
DB_PATH.parent.mkdir(exist_ok=True)
def create_database():
conn = sqlite3.connect(DB_PATH)
cur = conn.cursor()
# 启用外键约束(SQLite 默认关闭)
cur.execute("PRAGMA foreign_keys = ON")
# ── 建表 ──────────────────────────────────────────────
cur.executescript("""
CREATE TABLE IF NOT EXISTS users (
user_id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT NOT NULL UNIQUE,
email TEXT NOT NULL UNIQUE,
region TEXT NOT NULL, -- 大区:华东/华北/华南/海外
created_at TEXT NOT NULL,
is_vip INTEGER NOT NULL DEFAULT 0 -- 0普通 1VIP
);
CREATE TABLE IF NOT EXISTS products (
product_id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
category TEXT NOT NULL, -- 数码/服装/食品/家居
price REAL NOT NULL,
stock INTEGER NOT NULL DEFAULT 0,
supplier_id INTEGER NOT NULL
);
CREATE TABLE IF NOT EXISTS orders (
order_id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER NOT NULL REFERENCES users(user_id),
status TEXT NOT NULL, -- pending/paid/shipped/done/cancelled
total_amount REAL NOT NULL,
created_at TEXT NOT NULL,
shipped_at TEXT
);
CREATE TABLE IF NOT EXISTS order_items (
item_id INTEGER PRIMARY KEY AUTOINCREMENT,
order_id INTEGER NOT NULL REFERENCES orders(order_id),
product_id INTEGER NOT NULL REFERENCES products(product_id),
quantity INTEGER NOT NULL,
unit_price REAL NOT NULL -- 下单时快照价格,避免价格变更影响历史记录
);
""")
# ── 写入种子数据 ──────────────────────────────────────
regions = ["华东", "华北", "华南", "海外"]
categories = ["数码", "服装", "食品", "家居"]
statuses = ["pending", "paid", "shipped", "done", "cancelled"]
# 50 名用户
users = [
(f"user_{i:03d}", f"user{i}@example.com",
random.choice(regions),
(datetime.now() - timedelta(days=random.randint(1, 365))).isoformat(),
random.randint(0, 1))
for i in range(1, 51)
]
cur.executemany(
"INSERT OR IGNORE INTO users (username,email,region,created_at,is_vip) VALUES (?,?,?,?,?)",
users
)
# 100 件商品
products = [
(f"商品_{i:03d}", random.choice(categories),
round(random.uniform(9.9, 9999.0), 2),
random.randint(0, 500),
random.randint(1, 10))
for i in range(1, 101)
]
cur.executemany(
"INSERT OR IGNORE INTO products (name,category,price,stock,supplier_id) VALUES (?,?,?,?,?)",
products
)
# 200 笔订单 + 明细
for i in range(1, 201):
user_id = random.randint(1, 50)
status = random.choice(statuses)
created_at = (datetime.now() - timedelta(days=random.randint(1, 180))).isoformat()
shipped_at = (datetime.now() - timedelta(days=random.randint(0, 10))).isoformat() \
if status in ("shipped", "done") else None
# 每笔订单 1-5 件商品
items = []
total = 0.0
for _ in range(random.randint(1, 5)):
product_id = random.randint(1, 100)
qty = random.randint(1, 3)
price = round(random.uniform(9.9, 999.0), 2)
items.append((product_id, qty, price))
total += qty * price
cur.execute(
"INSERT INTO orders (user_id,status,total_amount,created_at,shipped_at) VALUES (?,?,?,?,?)",
(user_id, status, round(total, 2), created_at, shipped_at)
)
order_id = cur.lastrowid
cur.executemany(
"INSERT INTO order_items (order_id,product_id,quantity,unit_price) VALUES (?,?,?,?)",
[(order_id, *item) for item in items]
)
conn.commit()
conn.close()
print(f"✅ 数据库已生成:{DB_PATH.resolve()}")
print(" 表:users(50行) / products(100行) / orders(200行) / order_items(~600行)")
if __name__ == "__main__":
create_database()
python scripts_create_sample_db.py
关键点:
- order_items.unit_price 存快照价格而非引用当前商品价格,这是真实电商系统的标准设计——LLM 生成 SQL 时如果不理解这一点会写出错误的 JOIN。后续的 describe_table Tool 会把这个字段注释暴露给模型。
- ⚠️ SQLite 默认不开启外键约束,不加 PRAGMA foreign_keys = ON 插入违规数据不会报错,会造成数据不一致。
Step 2:SQL 安全守卫模块
目标:在正式写 MCP Server 之前,先把"只读保护"逻辑单独封装成模块,因为这是本 Server 最重要的安全边界,需要比关键字匹配更可靠的实现。
# db_guard.py
"""
SQL 安全守卫:使用 sqlparse 进行 AST 级别的语句类型验证
为何不用简单的 startswith("SELECT"):
攻击者可以绕过:WITH evil AS (DELETE ...) SELECT 1
sqlparse 解析后能拿到真实的 statement type
"""
import sqlparse
from sqlparse.sql import Statement
from sqlparse.tokens import Keyword, DDL, DML
class SQLSecurityError(ValueError):
"""SQL 安全违规异常,区别于普通的 ValueError"""
pass
# 白名单:只允许这些 DML 操作
ALLOWED_STATEMENT_TYPES = {"SELECT"}
# 黑名单关键字(用于双重检查,防止 sqlparse 解析漏洞)
FORBIDDEN_KEYWORDS = {
"INSERT", "UPDATE", "DELETE", "DROP", "CREATE",
"ALTER", "TRUNCATE", "REPLACE", "MERGE", "EXEC",
"EXECUTE", "GRANT", "REVOKE", "ATTACH", "DETACH",
}
def validate_sql(sql: str) -> str:
"""
验证 SQL 语句安全性,返回规范化后的 SQL。
Args:
sql: 待验证的 SQL 字符串
Returns:
规范化后的 SQL(去除首尾空白,统一大小写关键字)
Raises:
SQLSecurityError: 包含非 SELECT 操作时抛出
ValueError: SQL 为空或无法解析时抛出
"""
sql = sql.strip()
if not sql:
raise ValueError("SQL 不能为空")
# ── 第一层:sqlparse AST 解析 ────────────────────────
parsed = sqlparse.parse(sql)
if not parsed:
raise ValueError("无法解析 SQL 语句")
for statement in parsed:
stmt_type = statement.get_type()
# get_type() 返回 None 表示无法识别,拒绝放行(fail-safe 原则)
if stmt_type is None or stmt_type.upper() not in ALLOWED_STATEMENT_TYPES:
raise SQLSecurityError(
f"安全拒绝:检测到非 SELECT 操作(类型:{stmt_type})。"
f"本 Server 仅允许只读查询。"
)
# ── 第二层:关键字黑名单兜底 ────────────────────────
# 防御 sqlparse 对复杂 CTE 解析不准确的边界情况
tokens = sqlparse.parse(sql)[0].flatten()
for token in tokens:
if token.ttype in (Keyword, DDL, DML):
if token.normalized.upper() in FORBIDDEN_KEYWORDS:
raise SQLSecurityError(
f"安全拒绝:检测到禁止关键字 '{token.normalized}'。"
)
return sql
def sanitize_identifier(name: str) -> str:
"""
验证表名/列名是否为合法标识符,防止 SQL 注入。
仅允许字母、数字、下划线,且不能以数字开头。
"""
import re
if not re.match(r'^[a-zA-Z_][a-zA-Z0-9_]*$', name):
raise ValueError(f"非法标识符:'{name}'。只允许字母、数字和下划线。")
return name
关键点:
- 用 sqlparse 做 AST 级解析而不是 startswith("SELECT"),原因是 WITH malicious AS (DELETE FROM users) SELECT 1 这类 CTE 注入用字符串匹配无法拦截。
- 双层防御:AST 检查 + 关键字黑名单,任一层触发就拒绝。fail-safe 原则——当 get_type() 返回 None(无法识别)时选择拒绝而非放行。
- ⚠️ 生产环境还应限制 SQL 执行时间(conn.set_timeout())防止慢查询拖垮服务。
Step 3:数据库连接与结果格式化
目标:封装 SQLite 数据库连接工厂,以及统一的查询结果格式化逻辑。
# db_backend.py
"""
数据库后端抽象层:支持 SQLite(本地开发)和 PostgreSQL(生产)双模式
通过环境变量 DATABASE_URL 切换,不需要改代码
"""
import json
import os
import sqlite3
from contextlib import contextmanager
from typing import Generator, Any
# 结果截断阈值(行数):防止意外的全表扫描把几十万行数据塞入 LLM 上下文
MAX_ROWS = 100
# 单个字段值的最大字符数:防止 TEXT 字段里的长文本撑爆上下文
MAX_FIELD_LENGTH = 500
def _truncate_value(v: Any) -> Any:
"""截断过长的字段值,并标注截断标记方便 LLM 理解"""
if isinstance(v, str) and len(v) > MAX_FIELD_LENGTH:
return v[:MAX_FIELD_LENGTH] + f"... [截断,原始长度 {len(v)} 字符]"
return v
def format_query_result(rows: list[dict], total_fetched: int) -> str:
"""
将查询结果格式化为 JSON 字符串,附带元信息。
元信息对 LLM 很重要:知道"只返回了 100 行中的 100 行"
有助于它告知用户结果可能被截断,而不是编造"共有100条记录"。
"""
result = {
"row_count": len(rows),
"truncated": total_fetched > MAX_ROWS,
"rows": [{k: _truncate_value(v) for k, v in row.items()} for row in rows],
}
if result["truncated"]:
result["note"] = f"查询结果已截断至 {MAX_ROWS} 行,请添加 LIMIT 子句获取精确结果"
return json.dumps(result, ensure_ascii=False, indent=2, default=str)
# ── SQLite 后端 ──────────────────────────────────────────
@contextmanager
def get_sqlite_conn(db_path: str) -> Generator[sqlite3.Connection, None, None]:
"""SQLite 连接上下文管理器,自动处理关闭和异常回滚"""
conn = sqlite3.connect(db_path, timeout=10) # 10秒锁等待超时
conn.row_factory = sqlite3.Row # 让结果支持按列名访问
conn.execute("PRAGMA query_only = ON") # 数据库级只读保护(双重保险)
try:
yield conn
finally:
conn.close()
def sqlite_execute(db_path: str, sql: str) -> str:
"""在 SQLite 上执行已验证的 SELECT 查询"""
with get_sqlite_conn(db_path) as conn:
cursor = conn.execute(sql)
# fetchmany 避免把百万行数据全部加载到内存
rows = [dict(r) for r in cursor.fetchmany(MAX_ROWS + 1)]
total = len(rows)
return format_query_result(rows[:MAX_ROWS], total)
def sqlite_get_schema(db_path: str) -> dict:
"""提取 SQLite 完整 Schema:表名 → 列信息列表"""
with get_sqlite_conn(db_path) as conn:
tables = conn.execute(
"SELECT name FROM sqlite_master WHERE type='table' AND name NOT LIKE 'sqlite_%'"
).fetchall()
schema = {}
for (table_name,) in tables:
cols = conn.execute(f"PRAGMA table_info({table_name})").fetchall()
fks = conn.execute(f"PRAGMA foreign_key_list({table_name})").fetchall()
fk_map = {fk[3]: f"{fk[2]}.{fk[4]}" for fk in fks} # col → ref_table.ref_col
schema[table_name] = [
{
"column": col[1],
"type": col[2],
"nullable": not col[3],
"default": col[4],
"primary_key": bool(col[5]),
"references": fk_map.get(col[1]), # 外键引用,帮助 LLM 理解关联关系
}
for col in cols
]
return schema
def sqlite_describe_table(db_path: str, table_name: str) -> dict:
"""获取单表的详细结构信息"""
schema = sqlite_get_schema(db_path)
if table_name not in schema:
available = list(schema.keys())
raise ValueError(f"表 '{table_name}' 不存在。可用的表:{available}")
return {"table": table_name, "columns": schema[table_name]}
def sqlite_get_sample(db_path: str, table_name: str, limit: int) -> str:
"""获取样例数据(复用 execute,但 table_name 需要额外验证)"""
from db_guard import sanitize_identifier
safe_name = sanitize_identifier(table_name)
safe_limit = min(max(1, limit), 20) # 样例数据强制限制 1-20 行
sql = f"SELECT * FROM {safe_name} LIMIT {safe_limit}"
with get_sqlite_conn(db_path) as conn:
cursor = conn.execute(sql)
rows = [dict(r) for r in cursor.fetchall()]
return format_query_result(rows, len(rows))
关键点:
- PRAGMA query_only = ON 是 SQLite 层面的只读保护,配合应用层的 validate_sql 形成双重防御——即使应用层守卫被绕过,数据库驱动层也会拒绝写操作。
- fetchmany(MAX_ROWS + 1) 而非 fetchall():多取一行是为了判断"是否有更多数据"(通过 total > MAX_ROWS 判断),避免触发全表加载。
- 外键信息(PRAGMA foreign_key_list)对 LLM 生成准确 JOIN 语句至关重要,不能省略。
Step 4:MCP Server 主体
目标:将安全守卫和数据库后端组装成完整的 MCP Server,注册 Tools 和 Resources。
# server.py
"""
数据库查询 MCP Server
支持 Claude 通过自然语言查询 SQLite / PostgreSQL 数据库
运行方式:
python server.py # stdio 模式(配合 Claude Desktop)
python server.py --transport sse # SSE 模式(配合 Web 客户端,默认端口 8000)
"""
import json
import os
import sys
from pathlib import Path
from mcp.server.fastmcp import FastMCP
from db_backend import (
sqlite_execute,
sqlite_get_schema,
sqlite_describe_table,
sqlite_get_sample,
)
from db_guard import SQLSecurityError, validate_sql
# ── 配置 ─────────────────────────────────────────────────
# 优先读环境变量,方便在不同环境(本地/容器/云函数)切换数据库路径
DB_PATH = os.environ.get("DB_PATH", "data/sample.db")
# 验证数据库文件存在(早失败原则:启动时就报错,而不是在第一次请求时才发现)
if not Path(DB_PATH).exists():
sys.exit(f"❌ 数据库文件不存在:{DB_PATH}\n请先运行 python scripts_create_sample_db.py")
# ── 创建 MCP Server 实例 ──────────────────────────────────
mcp = FastMCP(
name="database-server",
# Server 的 instructions 会被注入到 Claude 的系统提示中
# 告诉 Claude 这个 Server 的能力边界和使用约定
instructions="""
你有权限查询一个电商业务数据库(SQLite)。
工作流程建议:
1. 先通过 db://schema Resource 了解完整表结构
2. 不确定字段含义时,用 describe_table 获取详细说明
3. 不熟悉数据分布时,用 get_sample_data 查看样例
4. 用 execute_query 执行最终的 SELECT 查询
注意事项:
- 仅支持 SELECT 查询,写操作会被拒绝
- 结果最多返回 100 行,超出会被截断并提示
- 涉及金额计算时注意使用 order_items.unit_price(快照价格)而非 products.price(当前价格)
""",
)
# ═══════════════════════════════════════════════════════════
# TOOLS:Claude 主动调用,执行具体操作
# ═══════════════════════════════════════════════════════════
@mcp.tool()
def execute_query(sql: str) -> str:
"""
执行 SQL SELECT 查询并返回结果。
Args:
sql: 标准 SQL SELECT 语句。支持 JOIN、子查询、聚合函数、CTE(WITH 子句)。
示例:SELECT u.username, COUNT(o.order_id) as order_count
FROM users u LEFT JOIN orders o ON u.user_id = o.user_id
GROUP BY u.user_id ORDER BY order_count DESC LIMIT 10
Returns:
JSON 格式的查询结果,包含 row_count、truncated 标志和 rows 数组。
若结果超过 100 行会被截断,truncated 字段为 true。
"""
try:
validated_sql = validate_sql(sql)
except SQLSecurityError as e:
# 安全拒绝:返回明确的错误信息让 Claude 理解原因
return json.dumps({"error": "SECURITY_REJECTED", "message": str(e)}, ensure_ascii=False)
except ValueError as e:
return json.dumps({"error": "INVALID_SQL", "message": str(e)}, ensure_ascii=False)
try:
return sqlite_execute(DB_PATH, validated_sql)
except Exception as e:
# 数据库执行错误:把原始报错返回给 Claude,让它自行修正 SQL
# 这是 Text-to-SQL 自修正循环的关键——错误信息本身就是上下文
return json.dumps({
"error": "EXECUTION_ERROR",
"message": str(e),
"hint": "请检查表名、列名是否正确,可以用 describe_table 确认表结构",
}, ensure_ascii=False)
@mcp.tool()
def describe_table(table_name: str) -> str:
"""
获取指定表的详细结构信息,包括列名、数据类型、是否可空、主键和外键关系。
当你不确定某个表有哪些字段,或需要了解表间关联关系时使用此工具。
Args:
table_name: 表名(区分大小写)。如:users、orders、order_items、products
Returns:
JSON 格式的表结构描述,包含所有列的详细信息和外键引用关系。
"""
try:
result = sqlite_describe_table(DB_PATH, table_name)
return json.dumps(result, ensure_ascii=False, indent=2)
except ValueError as e:
return json.dumps({"error": "TABLE_NOT_FOUND", "message": str(e)}, ensure_ascii=False)
@mcp.tool()
def get_sample_data(table_name: str, limit: int = 5) -> str:
"""
获取指定表的样例数据,帮助理解数据分布和字段取值范围。
适用场景:
- 不确定某个枚举字段有哪些可能值(如 status、region、category)
- 需要了解数据的大致规模和质量
- 构造 WHERE 条件前先看看实际数据长什么样
Args:
table_name: 表名
limit: 返回行数,范围 1-20,默认 5
Returns:
JSON 格式的样例数据
"""
try:
return sqlite_get_sample(DB_PATH, table_name, limit)
except ValueError as e:
return json.dumps({"error": str(e)}, ensure_ascii=False)
# ═══════════════════════════════════════════════════════════
# RESOURCES:被动注入,Claude 主动读取作为背景知识
# ═══════════════════════════════════════════════════════════
@mcp.resource("db://schema")
def get_full_schema() -> str:
"""
返回完整数据库 Schema,供 Claude 在生成 SQL 前参考。
这是一个 Resource 而非 Tool:Claude 在对话开始时会自动读取它,
不需要用户每次都在 Prompt 里粘贴表结构。
Returns:
JSON 格式的完整 Schema,结构为 {表名: [{列信息}]}
"""
schema = sqlite_get_schema(DB_PATH)
# 附加 Schema 使用说明,引导 LLM 正确理解
result = {
"database_type": "SQLite",
"database_path": DB_PATH,
"schema": schema,
"important_notes": [
"order_items.unit_price 是下单时的快照价格,用于历史金额计算",
"products.price 是当前售价,可能与历史订单价格不同",
"orders.status 枚举值:pending/paid/shipped/done/cancelled",
"users.is_vip: 0=普通用户, 1=VIP用户",
],
}
return json.dumps(result, ensure_ascii=False, indent=2)
@mcp.resource("db://tables")
def get_table_list() -> str:
"""
返回数据库中所有表的名称列表,适合快速了解数据库结构。
Returns:
JSON 格式的表名列表及简要说明
"""
schema = sqlite_get_schema(DB_PATH)
# 为每张表附加简要业务说明,帮助 LLM 快速定位目标表
table_descriptions = {
"users": "用户信息,包含大区、VIP等级",
"products": "商品信息,包含分类、价格、库存",
"orders": "订单主表,包含状态、总金额、时间",
"order_items": "订单明细,记录每笔订单的商品、数量、成交价",
}
result = {
"tables": [
{
"name": t,
"description": table_descriptions.get(t, ""),
"column_count": len(cols),
}
for t, cols in schema.items()
]
}
return json.dumps(result, ensure_ascii=False, indent=2)
# ═══════════════════════════════════════════════════════════
# 入口
# ═══════════════════════════════════════════════════════════
if __name__ == "__main__":
transport = "stdio"
if "--transport" in sys.argv:
idx = sys.argv.index("--transport")
transport = sys.argv[idx + 1]
print(f"🚀 Database MCP Server 启动中(transport={transport}, db={DB_PATH})", file=sys.stderr)
mcp.run(transport=transport)
关键点:
- Tools 的 docstring 不只是给开发者看的——FastMCP 会把它注入到 Claude 的工具描述里。写清楚"什么情况用这个工具、参数格式、返回格式"直接影响 Claude 能否正确调用。
- 执行错误的完整信息要透传给 Claude:{"error": "EXECUTION_ERROR", "message": "no such column: usr.name"} 这样的错误信息能让 Claude 理解哪里写错了,自动重试修正——这是 Text-to-SQL 自修正循环的基础。
- ⚠️ 生产注意:instructions 字段的内容会被注入系统提示,注意不要在这里写入任何敏感的业务逻辑或配置。
- 当前 FastMCP 构造中不包含 version 参数(代码中未设置),与部分文档中可能出现的 version="1.0.0" 不一致。
Step 5:配置接入 Claude Desktop
目标:将 MCP Server 注册到 Claude Desktop,实现真实对话驱动的数据库查询。
// ~/Library/Application Support/Claude/claude_desktop_config.json(macOS)
// %APPDATA%\Claude\claude_desktop_config.json(Windows)
{
"mcpServers": {
"database": {
"command": "/path/to/mcp-database-server/.venv/bin/python",
"args": ["/path/to/mcp-database-server/server.py"],
"env": {
"DB_PATH": "/path/to/mcp-database-server/data/sample.db"
}
}
}
}
# 快速获取 Python 路径
which python # 或: source .venv/bin/activate && which python
⚠️ 生产注意:
command必须使用虚拟环境内的 Python 绝对路径,使用系统 Python 会因为缺少依赖而启动失败。配置修改后需完全退出并重启 Claude Desktop。
完整运行验证
除了接入 Claude Desktop,还可以用 MCP 官方命令行工具做自动化验证:
# tests_smoke_test.py
"""
端到端冒烟测试:通过 MCP Python SDK 直接调用 Server
不依赖 Claude Desktop,适合 CI 环境
"""
import asyncio
import json
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
async def run_smoke_test():
server_params = StdioServerParameters(
command="python",
args=["server.py"],
env={"DB_PATH": "data/sample.db"},
)
async with stdio_client(server_params) as (read, write):
async with ClientSession(read, write) as session:
await session.initialize()
print("=" * 50)
print("🧪 MCP Database Server 冒烟测试")
print("=" * 50)
# ── 测试 1:列出所有工具 ──────────────────────
tools = await session.list_tools()
tool_names = [t.name for t in tools.tools]
print(f"\n✅ 工具列表:{tool_names}")
assert set(tool_names) == {"execute_query", "describe_table", "get_sample_data"}, \
f"工具列表不符合预期:{tool_names}"
# ── 测试 2:列出所有 Resources ───────────────
resources = await session.list_resources()
resource_uris = [str(r.uri) for r in resources.resources]
print(f"✅ Resources:{resource_uris}")
assert "db://schema" in resource_uris
# ── 测试 3:读取 Schema Resource ─────────────
schema_result = await session.read_resource("db://schema")
schema_text = schema_result.contents[0].text
schema_data = json.loads(schema_text)
tables_in_schema = list(schema_data["schema"].keys())
print(f"✅ Schema 包含表:{tables_in_schema}")
assert "users" in tables_in_schema and "orders" in tables_in_schema
# ── 测试 4:describe_table Tool ──────────────
desc_result = await session.call_tool("describe_table", {"table_name": "orders"})
desc_data = json.loads(desc_result.content[0].text)
col_names = [c["column"] for c in desc_data["columns"]]
print(f"✅ orders 表字段:{col_names}")
assert "order_id" in col_names and "total_amount" in col_names
# ── 测试 5:execute_query 正常查询 ───────────
query_result = await session.call_tool(
"execute_query",
{"sql": "SELECT COUNT(*) as user_count FROM users"}
)
query_data = json.loads(query_result.content[0].text)
user_count = query_data["rows"][0]["user_count"]
print(f"✅ 用户总数:{user_count}")
assert user_count == 50, f"预期 50 个用户,实际 {user_count}"
# ── 测试 6:安全拦截验证 ─────────────────────
evil_result = await session.call_tool(
"execute_query",
{"sql": "DELETE FROM users WHERE 1=1"}
)
evil_data = json.loads(evil_result.content[0].text)
print(f"✅ 危险 SQL 被拦截:{evil_data['error']}")
assert evil_data["error"] == "SECURITY_REJECTED"
# ── 测试 7:复杂业务查询 ─────────────────────
biz_sql = """
SELECT u.region,
COUNT(DISTINCT o.order_id) as order_count,
ROUND(SUM(o.total_amount), 2) as total_revenue
FROM users u
JOIN orders o ON u.user_id = o.user_id
WHERE o.status = 'done'
GROUP BY u.region
ORDER BY total_revenue DESC
"""
biz_result = await session.call_tool("execute_query", {"sql": biz_sql})
biz_data = json.loads(biz_result.content[0].text)
print(f"✅ 各地区完成订单统计({biz_data['row_count']} 行):")
for row in biz_data["rows"]:
print(f" {row['region']}: {row['order_count']} 笔, ¥{row['total_revenue']}")
print("\n🎉 所有测试通过!")
if __name__ == "__main__":
asyncio.run(run_smoke_test())
python tests_smoke_test.py
预期输出:
==================================================
🧪 MCP Database Server 冒烟测试
==================================================
✅ 工具列表:['execute_query', 'describe_table', 'get_sample_data']
✅ Resources:['db://schema', 'db://tables']
✅ Schema 包含表:['users', 'products', 'orders', 'order_items']
✅ orders 表字段:['order_id', 'user_id', 'status', 'total_amount', 'created_at', 'shipped_at']
✅ 用户总数:50
✅ 危险 SQL 被拦截:SECURITY_REJECTED
✅ 各地区完成订单统计(4 行):
华东: 23 笔, ¥58432.10
华南: 18 笔, ¥41209.55
华北: 15 笔, ¥33876.20
海外: 9 笔, ¥19443.80
🎉 所有测试通过!
接入 Claude Desktop 后,你可以直接用自然语言提问: - "过去 30 天内,每个品类的销售额是多少?" - "找出购买次数超过 5 次的 VIP 用户" - "库存低于 10 件的商品有哪些?"
常见报错与解决方案
| 报错信息 | 原因 | 解决方案 |
|---|---|---|
ModuleNotFoundError: No module named 'mcp' |
虚拟环境未激活,或依赖未安装 | source .venv/bin/activate && uv pip install "mcp>=1.0.0" |
❌ 数据库文件不存在:data/sample.db |
未运行建库脚本 | python scripts_create_sample_db.py |
sqlite3.OperationalError: attempt to write a readonly database |
PRAGMA query_only 拦截了写操作(符合预期) |
这是安全机制正常工作,检查 SQL 是否包含写操作 |
| Claude Desktop 看不到 MCP 工具 | claude_desktop_config.json 路径或 Python 路径错误 |
用 which python 确认路径,且必须是 .venv 内的 Python |
json.JSONDecodeError in smoke_test |
Server 启动失败,stderr 有报错信息被误读为 JSON | 检查 DB_PATH 环境变量是否正确,Server 启动日志写到 stderr 不会影响 MCP 通信 |
sqlparse.parse() 返回空列表 |
传入了空字符串或纯注释 SQL | 在 validate_sql 入口已有空检查,确认调用方没有传空字符串 |
扩展练习(可选)
-
🟡 中等:接入 PostgreSQL 后端。在
db_backend.py中新增postgres_execute等函数,通过DATABASE_URL环境变量(格式:postgresql://user:pass@host/dbname)自动选择后端。提示:用psycopg2.connect(DATABASE_URL)替换sqlite3.connect(),结果集处理逻辑基本相同,注意cursor.description替代PRAGMA table_info获取列信息。 -
🔴 困难:实现 SQL 执行时间统计与慢查询告警。在
execute_queryTool 中用time.perf_counter()记录执行耗时,超过 2 秒时在返回结果中附加"warning": "慢查询:执行耗时 {elapsed:.2f}s,建议添加索引或优化 SQL",同时将慢查询 SQL 和耗时写入本地日志文件,并新增一个get_slow_queries()Tool 供 Claude 主动查询慢查询历史。完成后思考:生产环境应该把这个日志发到哪里?