Agent 记忆模块深度技术文档
一、记忆模块概述
1.1 为什么 Agent 需要记忆?
在传统软件开发中,系统状态存储在数据库、缓存或文件系统中。但当系统从"规则驱动"转向"AI 驱动"时,记忆成为Agent 的核心能力之一:
| 维度 | 传统系统 | Agent 系统 |
|---|---|---|
| 状态存储 | 结构化数据库 | 非结构化向量存储 + Context Window |
| 知识获取 | 硬编码规则 | 从对话、文档中学习 |
| 上下文理解 | Session 管理 | 记忆检索 + LLM 推理 |
| 个性化能力 | 用户配置表 | 长期记忆 + 用户画像 |
核心价值:
- 连续性:跨对话保持上下文,记住用户偏好
- 学习能力:从历史交互中积累知识
- 个性化:针对不同用户提供定制化服务
- 效率优化:避免重复询问相同信息
1.2 记忆的分类维度
从不同维度理解记忆系统:
┌─────────────────────────────────────────────────────────┐
│ 记忆系统分类矩阵 │
├─────────────────────────────────────────────────────────┤
│ │
│ 时效维度 存储维度 │
│ ├─ 短期记忆 ├─ 内存存储 │
│ ├─ 长期记忆 ├─ 向量数据库 │
│ └─ 工作记忆 └─ 关系数据库 │
│ │
│ 内容维度 访问维度 │
│ ├─ 对话历史 ├─ 精确匹配 │
│ ├─ 知识事实 ├─ 语义检索 │
│ ├─ 用户画像 └─ 混合检索 │
│ └─ 任务状态 │
│ │
└─────────────────────────────────────────────────────────┘
二、短期记忆(Short-term Memory)
2.1 核心概念
短期记忆存储在 LLM 的 Context Window(上下文窗口)中,随对话结束而消失。
关键参数:
- Context Window 大小:不同模型的支持能力
- GPT-4 Turbo:128K tokens
- GPT-4:8K tokens
- Claude 3 Opus:200K tokens
- Claude 3.5 Sonnet:200K tokens
核心挑战:
- Token 限制:长对话会超出限制导致截断
- 成本问题:每次请求都要传递完整历史
- 注意力稀释:上下文过长影响模型推理质量
2.2 实现方案对比
方案一:完整对话历史(ConversationBufferMemory)
原理:保留所有历史对话
from langchain.memory import ConversationBufferMemory
from langchain.chains import ConversationChain
# 初始化记忆
memory = ConversationBufferMemory()
# 对话过程
memory.save_context({"input": "你好"}, {"output": "你好!我是AI助手"})
memory.save_context({"input": "我叫张三"}, {"output": "你好张三!很高兴认识你"})
# 查看记忆内容
print(memory.load_memory_variables({}))
# 输出:{'history': 'Human: 你好\nAI: 你好!我是AI助手\nHuman: 我叫张三\nAI: 你好张三!很高兴认识你'}
优点:
- 实现简单
- 不丢失任何信息
缺点:
- Token 消耗线性增长
- 长对话会超出 Context Window
适用场景:
- 短对话(< 20 轮)
- 需要完整上下文的场景
方案二:滑动窗口(ConversationBufferWindowMemory)
原理:只保留最近 K 轮对话
from langchain.memory import ConversationBufferWindowMemory
# 只保留最近 5 轮对话
memory = ConversationBufferWindowMemory(k=5)
# 模拟 10 轮对话
for i in range(10):
memory.save_context(
{"input": f"问题 {i+1}"},
{"output": f"回答 {i+1}"}
)
# 查看记忆(只保留最后 5 轮)
print(memory.load_memory_variables({}))
# 输出:最近 5 轮的对话内容
优点:
- Token 消耗可控
- 实现简单
缺点:
- 会丢失早期对话
- 无法回溯远期信息
适用场景:
- 中等长度对话
- 更关注最近上下文的场景
Token 计算:
def estimate_tokens_window(k: int, avg_tokens_per_turn: int = 50) -> int:
"""
估算滑动窗口的 Token 消耗
Args:
k: 保留的对话轮数
avg_tokens_per_turn: 平均每轮对话的 Token 数
Returns:
总 Token 数
"""
return k * avg_tokens_per_turn
# 示例:5 轮对话,平均每轮 50 tokens
print(f"预估 Token: {estimate_tokens_window(5)}") # 250 tokens
方案三:Token 限制截断(ConversationTokenBufferMemory)
原理:根据 Token 数量动态截断
from langchain.memory import ConversationTokenBufferMemory
from langchain_openai import OpenAI
llm = OpenAI()
# 限制最大 2000 tokens
memory = ConversationTokenBufferMemory(
llm=llm,
max_token_limit=2000
)
# 添加对话
memory.save_context({"input": "长文本..." * 100}, {"output": "回答..."})
# 自动截断超出部分
print(memory.load_memory_variables({}))
优点:
- 精确控制 Token 消耗
- 最大化利用 Context Window
缺点:
- 截断位置可能不自然
- 需要额外的 Token 计算
实现细节:
from langchain_core.messages import BaseMessage, get_buffer_string
class CustomTokenBufferMemory:
"""自定义 Token 限制记忆"""
def __init__(self, llm, max_token_limit: int = 4000):
self.llm = llm
self.max_token_limit = max_token_limit
self.buffer: list[BaseMessage] = []
def save_context(self, inputs: dict, outputs: dict):
"""保存对话上下文"""
from langchain_core.messages import HumanMessage, AIMessage
self.buffer.append(HumanMessage(content=inputs["input"]))
self.buffer.append(AIMessage(content=outputs["output"]))
# 检查并截断
self._trim_buffer()
def _trim_buffer(self):
"""截断超出 Token 限制的部分"""
current_tokens = self._count_tokens()
while current_tokens > self.max_token_limit and len(self.buffer) > 0:
# 从最旧的对话开始删除
removed = self.buffer.pop(0)
current_tokens -= self._count_message_tokens(removed)
def _count_tokens(self) -> int:
"""计算当前 Buffer 的 Token 数"""
buffer_str = get_buffer_string(self.buffer)
return len(self.llm.get_num_tokens(buffer_str))
def _count_message_tokens(self, message: BaseMessage) -> int:
"""计算单条消息的 Token 数"""
return len(self.llm.get_num_tokens(message.content))
方案四:摘要记忆(ConversationSummaryMemory)
原理:定期对历史对话生成摘要,用摘要替代原始对话
from langchain.memory import ConversationSummaryMemory
from langchain_openai import ChatOpenAI
llm = ChatOpenAI(model="gpt-4")
memory = ConversationSummaryMemory(llm=llm)
# 添加对话
memory.save_context(
{"input": "我最近在学习 Python,有什么建议吗?"},
{"output": "建议从基础语法开始,然后学习数据结构和算法..."}
)
memory.save_context(
{"input": "有没有推荐的书籍?"},
{"output": "推荐《Python编程:从入门到实践》和《流畅的Python》..."}
)
# 查看摘要
print(memory.load_memory_variables({}))
# 输出:{'history': '用户正在学习Python,我推荐了从基础语法开始学习,并建议了两本书籍...'}
优点:
- 大幅压缩 Token 消耗
- 保留关键信息
缺点:
- 摘要过程需要额外的 LLM 调用(成本)
- 可能丢失细节信息
- 摘要质量依赖模型能力
成本分析:
def calculate_summary_cost(
conversation_turns: int,
avg_tokens_per_turn: int = 50,
summary_interval: int = 10,
compression_ratio: float = 0.2
) -> dict:
"""
计算摘要记忆的成本
Args:
conversation_turns: 总对话轮数
avg_tokens_per_turn: 平均每轮对话的 Token 数
summary_interval: 每隔多少轮生成一次摘要
compression_ratio: 摘要压缩比
Returns:
成本分析字典
"""
# 原始对话的 Token 数
original_tokens = conversation_turns * avg_tokens_per_turn
# 摘要次数
summary_count = conversation_turns // summary_interval
# 摘要生成成本(输入 tokens + 输出 tokens)
summary_generation_cost = summary_count * summary_interval * avg_tokens_per_turn
# 最终保留的摘要 Token 数
final_summary_tokens = original_tokens * compression_ratio
# 最近未摘要的对话
recent_unsummarized = (conversation_turns % summary_interval) * avg_tokens_per_turn
# 总 Token 消耗(包括摘要生成 + 最终传递)
total_tokens = summary_generation_cost + final_summary_tokens + recent_unsummarized
return {
"original_tokens": original_tokens,
"total_tokens_consumed": total_tokens,
"compression_rate": (1 - (final_summary_tokens + recent_unsummarized) / original_tokens),
"summary_count": summary_count
}
# 示例:100 轮对话
cost = calculate_summary_cost(
conversation_turns=100,
avg_tokens_per_turn=50,
summary_interval=10,
compression_ratio=0.2
)
print(f"压缩率: {cost['compression_rate']:.1%}") # 压缩率: 72.0%
方案五:混合记忆(ConversationSummaryBufferMemory)
原理:结合摘要和完整对话,保留最近 K 轮 + 历史摘要
from langchain.memory import ConversationSummaryBufferMemory
from langchain_openai import ChatOpenAI
llm = ChatOpenAI(model="gpt-4")
# 保留最近 5 轮 + 历史摘要
memory = ConversationSummaryBufferMemory(
llm=llm,
max_token_limit=2000
)
# 添加 20 轮对话
for i in range(20):
memory.save_context(
{"input": f"问题 {i+1}"},
{"output": f"回答 {i+1}"}
)
# 查看记忆:早期对话会被摘要,最近几轮保持完整
print(memory.load_memory_variables({}))
优点:
- 平衡信息完整性和 Token 效率
- 保留最近关键上下文
缺点:
- 实现复杂
- 需要调优参数
架构图:
┌─────────────────────────────────────────────────────────┐
│ 混合记忆结构 │
├─────────────────────────────────────────────────────────┤
│ │
│ [对话历史] │
│ ├─ 第 1-10 轮 ──→ 摘要生成 ──→ 摘要 A (200 tokens) │
│ ├─ 第 11-15 轮 ──→ 摘要生成 ──→ 摘要 B (200 tokens) │
│ └─ 第 16-20 轮 ──→ 完整保留 ──→ 5 轮对话 (500 tokens)│
│ │
│ [最终传递给 LLM] │
│ └─ 摘要 A + 摘要 B + 最近 5 轮 = 900 tokens │
│ │
└─────────────────────────────────────────────────────────┘
2.3 短期记忆最佳实践
实践一:动态调整窗口大小
根据任务复杂度动态调整保留的对话轮数:
class DynamicWindowMemory:
"""动态调整窗口大小的记忆"""
def __init__(self, initial_k: int = 5, max_k: int = 20):
self.k = initial_k
self.max_k = max_k
self.buffer = []
self.complexity_scores = []
def save_context(self, inputs: dict, outputs: dict):
"""保存上下文并评估复杂度"""
self.buffer.append((inputs, outputs))
# 评估对话复杂度
complexity = self._estimate_complexity(inputs["input"])
self.complexity_scores.append(complexity)
# 动态调整窗口
self._adjust_window()
def _estimate_complexity(self, text: str) -> float:
"""估算对话复杂度"""
# 简单启发式:基于文本长度和关键词
score = 0.0
# 长度因素
if len(text) > 200:
score += 0.3
# 关键词因素
complex_keywords = ["详细", "解释", "为什么", "如何", "分析"]
if any(kw in text for kw in complex_keywords):
score += 0.3
# 多问题因素
if text.count("?") > 1 or text.count("?") > 1:
score += 0.4
return min(score, 1.0)
def _adjust_window(self):
"""根据复杂度调整窗口大小"""
if len(self.complexity_scores) < 5:
return
# 计算最近 5 轮的平均复杂度
avg_complexity = sum(self.complexity_scores[-5:]) / 5
# 动态调整 k
if avg_complexity > 0.6:
self.k = min(self.k + 2, self.max_k) # 增加窗口
elif avg_complexity < 0.3:
self.k = max(self.k - 1, 3) # 减小窗口
实践二:重要信息提取与保留
识别并优先保留重要信息:
from langchain_core.messages import HumanMessage, AIMessage
import re
class ImportanceAwareMemory:
"""重要信息感知的记忆"""
def __init__(self, max_tokens: int = 4000):
self.max_tokens = max_tokens
self.buffer = []
self.important_facts = []
def save_context(self, inputs: dict, outputs: dict):
"""保存上下文并提取重要信息"""
# 提取重要信息
important_info = self._extract_important_info(inputs["input"], outputs["output"])
if important_info:
self.important_facts.extend(important_info)
# 保存完整对话
self.buffer.append((inputs, outputs))
# 检查并压缩
self._compress_if_needed()
def _extract_important_info(self, user_input: str, ai_output: str) -> list:
"""提取重要信息"""
important = []
# 模式 1:用户偏好
preference_patterns = [
r"我喜欢(.+)",
r"我偏好(.+)",
r"我希望(.+)",
r"我叫(.+)",
r"我的(.+)是"
]
for pattern in preference_patterns:
matches = re.findall(pattern, user_input)
important.extend(matches)
# 模式 2:关键决策
decision_keywords = ["决定", "确认", "选择", "同意"]
if any(kw in user_input for kw in decision_keywords):
important.append(f"决策: {user_input}")
return important
def _compress_if_needed(self):
"""如果超出限制则压缩"""
current_tokens = self._count_tokens()
if current_tokens > self.max_tokens:
# 保留重要信息 + 最近对话
recent_conversations = self.buffer[-5:]
# 构建压缩后的记忆
compressed = {
"important_facts": self.important_facts,
"recent_conversations": recent_conversations
}
# 替换 buffer
self.buffer = compressed
def load_memory(self) -> str:
"""加载记忆"""
if isinstance(self.buffer, list):
# 正常模式
return self._format_conversations(self.buffer)
else:
# 压缩模式
facts_str = "\n".join([f"- {fact}" for fact in self.buffer["important_facts"]])
recent_str = self._format_conversations(self.buffer["recent_conversations"])
return f"【重要信息】\n{facts_str}\n\n【最近对话】\n{recent_str}"
实践三:多级记忆架构
构建多级记忆系统,平衡效率和完整性:
class MultiLevelMemory:
"""多级记忆架构"""
def __init__(self):
# Level 1: 工作记忆(当前对话)
self.working_memory = []
# Level 2: 短期记忆(滑动窗口)
self.short_term_memory = []
self.short_term_limit = 10
# Level 3: 摘要记忆
self.summary_memory = ""
# Level 4: 重要信息
self.key_facts = []
def add_message(self, role: str, content: str):
"""添加消息"""
# 添加到工作记忆
self.working_memory.append({"role": role, "content": content})
# 如果工作记忆达到阈值,推送到短期记忆
if len(self.working_memory) >= 2:
self._push_to_short_term()
# 如果短期记忆达到阈值,生成摘要
if len(self.short_term_memory) >= self.short_term_limit:
self._summarize_short_term()
def _push_to_short_term(self):
"""推送到短期记忆"""
# 提取重要信息
self._extract_key_facts()
# 移动到短期记忆
self.short_term_memory.extend(self.working_memory)
self.working_memory = []
def _extract_key_facts(self):
"""提取关键事实"""
# 简化版:实际应该用 LLM 提取
for msg in self.working_memory:
if "我叫" in msg["content"] or "我喜欢" in msg["content"]:
self.key_facts.append(msg["content"])
def _summarize_short_term(self):
"""摘要短期记忆"""
# 生成摘要(实际应该调用 LLM)
summary = f"过去 {len(self.short_term_memory)} 条消息的摘要..."
# 更新摘要记忆
if self.summary_memory:
self.summary_memory += "\n" + summary
else:
self.summary_memory = summary
# 清空短期记忆
self.short_term_memory = []
def get_context_for_llm(self, max_tokens: int = 4000) -> str:
"""获取传递给 LLM 的上下文"""
context_parts = []
# 1. 关键事实(优先级最高)
if self.key_facts:
context_parts.append("【用户信息】\n" + "\n".join(self.key_facts))
# 2. 历史摘要
if self.summary_memory:
context_parts.append("【历史摘要】\n" + self.summary_memory)
# 3. 短期记忆
if self.short_term_memory:
short_term_str = self._format_messages(self.short_term_memory)
context_parts.append("【近期对话】\n" + short_term_str)
# 4. 工作记忆(当前对话)
if self.working_memory:
working_str = self._format_messages(self.working_memory)
context_parts.append("【当前对话】\n" + working_str)
return "\n\n".join(context_parts)
def _format_messages(self, messages: list) -> str:
"""格式化消息"""
return "\n".join([
f"{msg['role']}: {msg['content']}"
for msg in messages
])
三、长期记忆(Long-term Memory)
3.1 核心概念
长期记忆用于存储跨对话的信息,需要外部存储系统支持。
关键特性:
- 持久化:数据永久存储
- 跨会话:不同对话可以访问相同记忆
- 可扩展:理论上无限容量
核心技术:
- 向量数据库(Vector Database)
- 嵌入模型(Embedding Model)
- 检索算法(Retrieval Algorithm)
3.2 向量数据库选型
主流向量数据库对比
| 数据库 | 类型 | 特点 | 适用场景 | 性能 |
|---|---|---|---|---|
| Pinecone | 云服务 | 托管式,零运维 | 生产环境,快速上手 | ⭐⭐⭐⭐⭐ |
| Milvus | 开源 | 高性能,可扩展 | 大规模生产环境 | ⭐⭐⭐⭐⭐ |
| Weaviate | 开源 | 语义搜索强 | 知识图谱场景 | ⭐⭐⭐⭐ |
| ChromaDB | 开源 | 轻量级,易用 | 开发测试,小规模应用 | ⭐⭐⭐ |
| Qdrant | 开源 | Rust 实现,高性能 | 性能敏感场景 | ⭐⭐⭐⭐⭐ |
| FAISS | 库 | Meta 开源,纯算法 | 本地嵌入,极致性能 | ⭐⭐⭐⭐⭐ |
选型决策树
需要长期记忆?
├─ 是
│ ├─ 团队有运维能力?
│ │ ├─ 是
│ │ │ ├─ 数据规模 > 1000万向量?
│ │ │ │ ├─ 是 → Milvus
│ │ │ │ └─ 否 → Qdrant
│ │ │ └─ 需要 RAG + 知识图谱?
│ │ │ ├─ 是 → Weaviate
│ │ │ └─ 否 → Qdrant
│ │ └─ 否(无运维能力)
│ │ └─ Pinecone(托管服务)
│ └─ 开发测试阶段?
│ └─ ChromaDB(最简单)
└─ 否(仅需短期记忆)
└─ 使用内存存储
3.3 嵌入模型选型
主流嵌入模型对比
| 模型 | 提供商 | 维度 | 性能 | 成本 |
|---|---|---|---|---|
| text-embedding-3-small | OpenAI | 1536 | ⭐⭐⭐⭐ | $0.02/1M tokens |
| text-embedding-3-large | OpenAI | 3072 | ⭐⭐⭐⭐⭐ | $0.13/1M tokens |
| text-embedding-ada-002 | OpenAI | 1536 | ⭐⭐⭐ | $0.10/1M tokens |
| bge-large-zh-v1.5 | BGE | 1024 | ⭐⭐⭐⭐ | 免费(本地) |
| bge-m3 | BGE | 1024 | ⭐⭐⭐⭐⭐ | 免费(本地) |
| Cohere embed-v3 | Cohere | 1024 | ⭐⭐⭐⭐⭐ | $0.10/1M tokens |
选择建议
# 场景 1:追求最佳性能
from langchain_openai import OpenAIEmbeddings
embeddings = OpenAIEmbeddings(model="text-embedding-3-large")
# 场景 2:成本敏感
from langchain_openai import OpenAIEmbeddings
embeddings = OpenAIEmbeddings(model="text-embedding-3-small")
# 场景 3:中文场景,本地部署
from langchain_community.embeddings import HuggingFaceBgeEmbeddings
embeddings = HuggingFaceBgeEmbeddings(
model_name="BAAI/bge-large-zh-v1.5",
model_kwargs={'device': 'cuda'},
encode_kwargs={'normalize_embeddings': True}
)
# 场景 4:多语言混合
from langchain_cohere import CohereEmbeddings
embeddings = CohereEmbeddings(model="embed-multilingual-v3.0")
3.4 实现方案详解
方案一:基于向量检索的记忆
架构图:
┌─────────────────────────────────────────────────────────┐
│ 向量检索记忆架构 │
├─────────────────────────────────────────────────────────┤
│ │
│ 用户输入 │
│ ↓ │
│ Embedding 模型 │
│ ↓ │
│ 查询向量 │
│ ↓ │
│ 向量数据库 ──→ 相似度检索 ──→ Top-K 记忆 │
│ ↓ │
│ 相关记忆 + 用户输入 ──→ LLM ──→ 回复 │
│ ↓ │
│ 回复 + 输入 ──→ Embedding ──→ 存入向量数据库 │
│ │
└─────────────────────────────────────────────────────────┘
实现代码:
from langchain.memory import VectorStoreRetrieverMemory
from langchain_community.vectorstores import Chroma
from langchain_openai import OpenAIEmbeddings
from langchain.chains import ConversationChain
from langchain_openai import ChatOpenAI
# 1. 初始化向量数据库
embeddings = OpenAIEmbeddings()
vectorstore = Chroma(
embedding_function=embeddings,
persist_directory="./chroma_memory_db"
)
# 2. 创建检索器
retriever = vectorstore.as_retriever(
search_kwargs={"k": 5} # 检索最相关的 5 条记忆
)
# 3. 创建记忆
memory = VectorStoreRetrieverMemory(retriever=retriever)
# 4. 使用记忆
# 保存对话
memory.save_context(
{"input": "我喜欢吃苹果"},
{"output": "好的,我会记住你喜欢吃苹果"}
)
memory.save_context(
{"input": "我叫张三"},
{"output": "你好张三!很高兴认识你"}
)
# 5. 检索相关记忆
relevant_memories = memory.load_memory_variables({"prompt": "我的名字是什么?"})
print(relevant_memories)
# 输出:最相关的记忆,包含"我叫张三"的对话
高级实现:带元数据的记忆:
from datetime import datetime
from typing import Optional
from langchain_core.documents import Document
class AdvancedVectorMemory:
"""高级向量记忆系统"""
def __init__(self, vectorstore, embeddings, user_id: str):
self.vectorstore = vectorstore
self.embeddings = embeddings
self.user_id = user_id
def save_memory(
self,
user_input: str,
ai_output: str,
importance: str = "normal", # high, normal, low
category: Optional[str] = None
):
"""保存记忆"""
# 构建文档
document = Document(
page_content=f"User: {user_input}\nAI: {ai_output}",
metadata={
"user_id": self.user_id,
"timestamp": datetime.now().isoformat(),
"importance": importance,
"category": category or "general",
"type": "conversation"
}
)
# 添加到向量数据库
self.vectorstore.add_documents([document])
def save_fact(self, fact: str, category: str = "fact"):
"""保存事实性知识"""
document = Document(
page_content=fact,
metadata={
"user_id": self.user_id,
"timestamp": datetime.now().isoformat(),
"type": "fact",
"category": category
}
)
self.vectorstore.add_documents([document])
def retrieve_relevant(
self,
query: str,
k: int = 5,
filters: Optional[dict] = None
) -> list:
"""检索相关记忆"""
# 基础过滤条件
base_filter = {"user_id": self.user_id}
# 合并额外过滤条件
if filters:
base_filter.update(filters)
# 检索
results = self.vectorstore.similarity_search(
query,
k=k,
filter=base_filter
)
return results
def retrieve_by_time_range(
self,
start_time: datetime,
end_time: datetime,
k: int = 10
) -> list:
"""按时间范围检索"""
# 注意:不同向量数据库的过滤语法不同
# 这里以 Chroma 为例
results = self.vectorstore.similarity_search(
"", # 空查询
k=k,
filter={
"user_id": self.user_id,
"timestamp": {
"$gte": start_time.isoformat(),
"$lte": end_time.isoformat()
}
}
)
return results
def get_user_preferences(self) -> list:
"""获取用户偏好"""
return self.retrieve_relevant(
query="用户偏好",
k=10,
filters={"category": "preference"}
)
def get_important_memories(self) -> list:
"""获取重要记忆"""
return self.retrieve_relevant(
query="",
k=10,
filters={"importance": "high"}
)
方案二:混合检索记忆
结合向量检索和关键词检索,提高召回率:
from langchain.retrievers import EnsembleRetriever
from langchain_community.retrievers import BM25Retriever
from langchain_community.vectorstores import Chroma
class HybridMemory:
"""混合检索记忆"""
def __init__(self, vectorstore, documents):
# 向量检索器
self.vector_retriever = vectorstore.as_retriever(
search_kwargs={"k": 5}
)
# 关键词检索器(BM25)
self.keyword_retriever = BM25Retriever.from_documents(documents)
self.keyword_retriever.k = 5
# 混合检索器
self.ensemble_retriever = EnsembleRetriever(
retrievers=[self.vector_retriever, self.keyword_retriever],
weights=[0.6, 0.4] # 向量检索权重更高
)
def retrieve(self, query: str) -> list:
"""混合检索"""
return self.ensemble_retriever.invoke(query)
混合检索的优势:
查询:"Python 异常处理"
┌─────────────────────────────────────────────────────────┐
│ 向量检索结果 │
├─────────────────────────────────────────────────────────┤
│ 1. 如何捕获 Python 异常? │
│ 2. try-except 语句的使用 │
│ 3. Python 错误处理最佳实践 │
└─────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────┐
│ 关键词检索结果 │
├─────────────────────────────────────────────────────────┤
│ 1. Python 异常类型列表 │
│ 2. 异常处理的代码示例 │
│ 3. raise 语句的用法 │
└─────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────┐
│ 混合检索结果 │
├─────────────────────────────────────────────────────────┤
│ 去重 + 加权排序 → 更全面的记忆召回 │
└─────────────────────────────────────────────────────────┘
方案三:知识图谱增强记忆
将记忆组织成知识图谱,支持复杂推理:
from langchain.graphs import Neo4jGraph
from langchain.chains import GraphQAChain
class KnowledgeGraphMemory:
"""知识图谱记忆"""
def __init__(self, neo4j_url: str, neo4j_user: str, neo4j_password: str):
self.graph = Neo4jGraph(
url=neo4j_url,
username=neo4j_user,
password=neo4j_password
)
def save_entity(self, entity_type: str, entity_name: str, properties: dict):
"""保存实体"""
props_str = ", ".join([f"{k}: {v}" for k, v in properties.items()])
query = f"""
MERGE (e:{entity_type} {{name: $name}})
SET e += {{{props_str}}}
"""
self.graph.query(query, params={"name": entity_name})
def save_relation(self, from_entity: str, relation: str, to_entity: str):
"""保存关系"""
query = f"""
MATCH (a {{name: $from_name}})
MATCH (b {{name: $to_name}})
MERGE (a)-[r:{relation}]->(b)
"""
self.graph.query(query, params={
"from_name": from_entity,
"to_name": to_entity
})
def save_conversation_memory(
self,
user_id: str,
entities: list,
relations: list,
conversation_id: str
):
"""保存对话记忆到知识图谱"""
# 保存实体
for entity in entities:
self.save_entity(
entity_type=entity["type"],
entity_name=entity["name"],
properties=entity.get("properties", {})
)
# 保存关系
for relation in relations:
self.save_relation(
from_entity=relation["from"],
relation=relation["type"],
to_entity=relation["to"]
)
# 关联到对话
query = """
MATCH (u:User {id: $user_id})
MATCH (c:Conversation {id: $conv_id})
MERGE (u)-[:HAD]->(c)
"""
self.graph.query(query, params={
"user_id": user_id,
"conv_id": conversation_id
})
def query_related_knowledge(self, entity_name: str, depth: int = 2) -> list:
"""查询相关知识"""
query = f"""
MATCH path = (e {{name: $name}})-[*1..{depth}]-(related)
RETURN path
"""
return self.graph.query(query, params={"name": entity_name})
知识图谱记忆示例:
用户说:"我和李四在讨论 Python 项目"
知识图谱存储:
┌─────────────────────────────────────────────────────────┐
│ │
│ (User:张三) ──[:DISCUSSED_WITH]──→ (User:李四) │
│ │ │
│ └──[:DISCUSSED_ABOUT]──→ (Topic:Python) │
│ │ │
│ └──[:RELATED_TO] │
│ ↓ │
│ (Project:XXX) │
│ │
└─────────────────────────────────────────────────────────┘
下次用户问:"上次我和谁讨论 Python?"
→ 查询图谱 → 返回"李四"
方案四:层级化长期记忆
模拟人类记忆的层级结构:
class HierarchicalLongTermMemory:
"""层级化长期记忆"""
def __init__(self):
# Level 1: 工作记忆(当前会话)
self.working_memory = []
# Level 2: 情景记忆(Episodic Memory)
# 存储具体事件和经历
self.episodic_store = {} # {session_id: [events]}
# Level 3: 语义记忆(Semantic Memory)
# 存储事实和知识
self.semantic_store = {} # {concept: facts}
# Level 4: 程序记忆(Procedural Memory)
# 存储技能和规则
self.procedural_store = {} # {skill: rules}
def add_to_working_memory(self, event: dict):
"""添加到工作记忆"""
self.working_memory.append(event)
def consolidate_to_episodic(self, session_id: str):
"""将工作记忆巩固到情景记忆"""
if session_id not in self.episodic_store:
self.episodic_store[session_id] = []
self.episodic_store[session_id].extend(self.working_memory)
self.working_memory = []
def extract_to_semantic(self, concept: str, facts: list):
"""提取事实到语义记忆"""
if concept not in self.semantic_store:
self.semantic_store[concept] = []
self.semantic_store[concept].extend(facts)
def learn_procedure(self, skill: str, rules: list):
"""学习程序性知识"""
self.procedural_store[skill] = rules
def retrieve(self, query: str, context: dict) -> dict:
"""检索记忆"""
results = {
"episodic": self._search_episodic(query),
"semantic": self._search_semantic(query),
"procedural": self._search_procedural(context)
}
return results
def _search_episodic(self, query: str) -> list:
"""搜索情景记忆"""
# 实际应该用向量检索
results = []
for session_id, events in self.episodic_store.items():
for event in events:
if query.lower() in str(event).lower():
results.append(event)
return results
def _search_semantic(self, query: str) -> list:
"""搜索语义记忆"""
results = []
for concept, facts in self.semantic_store.items():
if query.lower() in concept.lower():
results.extend(facts)
return results
def _search_procedural(self, context: dict) -> list:
"""搜索程序记忆"""
# 根据上下文匹配相关技能
results = []
for skill, rules in self.procedural_store.items():
if self._is_skill_relevant(skill, context):
results.extend(rules)
return results
def _is_skill_relevant(self, skill: str, context: dict) -> bool:
"""判断技能是否相关"""
# 简化实现
return skill in str(context)
记忆层级示意:
┌─────────────────────────────────────────────────────────┐
│ 记忆层级结构 │
├─────────────────────────────────────────────────────────┤
│ │
│ Level 1: 工作记忆(秒级) │
│ └─ 当前对话中的最近几轮 │
│ │
│ Level 2: 情景记忆(小时-天) │
│ └─ 具体事件:"昨天讨论了 Python 项目" │
│ │
│ Level 3: 语义记忆(永久) │
│ └─ 事实知识:"用户喜欢 Python"、"用户叫张三" │
│ │
│ Level 4: 程序记忆(永久) │
│ └─ 技能规则:"遇到代码问题 → 先检查语法错误" │
│ │
└─────────────────────────────────────────────────────────┘
3.5 长期记忆的最佳实践
实践一:记忆去重与更新
class DeduplicatedMemory:
"""去重记忆系统"""
def __init__(self, vectorstore, similarity_threshold: float = 0.95):
self.vectorstore = vectorstore
self.similarity_threshold = similarity_threshold
def save_memory(self, content: str, metadata: dict):
"""保存记忆(带去重)"""
# 检查是否已存在相似记忆
similar_docs = self.vectorstore.similarity_search_with_score(
content,
k=1
)
if similar_docs:
doc, score = similar_docs[0]
# 相似度高于阈值,认为是重复
if score > self.similarity_threshold:
# 更新元数据(如时间戳)
self._update_metadata(doc.id, metadata)
return False # 未添加新记忆
# 添加新记忆
self.vectorstore.add_texts([content], metadatas=[metadata])
return True # 添加成功
def _update_metadata(self, doc_id: str, new_metadata: dict):
"""更新元数据"""
# 实现取决于具体的向量数据库
pass
实践二:记忆遗忘机制
模拟人类遗忘,避免存储过多无用信息:
from datetime import datetime, timedelta
import numpy as np
class ForgettingMemory:
"""带遗忘机制的记忆"""
def __init__(self, vectorstore, half_life_days: int = 30):
self.vectorstore = vectorstore
self.half_life_days = half_life_days
def calculate_retention_score(
self,
timestamp: datetime,
access_count: int = 1,
importance: float = 1.0
) -> float:
"""
计算记忆保留分数
艾宾浩斯遗忘曲线变体:
R = importance * access_count * e^(-t/τ)
"""
# 时间衰减
days_passed = (datetime.now() - timestamp).days
time_decay = np.exp(-days_passed / self.half_life_days)
# 访问增强
access_boost = np.log1p(access_count)
# 最终分数
score = importance * access_boost * time_decay
return score
def should_forget(self, memory_metadata: dict) -> bool:
"""判断是否应该遗忘"""
score = self.calculate_retention_score(
timestamp=datetime.fromisoformat(memory_metadata["timestamp"]),
access_count=memory_metadata.get("access_count", 1),
importance=memory_metadata.get("importance", 1.0)
)
return score < 0.1 # 阈值
def cleanup_memories(self, user_id: str):
"""清理低价值记忆"""
# 检索用户所有记忆
all_memories = self.vectorstore.similarity_search(
"",
k=1000,
filter={"user_id": user_id}
)
# 识别需要遗忘的记忆
to_forget = []
for memory in all_memories:
if self.should_forget(memory.metadata):
to_forget.append(memory.id)
# 删除
if to_forget:
self.vectorstore.delete(to_forget)
print(f"已清理 {len(to_forget)} 条记忆")
实践三:记忆重要性评估
class ImportanceScorer:
"""记忆重要性评分"""
def __init__(self):
self.importance_keywords = {
"high": [
"重要", "紧急", "必须", "关键", "决定",
"偏好", "喜欢", "讨厌", "习惯"
],
"medium": [
"需要", "想要", "希望", "建议", "想法"
],
"low": [
"随便", "无所谓", "可能", "或许"
]
}
def score_importance(self, user_input: str, ai_output: str) -> float:
"""评估记忆重要性"""
score = 0.0
# 1. 关键词匹配
score += self._keyword_score(user_input)
# 2. 情感强度
score += self._sentiment_score(user_input)
# 3. 信息熵(信息量)
score += self._information_entropy_score(user_input)
# 4. 交互深度(追问、确认等)
score += self._interaction_depth_score(user_input)
return min(max(score, 0.0), 1.0) # 归一化到 [0, 1]
def _keyword_score(self, text: str) -> float:
"""关键词评分"""
for level, keywords in self.importance_keywords.items():
if any(kw in text for kw in keywords):
if level == "high":
return 0.4
elif level == "medium":
return 0.2
else:
return 0.1
return 0.05
def _sentiment_score(self, text: str) -> float:
"""情感强度评分"""
# 简化实现,实际可用情感分析模型
strong_emotions = ["非常", "特别", "极其", "超级"]
return 0.2 if any(e in text for e in strong_emotions) else 0.0
def _information_entropy_score(self, text: str) -> float:
"""信息熵评分"""
# 基于文本长度和信息密度
word_count = len(text.split())
if word_count < 5:
return 0.05
elif word_count < 20:
return 0.1
else:
return 0.2
def _interaction_depth_score(self, text: str) -> float:
"""交互深度评分"""
# 多问题、追问等表示重要
question_marks = text.count("?") + text.count("?")
return min(question_marks * 0.1, 0.2)
四、记忆系统集成实践
4.1 完整记忆系统架构
from typing import Optional, List, Dict
from datetime import datetime
from dataclasses import dataclass
from enum import Enum
class MemoryType(Enum):
"""记忆类型"""
WORKING = "working" # 工作记忆
SHORT_TERM = "short_term" # 短期记忆
LONG_TERM = "long_term" # 长期记忆
EPISODIC = "episodic" # 情景记忆
SEMANTIC = "semantic" # 语义记忆
@dataclass
class MemoryEntry:
"""记忆条目"""
id: str
content: str
memory_type: MemoryType
timestamp: datetime
importance: float
access_count: int
metadata: dict
class UnifiedMemorySystem:
"""统一记忆系统"""
def __init__(
self,
vectorstore,
embeddings,
llm,
user_id: str,
config: Optional[dict] = None
):
self.vectorstore = vectorstore
self.embeddings = embeddings
self.llm = llm
self.user_id = user_id
# 配置
self.config = config or {
"working_memory_limit": 10,
"short_term_limit": 20,
"long_term_retrieval_k": 5,
"importance_threshold": 0.7,
"consolidation_interval": 10
}
# 记忆存储
self.working_memory: List[MemoryEntry] = []
self.importance_scorer = ImportanceScorer()
self.forgetting_system = ForgettingMemory(vectorstore)
def add_memory(
self,
user_input: str,
ai_output: str,
force_long_term: bool = False
):
"""添加记忆"""
# 评估重要性
importance = self.importance_scorer.score_importance(
user_input, ai_output
)
# 创建记忆条目
entry = MemoryEntry(
id=self._generate_id(),
content=f"User: {user_input}\nAI: {ai_output}",
memory_type=MemoryType.WORKING,
timestamp=datetime.now(),
importance=importance,
access_count=1,
metadata={
"user_id": self.user_id,
"user_input": user_input,
"ai_output": ai_output
}
)
# 添加到工作记忆
self.working_memory.append(entry)
# 检查是否需要巩固到长期记忆
if (len(self.working_memory) >= self.config["working_memory_limit"] or
importance > self.config["importance_threshold"] or
force_long_term):
self._consolidate_memories()
def retrieve_memories(
self,
query: str,
memory_types: Optional[List[MemoryType]] = None
) -> List[MemoryEntry]:
"""检索记忆"""
memory_types = memory_types or [
MemoryType.WORKING,
MemoryType.SHORT_TERM,
MemoryType.LONG_TERM
]
results = []
# 1. 从工作记忆检索
if MemoryType.WORKING in memory_types:
results.extend(self.working_memory)
# 2. 从长期记忆检索
if MemoryType.LONG_TERM in memory_types:
long_term_results = self._retrieve_long_term(query)
results.extend(long_term_results)
# 3. 排序(按重要性和时间)
results.sort(key=lambda x: (x.importance, x.timestamp), reverse=True)
# 4. 更新访问计数
for entry in results:
entry.access_count += 1
return results[:self.config["long_term_retrieval_k"]]
def _consolidate_memories(self):
"""巩固记忆"""
if not self.working_memory:
return
# 批量存储到向量数据库
contents = [entry.content for entry in self.working_memory]
metadatas = [
{
**entry.metadata,
"timestamp": entry.timestamp.isoformat(),
"importance": entry.importance,
"memory_type": entry.memory_type.value
}
for entry in self.working_memory
]
self.vectorstore.add_texts(contents, metadatas=metadatas)
# 清空工作记忆
self.working_memory = []
def _retrieve_long_term(self, query: str) -> List[MemoryEntry]:
"""从长期记忆检索"""
docs = self.vectorstore.similarity_search(
query,
k=self.config["long_term_retrieval_k"],
filter={"user_id": self.user_id}
)
entries = []
for doc in docs:
entry = MemoryEntry(
id=doc.metadata.get("id", ""),
content=doc.page_content,
memory_type=MemoryType.LONG_TERM,
timestamp=datetime.fromisoformat(doc.metadata["timestamp"]),
importance=doc.metadata.get("importance", 0.5),
access_count=doc.metadata.get("access_count", 1),
metadata=doc.metadata
)
entries.append(entry)
return entries
def get_context_for_llm(
self,
query: str,
max_tokens: int = 4000
) -> str:
"""获取传递给 LLM 的上下文"""
# 检索相关记忆
memories = self.retrieve_memories(query)
# 构建上下文
context_parts = []
# 1. 重要事实
important_memories = [
m for m in memories if m.importance > 0.7
]
if important_memories:
facts_str = "\n".join([
f"- {m.metadata['user_input']}"
for m in important_memories
])
context_parts.append(f"【重要信息】\n{facts_str}")
# 2. 相关对话
relevant_str = "\n\n".join([
m.content for m in memories[:5]
])
context_parts.append(f"【相关历史】\n{relevant_str}")
context = "\n\n".join(context_parts)
# Token 检查
token_count = self._count_tokens(context)
if token_count > max_tokens:
context = self._truncate_context(context, max_tokens)
return context
def _generate_id(self) -> str:
"""生成唯一 ID"""
import uuid
return str(uuid.uuid4())
def _count_tokens(self, text: str) -> int:
"""计算 Token 数"""
# 简化实现
return len(text.split()) * 1.5
def _truncate_context(self, context: str, max_tokens: int) -> str:
"""截断上下文"""
# 简化实现
words = context.split()
return " ".join(words[:int(max_tokens / 1.5)])
def cleanup(self):
"""清理低价值记忆"""
self.forgetting_system.cleanup_memories(self.user_id)
4.2 与 LangChain 集成
from langchain.chains import ConversationChain
from langchain_openai import ChatOpenAI
from langchain.prompts import PromptTemplate
# 初始化记忆系统
memory_system = UnifiedMemorySystem(
vectorstore=Chroma(
embedding_function=OpenAIEmbeddings(),
persist_directory="./memory_db"
),
embeddings=OpenAIEmbeddings(),
llm=ChatOpenAI(model="gpt-4"),
user_id="user_123"
)
# 自定义 Prompt 模板
template = """
你是一个智能助手,拥有对用户的长期记忆。
{memory_context}
当前对话:
{input}
请基于记忆和当前输入回复:
"""
prompt = PromptTemplate(
input_variables=["memory_context", "input"],
template=template
)
# 对话函数
def chat_with_memory(user_input: str) -> str:
"""带记忆的对话"""
# 1. 检索相关记忆
memory_context = memory_system.get_context_for_llm(user_input)
# 2. 生成回复
llm = ChatOpenAI(model="gpt-4")
prompt_text = prompt.format(
memory_context=memory_context,
input=user_input
)
ai_output = llm.invoke(prompt_text).content
# 3. 保存记忆
memory_system.add_memory(user_input, ai_output)
return ai_output
# 使用示例
response1 = chat_with_memory("我叫张三,我喜欢 Python")
print(response1)
# 输出:你好张三!很高兴认识你。Python 是一门很棒的语言...
# ... 开启新对话
response2 = chat_with_memory("你还记得我的名字吗?")
print(response2)
# 输出:当然记得!你叫张三。而且我知道你喜欢 Python...
4.3 记忆系统评估
class MemoryEvaluator:
"""记忆系统评估器"""
def __init__(self, memory_system: UnifiedMemorySystem):
self.memory_system = memory_system
def evaluate_retrieval_quality(
self,
test_cases: List[dict]
) -> dict:
"""评估检索质量"""
results = {
"precision": [],
"recall": [],
"mrr": [] # Mean Reciprocal Rank
}
for case in test_cases:
query = case["query"]
expected_ids = set(case["relevant_memory_ids"])
# 检索
retrieved = self.memory_system.retrieve_memories(query)
retrieved_ids = set([m.id for m in retrieved])
# 计算指标
if retrieved_ids:
precision = len(retrieved_ids & expected_ids) / len(retrieved_ids)
results["precision"].append(precision)
if expected_ids:
recall = len(retrieved_ids & expected_ids) / len(expected_ids)
results["recall"].append(recall)
# MRR
for i, memory in enumerate(retrieved):
if memory.id in expected_ids:
results["mrr"].append(1.0 / (i + 1))
break
else:
results["mrr"].append(0.0)
# 平均值
return {
"avg_precision": np.mean(results["precision"]),
"avg_recall": np.mean(results["recall"]),
"avg_mrr": np.mean(results["mrr"])
}
def evaluate_memory_usage(self) -> dict:
"""评估记忆使用情况"""
# 统计各类记忆数量
all_memories = self.memory_system.vectorstore.similarity_search(
"",
k=10000,
filter={"user_id": self.memory_system.user_id}
)
# 按重要性分布
importance_dist = {
"high": 0,
"medium": 0,
"low": 0
}
for memory in all_memories:
importance = memory.metadata.get("importance", 0.5)
if importance > 0.7:
importance_dist["high"] += 1
elif importance > 0.3:
importance_dist["medium"] += 1
else:
importance_dist["low"] += 1
# Token 消耗估算
total_tokens = sum([
len(m.page_content.split()) * 1.5
for m in all_memories
])
return {
"total_memories": len(all_memories),
"importance_distribution": importance_dist,
"estimated_tokens": total_tokens,
"working_memory_size": len(self.memory_system.working_memory)
}
def benchmark_retrieval_speed(
self,
queries: List[str],
k: int = 5
) -> dict:
"""基准测试检索速度"""
import time
latencies = []
for query in queries:
start_time = time.time()
_ = self.memory_system.retrieve_memories(query)
latency = time.time() - start_time
latencies.append(latency)
return {
"avg_latency_ms": np.mean(latencies) * 1000,
"p50_latency_ms": np.percentile(latencies, 50) * 1000,
"p95_latency_ms": np.percentile(latencies, 95) * 1000,
"p99_latency_ms": np.percentile(latencies, 99) * 1000
}
五、高级主题
5.1 多用户记忆隔离
class MultiUserMemorySystem:
"""多用户记忆系统"""
def __init__(self, vectorstore, embeddings, llm):
self.vectorstore = vectorstore
self.embeddings = embeddings
self.llm = llm
# 用户记忆系统缓存
self.user_memories: Dict[str, UnifiedMemorySystem] = {}
def get_user_memory(self, user_id: str) -> UnifiedMemorySystem:
"""获取用户的记忆系统"""
if user_id not in self.user_memories:
self.user_memories[user_id] = UnifiedMemorySystem(
vectorstore=self.vectorstore,
embeddings=self.embeddings,
llm=self.llm,
user_id=user_id
)
return self.user_memories[user_id]
def chat(self, user_id: str, user_input: str) -> str:
"""用户对话"""
memory = self.get_user_memory(user_id)
# 获取上下文
context = memory.get_context_for_llm(user_input)
# 生成回复
prompt = f"{context}\n\n用户:{user_input}\n\n助手:"
response = self.llm.invoke(prompt).content
# 保存记忆
memory.add_memory(user_input, response)
return response
def share_memory_between_users(
self,
from_user: str,
to_user: str,
memory_ids: List[str]
):
"""在用户间共享记忆"""
# 检索源用户的记忆
from_memory = self.get_user_memory(from_user)
# 复制到目标用户
to_memory = self.get_user_memory(to_user)
for memory_id in memory_ids:
# 从向量数据库检索
# 然后添加到目标用户
pass
5.2 跨模态记忆
class MultiModalMemory:
"""多模态记忆"""
def __init__(self, vectorstore, text_embeddings, image_embeddings):
self.vectorstore = vectorstore
self.text_embeddings = text_embeddings
self.image_embeddings = image_embeddings
def save_text_memory(self, text: str, metadata: dict):
"""保存文本记忆"""
embedding = self.text_embeddings.embed_query(text)
self.vectorstore.add_embeddings(
[(text, embedding)],
metadatas=[{**metadata, "modality": "text"}]
)
def save_image_memory(self, image_path: str, description: str, metadata: dict):
"""保存图像记忆"""
# 提取图像特征
embedding = self.image_embeddings.embed_image(image_path)
self.vectorstore.add_embeddings(
[(description, embedding)],
metadatas=[{
**metadata,
"modality": "image",
"image_path": image_path
}]
)
def retrieve_multimodal(
self,
query: str,
modalities: List[str] = ["text", "image"]
) -> List[dict]:
"""多模态检索"""
# 文本查询嵌入
query_embedding = self.text_embeddings.embed_query(query)
# 检索
results = self.vectorstore.similarity_search_by_vector(
query_embedding,
k=10,
filter={"modality": {"$in": modalities}}
)
return results
5.3 记忆压缩与总结
class MemoryCompressor:
"""记忆压缩器"""
def __init__(self, llm):
self.llm = llm
def compress_memories(
self,
memories: List[MemoryEntry],
strategy: str = "summary"
) -> str:
"""压缩记忆"""
if strategy == "summary":
return self._summarize(memories)
elif strategy == "extract":
return self._extract_key_points(memories)
else:
raise ValueError(f"Unknown strategy: {strategy}")
def _summarize(self, memories: List[MemoryEntry]) -> str:
"""生成摘要"""
# 合并记忆内容
combined_text = "\n\n".join([
m.content for m in memories
])
# 使用 LLM 生成摘要
prompt = f"""
请将以下对话历史压缩成简洁的摘要,保留关键信息:
{combined_text}
摘要:
"""
summary = self.llm.invoke(prompt).content
return summary
def _extract_key_points(self, memories: List[MemoryEntry]) -> str:
"""提取关键点"""
combined_text = "\n\n".join([
m.content for m in memories
])
prompt = f"""
从以下对话中提取关键事实和信息点:
{combined_text}
关键信息:
-
"""
key_points = self.llm.invoke(prompt).content
return key_points
六、性能优化
6.1 检索优化
class OptimizedMemoryRetriever:
"""优化的记忆检索器"""
def __init__(
self,
vectorstore,
embeddings,
cache_size: int = 1000
):
self.vectorstore = vectorstore
self.embeddings = embeddings
# 查询缓存
self.query_cache = {}
self.cache_size = cache_size
def retrieve_with_cache(
self,
query: str,
k: int = 5,
use_cache: bool = True
) -> List:
"""带缓存的检索"""
# 检查缓存
cache_key = self._get_cache_key(query, k)
if use_cache and cache_key in self.query_cache:
return self.query_cache[cache_key]
# 执行检索
results = self.vectorstore.similarity_search(query, k=k)
# 更新缓存
if use_cache:
self._update_cache(cache_key, results)
return results
def _get_cache_key(self, query: str, k: int) -> str:
"""生成缓存键"""
import hashlib
return hashlib.md5(f"{query}_{k}".encode()).hexdigest()
def _update_cache(self, key: str, value: List):
"""更新缓存"""
# LRU 淘汰
if len(self.query_cache) >= self.cache_size:
# 删除最旧的
oldest_key = next(iter(self.query_cache))
del self.query_cache[oldest_key]
self.query_cache[key] = value
def batch_retrieve(
self,
queries: List[str],
k: int = 5
) -> List[List]:
"""批量检索"""
# 批量嵌入
query_embeddings = self.embeddings.embed_documents(queries)
# 批量检索
results = []
for embedding in query_embeddings:
docs = self.vectorstore.similarity_search_by_vector(
embedding,
k=k
)
results.append(docs)
return results
6.2 存储优化
class OptimizedMemoryStorage:
"""优化的记忆存储"""
def __init__(self, vectorstore, compression_threshold: int = 1000):
self.vectorstore = vectorstore
self.compression_threshold = compression_threshold
def save_with_compression(
self,
contents: List[str],
metadatas: List[dict]
):
"""带压缩的存储"""
# 如果内容过长,先压缩
compressed_contents = []
for content in contents:
if len(content) > self.compression_threshold:
content = self._compress_content(content)
compressed_contents.append(content)
# 存储
self.vectorstore.add_texts(compressed_contents, metadatas=metadatas)
def _compress_content(self, content: str) -> str:
"""压缩内容"""
# 移除多余空白
import re
content = re.sub(r'\s+', ' ', content)
# 截断
if len(content) > self.compression_threshold:
content = content[:self.compression_threshold] + "..."
return content
def optimize_storage(self):
"""优化存储"""
# 1. 合并相似记忆
# 2. 删除过期记忆
# 3. 更新索引
pass
七、实战案例
案例:个性化学习助手记忆系统
class PersonalizedLearningAssistant:
"""个性化学习助手"""
def __init__(self, user_id: str):
# 初始化记忆系统
self.memory_system = UnifiedMemorySystem(
vectorstore=Chroma(
embedding_function=OpenAIEmbeddings(),
persist_directory=f"./learning_assistant/{user_id}"
),
embeddings=OpenAIEmbeddings(),
llm=ChatOpenAI(model="gpt-4"),
user_id=user_id
)
# 学习档案
self.learning_profile = {
"knowledge_level": {}, # 各知识点的掌握程度
"learning_style": None, # 学习风格
"goals": [], # 学习目标
"weaknesses": [] # 薄弱环节
}
def learn(self, topic: str, content: str):
"""学习过程"""
# 1. 检索相关记忆
context = self.memory_system.get_context_for_llm(
f"关于 {topic} 的知识"
)
# 2. 生成学习内容
prompt = f"""
用户正在学习 {topic}。
历史学习记录:
{context}
当前学习内容:
{content}
请:
1. 讲解这个知识点
2. 关联之前的学习内容
3. 提供练习题
"""
response = self.memory_system.llm.invoke(prompt).content
# 3. 保存记忆
self.memory_system.add_memory(
f"学习 {topic}: {content}",
response,
force_long_term=True
)
# 4. 更新学习档案
self._update_profile(topic, content, response)
return response
def review(self, topic: str) -> str:
"""复习过程"""
# 检索该主题的所有相关记忆
memories = self.memory_system.retrieve_memories(
f"{topic} 知识点",
memory_types=[MemoryType.LONG_TERM]
)
# 生成复习内容
context = "\n\n".join([m.content for m in memories])
prompt = f"""
用户要复习 {topic}。
学习历史:
{context}
请生成一份复习总结,包括:
1. 核心概念回顾
2. 重点难点梳理
3. 常见错误提醒
4. 巩固练习
"""
return self.memory_system.llm.invoke(prompt).content
def _update_profile(self, topic: str, content: str, response: str):
"""更新学习档案"""
# 使用 LLM 提取知识点和掌握程度
extraction_prompt = f"""
分析以下学习内容,提取:
1. 知识点
2. 用户掌握程度(1-5)
3. 学习难点
学习内容:
{content}
AI 回复:
{response}
以 JSON 格式返回。
"""
import json
extraction = self.memory_system.llm.invoke(extraction_prompt).content
try:
data = json.loads(extraction)
# 更新知识点掌握程度
for kp in data.get("knowledge_points", []):
self.learning_profile["knowledge_level"][kp["name"]] = {
"level": kp["level"],
"last_review": datetime.now().isoformat()
}
# 更新薄弱环节
difficulties = data.get("difficulties", [])
self.learning_profile["weaknesses"].extend(difficulties)
self.learning_profile["weaknesses"] = list(set(
self.learning_profile["weaknesses"]
))
except Exception as e:
print(f"更新档案失败: {e}")
def get_personalized_recommendation(self) -> str:
"""获取个性化学习推荐"""
# 基于记忆和学习档案生成推荐
prompt = f"""
基于用户的学习档案:
{json.dumps(self.learning_profile, indent=2)}
请推荐:
1. 下一步应该学习什么
2. 需要复习哪些内容
3. 如何改进学习方法
"""
return self.memory_system.llm.invoke(prompt).content
# 使用示例
assistant = PersonalizedLearningAssistant(user_id="learner_123")
# 学习
response1 = assistant.learn(
"Python 基础",
"变量和数据类型"
)
print(response1)
# 继续学习
response2 = assistant.learn(
"Python 基础",
"条件和循环"
)
print(response2)
# 复习
review = assistant.review("Python 基础")
print(review)
# 获取推荐
recommendation = assistant.get_personalized_recommendation()
print(recommendation)
八、总结与最佳实践清单
记忆系统设计原则
- 分层原则:工作记忆 → 短期记忆 → 长期记忆
- 重要性优先:重要信息优先存储和检索
- 遗忘机制:定期清理低价值记忆
- 上下文相关:检索与当前上下文相关的记忆
- 用户隔离:多用户场景下的记忆隔离
最佳实践清单
短期记忆
- ✅ 根据场景选择合适的记忆类型(滑动窗口 vs 摘要)
- ✅ 监控 Token 消耗,避免超出 Context Window
- ✅ 提取并优先保留重要信息
- ✅ 考虑混合记忆策略(摘要 + 完整对话)
长期记忆
- ✅ 选择合适的向量数据库(考虑规模、性能、成本)
- ✅ 选择合适的嵌入模型(考虑语言、性能、成本)
- ✅ 实现记忆去重机制
- ✅ 实现遗忘机制,定期清理
- ✅ 为记忆添加丰富的元数据
- ✅ 考虑混合检索(向量 + 关键词)
性能优化
- ✅ 实现查询缓存
- ✅ 批量操作优化
- ✅ 内容压缩
- ✅ 异步处理
评估与监控
- ✅ 建立检索质量评估体系
- ✅ 监控记忆使用情况
- ✅ 基准测试检索速度
- ✅ 收集用户反馈
常见问题与解决方案
| 问题 | 解决方案 |
|---|---|
| Token 超限 | 滑动窗口 + 摘要记忆 |
| 检索不准确 | 混合检索 + 元数据过滤 |
| 记忆重复 | 去重机制 + 相似度阈值 |
| 检索慢 | 缓存 + 索引优化 |
| 存储爆炸 | 遗忘机制 + 压缩 |
| 跨用户泄露 | 用户 ID 过滤 + 权限控制 |
参考资料
论文
- MemGPT: Towards LLMs as Operating Systems
- Generative Agents: Interactive Simulacra of Human Behavior
- MemoryBank: Enhancing Large Language Models with Long-Term Memory
