跳转至

【动手】Advanced RAG:重排序 + 查询改写

2.5 【动手】Advanced RAG:重排序 + 查询改写

实验目标

完成本节后,你能够:

  1. 理解 Naive RAG 的核心缺陷,并用 HyDE 与 Multi-Query 两种策略从查询端提升召回质量;
  2. 接入 DashScope qwen3-rerank API 对候选文档做精排,解决向量检索"语义相近但答非所问"的问题;
  3. 用摘要压缩对检索结果做上下文裁剪,在不损失关键信息的前提下节约 Token;
  4. 通过量化对比实验,直观感受每个优化环节的实际收益,建立"哪步值得加、哪步可以省"的工程判断。

架构总览

graph TD
    Q[用户原始 Query] --> QR[查询改写层]

    subgraph QR[查询改写层]
        direction LR
        HyDE[HyDE\n生成假设文档]
        MQ[Multi-Query\n生成多路查询]
    end

    QR -->|多路 Query| RET[向量检索层\nQdrant HNSW]
    RET -->|Top-K 候选 x N路| MERGE[结果合并去重\nRRF 融合]
    MERGE -->|Top-20 候选| RERANK[重排序层\nDashScope qwen3-rerank]
    RERANK -->|Top-5 精排结果| COMPRESS[上下文压缩层\n摘要压缩]
    COMPRESS -->|压缩后上下文| GEN[LLM 生成答案]
    GEN --> ANS[最终答案]

    style QR fill:#e8f4f8,stroke:#4a9eca
    style RERANK fill:#fff3e0,stroke:#f39c12
    style COMPRESS fill:#e8f5e9,stroke:#27ae60

整条链路分为四层,每层独立可替换。本节将逐层实现,并在最后做端到端对比实验。


环境准备

# 创建虚拟环境(uv)
uv venv --python 3.11 && source .venv/bin/activate

# 安装依赖(锁定版本)
uv pip install \
  openai>=1.51.0 \
  qdrant-client>=1.11.0 \
  python-dotenv>=1.0.0 \
  httpx>=0.27.0 \
  llmlingua==0.2.2          # 可选,仅在使用 LLMLinguaCompressor 时需要

Colab 用户!pip install openai qdrant-client python-dotenv httpx 即可,llmlingua 按需安装。

创建 .env 文件:

# DashScope API Key(Embedding + Reranker 共用)
DASHSCOPE_API_KEY=sk-...

# 聊天模型 API Key(按 ACTIVE_MODEL_KEY 选择)
DEEPSEEK_API_KEY=sk-...
# DASHSCOPE_API_KEY=sk-...  # 若使用 Qwen-Max 作为聊天模型
# OPENAI_API_KEY=sk-...     # 若使用 OpenAI-GPT-4o-mini 作为聊天模型

统一配置(core_config.py)

项目使用 core_config.py 统一管理所有模型配置,包括聊天模型、Embedding 模型和 Reranker 模型。

# core_config.py
"""全局配置:模型注册表、Embedding、Reranker 与定价信息"""
import os
from typing import TypedDict


# ── 聊天模型注册表 ──────────────────────────────────────────────────
class ModelConfig(TypedDict, total=False):
    litellm_id: str          # LiteLLM SDK 使用的模型 ID(含 provider 前缀)
    chat_model_id: str       # OpenAI SDK 直连时使用的模型名(无前缀)
    price_in: float          # 每 1K input tokens 价格(美元)
    price_out: float         # 每 1K output tokens 价格(美元)
    max_tokens_limit: int    # 模型支持的最大 max_tokens
    api_key_env: str | None  # API Key 环境变量名
    base_url: str | None     # API 基础 URL(None 表示使用默认)


MODEL_REGISTRY: dict[str, ModelConfig] = {
    "DeepSeek-V3": {
        "litellm_id": "deepseek/deepseek-chat",
        "chat_model_id": "deepseek-v4-flash",
        "price_in": 0.00027,
        "price_out": 0.0011,
        "max_tokens_limit": 4096,
        "api_key_env": "DEEPSEEK_API_KEY",
        "base_url": "https://api.deepseek.com/v1",
    },
    "Qwen-Max": {
        "litellm_id": "qwen/qwen-plus",
        "chat_model_id": "qwen-plus",
        "price_in": 0.001,
        "price_out": 0.004,
        "max_tokens_limit": 4096,
        "api_key_env": "DASHSCOPE_API_KEY",
        "base_url": "https://dashscope.aliyuncs.com/compatible-mode/v1",
    },
    "OpenAI-GPT-4o-mini": {
        "litellm_id": "gpt-4o-mini",
        "chat_model_id": "gpt-4o-mini",
        "price_in": 0.00015,
        "price_out": 0.0006,
        "max_tokens_limit": 16384,
        "api_key_env": "OPENAI_API_KEY",
        "base_url": None,
    },
}

