跳转至

3.2 【动手二】数据库查询 MCP Server

难度 ⭐⭐⭐ | 类型 Tools + Resources | 适合人群 后端开发者


实验目标

本节结束后,你将能够: 1. 独立实现一个支持 SQLite 的数据库查询 MCP Server,让 Claude 可以直接理解并查询数据库; 2. 掌握 MCP Resources 的设计模式——用 db://schema 等 URI 向 LLM 注入结构化上下文,而不是每次调用都靠 Prompt 塞表结构; 3. 理解 Text-to-SQL 场景下的只读安全保护、结果分页、Schema 感知三个核心工程问题,以及为什么它们在生产中不可忽视。


架构总览

graph TD
    User["👤 用户自然语言提问"] --> Claude["🤖 Claude\n(MCP Host)"]

    Claude -->|"读取 Resources\ndb://schema\ndb://tables"| MCPServer["🔧 Database MCP Server\n(FastMCP)"]
    Claude -->|"调用 Tools\nexecute_query / describe_table / get_sample_data"| MCPServer

    MCPServer --> Guard["🛡️ SQL 安全守卫\n只允许 SELECT\nAST 语法解析验证"]
    Guard --> DB[(":floppy_disk: SQLite 数据库")]
    DB --> Formatter["📋 结果格式化器\n行数截断 + JSON 序列化\n+ 列类型推断"]
    Formatter --> MCPServer
    MCPServer -->|"结构化结果"| Claude
    Claude -->|"自然语言回答 + SQL 引用"| User

    style Guard fill:#ff9999,stroke:#cc0000
    style DB fill:#99ccff,stroke:#0066cc
    style MCPServer fill:#99ff99,stroke:#006600

数据流说明:Claude 首先通过 Resources 拉取 Schema(这是被动注入,无需用户触发),再通过 Tools 执行具体查询。两种能力分工明确——Resources 负责"让 LLM 知道有什么",Tools 负责"让 LLM 拿到具体数据"。


环境准备

# 创建虚拟环境
uv venv && source .venv/bin/activate  # Windows: .venv\Scripts\activate

# 安装依赖
uv pip install "mcp>=1.0.0" "sqlparse>=0.4.0" "python-dotenv>=1.0.0"

# 验证安装
python -c "import mcp, sqlparse; print('依赖安装成功')"

Colab 用户!pip install "mcp>=1.0.0" "sqlparse>=0.4.0" 即可,SQLite 内置无需额外安装。

# 创建项目结构
mkdir -p mcp-database-server/data
cd mcp-database-server

Step-by-Step 实现

Step 1:生成示例数据库

目标:创建一个包含真实业务场景数据的 SQLite 数据库,让后续的 SQL 生成有实质意义可验证。

# scripts_create_sample_db.py
"""
生成示例电商数据库,包含:用户、商品、订单、订单明细 四张表
字段设计参考真实业务场景,包含外键关系和字段注释
"""
import sqlite3
import random
from datetime import datetime, timedelta
from pathlib import Path

DB_PATH = Path("data/sample.db")
DB_PATH.parent.mkdir(exist_ok=True)

