【动手】Advanced RAG:重排序 + 查询改写
2.5 【动手】Advanced RAG:重排序 + 查询改写
实验目标
完成本节后,你能够:
- 理解 Naive RAG 的核心缺陷,并用 HyDE 与 Multi-Query 两种策略从查询端提升召回质量;
- 接入 DashScope qwen3-rerank API 对候选文档做精排,解决向量检索"语义相近但答非所问"的问题;
- 用摘要压缩对检索结果做上下文裁剪,在不损失关键信息的前提下节约 Token;
- 通过量化对比实验,直观感受每个优化环节的实际收益,建立"哪步值得加、哪步可以省"的工程判断。
架构总览
graph TD
Q[用户原始 Query] --> QR[查询改写层]
subgraph QR[查询改写层]
direction LR
HyDE[HyDE\n生成假设文档]
MQ[Multi-Query\n生成多路查询]
end
QR -->|多路 Query| RET[向量检索层\nQdrant HNSW]
RET -->|Top-K 候选 x N路| MERGE[结果合并去重\nRRF 融合]
MERGE -->|Top-20 候选| RERANK[重排序层\nDashScope qwen3-rerank]
RERANK -->|Top-5 精排结果| COMPRESS[上下文压缩层\n摘要压缩]
COMPRESS -->|压缩后上下文| GEN[LLM 生成答案]
GEN --> ANS[最终答案]
style QR fill:#e8f4f8,stroke:#4a9eca
style RERANK fill:#fff3e0,stroke:#f39c12
style COMPRESS fill:#e8f5e9,stroke:#27ae60
整条链路分为四层,每层独立可替换。本节将逐层实现,并在最后做端到端对比实验。
环境准备
# 创建虚拟环境(uv)
uv venv --python 3.11 && source .venv/bin/activate
# 安装依赖(锁定版本)
uv pip install \
openai>=1.51.0 \
qdrant-client>=1.11.0 \
python-dotenv>=1.0.0 \
httpx>=0.27.0 \
llmlingua==0.2.2 # 可选,仅在使用 LLMLinguaCompressor 时需要
Colab 用户:
!pip install openai qdrant-client python-dotenv httpx即可,llmlingua按需安装。
创建 .env 文件:
# DashScope API Key(Embedding + Reranker 共用)
DASHSCOPE_API_KEY=sk-...
# 聊天模型 API Key(按 ACTIVE_MODEL_KEY 选择)
DEEPSEEK_API_KEY=sk-...
# DASHSCOPE_API_KEY=sk-... # 若使用 Qwen-Max 作为聊天模型
# OPENAI_API_KEY=sk-... # 若使用 OpenAI-GPT-4o-mini 作为聊天模型
统一配置(core_config.py)
项目使用 core_config.py 统一管理所有模型配置,包括聊天模型、Embedding 模型和 Reranker 模型。
# core_config.py
"""全局配置:模型注册表、Embedding、Reranker 与定价信息"""
import os
from typing import TypedDict
# ── 聊天模型注册表 ──────────────────────────────────────────────────
class ModelConfig(TypedDict, total=False):
litellm_id: str # LiteLLM SDK 使用的模型 ID(含 provider 前缀)
chat_model_id: str # OpenAI SDK 直连时使用的模型名(无前缀)
price_in: float # 每 1K input tokens 价格(美元)
price_out: float # 每 1K output tokens 价格(美元)
max_tokens_limit: int # 模型支持的最大 max_tokens
api_key_env: str | None # API Key 环境变量名
base_url: str | None # API 基础 URL(None 表示使用默认)
MODEL_REGISTRY: dict[str, ModelConfig] = {
"DeepSeek-V3": {
"litellm_id": "deepseek/deepseek-chat",
"chat_model_id": "deepseek-v4-flash",
"price_in": 0.00027,
"price_out": 0.0011,
"max_tokens_limit": 4096,
"api_key_env": "DEEPSEEK_API_KEY",
"base_url": "https://api.deepseek.com/v1",
},
"Qwen-Max": {
"litellm_id": "qwen/qwen-plus",
"chat_model_id": "qwen-plus",
"price_in": 0.001,
"price_out": 0.004,
"max_tokens_limit": 4096,
"api_key_env": "DASHSCOPE_API_KEY",
"base_url": "https://dashscope.aliyuncs.com/compatible-mode/v1",
},
"OpenAI-GPT-4o-mini": {
"litellm_id": "gpt-4o-mini",
"chat_model_id": "gpt-4o-mini",
"price_in": 0.00015,
"price_out": 0.0006,
"max_tokens_limit": 16384,
"api_key_env": "OPENAI_API_KEY",
"base_url": None,
},
}
# ✅ 当前激活聊天模型 key — 修改此处全局生效
ACTIVE_MODEL_KEY: str = "DeepSeek-V3"
# ── Embedding 模型配置(DashScope)─────────────────────────────────
EMBEDDING_MODEL: str = "text-embedding-v4"
EMBEDDING_DIM: int = 1024 # text-embedding-v4 维度
EMBEDDING_BASE_URL: str = "https://dashscope.aliyuncs.com/compatible-mode/v1"
# ── Reranker 模型配置(DashScope OpenAI 兼容接口)─────────────────
RERANKER_MODEL: str = "qwen3-rerank"
RERANKER_TOP_N: int = 5 # 默认精排后保留的数量
RERANKER_BASE_URL: str = "https://dashscope.aliyuncs.com/compatible-mode/v1"
辅助函数包括:
- 聊天模型:get_litellm_id(), get_chat_model_id(), get_api_key(), get_base_url(), get_active_config(), get_model_list(), estimate_cost()
- Embedding:get_dashscope_api_key(), get_embedding_model(), get_embedding_dim(), get_embedding_base_url()
- Reranker:get_reranker_model(), get_reranker_top_n(), get_reranker_base_url()
修改
ACTIVE_MODEL_KEY即可全局切换聊天模型,无需改动任何业务代码。
Step-by-Step 实现
Step 1:构建基础 RAG 基线(Naive RAG)
目标:先搭一个最简单的 Naive RAG 作为对照基线,后续所有优化都在此基础上叠加,对比才有说服力。
# baseline_rag.py
"""Naive RAG 基线实现,作为对比实验的起点。"""
from typing import Optional
from dataclasses import dataclass
from dotenv import load_dotenv
from openai import OpenAI
from qdrant_client import QdrantClient
from qdrant_client.models import (
Distance, VectorParams, PointStruct
)
from core_config import (
get_dashscope_api_key, get_embedding_base_url,
get_embedding_model, get_embedding_dim,
get_chat_model_id, get_api_key, get_base_url,
ACTIVE_MODEL_KEY,
)
load_dotenv()
@dataclass
class RetrievedChunk:
"""检索结果的数据结构,贯穿整条链路。"""
text: str
score: float
chunk_id: str
source: str = ""
class NaiveRAG:
"""
Naive RAG 基线:Embedding → 向量检索 → 直接生成。
不做任何查询改写、重排或压缩。
"""
COLLECTION = "naive_rag_demo"
def __init__(self) -> None:
# Embedding 客户端(DashScope,text-embedding-v4)
self.embed_client = OpenAI(
api_key=get_dashscope_api_key(),
base_url=get_embedding_base_url(),
)
# 聊天模型客户端(通过 core_config 统一配置)
self.chat_client = OpenAI(
api_key=get_api_key(),
base_url=get_base_url(),
)
# 使用内存模式的 Qdrant,无需启动独立服务
self.qdrant = QdrantClient(":memory:")
self._init_collection()
def _init_collection(self) -> None:
"""初始化向量集合(幂等操作)。"""
existing = [c.name for c in self.qdrant.get_collections().collections]
if self.COLLECTION not in existing:
self.qdrant.create_collection(
collection_name=self.COLLECTION,
vectors_config=VectorParams(
size=get_embedding_dim(), distance=Distance.COSINE
),
)
def embed(self, texts: list[str]) -> list[list[float]]:
"""批量 Embedding,使用配置的 Embedding 模型。"""
resp = self.embed_client.embeddings.create(
model=get_embedding_model(), input=texts
)
return [item.embedding for item in resp.data]
def index_documents(self, chunks: list[str]) -> None:
"""将文档切块写入向量库。"""
embeddings = self.embed(chunks)
points = [
PointStruct(
id=i,
vector=emb,
payload={"text": text, "chunk_id": str(i)},
)
for i, (text, emb) in enumerate(zip(chunks, embeddings))
]
self.qdrant.upsert(collection_name=self.COLLECTION, points=points)
print(f"已索引 {len(chunks)} 个切块")
def retrieve(self, query: str, top_k: int = 5) -> list[RetrievedChunk]:
"""向量检索,返回 Top-K 候选。"""
q_vec = self.embed([query])[0]
response = self.qdrant.query_points(
collection_name=self.COLLECTION,
query=q_vec,
limit=top_k,
)
return [
RetrievedChunk(
text=h.payload["text"],
score=h.score,
chunk_id=h.payload["chunk_id"],
)
for h in response.points
]
def generate(self, query: str, context_chunks: list[RetrievedChunk]) -> str:
"""基于检索结果生成答案。"""
context = "\n\n---\n\n".join(c.text for c in context_chunks)
messages = [
{
"role": "system",
"content": (
"你是一个精准的问答助手。请严格基于以下上下文回答问题,"
"如果上下文中没有相关信息,直接说'不知道',不要编造。"
),
},
{
"role": "user",
"content": f"上下文:\n{context}\n\n问题:{query}",
},
]
resp = self.chat_client.chat.completions.create(
model=get_chat_model_id(),
messages=messages,
temperature=0.1,
)
return resp.choices[0].message.content
def query(self, question: str, top_k: int = 5) -> dict:
"""完整的 RAG 查询链路,返回答案和检索到的切块。"""
chunks = self.retrieve(question, top_k=top_k)
answer = self.generate(question, chunks)
return {"answer": answer, "retrieved_chunks": chunks}
与初版 RAG 相比的关键变化:
- 不再硬编码模型名和 API Key,全部通过 core_config 统一获取
- 使用两个独立客户端:embed_client(DashScope text-embedding-v4)和 chat_client(可切换 DeepSeek/Qwen/OpenAI)
- Embedding 维度由配置决定(1024),而非写死 1536
- Qdrant API 从 search() 更新为 query_points()(新版 qdrant-client 兼容)
Step 2:查询改写——HyDE 与 Multi-Query
目标:原始查询往往是短句,语义稀疏,导致向量检索"找近义词"而非"找答案"。查询改写从用户意图出发,生成更丰富的检索信号,是 Advanced RAG 收益最高的单点优化。
两种策略各有侧重: - HyDE(Hypothetical Document Embeddings):让 LLM 先凭空"想象"一篇能回答该问题的文档,用这篇假设文档的 Embedding 去检索——因为假设文档在语义空间上比短查询更接近真实答案文档; - Multi-Query:让 LLM 从不同角度生成 3~5 个语义相近但措辞各异的查询,分别检索后合并——覆盖原始查询可能遗漏的语义变体。
# query_rewriter.py
"""查询改写模块:HyDE 与 Multi-Query 双策略。"""
import json
from openai import OpenAI
from core_config import get_chat_model_id
class QueryRewriter:
"""
封装 HyDE 和 Multi-Query 两种改写策略。
设计原则:策略可单独使用,也可组合使用。
"""
def __init__(self, client: OpenAI, model: str | None = None) -> None:
self.client = client
self.model = model or get_chat_model_id()
def hyde(self, query: str) -> str:
"""
HyDE:生成一篇假设性的"理想答案文档"。
为何 HyDE 有效:
向量模型训练时见过大量文档,对"文档风格"的 Embedding 比对"问题风格"
的 Embedding 更稳定。用假设文档去检索,相当于把查询从问题空间
映射到了文档空间。
局限:HyDE 依赖 LLM 生成质量,若 LLM 对领域不熟悉,
假设文档可能引入噪声,反而拉低召回质量。
"""
prompt = (
f"请为以下问题撰写一段简洁的参考答案(100字以内),"
f"语气像专业文档,不必完全准确,只需覆盖关键概念:\n\n问题:{query}"
)
resp = self.client.chat.completions.create(
model=self.model,
messages=[{"role": "user", "content": prompt}],
temperature=0.3, # 稍高温度增加多样性,但不宜过高
)
return resp.choices[0].message.content.strip()
def multi_query(self, query: str, n: int = 3) -> list[str]:
"""
Multi-Query:生成 N 个语义相近但措辞不同的查询。
返回的列表包含原始查询(第一个),确保原始意图不丢失。
"""
prompt = (
f"请将以下问题改写为 {n} 个语义相近但措辞不同的查询,"
f"以 JSON 数组格式返回,每个元素是一个字符串,不要加任何解释:\n\n"
f"原始问题:{query}"
)
resp = self.client.chat.completions.create(
model=self.model,
messages=[{"role": "user", "content": prompt}],
temperature=0.5,
response_format={"type": "json_object"},
)
try:
raw = json.loads(resp.choices[0].message.content)
# 兼容模型返回 {"queries": [...]} 或直接 [...] 两种格式
variants: list[str] = (
raw if isinstance(raw, list)
else next(iter(raw.values()))
)
except (json.JSONDecodeError, StopIteration):
variants = []
# 原始查询始终放第一位,保证召回的下界
return [query] + [v for v in variants if v != query]
def rrf_merge(
self,
results_list: list[list["RetrievedChunk"]], # 多路检索结果
k: int = 60,
top_n: int = 20,
) -> list["RetrievedChunk"]:
"""
Reciprocal Rank Fusion(RRF):将多路检索结果融合去重。
RRF 公式:score(d) = Σ 1 / (k + rank_i(d))
k=60 是 Cormack et al. 2009 论文推荐的默认值,实践中无需调整。
优势:不依赖各路检索的分值量纲,天然适合混合不同类型的检索器。
"""
from collections import defaultdict
chunk_scores: dict[str, float] = defaultdict(float)
chunk_map: dict[str, "RetrievedChunk"] = {}
for results in results_list:
for rank, chunk in enumerate(results, start=1):
chunk_scores[chunk.chunk_id] += 1.0 / (k + rank)
chunk_map[chunk.chunk_id] = chunk
merged = sorted(
chunk_map.values(),
key=lambda c: chunk_scores[c.chunk_id],
reverse=True,
)
return merged[:top_n]
⚠️ 生产注意:Multi-Query 会增加 N 倍的 Embedding 调用开销。若 Embedding API 有速率限制,应对多路查询做并发控制(asyncio.gather + Semaphore)。对于延迟敏感的场景,可仅用 Multi-Query 而不用 HyDE,节省一次 LLM 调用。
Step 3:重排序——DashScope qwen3-rerank API
目标:向量检索是"粗排",用内积/余弦快速从千万文档中找到 Top-20 候选;Reranker 是"精排",它将 Query 和每个候选文档一起送入重排模型,直接对相关性打分,准确度远高于向量相似度。
为什么需要两阶段?精排模型对全库检索的复杂度是 O(N × doc_len),代价太高;但对 Top-20 候选精排,延迟完全可以接受。
# reranker.py
"""Reranker 模块:使用 DashScope qwen3-rerank API 进行精排。"""
import httpx
from typing import TYPE_CHECKING
from core_config import (
get_dashscope_api_key, get_reranker_model,
get_reranker_top_n,
)
if TYPE_CHECKING:
from baseline_rag import RetrievedChunk
class DashScopeReranker:
"""
使用 DashScope qwen3-rerank 对候选文档精排。
优势:
- 无需加载本地模型,减少显存/内存占用
- qwen3-rerank 是阿里云最新发布的中文重排序模型
- 通过 API 调用,首次运行即可使用
模型选型说明:
- qwen3-rerank:中文场景推荐,效果好,无需本地 GPU
- bge-reranker-v2-m3(本地):如需离线使用可切换回本地方案
"""
def __init__(
self,
model: str | None = None,
api_key: str | None = None,
top_n: int | None = None,
) -> None:
self.model = model or get_reranker_model()
self.api_key = api_key or get_dashscope_api_key()
self.top_n = top_n if top_n is not None else get_reranker_top_n()
# DashScope 原生 rerank API 端点(非 OpenAI 兼容)
self.api_url = "https://dashscope.aliyuncs.com/api/v1/services/rerank/text-rerank/text-rerank"
print(f"✅ Reranker 已配置:{self.model}")
def rerank(
self,
query: str,
chunks: list["RetrievedChunk"],
top_n: int | None = None,
) -> list["RetrievedChunk"]:
"""
对候选切块重新打分并排序。
Args:
query: 用户原始查询(不是改写后的查询)
chunks: 向量检索的候选切块(通常 Top-20)
top_n: 精排后保留的数量(送入 LLM 的上下文)
Returns:
按分值降序排列的 Top-N 切块,score 字段更新为 reranker 分值。
"""
if not chunks:
return []
n = top_n if top_n is not None else self.top_n
documents = [chunk.text for chunk in chunks]
headers = {
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json",
}
payload = {
"model": self.model,
"input": {
"query": query,
"documents": documents,
},
}
try:
with httpx.Client(timeout=30.0) as client:
response = client.post(self.api_url, headers=headers, json=payload)
response.raise_for_status()
data = response.json()
# DashScope 返回格式: output.results[].relevance_score
results = data.get("output", {}).get("results", [])
# results 是无序的,需要按 index 映射回原始文档顺序
score_map = {}
for item in results:
idx = item.get("index", 0)
score = item.get("relevance_score", 0)
score_map[idx] = score
scores = [score_map.get(i, 0) for i in range(len(chunks))]
except Exception as e:
# 降级方案:回退到向量检索原始分数排序
print(f"⚠️ Reranker API 调用失败,回退到向量检索分数排序: {e}")
return sorted(chunks, key=lambda c: c.score, reverse=True)[:n]
# 将 reranker 分值回写到 RetrievedChunk
for chunk, score in zip(chunks, scores):
chunk.score = score
# 按分值降序,取 Top-N
reranked = sorted(chunks, key=lambda c: c.score, reverse=True)
return reranked[:n]
⚠️ 生产注意:DashScope rerank API 对 20 个候选的响应时间通常在 100~300ms。若调用失败,代码已内置降级逻辑——回退到向量检索原始分数排序,保证链路不中断。
Step 4:上下文压缩——LLMLingua 与摘要压缩
目标:重排后的 Top-5 文档往往仍有大量与问题无关的句子,直接塞入 LLM 既浪费 Token,也会因"Lost in the Middle"效应降低答案质量。上下文压缩的目标是:在不丢失关键信息的前提下,最大化删除噪声。
两种方案各有适用场景: - LLMLingua:基于小语言模型(如 Llama-2-7B)对每个 Token 计算困惑度,删掉困惑度低(即模型认为"废话")的 Token。优点是压缩率可控(可指定目标比例),缺点是首次加载模型较慢; - 摘要压缩:直接让 LLM 对检索结果做摘要,保留与查询最相关的内容。实现最简单,适合对延迟不敏感、但 Token 成本敏感的离线场景。
# context_compressor.py
"""上下文压缩模块:LLMLingua 与 LLM 摘要压缩两种实现。"""
from openai import OpenAI
from core_config import get_chat_model_id
class LLMLinguaCompressor:
"""
基于 LLMLingua 的 Token 级别压缩。
需要额外安装:uv pip install llmlingua==0.2.2
"""
def __init__(self, rate: float = 0.5) -> None:
"""
Args:
rate: 压缩率,0.5 表示保留 50% 的 Token。
实验建议:0.4~0.6 之间,低于 0.3 可能损失关键信息。
"""
from llmlingua import PromptCompressor
# 使用轻量级模型做 Token 打分,不参与最终生成
# 国内可替换为 Qwen/Qwen1.5-1.8B-Chat 等本地模型
self.compressor = PromptCompressor(
model_name="microsoft/llmlingua-2-xlm-roberta-large-meetingbank",
use_llmlingua2=True,
)
self.rate = rate
def compress(self, query: str, context: str) -> str:
"""
压缩上下文文本,保留与 query 最相关的 Token。
LLMLingua2 使用 query-aware 压缩:
会优先保留与 query 语义相关的 Token。
"""
result = self.compressor.compress_prompt(
context,
rate=self.rate,
question=query,
)
compressed = result["compressed_prompt"]
original_tokens = result["origin_tokens"]
compressed_tokens = result["compressed_tokens"]
ratio = 1 - compressed_tokens / max(original_tokens, 1)
print(
f"📉 LLMLingua 压缩:{original_tokens} → {compressed_tokens} tokens "
f"(节省 {ratio:.1%})"
)
return compressed
class SummaryCompressor:
"""
基于 LLM 摘要的上下文压缩。
实现最简单,适合快速集成,无需额外加载本地模型。
"""
def __init__(self, client: OpenAI, model: str | None = None) -> None:
self.client = client
self.model = model or get_chat_model_id()
def compress(
self,
query: str,
chunks: list["RetrievedChunk"],
max_words: int = 300,
) -> str:
"""
让 LLM 对多个检索切块做摘要,只保留与 query 相关的内容。
注意:这会引入额外的 LLM 调用。适合最终上下文需要控制在
1000 Token 以内的场景,不适合实时对话(延迟会增加 500ms+)。
"""
context = "\n\n---\n\n".join(
f"[切块 {i+1}]\n{c.text}" for i, c in enumerate(chunks)
)
prompt = (
f"以下是检索到的多段文本,请提取与问题直接相关的信息,"
f"压缩为不超过 {max_words} 字的摘要,保留具体数字和关键术语:\n\n"
f"问题:{query}\n\n文本:\n{context}"
)
resp = self.client.chat.completions.create(
model=self.model,
messages=[{"role": "user", "content": prompt}],
temperature=0.1,
)
return resp.choices[0].message.content.strip()
Step 5:组装 Advanced RAG 完整流水线
目标:将上述所有模块串联为一个可配置的 AdvancedRAG 类,支持按需开关各优化环节,方便做消融实验。
# advanced_rag.py
"""Advanced RAG 完整流水线,整合查询改写、重排序与上下文压缩。"""
from dataclasses import dataclass, field
from baseline_rag import NaiveRAG, RetrievedChunk
from query_rewriter import QueryRewriter
from reranker import DashScopeReranker
from context_compressor import SummaryCompressor
@dataclass
class AdvancedRAGConfig:
"""Advanced RAG 配置,每个优化环节可独立开关,便于消融实验。"""
# 查询改写配置
use_hyde: bool = True
use_multi_query: bool = True
multi_query_n: int = 3 # Multi-Query 生成几个变体
# 检索配置
retrieval_top_k: int = 20 # 粗排候选数,给重排留充足空间
# 重排配置
use_reranker: bool = True
reranker_top_n: int = 5 # 精排后送入 LLM 的切块数
# 压缩配置
use_compression: bool = False # 默认关闭,延迟敏感场景不建议开启
compression_max_words: int = 300
class AdvancedRAG(NaiveRAG):
"""
继承 NaiveRAG,叠加高级检索能力。
继承而非组合,是为了复用索引与 Embedding 逻辑,
避免重复代码,同时保持 Naive RAG 的基线可对比性。
"""
def __init__(self, config: AdvancedRAGConfig | None = None) -> None:
super().__init__()
self.config = config or AdvancedRAGConfig()
self.rewriter = QueryRewriter(self.chat_client)
self.reranker = DashScopeReranker() if self.config.use_reranker else None
self.compressor = SummaryCompressor(self.chat_client)
def _multi_retrieve(self, queries: list[str]) -> list[RetrievedChunk]:
"""
对多个查询分别检索,用 RRF 融合结果。
每路检索独立执行,后续 RRF 去重。
"""
all_results: list[list[RetrievedChunk]] = []
for q in queries:
results = self.retrieve(q, top_k=self.config.retrieval_top_k)
all_results.append(results)
if len(all_results) == 1:
return all_results[0]
return self.rewriter.rrf_merge(
all_results,
top_n=self.config.retrieval_top_k,
)
def advanced_query(self, question: str) -> dict:
"""
Advanced RAG 完整查询链路:
查询改写 → 多路检索 → RRF 融合 → 重排 → 压缩 → 生成
"""
cfg = self.config
queries = [question]
steps_log: list[str] = []
# ① 查询改写
if cfg.use_multi_query:
queries = self.rewriter.multi_query(question, n=cfg.multi_query_n)
steps_log.append(f"Multi-Query 生成 {len(queries)} 路查询")
if cfg.use_hyde:
hyde_doc = self.rewriter.hyde(question)
# HyDE 生成的假设文档作为额外的检索查询加入
queries.append(hyde_doc)
steps_log.append("HyDE 生成假设文档")
# ② 多路检索 + RRF 融合
candidates = self._multi_retrieve(queries)
steps_log.append(f"RRF 融合后候选数:{len(candidates)}")
# ③ Cross-Encoder 重排
if cfg.use_reranker and self.reranker:
candidates = self.reranker.rerank(
question, candidates, top_n=cfg.reranker_top_n
)
steps_log.append(f"重排后保留 Top-{cfg.reranker_top_n}")
else:
candidates = candidates[: cfg.reranker_top_n]
# ④ 上下文压缩(可选)
if cfg.use_compression and candidates:
compressed_ctx = self.compressor.compress(
question, candidates, max_words=cfg.compression_max_words
)
# 将压缩结果包装为单个 chunk,保持 generate 接口统一
candidates = [
RetrievedChunk(
text=compressed_ctx, score=1.0, chunk_id="compressed"
)
]
steps_log.append("上下文压缩完成")
# ⑤ LLM 生成
answer = self.generate(question, candidates)
return {
"answer": answer,
"retrieved_chunks": candidates,
"steps": steps_log,
"queries_used": queries,
}
完整运行验证
# e2e_test.py
"""端到端验证:对比 Naive RAG vs Advanced RAG 效果。"""
import time
from advanced_rag import AdvancedRAG, AdvancedRAGConfig
from baseline_rag import NaiveRAG
# ── 准备测试语料(模拟企业知识库切块)──────────────────────────────────────
DOCS = [
"向量数据库 Qdrant 支持 HNSW 索引,查询延迟在毫秒级别,适合大规模相似度检索。",
"BM25 是经典的稀疏检索算法,基于词频和逆文档频率,擅长精确关键词匹配。",
"混合检索结合稠密向量检索和稀疏 BM25 检索,通过 RRF 融合提升召回率。",
"RAG 的核心挑战是检索精度,向量检索可能因语义漂移返回不相关文档。",
"Cross-Encoder 重排序通过联合编码 Query 和文档来计算精确相关性分数,准确度高于 Bi-Encoder。",
"HyDE(Hypothetical Document Embeddings)通过 LLM 生成假设答案文档来改善查询表示。",
"LLMLingua 利用小语言模型压缩 Prompt,可将上下文 Token 减少 50% 同时保留关键信息。",
"BGE-Reranker 是 BAAI 发布的双语重排序模型,在 BEIR 基准上表现优异。",
"Naive RAG 直接将检索结果喂给 LLM,没有重排和压缩,容易引入噪声。",
"查询改写(Query Rewriting)通过生成多角度查询来覆盖更广泛的语义空间。",
]
TEST_QUESTIONS = [
"为什么 Cross-Encoder 的重排效果比向量检索更准确?",
"HyDE 和 Multi-Query 分别是如何改进检索的?",
"如何在 RAG 中控制输入 LLM 的 Token 数量?",
]
def run_experiment() -> None:
print("=" * 60)
print("🚀 初始化 Naive RAG 基线...")
naive = NaiveRAG()
naive.index_documents(DOCS)
print("\n🚀 初始化 Advanced RAG(仅开启重排+改写,关闭压缩)...")
cfg = AdvancedRAGConfig(
use_hyde=True,
use_multi_query=True,
use_reranker=True,
use_compression=False, # 首次验证先关闭,减少调用
retrieval_top_k=10,
reranker_top_n=3,
)
advanced = AdvancedRAG(config=cfg)
advanced.index_documents(DOCS) # 共用同一份文档
print("\n" + "=" * 60)
for q in TEST_QUESTIONS:
print(f"\n📌 问题:{q}")
print("-" * 40)
# Naive RAG
t0 = time.time()
naive_result = naive.query(q, top_k=3)
naive_time = time.time() - t0
# Advanced RAG
t0 = time.time()
adv_result = advanced.advanced_query(q)
adv_time = time.time() - t0
print(f"[Naive RAG | {naive_time:.2f}s] {naive_result['answer'][:120]}...")
print(f"[Advanced | {adv_time:.2f}s] {adv_result['answer'][:120]}...")
print(f" 优化步骤:{' → '.join(adv_result['steps'])}")
print(f" 使用查询数:{len(adv_result['queries_used'])}")
if __name__ == "__main__":
run_experiment()
预期输出:
============================================================
🚀 初始化 Naive RAG 基线...
已索引 10 个切块
🚀 初始化 Advanced RAG(仅开启重排+改写,关闭压缩)...
✅ Reranker 已配置:qwen3-rerank
已索引 10 个切块
============================================================
📌 问题:为什么 Cross-Encoder 的重排效果比向量检索更准确?
----------------------------------------
[Naive RAG | 1.23s] Cross-Encoder通过将Query和文档拼接后联合编码,能直接建模两者的交互...
[Advanced | 2.87s] Cross-Encoder重排序准确度高于Bi-Encoder的原因在于其联合编码机制——它将查询...
优化步骤:Multi-Query 生成 4 路查询 → HyDE 生成假设文档 → RRF 融合后候选数:10 → 重排后保留 Top-3
使用查询数:5
快速演示(main.py)
项目提供了 main.py 作为快速演示入口,使用精简的文档集和默认配置:
# main.py
"""主入口:演示 Advanced RAG 查询流程。"""
from advanced_rag import AdvancedRAG, AdvancedRAGConfig
from baseline_rag import NaiveRAG
# ── 示例文档 ───────────────────────────────────────────────────────────────────
DOCS = [
"向量数据库 Qdrant 支持 HNSW 索引,查询延迟在毫秒级别,适合大规模相似度检索。",
"BM25 是经典的稀疏检索算法,基于词频和逆文档频率,擅长精确关键词匹配。",
"混合检索结合稠密向量检索和稀疏 BM25 检索,通过 RRF 融合提升召回率。",
"Cross-Encoder 重排序通过联合编码 Query 和文档来计算精确相关性分数。",
"HyDE 通过 LLM 生成假设答案文档来改善查询表示。",
"查询改写通过生成多角度查询来覆盖更广泛的语义空间。",
]
def main() -> None:
print("=" * 60)
print("Advanced RAG 演示(重排序 + 查询改写)")
print("=" * 60)
# 初始化 Advanced RAG
cfg = AdvancedRAGConfig(
use_hyde=True,
use_multi_query=True,
multi_query_n=2,
use_reranker=True,
use_compression=True,
retrieval_top_k=10,
reranker_top_n=3,
)
rag = AdvancedRAG(config=cfg)
rag.index_documents(DOCS)
# 演示查询
question = "为什么 Cross-Encoder 的重排效果比向量检索更准确?"
print(f"\n问题:{question}")
result = rag.advanced_query(question)
print(f"答案:{result['answer']}")
print(f"优化步骤:{' → '.join(result['steps'])}")
if __name__ == "__main__":
main()
常见报错与解决方案
| 报错信息 | 原因 | 解决方案 |
|---|---|---|
DashScope API key not found |
未设置 DASHSCOPE_API_KEY 环境变量 |
在 .env 中添加 DASHSCOPE_API_KEY=sk-... |
Reranker API 调用失败 |
DashScope 网络不通或模型未开通 | 确认 qwen3-rerank 已在阿里云控制台开通;检查网络连接 |
json.JSONDecodeError in multi_query |
模型返回非标准 JSON | 已在代码中做了兼容处理;若仍报错,将 response_format 改为 text 并手动 parse |
RateLimitError: 429 |
Embedding 并发过高 | 对多路 Embedding 调用添加 asyncio.Semaphore(5) 限制并发数 |
qdrant_client.http.exceptions.UnexpectedResponse |
Qdrant 集合不存在 | 确认 _init_collection() 已在 index_documents 前调用 |
ImportError: llmlingua requires torch |
llmlingua 依赖 PyTorch | uv pip install torch==2.4.0 --index-url https://download.pytorch.org/whl/cpu |
扩展练习(可选)
-
🟡 中等:为本节的 Advanced RAG 接入 BM25 稀疏检索(使用
rank-bm25库),与向量检索做混合,观察 RRF 融合后召回率是否进一步提升。参考指标:构建一个包含 20 个问题的评估集,统计"正确切块在 Top-5 内"的比例作为 Recall@5。 -
🔴 困难:实现一个自动化消融实验框架——枚举
AdvancedRAGConfig的所有布尔开关组合(共 \(2^3 = 8\) 种),对同一评估集跑全部组合,输出一张 Recall@5 vs 平均延迟的散点图,帮助团队在"效果-性能"权衡中做出数据驱动的选型决策。