# ✅ 当前激活聊天模型 key — 修改此处全局生效
ACTIVE_MODEL_KEY: str = "DeepSeek-V3"


# ── Embedding 模型配置(DashScope)─────────────────────────────────
EMBEDDING_MODEL: str = "text-embedding-v4"
EMBEDDING_DIM: int = 1024          # text-embedding-v4 维度
EMBEDDING_BASE_URL: str = "https://dashscope.aliyuncs.com/compatible-mode/v1"


# ── Reranker 模型配置(DashScope OpenAI 兼容接口)─────────────────
RERANKER_MODEL: str = "qwen3-rerank"
RERANKER_TOP_N: int = 5            # 默认精排后保留的数量
RERANKER_BASE_URL: str = "https://dashscope.aliyuncs.com/compatible-mode/v1"

辅助函数包括: - 聊天模型get_litellm_id(), get_chat_model_id(), get_api_key(), get_base_url(), get_active_config(), get_model_list(), estimate_cost() - Embeddingget_dashscope_api_key(), get_embedding_model(), get_embedding_dim(), get_embedding_base_url() - Rerankerget_reranker_model(), get_reranker_top_n(), get_reranker_base_url()

修改 ACTIVE_MODEL_KEY 即可全局切换聊天模型,无需改动任何业务代码。


Step-by-Step 实现

Step 1:构建基础 RAG 基线(Naive RAG)

目标:先搭一个最简单的 Naive RAG 作为对照基线,后续所有优化都在此基础上叠加,对比才有说服力。

# baseline_rag.py
"""Naive RAG 基线实现,作为对比实验的起点。"""

from typing import Optional
from dataclasses import dataclass

from dotenv import load_dotenv
from openai import OpenAI
from qdrant_client import QdrantClient
from qdrant_client.models import (
    Distance, VectorParams, PointStruct
)
from core_config import (
    get_dashscope_api_key, get_embedding_base_url,
    get_embedding_model, get_embedding_dim,
    get_chat_model_id, get_api_key, get_base_url,
    ACTIVE_MODEL_KEY,
)

load_dotenv()


@dataclass
class RetrievedChunk:
    """检索结果的数据结构,贯穿整条链路。"""
    text: str
    score: float
    chunk_id: str
    source: str = ""