def create_database():
    conn = sqlite3.connect(DB_PATH)
    cur = conn.cursor()

    # 启用外键约束(SQLite 默认关闭)
    cur.execute("PRAGMA foreign_keys = ON")

    # ── 建表 ──────────────────────────────────────────────
    cur.executescript("""
        CREATE TABLE IF NOT EXISTS users (
            user_id     INTEGER PRIMARY KEY AUTOINCREMENT,
            username    TEXT    NOT NULL UNIQUE,
            email       TEXT    NOT NULL UNIQUE,
            region      TEXT    NOT NULL,               -- 大区:华东/华北/华南/海外
            created_at  TEXT    NOT NULL,
            is_vip      INTEGER NOT NULL DEFAULT 0      -- 0普通 1VIP
        );

        CREATE TABLE IF NOT EXISTS products (
            product_id   INTEGER PRIMARY KEY AUTOINCREMENT,
            name         TEXT    NOT NULL,
            category     TEXT    NOT NULL,              -- 数码/服装/食品/家居
            price        REAL    NOT NULL,
            stock        INTEGER NOT NULL DEFAULT 0,
            supplier_id  INTEGER NOT NULL
        );

        CREATE TABLE IF NOT EXISTS orders (
            order_id     INTEGER PRIMARY KEY AUTOINCREMENT,
            user_id      INTEGER NOT NULL REFERENCES users(user_id),
            status       TEXT    NOT NULL,              -- pending/paid/shipped/done/cancelled
            total_amount REAL    NOT NULL,
            created_at   TEXT    NOT NULL,
            shipped_at   TEXT
        );

        CREATE TABLE IF NOT EXISTS order_items (
            item_id    INTEGER PRIMARY KEY AUTOINCREMENT,
            order_id   INTEGER NOT NULL REFERENCES orders(order_id),
            product_id INTEGER NOT NULL REFERENCES products(product_id),
            quantity   INTEGER NOT NULL,
            unit_price REAL    NOT NULL                 -- 下单时快照价格,避免价格变更影响历史记录
        );
    """)

    # ── 写入种子数据 ──────────────────────────────────────
    regions = ["华东", "华北", "华南", "海外"]
    categories = ["数码", "服装", "食品", "家居"]
    statuses = ["pending", "paid", "shipped", "done", "cancelled"]

    # 50 名用户
    users = [
        (f"user_{i:03d}", f"user{i}@example.com",
         random.choice(regions),
         (datetime.now() - timedelta(days=random.randint(1, 365))).isoformat(),
         random.randint(0, 1))
        for i in range(1, 51)
    ]
    cur.executemany(
        "INSERT OR IGNORE INTO users (username,email,region,created_at,is_vip) VALUES (?,?,?,?,?)",
        users
    )

    # 100 件商品
    products = [
        (f"商品_{i:03d}", random.choice(categories),
         round(random.uniform(9.9, 9999.0), 2),
         random.randint(0, 500),
         random.randint(1, 10))
        for i in range(1, 101)
    ]
    cur.executemany(
        "INSERT OR IGNORE INTO products (name,category,price,stock,supplier_id) VALUES (?,?,?,?,?)",
        products
    )

    # 200 笔订单 + 明细
    for i in range(1, 201):
        user_id = random.randint(1, 50)
        status = random.choice(statuses)
        created_at = (datetime.now() - timedelta(days=random.randint(1, 180))).isoformat()
        shipped_at = (datetime.now() - timedelta(days=random.randint(0, 10))).isoformat() \
            if status in ("shipped", "done") else None

        # 每笔订单 1-5 件商品
        items = []
        total = 0.0
        for _ in range(random.randint(1, 5)):
            product_id = random.randint(1, 100)
            qty = random.randint(1, 3)
            price = round(random.uniform(9.9, 999.0), 2)
            items.append((product_id, qty, price))
            total += qty * price

        cur.execute(
            "INSERT INTO orders (user_id,status,total_amount,created_at,shipped_at) VALUES (?,?,?,?,?)",
            (user_id, status, round(total, 2), created_at, shipped_at)
        )
        order_id = cur.lastrowid
        cur.executemany(
            "INSERT INTO order_items (order_id,product_id,quantity,unit_price) VALUES (?,?,?,?)",
            [(order_id, *item) for item in items]
        )

    conn.commit()
    conn.close()
    print(f"✅ 数据库已生成:{DB_PATH.resolve()}")
    print("   表:users(50行) / products(100行) / orders(200行) / order_items(~600行)")

if __name__ == "__main__":
    create_database()
python scripts_create_sample_db.py

关键点: - order_items.unit_price 存快照价格而非引用当前商品价格,这是真实电商系统的标准设计——LLM 生成 SQL 时如果不理解这一点会写出错误的 JOIN。后续的 describe_table Tool 会把这个字段注释暴露给模型。 - ⚠️ SQLite 默认不开启外键约束,不加 PRAGMA foreign_keys = ON 插入违规数据不会报错,会造成数据不一致。


Step 2:SQL 安全守卫模块

目标:在正式写 MCP Server 之前,先把"只读保护"逻辑单独封装成模块,因为这是本 Server 最重要的安全边界,需要比关键字匹配更可靠的实现。

# db_guard.py
"""
SQL 安全守卫:使用 sqlparse 进行 AST 级别的语句类型验证
为何不用简单的 startswith("SELECT"):
  攻击者可以绕过:WITH evil AS (DELETE ...) SELECT 1
  sqlparse 解析后能拿到真实的 statement type
"""
import sqlparse
from sqlparse.sql import Statement
from sqlparse.tokens import Keyword, DDL, DML


class SQLSecurityError(ValueError):
    """SQL 安全违规异常,区别于普通的 ValueError"""
    pass


# 白名单:只允许这些 DML 操作
ALLOWED_STATEMENT_TYPES = {"SELECT"}

# 黑名单关键字(用于双重检查,防止 sqlparse 解析漏洞)
FORBIDDEN_KEYWORDS = {
    "INSERT", "UPDATE", "DELETE", "DROP", "CREATE",
    "ALTER", "TRUNCATE", "REPLACE", "MERGE", "EXEC",
    "EXECUTE", "GRANT", "REVOKE", "ATTACH", "DETACH",
}


def validate_sql(sql: str) -> str:
    """
    验证 SQL 语句安全性,返回规范化后的 SQL。

    Args:
        sql: 待验证的 SQL 字符串

    Returns:
        规范化后的 SQL(去除首尾空白,统一大小写关键字)

    Raises:
        SQLSecurityError: 包含非 SELECT 操作时抛出
        ValueError: SQL 为空或无法解析时抛出
    """
    sql = sql.strip()
    if not sql:
        raise ValueError("SQL 不能为空")

    # ── 第一层:sqlparse AST 解析 ────────────────────────
    parsed = sqlparse.parse(sql)
    if not parsed:
        raise ValueError("无法解析 SQL 语句")

    for statement in parsed:
        stmt_type = statement.get_type()
        # get_type() 返回 None 表示无法识别,拒绝放行(fail-safe 原则)
        if stmt_type is None or stmt_type.upper() not in ALLOWED_STATEMENT_TYPES:
            raise SQLSecurityError(
                f"安全拒绝:检测到非 SELECT 操作(类型:{stmt_type})。"
                f"本 Server 仅允许只读查询。"
            )

    # ── 第二层:关键字黑名单兜底 ────────────────────────
    # 防御 sqlparse 对复杂 CTE 解析不准确的边界情况
    tokens = sqlparse.parse(sql)[0].flatten()
    for token in tokens:
        if token.ttype in (Keyword, DDL, DML):
            if token.normalized.upper() in FORBIDDEN_KEYWORDS:
                raise SQLSecurityError(
                    f"安全拒绝:检测到禁止关键字 '{token.normalized}'。"
                )

    return sql


def sanitize_identifier(name: str) -> str:
    """
    验证表名/列名是否为合法标识符,防止 SQL 注入。
    仅允许字母、数字、下划线,且不能以数字开头。
    """
    import re
    if not re.match(r'^[a-zA-Z_][a-zA-Z0-9_]*$', name):
        raise ValueError(f"非法标识符:'{name}'。只允许字母、数字和下划线。")
    return name

关键点: - 用 sqlparse 做 AST 级解析而不是 startswith("SELECT"),原因是 WITH malicious AS (DELETE FROM users) SELECT 1 这类 CTE 注入用字符串匹配无法拦截。 - 双层防御:AST 检查 + 关键字黑名单,任一层触发就拒绝。fail-safe 原则——当 get_type() 返回 None(无法识别)时选择拒绝而非放行。 - ⚠️ 生产环境还应限制 SQL 执行时间(conn.set_timeout())防止慢查询拖垮服务。


Step 3:数据库连接与结果格式化

目标:封装 SQLite 数据库连接工厂,以及统一的查询结果格式化逻辑。

# db_backend.py
"""
数据库后端抽象层:支持 SQLite(本地开发)和 PostgreSQL(生产)双模式
通过环境变量 DATABASE_URL 切换,不需要改代码
"""
import json
import os
import sqlite3
from contextlib import contextmanager
from typing import Generator, Any

# 结果截断阈值(行数):防止意外的全表扫描把几十万行数据塞入 LLM 上下文
MAX_ROWS = 100
# 单个字段值的最大字符数:防止 TEXT 字段里的长文本撑爆上下文
MAX_FIELD_LENGTH = 500