class NaiveRAG:
    """
    Naive RAG 基线:Embedding → 向量检索 → 直接生成。
    不做任何查询改写、重排或压缩。
    """

    COLLECTION = "naive_rag_demo"

    def __init__(self) -> None:
        # Embedding 客户端(DashScope,text-embedding-v4)
        self.embed_client = OpenAI(
            api_key=get_dashscope_api_key(),
            base_url=get_embedding_base_url(),
        )
        # 聊天模型客户端(通过 core_config 统一配置)
        self.chat_client = OpenAI(
            api_key=get_api_key(),
            base_url=get_base_url(),
        )
        # 使用内存模式的 Qdrant,无需启动独立服务
        self.qdrant = QdrantClient(":memory:")
        self._init_collection()

    def _init_collection(self) -> None:
        """初始化向量集合(幂等操作)。"""
        existing = [c.name for c in self.qdrant.get_collections().collections]
        if self.COLLECTION not in existing:
            self.qdrant.create_collection(
                collection_name=self.COLLECTION,
                vectors_config=VectorParams(
                    size=get_embedding_dim(), distance=Distance.COSINE
                ),
            )

    def embed(self, texts: list[str]) -> list[list[float]]:
        """批量 Embedding,使用配置的 Embedding 模型。"""
        resp = self.embed_client.embeddings.create(
            model=get_embedding_model(), input=texts
        )
        return [item.embedding for item in resp.data]

    def index_documents(self, chunks: list[str]) -> None:
        """将文档切块写入向量库。"""
        embeddings = self.embed(chunks)
        points = [
            PointStruct(
                id=i,
                vector=emb,
                payload={"text": text, "chunk_id": str(i)},
            )
            for i, (text, emb) in enumerate(zip(chunks, embeddings))
        ]
        self.qdrant.upsert(collection_name=self.COLLECTION, points=points)
        print(f"已索引 {len(chunks)} 个切块")

    def retrieve(self, query: str, top_k: int = 5) -> list[RetrievedChunk]:
        """向量检索,返回 Top-K 候选。"""
        q_vec = self.embed([query])[0]
        response = self.qdrant.query_points(
            collection_name=self.COLLECTION,
            query=q_vec,
            limit=top_k,
        )
        return [
            RetrievedChunk(
                text=h.payload["text"],
                score=h.score,
                chunk_id=h.payload["chunk_id"],
            )
            for h in response.points
        ]

    def generate(self, query: str, context_chunks: list[RetrievedChunk]) -> str:
        """基于检索结果生成答案。"""
        context = "\n\n---\n\n".join(c.text for c in context_chunks)
        messages = [
            {
                "role": "system",
                "content": (
                    "你是一个精准的问答助手。请严格基于以下上下文回答问题,"
                    "如果上下文中没有相关信息,直接说'不知道',不要编造。"
                ),
            },
            {
                "role": "user",
                "content": f"上下文:\n{context}\n\n问题:{query}",
            },
        ]
        resp = self.chat_client.chat.completions.create(
            model=get_chat_model_id(),
            messages=messages,
            temperature=0.1,
        )
        return resp.choices[0].message.content

    def query(self, question: str, top_k: int = 5) -> dict:
        """完整的 RAG 查询链路,返回答案和检索到的切块。"""
        chunks = self.retrieve(question, top_k=top_k)
        answer = self.generate(question, chunks)
        return {"answer": answer, "retrieved_chunks": chunks}

与初版 RAG 相比的关键变化: - 不再硬编码模型名和 API Key,全部通过 core_config 统一获取 - 使用两个独立客户端embed_client(DashScope text-embedding-v4)和 chat_client(可切换 DeepSeek/Qwen/OpenAI) - Embedding 维度由配置决定(1024),而非写死 1536 - Qdrant API 从 search() 更新为 query_points()(新版 qdrant-client 兼容)


Step 2:查询改写——HyDE 与 Multi-Query

目标:原始查询往往是短句,语义稀疏,导致向量检索"找近义词"而非"找答案"。查询改写从用户意图出发,生成更丰富的检索信号,是 Advanced RAG 收益最高的单点优化。

两种策略各有侧重: - HyDE(Hypothetical Document Embeddings):让 LLM 先凭空"想象"一篇能回答该问题的文档,用这篇假设文档的 Embedding 去检索——因为假设文档在语义空间上比短查询更接近真实答案文档; - Multi-Query:让 LLM 从不同角度生成 3~5 个语义相近但措辞各异的查询,分别检索后合并——覆盖原始查询可能遗漏的语义变体。

# query_rewriter.py
"""查询改写模块:HyDE 与 Multi-Query 双策略。"""

import json
from openai import OpenAI
from core_config import get_chat_model_id


class QueryRewriter:
    """
    封装 HyDE 和 Multi-Query 两种改写策略。
    设计原则:策略可单独使用,也可组合使用。
    """

    def __init__(self, client: OpenAI, model: str | None = None) -> None:
        self.client = client
        self.model = model or get_chat_model_id()

    def hyde(self, query: str) -> str:
        """
        HyDE:生成一篇假设性的"理想答案文档"。

        为何 HyDE 有效:
        向量模型训练时见过大量文档,对"文档风格"的 Embedding 比对"问题风格"
        的 Embedding 更稳定。用假设文档去检索,相当于把查询从问题空间
        映射到了文档空间。

        局限:HyDE 依赖 LLM 生成质量,若 LLM 对领域不熟悉,
        假设文档可能引入噪声,反而拉低召回质量。
        """
        prompt = (
            f"请为以下问题撰写一段简洁的参考答案(100字以内),"
            f"语气像专业文档,不必完全准确,只需覆盖关键概念:\n\n问题:{query}"
        )
        resp = self.client.chat.completions.create(
            model=self.model,
            messages=[{"role": "user", "content": prompt}],
            temperature=0.3,  # 稍高温度增加多样性,但不宜过高
        )
        return resp.choices[0].message.content.strip()

    def multi_query(self, query: str, n: int = 3) -> list[str]:
        """
        Multi-Query:生成 N 个语义相近但措辞不同的查询。

        返回的列表包含原始查询(第一个),确保原始意图不丢失。
        """
        prompt = (
            f"请将以下问题改写为 {n} 个语义相近但措辞不同的查询,"
            f"以 JSON 数组格式返回,每个元素是一个字符串,不要加任何解释:\n\n"
            f"原始问题:{query}"
        )
        resp = self.client.chat.completions.create(
            model=self.model,
            messages=[{"role": "user", "content": prompt}],
            temperature=0.5,
            response_format={"type": "json_object"},
        )
        try:
            raw = json.loads(resp.choices[0].message.content)
            # 兼容模型返回 {"queries": [...]} 或直接 [...] 两种格式
            variants: list[str] = (
                raw if isinstance(raw, list)
                else next(iter(raw.values()))
            )
        except (json.JSONDecodeError, StopIteration):
            variants = []

        # 原始查询始终放第一位,保证召回的下界
        return [query] + [v for v in variants if v != query]

    def rrf_merge(
        self,
        results_list: list[list["RetrievedChunk"]],  # 多路检索结果
        k: int = 60,
        top_n: int = 20,
    ) -> list["RetrievedChunk"]:
        """
        Reciprocal Rank Fusion(RRF):将多路检索结果融合去重。

        RRF 公式:score(d) = Σ 1 / (k + rank_i(d))
        k=60 是 Cormack et al. 2009 论文推荐的默认值,实践中无需调整。

        优势:不依赖各路检索的分值量纲,天然适合混合不同类型的检索器。
        """
        from collections import defaultdict

        chunk_scores: dict[str, float] = defaultdict(float)
        chunk_map: dict[str, "RetrievedChunk"] = {}

        for results in results_list:
            for rank, chunk in enumerate(results, start=1):
                chunk_scores[chunk.chunk_id] += 1.0 / (k + rank)
                chunk_map[chunk.chunk_id] = chunk

        merged = sorted(
            chunk_map.values(),
            key=lambda c: chunk_scores[c.chunk_id],
            reverse=True,
        )
        return merged[:top_n]

⚠️ 生产注意:Multi-Query 会增加 N 倍的 Embedding 调用开销。若 Embedding API 有速率限制,应对多路查询做并发控制(asyncio.gather + Semaphore)。对于延迟敏感的场景,可仅用 Multi-Query 而不用 HyDE,节省一次 LLM 调用。


Step 3:重排序——DashScope qwen3-rerank API

目标:向量检索是"粗排",用内积/余弦快速从千万文档中找到 Top-20 候选;Reranker 是"精排",它将 Query 和每个候选文档一起送入重排模型,直接对相关性打分,准确度远高于向量相似度。

为什么需要两阶段?精排模型对全库检索的复杂度是 O(N × doc_len),代价太高;但对 Top-20 候选精排,延迟完全可以接受。

# reranker.py
"""Reranker 模块:使用 DashScope qwen3-rerank API 进行精排。"""

import httpx
from typing import TYPE_CHECKING

from core_config import (
    get_dashscope_api_key, get_reranker_model,
    get_reranker_top_n,
)

if TYPE_CHECKING:
    from baseline_rag import RetrievedChunk