def _truncate_value(v: Any) -> Any:
    """截断过长的字段值,并标注截断标记方便 LLM 理解"""
    if isinstance(v, str) and len(v) > MAX_FIELD_LENGTH:
        return v[:MAX_FIELD_LENGTH] + f"... [截断,原始长度 {len(v)} 字符]"
    return v


def format_query_result(rows: list[dict], total_fetched: int) -> str:
    """
    将查询结果格式化为 JSON 字符串,附带元信息。

    元信息对 LLM 很重要:知道"只返回了 100 行中的 100 行"
    有助于它告知用户结果可能被截断,而不是编造"共有100条记录"。
    """
    result = {
        "row_count": len(rows),
        "truncated": total_fetched > MAX_ROWS,
        "rows": [{k: _truncate_value(v) for k, v in row.items()} for row in rows],
    }
    if result["truncated"]:
        result["note"] = f"查询结果已截断至 {MAX_ROWS} 行,请添加 LIMIT 子句获取精确结果"
    return json.dumps(result, ensure_ascii=False, indent=2, default=str)


# ── SQLite 后端 ──────────────────────────────────────────

@contextmanager
def get_sqlite_conn(db_path: str) -> Generator[sqlite3.Connection, None, None]:
    """SQLite 连接上下文管理器,自动处理关闭和异常回滚"""
    conn = sqlite3.connect(db_path, timeout=10)  # 10秒锁等待超时
    conn.row_factory = sqlite3.Row  # 让结果支持按列名访问
    conn.execute("PRAGMA query_only = ON")  # 数据库级只读保护(双重保险)
    try:
        yield conn
    finally:
        conn.close()


def sqlite_execute(db_path: str, sql: str) -> str:
    """在 SQLite 上执行已验证的 SELECT 查询"""
    with get_sqlite_conn(db_path) as conn:
        cursor = conn.execute(sql)
        # fetchmany 避免把百万行数据全部加载到内存
        rows = [dict(r) for r in cursor.fetchmany(MAX_ROWS + 1)]
        total = len(rows)
        return format_query_result(rows[:MAX_ROWS], total)


def sqlite_get_schema(db_path: str) -> dict:
    """提取 SQLite 完整 Schema:表名 → 列信息列表"""
    with get_sqlite_conn(db_path) as conn:
        tables = conn.execute(
            "SELECT name FROM sqlite_master WHERE type='table' AND name NOT LIKE 'sqlite_%'"
        ).fetchall()
        schema = {}
        for (table_name,) in tables:
            cols = conn.execute(f"PRAGMA table_info({table_name})").fetchall()
            fks = conn.execute(f"PRAGMA foreign_key_list({table_name})").fetchall()
            fk_map = {fk[3]: f"{fk[2]}.{fk[4]}" for fk in fks}  # col → ref_table.ref_col

            schema[table_name] = [
                {
                    "column": col[1],
                    "type": col[2],
                    "nullable": not col[3],
                    "default": col[4],
                    "primary_key": bool(col[5]),
                    "references": fk_map.get(col[1]),  # 外键引用,帮助 LLM 理解关联关系
                }
                for col in cols
            ]
        return schema


def sqlite_describe_table(db_path: str, table_name: str) -> dict:
    """获取单表的详细结构信息"""
    schema = sqlite_get_schema(db_path)
    if table_name not in schema:
        available = list(schema.keys())
        raise ValueError(f"表 '{table_name}' 不存在。可用的表:{available}")
    return {"table": table_name, "columns": schema[table_name]}


def sqlite_get_sample(db_path: str, table_name: str, limit: int) -> str:
    """获取样例数据(复用 execute,但 table_name 需要额外验证)"""
    from db_guard import sanitize_identifier
    safe_name = sanitize_identifier(table_name)
    safe_limit = min(max(1, limit), 20)  # 样例数据强制限制 1-20 行
    sql = f"SELECT * FROM {safe_name} LIMIT {safe_limit}"
    with get_sqlite_conn(db_path) as conn:
        cursor = conn.execute(sql)
        rows = [dict(r) for r in cursor.fetchall()]
        return format_query_result(rows, len(rows))