class DashScopeReranker:
    """
    使用 DashScope qwen3-rerank 对候选文档精排。

    优势:
    - 无需加载本地模型,减少显存/内存占用
    - qwen3-rerank 是阿里云最新发布的中文重排序模型
    - 通过 API 调用,首次运行即可使用

    模型选型说明:
    - qwen3-rerank:中文场景推荐,效果好,无需本地 GPU
    - bge-reranker-v2-m3(本地):如需离线使用可切换回本地方案
    """

    def __init__(
        self,
        model: str | None = None,
        api_key: str | None = None,
        top_n: int | None = None,
    ) -> None:
        self.model = model or get_reranker_model()
        self.api_key = api_key or get_dashscope_api_key()
        self.top_n = top_n if top_n is not None else get_reranker_top_n()
        # DashScope 原生 rerank API 端点(非 OpenAI 兼容)
        self.api_url = "https://dashscope.aliyuncs.com/api/v1/services/rerank/text-rerank/text-rerank"
        print(f"✅ Reranker 已配置:{self.model}")

    def rerank(
        self,
        query: str,
        chunks: list["RetrievedChunk"],
        top_n: int | None = None,
    ) -> list["RetrievedChunk"]:
        """
        对候选切块重新打分并排序。

        Args:
            query: 用户原始查询(不是改写后的查询)
            chunks: 向量检索的候选切块(通常 Top-20)
            top_n: 精排后保留的数量(送入 LLM 的上下文)

        Returns:
            按分值降序排列的 Top-N 切块,score 字段更新为 reranker 分值。
        """
        if not chunks:
            return []

        n = top_n if top_n is not None else self.top_n
        documents = [chunk.text for chunk in chunks]

        headers = {
            "Authorization": f"Bearer {self.api_key}",
            "Content-Type": "application/json",
        }
        payload = {
            "model": self.model,
            "input": {
                "query": query,
                "documents": documents,
            },
        }

        try:
            with httpx.Client(timeout=30.0) as client:
                response = client.post(self.api_url, headers=headers, json=payload)
                response.raise_for_status()
                data = response.json()

            # DashScope 返回格式: output.results[].relevance_score
            results = data.get("output", {}).get("results", [])
            # results 是无序的,需要按 index 映射回原始文档顺序
            score_map = {}
            for item in results:
                idx = item.get("index", 0)
                score = item.get("relevance_score", 0)
                score_map[idx] = score

            scores = [score_map.get(i, 0) for i in range(len(chunks))]

        except Exception as e:
            # 降级方案:回退到向量检索原始分数排序
            print(f"⚠️ Reranker API 调用失败,回退到向量检索分数排序: {e}")
            return sorted(chunks, key=lambda c: c.score, reverse=True)[:n]

        # 将 reranker 分值回写到 RetrievedChunk
        for chunk, score in zip(chunks, scores):
            chunk.score = score

        # 按分值降序,取 Top-N
        reranked = sorted(chunks, key=lambda c: c.score, reverse=True)
        return reranked[:n]

⚠️ 生产注意:DashScope rerank API 对 20 个候选的响应时间通常在 100~300ms。若调用失败,代码已内置降级逻辑——回退到向量检索原始分数排序,保证链路不中断。


Step 4:上下文压缩——LLMLingua 与摘要压缩

目标:重排后的 Top-5 文档往往仍有大量与问题无关的句子,直接塞入 LLM 既浪费 Token,也会因"Lost in the Middle"效应降低答案质量。上下文压缩的目标是:在不丢失关键信息的前提下,最大化删除噪声

两种方案各有适用场景: - LLMLingua:基于小语言模型(如 Llama-2-7B)对每个 Token 计算困惑度,删掉困惑度低(即模型认为"废话")的 Token。优点是压缩率可控(可指定目标比例),缺点是首次加载模型较慢; - 摘要压缩:直接让 LLM 对检索结果做摘要,保留与查询最相关的内容。实现最简单,适合对延迟不敏感、但 Token 成本敏感的离线场景。

# context_compressor.py
"""上下文压缩模块:LLMLingua 与 LLM 摘要压缩两种实现。"""

from openai import OpenAI
from core_config import get_chat_model_id


class LLMLinguaCompressor:
    """
    基于 LLMLingua 的 Token 级别压缩。
    需要额外安装:uv pip install llmlingua==0.2.2
    """

    def __init__(self, rate: float = 0.5) -> None:
        """
        Args:
            rate: 压缩率,0.5 表示保留 50% 的 Token。
                  实验建议:0.4~0.6 之间,低于 0.3 可能损失关键信息。
        """
        from llmlingua import PromptCompressor

        # 使用轻量级模型做 Token 打分,不参与最终生成
        # 国内可替换为 Qwen/Qwen1.5-1.8B-Chat 等本地模型
        self.compressor = PromptCompressor(
            model_name="microsoft/llmlingua-2-xlm-roberta-large-meetingbank",
            use_llmlingua2=True,
        )
        self.rate = rate

    def compress(self, query: str, context: str) -> str:
        """
        压缩上下文文本,保留与 query 最相关的 Token。

        LLMLingua2 使用 query-aware 压缩:
        会优先保留与 query 语义相关的 Token。
        """
        result = self.compressor.compress_prompt(
            context,
            rate=self.rate,
            question=query,
        )
        compressed = result["compressed_prompt"]
        original_tokens = result["origin_tokens"]
        compressed_tokens = result["compressed_tokens"]

        ratio = 1 - compressed_tokens / max(original_tokens, 1)
        print(
            f"📉 LLMLingua 压缩:{original_tokens}{compressed_tokens} tokens "
            f"(节省 {ratio:.1%})"
        )
        return compressed


class SummaryCompressor:
    """
    基于 LLM 摘要的上下文压缩。
    实现最简单,适合快速集成,无需额外加载本地模型。
    """

    def __init__(self, client: OpenAI, model: str | None = None) -> None:
        self.client = client
        self.model = model or get_chat_model_id()

    def compress(
        self,
        query: str,
        chunks: list["RetrievedChunk"],
        max_words: int = 300,
    ) -> str:
        """
        让 LLM 对多个检索切块做摘要,只保留与 query 相关的内容。

        注意:这会引入额外的 LLM 调用。适合最终上下文需要控制在
        1000 Token 以内的场景,不适合实时对话(延迟会增加 500ms+)。
        """
        context = "\n\n---\n\n".join(
            f"[切块 {i+1}]\n{c.text}" for i, c in enumerate(chunks)
        )
        prompt = (
            f"以下是检索到的多段文本,请提取与问题直接相关的信息,"
            f"压缩为不超过 {max_words} 字的摘要,保留具体数字和关键术语:\n\n"
            f"问题:{query}\n\n文本:\n{context}"
        )
        resp = self.client.chat.completions.create(
            model=self.model,
            messages=[{"role": "user", "content": prompt}],
            temperature=0.1,
        )
        return resp.choices[0].message.content.strip()

Step 5:组装 Advanced RAG 完整流水线

目标:将上述所有模块串联为一个可配置的 AdvancedRAG 类,支持按需开关各优化环节,方便做消融实验。

# advanced_rag.py
"""Advanced RAG 完整流水线,整合查询改写、重排序与上下文压缩。"""

from dataclasses import dataclass, field
from baseline_rag import NaiveRAG, RetrievedChunk
from query_rewriter import QueryRewriter
from reranker import DashScopeReranker
from context_compressor import SummaryCompressor


@dataclass
class AdvancedRAGConfig:
    """Advanced RAG 配置,每个优化环节可独立开关,便于消融实验。"""

    # 查询改写配置
    use_hyde: bool = True
    use_multi_query: bool = True
    multi_query_n: int = 3          # Multi-Query 生成几个变体

    # 检索配置
    retrieval_top_k: int = 20       # 粗排候选数,给重排留充足空间

    # 重排配置
    use_reranker: bool = True
    reranker_top_n: int = 5         # 精排后送入 LLM 的切块数

    # 压缩配置
    use_compression: bool = False   # 默认关闭,延迟敏感场景不建议开启
    compression_max_words: int = 300


class AdvancedRAG(NaiveRAG):
    """
    继承 NaiveRAG,叠加高级检索能力。
    继承而非组合,是为了复用索引与 Embedding 逻辑,
    避免重复代码,同时保持 Naive RAG 的基线可对比性。
    """

    def __init__(self, config: AdvancedRAGConfig | None = None) -> None:
        super().__init__()
        self.config = config or AdvancedRAGConfig()
        self.rewriter = QueryRewriter(self.chat_client)
        self.reranker = DashScopeReranker() if self.config.use_reranker else None
        self.compressor = SummaryCompressor(self.chat_client)

    def _multi_retrieve(self, queries: list[str]) -> list[RetrievedChunk]:
        """
        对多个查询分别检索,用 RRF 融合结果。
        每路检索独立执行,后续 RRF 去重。
        """
        all_results: list[list[RetrievedChunk]] = []
        for q in queries:
            results = self.retrieve(q, top_k=self.config.retrieval_top_k)
            all_results.append(results)

        if len(all_results) == 1:
            return all_results[0]

        return self.rewriter.rrf_merge(
            all_results,
            top_n=self.config.retrieval_top_k,
        )

    def advanced_query(self, question: str) -> dict:
        """
        Advanced RAG 完整查询链路:
        查询改写 → 多路检索 → RRF 融合 → 重排 → 压缩 → 生成
        """
        cfg = self.config
        queries = [question]
        steps_log: list[str] = []

        # ① 查询改写
        if cfg.use_multi_query:
            queries = self.rewriter.multi_query(question, n=cfg.multi_query_n)
            steps_log.append(f"Multi-Query 生成 {len(queries)} 路查询")

        if cfg.use_hyde:
            hyde_doc = self.rewriter.hyde(question)
            # HyDE 生成的假设文档作为额外的检索查询加入
            queries.append(hyde_doc)
            steps_log.append("HyDE 生成假设文档")

        # ② 多路检索 + RRF 融合
        candidates = self._multi_retrieve(queries)
        steps_log.append(f"RRF 融合后候选数:{len(candidates)}")

        # ③ Cross-Encoder 重排
        if cfg.use_reranker and self.reranker:
            candidates = self.reranker.rerank(
                question, candidates, top_n=cfg.reranker_top_n
            )
            steps_log.append(f"重排后保留 Top-{cfg.reranker_top_n}")
        else:
            candidates = candidates[: cfg.reranker_top_n]

        # ④ 上下文压缩(可选)
        if cfg.use_compression and candidates:
            compressed_ctx = self.compressor.compress(
                question, candidates, max_words=cfg.compression_max_words
            )
            # 将压缩结果包装为单个 chunk,保持 generate 接口统一
            candidates = [
                RetrievedChunk(
                    text=compressed_ctx, score=1.0, chunk_id="compressed"
                )
            ]
            steps_log.append("上下文压缩完成")

        # ⑤ LLM 生成
        answer = self.generate(question, candidates)

        return {
            "answer": answer,
            "retrieved_chunks": candidates,
            "steps": steps_log,
            "queries_used": queries,
        }