关键点: - PRAGMA query_only = ON 是 SQLite 层面的只读保护,配合应用层的 validate_sql 形成双重防御——即使应用层守卫被绕过,数据库驱动层也会拒绝写操作。 - fetchmany(MAX_ROWS + 1) 而非 fetchall():多取一行是为了判断"是否有更多数据"(通过 total > MAX_ROWS 判断),避免触发全表加载。 - 外键信息(PRAGMA foreign_key_list)对 LLM 生成准确 JOIN 语句至关重要,不能省略。


Step 4:MCP Server 主体

目标:将安全守卫和数据库后端组装成完整的 MCP Server,注册 Tools 和 Resources。

# server.py
"""
数据库查询 MCP Server
支持 Claude 通过自然语言查询 SQLite / PostgreSQL 数据库

运行方式:
  python server.py                    # stdio 模式(配合 Claude Desktop)
  python server.py --transport sse    # SSE 模式(配合 Web 客户端,默认端口 8000)
"""
import json
import os
import sys
from pathlib import Path

from mcp.server.fastmcp import FastMCP

from db_backend import (
    sqlite_execute,
    sqlite_get_schema,
    sqlite_describe_table,
    sqlite_get_sample,
)
from db_guard import SQLSecurityError, validate_sql

# ── 配置 ─────────────────────────────────────────────────
# 优先读环境变量,方便在不同环境(本地/容器/云函数)切换数据库路径
DB_PATH = os.environ.get("DB_PATH", "data/sample.db")

# 验证数据库文件存在(早失败原则:启动时就报错,而不是在第一次请求时才发现)
if not Path(DB_PATH).exists():
    sys.exit(f"❌ 数据库文件不存在:{DB_PATH}\n请先运行 python scripts_create_sample_db.py")

# ── 创建 MCP Server 实例 ──────────────────────────────────
mcp = FastMCP(
    name="database-server",
    # Server 的 instructions 会被注入到 Claude 的系统提示中
    # 告诉 Claude 这个 Server 的能力边界和使用约定
    instructions="""
    你有权限查询一个电商业务数据库(SQLite)。

    工作流程建议:
    1. 先通过 db://schema Resource 了解完整表结构
    2. 不确定字段含义时,用 describe_table 获取详细说明
    3. 不熟悉数据分布时,用 get_sample_data 查看样例
    4. 用 execute_query 执行最终的 SELECT 查询

    注意事项:
    - 仅支持 SELECT 查询,写操作会被拒绝
    - 结果最多返回 100 行,超出会被截断并提示
    - 涉及金额计算时注意使用 order_items.unit_price(快照价格)而非 products.price(当前价格)
    """,
)


# ═══════════════════════════════════════════════════════════
# TOOLS:Claude 主动调用,执行具体操作
# ═══════════════════════════════════════════════════════════

@mcp.tool()
def execute_query(sql: str) -> str:
    """
    执行 SQL SELECT 查询并返回结果。

    Args:
        sql: 标准 SQL SELECT 语句。支持 JOIN、子查询、聚合函数、CTE(WITH 子句)。
             示例:SELECT u.username, COUNT(o.order_id) as order_count
                   FROM users u LEFT JOIN orders o ON u.user_id = o.user_id
                   GROUP BY u.user_id ORDER BY order_count DESC LIMIT 10

    Returns:
        JSON 格式的查询结果,包含 row_count、truncated 标志和 rows 数组。
        若结果超过 100 行会被截断,truncated 字段为 true。
    """
    try:
        validated_sql = validate_sql(sql)
    except SQLSecurityError as e:
        # 安全拒绝:返回明确的错误信息让 Claude 理解原因
        return json.dumps({"error": "SECURITY_REJECTED", "message": str(e)}, ensure_ascii=False)
    except ValueError as e:
        return json.dumps({"error": "INVALID_SQL", "message": str(e)}, ensure_ascii=False)

    try:
        return sqlite_execute(DB_PATH, validated_sql)
    except Exception as e:
        # 数据库执行错误:把原始报错返回给 Claude,让它自行修正 SQL
        # 这是 Text-to-SQL 自修正循环的关键——错误信息本身就是上下文
        return json.dumps({
            "error": "EXECUTION_ERROR",
            "message": str(e),
            "hint": "请检查表名、列名是否正确,可以用 describe_table 确认表结构",
        }, ensure_ascii=False)