完整运行验证

# e2e_test.py
"""端到端验证:对比 Naive RAG vs Advanced RAG 效果。"""

import time
from advanced_rag import AdvancedRAG, AdvancedRAGConfig
from baseline_rag import NaiveRAG

# ── 准备测试语料(模拟企业知识库切块)──────────────────────────────────────
DOCS = [
    "向量数据库 Qdrant 支持 HNSW 索引,查询延迟在毫秒级别,适合大规模相似度检索。",
    "BM25 是经典的稀疏检索算法,基于词频和逆文档频率,擅长精确关键词匹配。",
    "混合检索结合稠密向量检索和稀疏 BM25 检索,通过 RRF 融合提升召回率。",
    "RAG 的核心挑战是检索精度,向量检索可能因语义漂移返回不相关文档。",
    "Cross-Encoder 重排序通过联合编码 Query 和文档来计算精确相关性分数,准确度高于 Bi-Encoder。",
    "HyDE(Hypothetical Document Embeddings)通过 LLM 生成假设答案文档来改善查询表示。",
    "LLMLingua 利用小语言模型压缩 Prompt,可将上下文 Token 减少 50% 同时保留关键信息。",
    "BGE-Reranker 是 BAAI 发布的双语重排序模型,在 BEIR 基准上表现优异。",
    "Naive RAG 直接将检索结果喂给 LLM,没有重排和压缩,容易引入噪声。",
    "查询改写(Query Rewriting)通过生成多角度查询来覆盖更广泛的语义空间。",
]

TEST_QUESTIONS = [
    "为什么 Cross-Encoder 的重排效果比向量检索更准确?",
    "HyDE 和 Multi-Query 分别是如何改进检索的?",
    "如何在 RAG 中控制输入 LLM 的 Token 数量?",
]


def run_experiment() -> None:
    print("=" * 60)
    print("🚀 初始化 Naive RAG 基线...")
    naive = NaiveRAG()
    naive.index_documents(DOCS)

    print("\n🚀 初始化 Advanced RAG(仅开启重排+改写,关闭压缩)...")
    cfg = AdvancedRAGConfig(
        use_hyde=True,
        use_multi_query=True,
        use_reranker=True,
        use_compression=False,  # 首次验证先关闭,减少调用
        retrieval_top_k=10,
        reranker_top_n=3,
    )
    advanced = AdvancedRAG(config=cfg)
    advanced.index_documents(DOCS)  # 共用同一份文档

    print("\n" + "=" * 60)
    for q in TEST_QUESTIONS:
        print(f"\n📌 问题:{q}")
        print("-" * 40)

        # Naive RAG
        t0 = time.time()
        naive_result = naive.query(q, top_k=3)
        naive_time = time.time() - t0

        # Advanced RAG
        t0 = time.time()
        adv_result = advanced.advanced_query(q)
        adv_time = time.time() - t0

        print(f"[Naive  RAG | {naive_time:.2f}s] {naive_result['answer'][:120]}...")
        print(f"[Advanced   | {adv_time:.2f}s] {adv_result['answer'][:120]}...")
        print(f"  优化步骤:{' → '.join(adv_result['steps'])}")
        print(f"  使用查询数:{len(adv_result['queries_used'])}")