@mcp.tool()
def describe_table(table_name: str) -> str:
    """
    获取指定表的详细结构信息,包括列名、数据类型、是否可空、主键和外键关系。

    当你不确定某个表有哪些字段,或需要了解表间关联关系时使用此工具。

    Args:
        table_name: 表名(区分大小写)。如:users、orders、order_items、products

    Returns:
        JSON 格式的表结构描述,包含所有列的详细信息和外键引用关系。
    """
    try:
        result = sqlite_describe_table(DB_PATH, table_name)
        return json.dumps(result, ensure_ascii=False, indent=2)
    except ValueError as e:
        return json.dumps({"error": "TABLE_NOT_FOUND", "message": str(e)}, ensure_ascii=False)


@mcp.tool()
def get_sample_data(table_name: str, limit: int = 5) -> str:
    """
    获取指定表的样例数据,帮助理解数据分布和字段取值范围。

    适用场景:
    - 不确定某个枚举字段有哪些可能值(如 status、region、category)
    - 需要了解数据的大致规模和质量
    - 构造 WHERE 条件前先看看实际数据长什么样

    Args:
        table_name: 表名
        limit: 返回行数,范围 1-20,默认 5

    Returns:
        JSON 格式的样例数据
    """
    try:
        return sqlite_get_sample(DB_PATH, table_name, limit)
    except ValueError as e:
        return json.dumps({"error": str(e)}, ensure_ascii=False)


# ═══════════════════════════════════════════════════════════
# RESOURCES:被动注入,Claude 主动读取作为背景知识
# ═══════════════════════════════════════════════════════════

@mcp.resource("db://schema")
def get_full_schema() -> str:
    """
    返回完整数据库 Schema,供 Claude 在生成 SQL 前参考。

    这是一个 Resource 而非 Tool:Claude 在对话开始时会自动读取它,
    不需要用户每次都在 Prompt 里粘贴表结构。

    Returns:
        JSON 格式的完整 Schema,结构为 {表名: [{列信息}]}
    """
    schema = sqlite_get_schema(DB_PATH)
    # 附加 Schema 使用说明,引导 LLM 正确理解
    result = {
        "database_type": "SQLite",
        "database_path": DB_PATH,
        "schema": schema,
        "important_notes": [
            "order_items.unit_price 是下单时的快照价格,用于历史金额计算",
            "products.price 是当前售价,可能与历史订单价格不同",
            "orders.status 枚举值:pending/paid/shipped/done/cancelled",
            "users.is_vip: 0=普通用户, 1=VIP用户",
        ],
    }
    return json.dumps(result, ensure_ascii=False, indent=2)


@mcp.resource("db://tables")
def get_table_list() -> str:
    """
    返回数据库中所有表的名称列表,适合快速了解数据库结构。

    Returns:
        JSON 格式的表名列表及简要说明
    """
    schema = sqlite_get_schema(DB_PATH)
    # 为每张表附加简要业务说明,帮助 LLM 快速定位目标表
    table_descriptions = {
        "users": "用户信息,包含大区、VIP等级",
        "products": "商品信息,包含分类、价格、库存",
        "orders": "订单主表,包含状态、总金额、时间",
        "order_items": "订单明细,记录每笔订单的商品、数量、成交价",
    }
    result = {
        "tables": [
            {
                "name": t,
                "description": table_descriptions.get(t, ""),
                "column_count": len(cols),
            }
            for t, cols in schema.items()
        ]
    }
    return json.dumps(result, ensure_ascii=False, indent=2)


# ═══════════════════════════════════════════════════════════
# 入口
# ═══════════════════════════════════════════════════════════

if __name__ == "__main__":
    transport = "stdio"
    if "--transport" in sys.argv:
        idx = sys.argv.index("--transport")
        transport = sys.argv[idx + 1]

    print(f"🚀 Database MCP Server 启动中(transport={transport}, db={DB_PATH})", file=sys.stderr)
    mcp.run(transport=transport)