if __name__ == "__main__":
    run_experiment()

预期输出:

============================================================
🚀 初始化 Naive RAG 基线...
已索引 10 个切块

🚀 初始化 Advanced RAG(仅开启重排+改写,关闭压缩)...
✅ Reranker 已配置:qwen3-rerank
已索引 10 个切块

============================================================

📌 问题:为什么 Cross-Encoder 的重排效果比向量检索更准确?
----------------------------------------
[Naive  RAG | 1.23s] Cross-Encoder通过将Query和文档拼接后联合编码,能直接建模两者的交互...
[Advanced   | 2.87s] Cross-Encoder重排序准确度高于Bi-Encoder的原因在于其联合编码机制——它将查询...
  优化步骤:Multi-Query 生成 4 路查询 → HyDE 生成假设文档 → RRF 融合后候选数:10 → 重排后保留 Top-3
  使用查询数:5

快速演示(main.py)

项目提供了 main.py 作为快速演示入口,使用精简的文档集和默认配置:

# main.py
"""主入口:演示 Advanced RAG 查询流程。"""

from advanced_rag import AdvancedRAG, AdvancedRAGConfig
from baseline_rag import NaiveRAG

# ── 示例文档 ───────────────────────────────────────────────────────────────────
DOCS = [
    "向量数据库 Qdrant 支持 HNSW 索引,查询延迟在毫秒级别,适合大规模相似度检索。",
    "BM25 是经典的稀疏检索算法,基于词频和逆文档频率,擅长精确关键词匹配。",
    "混合检索结合稠密向量检索和稀疏 BM25 检索,通过 RRF 融合提升召回率。",
    "Cross-Encoder 重排序通过联合编码 Query 和文档来计算精确相关性分数。",
    "HyDE 通过 LLM 生成假设答案文档来改善查询表示。",
    "查询改写通过生成多角度查询来覆盖更广泛的语义空间。",
]


def main() -> None:
    print("=" * 60)
    print("Advanced RAG 演示(重排序 + 查询改写)")
    print("=" * 60)

    # 初始化 Advanced RAG
    cfg = AdvancedRAGConfig(
        use_hyde=True,
        use_multi_query=True,
        multi_query_n=2,
        use_reranker=True,
        use_compression=True,
        retrieval_top_k=10,
        reranker_top_n=3,
    )
    rag = AdvancedRAG(config=cfg)
    rag.index_documents(DOCS)

    # 演示查询
    question = "为什么 Cross-Encoder 的重排效果比向量检索更准确?"
    print(f"\n问题:{question}")
    result = rag.advanced_query(question)
    print(f"答案:{result['answer']}")
    print(f"优化步骤:{' → '.join(result['steps'])}")


if __name__ == "__main__":
    main()

常见报错与解决方案

报错信息 原因 解决方案
DashScope API key not found 未设置 DASHSCOPE_API_KEY 环境变量 .env 中添加 DASHSCOPE_API_KEY=sk-...
Reranker API 调用失败 DashScope 网络不通或模型未开通 确认 qwen3-rerank 已在阿里云控制台开通;检查网络连接
json.JSONDecodeError in multi_query 模型返回非标准 JSON 已在代码中做了兼容处理;若仍报错,将 response_format 改为 text 并手动 parse
RateLimitError: 429 Embedding 并发过高 对多路 Embedding 调用添加 asyncio.Semaphore(5) 限制并发数
qdrant_client.http.exceptions.UnexpectedResponse Qdrant 集合不存在 确认 _init_collection() 已在 index_documents 前调用
ImportError: llmlingua requires torch llmlingua 依赖 PyTorch uv pip install torch==2.4.0 --index-url https://download.pytorch.org/whl/cpu

扩展练习(可选)

  1. 🟡 中等:为本节的 Advanced RAG 接入 BM25 稀疏检索(使用 rank-bm25 库),与向量检索做混合,观察 RRF 融合后召回率是否进一步提升。参考指标:构建一个包含 20 个问题的评估集,统计"正确切块在 Top-5 内"的比例作为 Recall@5。

  2. 🔴 困难:实现一个自动化消融实验框架——枚举 AdvancedRAGConfig 的所有布尔开关组合(共 \(2^3 = 8\) 种),对同一评估集跑全部组合,输出一张 Recall@5 vs 平均延迟的散点图,帮助团队在"效果-性能"权衡中做出数据驱动的选型决策。