关键点: - Tools 的 docstring 不只是给开发者看的——FastMCP 会把它注入到 Claude 的工具描述里。写清楚"什么情况用这个工具、参数格式、返回格式"直接影响 Claude 能否正确调用。 - 执行错误的完整信息要透传给 Claude:{"error": "EXECUTION_ERROR", "message": "no such column: usr.name"} 这样的错误信息能让 Claude 理解哪里写错了,自动重试修正——这是 Text-to-SQL 自修正循环的基础。 - ⚠️ 生产注意:instructions 字段的内容会被注入系统提示,注意不要在这里写入任何敏感的业务逻辑或配置。 - 当前 FastMCP 构造中不包含 version 参数(代码中未设置),与部分文档中可能出现的 version="1.0.0" 不一致。


Step 5:配置接入 Claude Desktop

目标:将 MCP Server 注册到 Claude Desktop,实现真实对话驱动的数据库查询。

// ~/Library/Application Support/Claude/claude_desktop_config.json(macOS)
// %APPDATA%\Claude\claude_desktop_config.json(Windows)
{
  "mcpServers": {
    "database": {
      "command": "/path/to/mcp-database-server/.venv/bin/python",
      "args": ["/path/to/mcp-database-server/server.py"],
      "env": {
        "DB_PATH": "/path/to/mcp-database-server/data/sample.db"
      }
    }
  }
}
# 快速获取 Python 路径
which python  # 或: source .venv/bin/activate && which python

⚠️ 生产注意:command 必须使用虚拟环境内的 Python 绝对路径,使用系统 Python 会因为缺少依赖而启动失败。配置修改后需完全退出并重启 Claude Desktop。


完整运行验证

除了接入 Claude Desktop,还可以用 MCP 官方命令行工具做自动化验证:

# tests_smoke_test.py
"""
端到端冒烟测试:通过 MCP Python SDK 直接调用 Server
不依赖 Claude Desktop,适合 CI 环境
"""
import asyncio
import json
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client


async def run_smoke_test():
    server_params = StdioServerParameters(
        command="python",
        args=["server.py"],
        env={"DB_PATH": "data/sample.db"},
    )

    async with stdio_client(server_params) as (read, write):
        async with ClientSession(read, write) as session:
            await session.initialize()

            print("=" * 50)
            print("🧪 MCP Database Server 冒烟测试")
            print("=" * 50)

            # ── 测试 1:列出所有工具 ──────────────────────
            tools = await session.list_tools()
            tool_names = [t.name for t in tools.tools]
            print(f"\n✅ 工具列表:{tool_names}")
            assert set(tool_names) == {"execute_query", "describe_table", "get_sample_data"}, \
                f"工具列表不符合预期:{tool_names}"

            # ── 测试 2:列出所有 Resources ───────────────
            resources = await session.list_resources()
            resource_uris = [str(r.uri) for r in resources.resources]
            print(f"✅ Resources:{resource_uris}")
            assert "db://schema" in resource_uris

            # ── 测试 3:读取 Schema Resource ─────────────
            schema_result = await session.read_resource("db://schema")
            schema_text = schema_result.contents[0].text
            schema_data = json.loads(schema_text)
            tables_in_schema = list(schema_data["schema"].keys())
            print(f"✅ Schema 包含表:{tables_in_schema}")
            assert "users" in tables_in_schema and "orders" in tables_in_schema

            # ── 测试 4:describe_table Tool ──────────────
            desc_result = await session.call_tool("describe_table", {"table_name": "orders"})
            desc_data = json.loads(desc_result.content[0].text)
            col_names = [c["column"] for c in desc_data["columns"]]
            print(f"✅ orders 表字段:{col_names}")
            assert "order_id" in col_names and "total_amount" in col_names

            # ── 测试 5:execute_query 正常查询 ───────────
            query_result = await session.call_tool(
                "execute_query",
                {"sql": "SELECT COUNT(*) as user_count FROM users"}
            )
            query_data = json.loads(query_result.content[0].text)
            user_count = query_data["rows"][0]["user_count"]
            print(f"✅ 用户总数:{user_count}")
            assert user_count == 50, f"预期 50 个用户,实际 {user_count}"

            # ── 测试 6:安全拦截验证 ─────────────────────
            evil_result = await session.call_tool(
                "execute_query",
                {"sql": "DELETE FROM users WHERE 1=1"}
            )
            evil_data = json.loads(evil_result.content[0].text)
            print(f"✅ 危险 SQL 被拦截:{evil_data['error']}")
            assert evil_data["error"] == "SECURITY_REJECTED"

            # ── 测试 7:复杂业务查询 ─────────────────────
            biz_sql = """
                SELECT u.region,
                       COUNT(DISTINCT o.order_id) as order_count,
                       ROUND(SUM(o.total_amount), 2) as total_revenue
                FROM users u
                JOIN orders o ON u.user_id = o.user_id
                WHERE o.status = 'done'
                GROUP BY u.region
                ORDER BY total_revenue DESC
            """
            biz_result = await session.call_tool("execute_query", {"sql": biz_sql})
            biz_data = json.loads(biz_result.content[0].text)
            print(f"✅ 各地区完成订单统计({biz_data['row_count']} 行):")
            for row in biz_data["rows"]:
                print(f"   {row['region']}: {row['order_count']} 笔, ¥{row['total_revenue']}")

            print("\n🎉 所有测试通过!")


if __name__ == "__main__":
    asyncio.run(run_smoke_test())
python tests_smoke_test.py

预期输出

==================================================
🧪 MCP Database Server 冒烟测试
==================================================

✅ 工具列表:['execute_query', 'describe_table', 'get_sample_data']
✅ Resources:['db://schema', 'db://tables']
✅ Schema 包含表:['users', 'products', 'orders', 'order_items']
✅ orders 表字段:['order_id', 'user_id', 'status', 'total_amount', 'created_at', 'shipped_at']
✅ 用户总数:50
✅ 危险 SQL 被拦截:SECURITY_REJECTED
✅ 各地区完成订单统计(4 行):
   华东: 23 笔, ¥58432.10
   华南: 18 笔, ¥41209.55
   华北: 15 笔, ¥33876.20
   海外: 9 笔, ¥19443.80

🎉 所有测试通过!

接入 Claude Desktop 后,你可以直接用自然语言提问: - "过去 30 天内,每个品类的销售额是多少?" - "找出购买次数超过 5 次的 VIP 用户" - "库存低于 10 件的商品有哪些?"


常见报错与解决方案

报错信息 原因 解决方案
ModuleNotFoundError: No module named 'mcp' 虚拟环境未激活,或依赖未安装 source .venv/bin/activate && uv pip install "mcp>=1.0.0"
❌ 数据库文件不存在:data/sample.db 未运行建库脚本 python scripts_create_sample_db.py
sqlite3.OperationalError: attempt to write a readonly database PRAGMA query_only 拦截了写操作(符合预期) 这是安全机制正常工作,检查 SQL 是否包含写操作
Claude Desktop 看不到 MCP 工具 claude_desktop_config.json 路径或 Python 路径错误 which python 确认路径,且必须是 .venv 内的 Python
json.JSONDecodeError in smoke_test Server 启动失败,stderr 有报错信息被误读为 JSON 检查 DB_PATH 环境变量是否正确,Server 启动日志写到 stderr 不会影响 MCP 通信
sqlparse.parse() 返回空列表 传入了空字符串或纯注释 SQL validate_sql 入口已有空检查,确认调用方没有传空字符串

扩展练习(可选)

  1. 🟡 中等:接入 PostgreSQL 后端。在 db_backend.py 中新增 postgres_execute 等函数,通过 DATABASE_URL 环境变量(格式:postgresql://user:pass@host/dbname)自动选择后端。提示:用 psycopg2.connect(DATABASE_URL) 替换 sqlite3.connect(),结果集处理逻辑基本相同,注意 cursor.description 替代 PRAGMA table_info 获取列信息。

  2. 🔴 困难:实现 SQL 执行时间统计与慢查询告警。在 execute_query Tool 中用 time.perf_counter() 记录执行耗时,超过 2 秒时在返回结果中附加 "warning": "慢查询:执行耗时 {elapsed:.2f}s,建议添加索引或优化 SQL",同时将慢查询 SQL 和耗时写入本地日志文件,并新增一个 get_slow_queries() Tool 供 Claude 主动查询慢查询历史。完成后思考:生产环境应该把这个日志发到哪里?