Featured image of post Agent 记忆模块深度技术文档

Agent 记忆模块深度技术文档

深入解析智能体记忆系统的架构设计、实现方案与最佳实践,从短期记忆到长期记忆的完整技术栈详解

Agent 记忆模块深度技术文档

一、记忆模块概述

1.1 为什么 Agent 需要记忆?

在传统软件开发中,系统状态存储在数据库、缓存或文件系统中。但当系统从"规则驱动"转向"AI 驱动"时,记忆成为Agent 的核心能力之一:

维度传统系统Agent 系统
状态存储结构化数据库非结构化向量存储 + Context Window
知识获取硬编码规则从对话、文档中学习
上下文理解Session 管理记忆检索 + LLM 推理
个性化能力用户配置表长期记忆 + 用户画像

核心价值

  • 连续性:跨对话保持上下文,记住用户偏好
  • 学习能力:从历史交互中积累知识
  • 个性化:针对不同用户提供定制化服务
  • 效率优化:避免重复询问相同信息

1.2 记忆的分类维度

从不同维度理解记忆系统:

┌─────────────────────────────────────────────────────────┐
│                   记忆系统分类矩阵                        │
├─────────────────────────────────────────────────────────┤
│                                                         │
│   时效维度                存储维度                       │
│   ├─ 短期记忆             ├─ 内存存储                    │
│   ├─ 长期记忆             ├─ 向量数据库                  │
│   └─ 工作记忆             └─ 关系数据库                  │
│                                                         │
│   内容维度                访问维度                       │
│   ├─ 对话历史             ├─ 精确匹配                    │
│   ├─ 知识事实             ├─ 语义检索                    │
│   ├─ 用户画像             └─ 混合检索                    │
│   └─ 任务状态                                           │
│                                                         │
└─────────────────────────────────────────────────────────┘

二、短期记忆(Short-term Memory)

2.1 核心概念

短期记忆存储在 LLM 的 Context Window(上下文窗口)中,随对话结束而消失。

关键参数

  • Context Window 大小:不同模型的支持能力
    • GPT-4 Turbo:128K tokens
    • GPT-4:8K tokens
    • Claude 3 Opus:200K tokens
    • Claude 3.5 Sonnet:200K tokens

核心挑战

  1. Token 限制:长对话会超出限制导致截断
  2. 成本问题:每次请求都要传递完整历史
  3. 注意力稀释:上下文过长影响模型推理质量

2.2 实现方案对比

方案一:完整对话历史(ConversationBufferMemory)

原理:保留所有历史对话

from langchain.memory import ConversationBufferMemory
from langchain.chains import ConversationChain

# 初始化记忆
memory = ConversationBufferMemory()

# 对话过程
memory.save_context({"input": "你好"}, {"output": "你好!我是AI助手"})
memory.save_context({"input": "我叫张三"}, {"output": "你好张三!很高兴认识你"})

# 查看记忆内容
print(memory.load_memory_variables({}))
# 输出:{'history': 'Human: 你好\nAI: 你好!我是AI助手\nHuman: 我叫张三\nAI: 你好张三!很高兴认识你'}

优点

  • 实现简单
  • 不丢失任何信息

缺点

  • Token 消耗线性增长
  • 长对话会超出 Context Window

适用场景

  • 短对话(< 20 轮)
  • 需要完整上下文的场景

方案二:滑动窗口(ConversationBufferWindowMemory)

原理:只保留最近 K 轮对话

from langchain.memory import ConversationBufferWindowMemory

# 只保留最近 5 轮对话
memory = ConversationBufferWindowMemory(k=5)

# 模拟 10 轮对话
for i in range(10):
    memory.save_context(
        {"input": f"问题 {i+1}"}, 
        {"output": f"回答 {i+1}"}
    )

# 查看记忆(只保留最后 5 轮)
print(memory.load_memory_variables({}))
# 输出:最近 5 轮的对话内容

优点

  • Token 消耗可控
  • 实现简单

缺点

  • 会丢失早期对话
  • 无法回溯远期信息

适用场景

  • 中等长度对话
  • 更关注最近上下文的场景

Token 计算

def estimate_tokens_window(k: int, avg_tokens_per_turn: int = 50) -> int:
    """
    估算滑动窗口的 Token 消耗
    
    Args:
        k: 保留的对话轮数
        avg_tokens_per_turn: 平均每轮对话的 Token 数
    
    Returns:
        总 Token 数
    """
    return k * avg_tokens_per_turn

# 示例:5 轮对话,平均每轮 50 tokens
print(f"预估 Token: {estimate_tokens_window(5)}")  # 250 tokens

方案三:Token 限制截断(ConversationTokenBufferMemory)

原理:根据 Token 数量动态截断

from langchain.memory import ConversationTokenBufferMemory
from langchain_openai import OpenAI

llm = OpenAI()

# 限制最大 2000 tokens
memory = ConversationTokenBufferMemory(
    llm=llm,
    max_token_limit=2000
)

# 添加对话
memory.save_context({"input": "长文本..." * 100}, {"output": "回答..."})

# 自动截断超出部分
print(memory.load_memory_variables({}))

优点

  • 精确控制 Token 消耗
  • 最大化利用 Context Window

缺点

  • 截断位置可能不自然
  • 需要额外的 Token 计算

实现细节

from langchain_core.messages import BaseMessage, get_buffer_string

class CustomTokenBufferMemory:
    """自定义 Token 限制记忆"""
    
    def __init__(self, llm, max_token_limit: int = 4000):
        self.llm = llm
        self.max_token_limit = max_token_limit
        self.buffer: list[BaseMessage] = []
    
    def save_context(self, inputs: dict, outputs: dict):
        """保存对话上下文"""
        from langchain_core.messages import HumanMessage, AIMessage
        
        self.buffer.append(HumanMessage(content=inputs["input"]))
        self.buffer.append(AIMessage(content=outputs["output"]))
        
        # 检查并截断
        self._trim_buffer()
    
    def _trim_buffer(self):
        """截断超出 Token 限制的部分"""
        current_tokens = self._count_tokens()
        
        while current_tokens > self.max_token_limit and len(self.buffer) > 0:
            # 从最旧的对话开始删除
            removed = self.buffer.pop(0)
            current_tokens -= self._count_message_tokens(removed)
    
    def _count_tokens(self) -> int:
        """计算当前 Buffer 的 Token 数"""
        buffer_str = get_buffer_string(self.buffer)
        return len(self.llm.get_num_tokens(buffer_str))
    
    def _count_message_tokens(self, message: BaseMessage) -> int:
        """计算单条消息的 Token 数"""
        return len(self.llm.get_num_tokens(message.content))

方案四:摘要记忆(ConversationSummaryMemory)

原理:定期对历史对话生成摘要,用摘要替代原始对话

from langchain.memory import ConversationSummaryMemory
from langchain_openai import ChatOpenAI

llm = ChatOpenAI(model="gpt-4")

memory = ConversationSummaryMemory(llm=llm)

# 添加对话
memory.save_context(
    {"input": "我最近在学习 Python,有什么建议吗?"}, 
    {"output": "建议从基础语法开始,然后学习数据结构和算法..."}
)

memory.save_context(
    {"input": "有没有推荐的书籍?"}, 
    {"output": "推荐《Python编程:从入门到实践》和《流畅的Python》..."}
)

# 查看摘要
print(memory.load_memory_variables({}))
# 输出:{'history': '用户正在学习Python,我推荐了从基础语法开始学习,并建议了两本书籍...'}

优点

  • 大幅压缩 Token 消耗
  • 保留关键信息

缺点

  • 摘要过程需要额外的 LLM 调用(成本)
  • 可能丢失细节信息
  • 摘要质量依赖模型能力

成本分析

def calculate_summary_cost(
    conversation_turns: int,
    avg_tokens_per_turn: int = 50,
    summary_interval: int = 10,
    compression_ratio: float = 0.2
) -> dict:
    """
    计算摘要记忆的成本
    
    Args:
        conversation_turns: 总对话轮数
        avg_tokens_per_turn: 平均每轮对话的 Token 数
        summary_interval: 每隔多少轮生成一次摘要
        compression_ratio: 摘要压缩比
    
    Returns:
        成本分析字典
    """
    # 原始对话的 Token 数
    original_tokens = conversation_turns * avg_tokens_per_turn
    
    # 摘要次数
    summary_count = conversation_turns // summary_interval
    
    # 摘要生成成本(输入 tokens + 输出 tokens)
    summary_generation_cost = summary_count * summary_interval * avg_tokens_per_turn
    
    # 最终保留的摘要 Token 数
    final_summary_tokens = original_tokens * compression_ratio
    
    # 最近未摘要的对话
    recent_unsummarized = (conversation_turns % summary_interval) * avg_tokens_per_turn
    
    # 总 Token 消耗(包括摘要生成 + 最终传递)
    total_tokens = summary_generation_cost + final_summary_tokens + recent_unsummarized
    
    return {
        "original_tokens": original_tokens,
        "total_tokens_consumed": total_tokens,
        "compression_rate": (1 - (final_summary_tokens + recent_unsummarized) / original_tokens),
        "summary_count": summary_count
    }

# 示例:100 轮对话
cost = calculate_summary_cost(
    conversation_turns=100,
    avg_tokens_per_turn=50,
    summary_interval=10,
    compression_ratio=0.2
)
print(f"压缩率: {cost['compression_rate']:.1%}")  # 压缩率: 72.0%

方案五:混合记忆(ConversationSummaryBufferMemory)

原理:结合摘要和完整对话,保留最近 K 轮 + 历史摘要

from langchain.memory import ConversationSummaryBufferMemory
from langchain_openai import ChatOpenAI

llm = ChatOpenAI(model="gpt-4")

# 保留最近 5 轮 + 历史摘要
memory = ConversationSummaryBufferMemory(
    llm=llm,
    max_token_limit=2000
)

# 添加 20 轮对话
for i in range(20):
    memory.save_context(
        {"input": f"问题 {i+1}"}, 
        {"output": f"回答 {i+1}"}
    )

# 查看记忆:早期对话会被摘要,最近几轮保持完整
print(memory.load_memory_variables({}))

优点

  • 平衡信息完整性和 Token 效率
  • 保留最近关键上下文

缺点

  • 实现复杂
  • 需要调优参数

架构图

┌─────────────────────────────────────────────────────────┐
│                 混合记忆结构                              │
├─────────────────────────────────────────────────────────┤
│                                                         │
│   [对话历史]                                            │
│   ├─ 第 1-10 轮  ──→ 摘要生成 ──→ 摘要 A (200 tokens)   │
│   ├─ 第 11-15 轮 ──→ 摘要生成 ──→ 摘要 B (200 tokens)   │
│   └─ 第 16-20 轮 ──→ 完整保留 ──→ 5 轮对话 (500 tokens)│
│                                                         │
│   [最终传递给 LLM]                                      │
│   └─ 摘要 A + 摘要 B + 最近 5 轮 = 900 tokens           │
│                                                         │
└─────────────────────────────────────────────────────────┘

2.3 短期记忆最佳实践

实践一:动态调整窗口大小

根据任务复杂度动态调整保留的对话轮数:

class DynamicWindowMemory:
    """动态调整窗口大小的记忆"""
    
    def __init__(self, initial_k: int = 5, max_k: int = 20):
        self.k = initial_k
        self.max_k = max_k
        self.buffer = []
        self.complexity_scores = []
    
    def save_context(self, inputs: dict, outputs: dict):
        """保存上下文并评估复杂度"""
        self.buffer.append((inputs, outputs))
        
        # 评估对话复杂度
        complexity = self._estimate_complexity(inputs["input"])
        self.complexity_scores.append(complexity)
        
        # 动态调整窗口
        self._adjust_window()
    
    def _estimate_complexity(self, text: str) -> float:
        """估算对话复杂度"""
        # 简单启发式:基于文本长度和关键词
        score = 0.0
        
        # 长度因素
        if len(text) > 200:
            score += 0.3
        
        # 关键词因素
        complex_keywords = ["详细", "解释", "为什么", "如何", "分析"]
        if any(kw in text for kw in complex_keywords):
            score += 0.3
        
        # 多问题因素
        if text.count("?") > 1 or text.count("?") > 1:
            score += 0.4
        
        return min(score, 1.0)
    
    def _adjust_window(self):
        """根据复杂度调整窗口大小"""
        if len(self.complexity_scores) < 5:
            return
        
        # 计算最近 5 轮的平均复杂度
        avg_complexity = sum(self.complexity_scores[-5:]) / 5
        
        # 动态调整 k
        if avg_complexity > 0.6:
            self.k = min(self.k + 2, self.max_k)  # 增加窗口
        elif avg_complexity < 0.3:
            self.k = max(self.k - 1, 3)  # 减小窗口

实践二:重要信息提取与保留

识别并优先保留重要信息:

from langchain_core.messages import HumanMessage, AIMessage
import re

class ImportanceAwareMemory:
    """重要信息感知的记忆"""
    
    def __init__(self, max_tokens: int = 4000):
        self.max_tokens = max_tokens
        self.buffer = []
        self.important_facts = []
    
    def save_context(self, inputs: dict, outputs: dict):
        """保存上下文并提取重要信息"""
        # 提取重要信息
        important_info = self._extract_important_info(inputs["input"], outputs["output"])
        
        if important_info:
            self.important_facts.extend(important_info)
        
        # 保存完整对话
        self.buffer.append((inputs, outputs))
        
        # 检查并压缩
        self._compress_if_needed()
    
    def _extract_important_info(self, user_input: str, ai_output: str) -> list:
        """提取重要信息"""
        important = []
        
        # 模式 1:用户偏好
        preference_patterns = [
            r"我喜欢(.+)",
            r"我偏好(.+)",
            r"我希望(.+)",
            r"我叫(.+)",
            r"我的(.+)是"
        ]
        
        for pattern in preference_patterns:
            matches = re.findall(pattern, user_input)
            important.extend(matches)
        
        # 模式 2:关键决策
        decision_keywords = ["决定", "确认", "选择", "同意"]
        if any(kw in user_input for kw in decision_keywords):
            important.append(f"决策: {user_input}")
        
        return important
    
    def _compress_if_needed(self):
        """如果超出限制则压缩"""
        current_tokens = self._count_tokens()
        
        if current_tokens > self.max_tokens:
            # 保留重要信息 + 最近对话
            recent_conversations = self.buffer[-5:]
            
            # 构建压缩后的记忆
            compressed = {
                "important_facts": self.important_facts,
                "recent_conversations": recent_conversations
            }
            
            # 替换 buffer
            self.buffer = compressed
    
    def load_memory(self) -> str:
        """加载记忆"""
        if isinstance(self.buffer, list):
            # 正常模式
            return self._format_conversations(self.buffer)
        else:
            # 压缩模式
            facts_str = "\n".join([f"- {fact}" for fact in self.buffer["important_facts"]])
            recent_str = self._format_conversations(self.buffer["recent_conversations"])
            return f"【重要信息】\n{facts_str}\n\n【最近对话】\n{recent_str}"

实践三:多级记忆架构

构建多级记忆系统,平衡效率和完整性:

class MultiLevelMemory:
    """多级记忆架构"""
    
    def __init__(self):
        # Level 1: 工作记忆(当前对话)
        self.working_memory = []
        
        # Level 2: 短期记忆(滑动窗口)
        self.short_term_memory = []
        self.short_term_limit = 10
        
        # Level 3: 摘要记忆
        self.summary_memory = ""
        
        # Level 4: 重要信息
        self.key_facts = []
    
    def add_message(self, role: str, content: str):
        """添加消息"""
        # 添加到工作记忆
        self.working_memory.append({"role": role, "content": content})
        
        # 如果工作记忆达到阈值,推送到短期记忆
        if len(self.working_memory) >= 2:
            self._push_to_short_term()
        
        # 如果短期记忆达到阈值,生成摘要
        if len(self.short_term_memory) >= self.short_term_limit:
            self._summarize_short_term()
    
    def _push_to_short_term(self):
        """推送到短期记忆"""
        # 提取重要信息
        self._extract_key_facts()
        
        # 移动到短期记忆
        self.short_term_memory.extend(self.working_memory)
        self.working_memory = []
    
    def _extract_key_facts(self):
        """提取关键事实"""
        # 简化版:实际应该用 LLM 提取
        for msg in self.working_memory:
            if "我叫" in msg["content"] or "我喜欢" in msg["content"]:
                self.key_facts.append(msg["content"])
    
    def _summarize_short_term(self):
        """摘要短期记忆"""
        # 生成摘要(实际应该调用 LLM)
        summary = f"过去 {len(self.short_term_memory)} 条消息的摘要..."
        
        # 更新摘要记忆
        if self.summary_memory:
            self.summary_memory += "\n" + summary
        else:
            self.summary_memory = summary
        
        # 清空短期记忆
        self.short_term_memory = []
    
    def get_context_for_llm(self, max_tokens: int = 4000) -> str:
        """获取传递给 LLM 的上下文"""
        context_parts = []
        
        # 1. 关键事实(优先级最高)
        if self.key_facts:
            context_parts.append("【用户信息】\n" + "\n".join(self.key_facts))
        
        # 2. 历史摘要
        if self.summary_memory:
            context_parts.append("【历史摘要】\n" + self.summary_memory)
        
        # 3. 短期记忆
        if self.short_term_memory:
            short_term_str = self._format_messages(self.short_term_memory)
            context_parts.append("【近期对话】\n" + short_term_str)
        
        # 4. 工作记忆(当前对话)
        if self.working_memory:
            working_str = self._format_messages(self.working_memory)
            context_parts.append("【当前对话】\n" + working_str)
        
        return "\n\n".join(context_parts)
    
    def _format_messages(self, messages: list) -> str:
        """格式化消息"""
        return "\n".join([
            f"{msg['role']}: {msg['content']}" 
            for msg in messages
        ])

三、长期记忆(Long-term Memory)

3.1 核心概念

长期记忆用于存储跨对话的信息,需要外部存储系统支持。

关键特性

  • 持久化:数据永久存储
  • 跨会话:不同对话可以访问相同记忆
  • 可扩展:理论上无限容量

核心技术

  • 向量数据库(Vector Database)
  • 嵌入模型(Embedding Model)
  • 检索算法(Retrieval Algorithm)

3.2 向量数据库选型

主流向量数据库对比

数据库类型特点适用场景性能
Pinecone云服务托管式,零运维生产环境,快速上手⭐⭐⭐⭐⭐
Milvus开源高性能,可扩展大规模生产环境⭐⭐⭐⭐⭐
Weaviate开源语义搜索强知识图谱场景⭐⭐⭐⭐
ChromaDB开源轻量级,易用开发测试,小规模应用⭐⭐⭐
Qdrant开源Rust 实现,高性能性能敏感场景⭐⭐⭐⭐⭐
FAISSMeta 开源,纯算法本地嵌入,极致性能⭐⭐⭐⭐⭐

选型决策树

需要长期记忆?
├─ 是
│   ├─ 团队有运维能力?
│   │   ├─ 是
│   │   │   ├─ 数据规模 > 1000万向量?
│   │   │   │   ├─ 是 → Milvus
│   │   │   │   └─ 否 → Qdrant
│   │   │   └─ 需要 RAG + 知识图谱?
│   │   │       ├─ 是 → Weaviate
│   │   │       └─ 否 → Qdrant
│   │   └─ 否(无运维能力)
│   │       └─ Pinecone(托管服务)
│   └─ 开发测试阶段?
│       └─ ChromaDB(最简单)
└─ 否(仅需短期记忆)
    └─ 使用内存存储

3.3 嵌入模型选型

主流嵌入模型对比

模型提供商维度性能成本
text-embedding-3-smallOpenAI1536⭐⭐⭐⭐$0.02/1M tokens
text-embedding-3-largeOpenAI3072⭐⭐⭐⭐⭐$0.13/1M tokens
text-embedding-ada-002OpenAI1536⭐⭐⭐$0.10/1M tokens
bge-large-zh-v1.5BGE1024⭐⭐⭐⭐免费(本地)
bge-m3BGE1024⭐⭐⭐⭐⭐免费(本地)
Cohere embed-v3Cohere1024⭐⭐⭐⭐⭐$0.10/1M tokens

选择建议

# 场景 1:追求最佳性能
from langchain_openai import OpenAIEmbeddings
embeddings = OpenAIEmbeddings(model="text-embedding-3-large")

# 场景 2:成本敏感
from langchain_openai import OpenAIEmbeddings
embeddings = OpenAIEmbeddings(model="text-embedding-3-small")

# 场景 3:中文场景,本地部署
from langchain_community.embeddings import HuggingFaceBgeEmbeddings
embeddings = HuggingFaceBgeEmbeddings(
    model_name="BAAI/bge-large-zh-v1.5",
    model_kwargs={'device': 'cuda'},
    encode_kwargs={'normalize_embeddings': True}
)

# 场景 4:多语言混合
from langchain_cohere import CohereEmbeddings
embeddings = CohereEmbeddings(model="embed-multilingual-v3.0")

3.4 实现方案详解

方案一:基于向量检索的记忆

架构图

┌─────────────────────────────────────────────────────────┐
│              向量检索记忆架构                            │
├─────────────────────────────────────────────────────────┤
│                                                         │
│   用户输入                                              │
│       ↓                                                 │
│   Embedding 模型                                        │
│       ↓                                                 │
│   查询向量                                              │
│       ↓                                                 │
│   向量数据库 ──→ 相似度检索 ──→ Top-K 记忆              │
│       ↓                                                 │
│   相关记忆 + 用户输入 ──→ LLM ──→ 回复                 │
│       ↓                                                 │
│   回复 + 输入 ──→ Embedding ──→ 存入向量数据库         │
│                                                         │
└─────────────────────────────────────────────────────────┘

实现代码

from langchain.memory import VectorStoreRetrieverMemory
from langchain_community.vectorstores import Chroma
from langchain_openai import OpenAIEmbeddings
from langchain.chains import ConversationChain
from langchain_openai import ChatOpenAI

# 1. 初始化向量数据库
embeddings = OpenAIEmbeddings()
vectorstore = Chroma(
    embedding_function=embeddings,
    persist_directory="./chroma_memory_db"
)

# 2. 创建检索器
retriever = vectorstore.as_retriever(
    search_kwargs={"k": 5}  # 检索最相关的 5 条记忆
)

# 3. 创建记忆
memory = VectorStoreRetrieverMemory(retriever=retriever)

# 4. 使用记忆
# 保存对话
memory.save_context(
    {"input": "我喜欢吃苹果"},
    {"output": "好的,我会记住你喜欢吃苹果"}
)

memory.save_context(
    {"input": "我叫张三"},
    {"output": "你好张三!很高兴认识你"}
)

# 5. 检索相关记忆
relevant_memories = memory.load_memory_variables({"prompt": "我的名字是什么?"})
print(relevant_memories)
# 输出:最相关的记忆,包含"我叫张三"的对话

高级实现:带元数据的记忆

from datetime import datetime
from typing import Optional
from langchain_core.documents import Document

class AdvancedVectorMemory:
    """高级向量记忆系统"""
    
    def __init__(self, vectorstore, embeddings, user_id: str):
        self.vectorstore = vectorstore
        self.embeddings = embeddings
        self.user_id = user_id
    
    def save_memory(
        self,
        user_input: str,
        ai_output: str,
        importance: str = "normal",  # high, normal, low
        category: Optional[str] = None
    ):
        """保存记忆"""
        
        # 构建文档
        document = Document(
            page_content=f"User: {user_input}\nAI: {ai_output}",
            metadata={
                "user_id": self.user_id,
                "timestamp": datetime.now().isoformat(),
                "importance": importance,
                "category": category or "general",
                "type": "conversation"
            }
        )
        
        # 添加到向量数据库
        self.vectorstore.add_documents([document])
    
    def save_fact(self, fact: str, category: str = "fact"):
        """保存事实性知识"""
        document = Document(
            page_content=fact,
            metadata={
                "user_id": self.user_id,
                "timestamp": datetime.now().isoformat(),
                "type": "fact",
                "category": category
            }
        )
        self.vectorstore.add_documents([document])
    
    def retrieve_relevant(
        self,
        query: str,
        k: int = 5,
        filters: Optional[dict] = None
    ) -> list:
        """检索相关记忆"""
        
        # 基础过滤条件
        base_filter = {"user_id": self.user_id}
        
        # 合并额外过滤条件
        if filters:
            base_filter.update(filters)
        
        # 检索
        results = self.vectorstore.similarity_search(
            query,
            k=k,
            filter=base_filter
        )
        
        return results
    
    def retrieve_by_time_range(
        self,
        start_time: datetime,
        end_time: datetime,
        k: int = 10
    ) -> list:
        """按时间范围检索"""
        # 注意:不同向量数据库的过滤语法不同
        # 这里以 Chroma 为例
        results = self.vectorstore.similarity_search(
            "",  # 空查询
            k=k,
            filter={
                "user_id": self.user_id,
                "timestamp": {
                    "$gte": start_time.isoformat(),
                    "$lte": end_time.isoformat()
                }
            }
        )
        return results
    
    def get_user_preferences(self) -> list:
        """获取用户偏好"""
        return self.retrieve_relevant(
            query="用户偏好",
            k=10,
            filters={"category": "preference"}
        )
    
    def get_important_memories(self) -> list:
        """获取重要记忆"""
        return self.retrieve_relevant(
            query="",
            k=10,
            filters={"importance": "high"}
        )

方案二:混合检索记忆

结合向量检索和关键词检索,提高召回率:

from langchain.retrievers import EnsembleRetriever
from langchain_community.retrievers import BM25Retriever
from langchain_community.vectorstores import Chroma

class HybridMemory:
    """混合检索记忆"""
    
    def __init__(self, vectorstore, documents):
        # 向量检索器
        self.vector_retriever = vectorstore.as_retriever(
            search_kwargs={"k": 5}
        )
        
        # 关键词检索器(BM25)
        self.keyword_retriever = BM25Retriever.from_documents(documents)
        self.keyword_retriever.k = 5
        
        # 混合检索器
        self.ensemble_retriever = EnsembleRetriever(
            retrievers=[self.vector_retriever, self.keyword_retriever],
            weights=[0.6, 0.4]  # 向量检索权重更高
        )
    
    def retrieve(self, query: str) -> list:
        """混合检索"""
        return self.ensemble_retriever.invoke(query)

混合检索的优势

查询:"Python 异常处理"

┌─────────────────────────────────────────────────────────┐
│                 向量检索结果                             │
├─────────────────────────────────────────────────────────┤
│ 1. 如何捕获 Python 异常?                               │
│ 2. try-except 语句的使用                                │
│ 3. Python 错误处理最佳实践                              │
└─────────────────────────────────────────────────────────┘

┌─────────────────────────────────────────────────────────┐
│                 关键词检索结果                           │
├─────────────────────────────────────────────────────────┤
│ 1. Python 异常类型列表                                  │
│ 2. 异常处理的代码示例                                   │
│ 3. raise 语句的用法                                     │
└─────────────────────────────────────────────────────────┘

┌─────────────────────────────────────────────────────────┐
│                 混合检索结果                             │
├─────────────────────────────────────────────────────────┤
│ 去重 + 加权排序 → 更全面的记忆召回                      │
└─────────────────────────────────────────────────────────┘

方案三:知识图谱增强记忆

将记忆组织成知识图谱,支持复杂推理:

from langchain.graphs import Neo4jGraph
from langchain.chains import GraphQAChain

class KnowledgeGraphMemory:
    """知识图谱记忆"""
    
    def __init__(self, neo4j_url: str, neo4j_user: str, neo4j_password: str):
        self.graph = Neo4jGraph(
            url=neo4j_url,
            username=neo4j_user,
            password=neo4j_password
        )
    
    def save_entity(self, entity_type: str, entity_name: str, properties: dict):
        """保存实体"""
        props_str = ", ".join([f"{k}: {v}" for k, v in properties.items()])
        
        query = f"""
        MERGE (e:{entity_type} {{name: $name}})
        SET e += {{{props_str}}}
        """
        
        self.graph.query(query, params={"name": entity_name})
    
    def save_relation(self, from_entity: str, relation: str, to_entity: str):
        """保存关系"""
        query = f"""
        MATCH (a {{name: $from_name}})
        MATCH (b {{name: $to_name}})
        MERGE (a)-[r:{relation}]->(b)
        """
        
        self.graph.query(query, params={
            "from_name": from_entity,
            "to_name": to_entity
        })
    
    def save_conversation_memory(
        self,
        user_id: str,
        entities: list,
        relations: list,
        conversation_id: str
    ):
        """保存对话记忆到知识图谱"""
        
        # 保存实体
        for entity in entities:
            self.save_entity(
                entity_type=entity["type"],
                entity_name=entity["name"],
                properties=entity.get("properties", {})
            )
        
        # 保存关系
        for relation in relations:
            self.save_relation(
                from_entity=relation["from"],
                relation=relation["type"],
                to_entity=relation["to"]
            )
        
        # 关联到对话
        query = """
        MATCH (u:User {id: $user_id})
        MATCH (c:Conversation {id: $conv_id})
        MERGE (u)-[:HAD]->(c)
        """
        self.graph.query(query, params={
            "user_id": user_id,
            "conv_id": conversation_id
        })
    
    def query_related_knowledge(self, entity_name: str, depth: int = 2) -> list:
        """查询相关知识"""
        query = f"""
        MATCH path = (e {{name: $name}})-[*1..{depth}]-(related)
        RETURN path
        """
        
        return self.graph.query(query, params={"name": entity_name})

知识图谱记忆示例

用户说:"我和李四在讨论 Python 项目"

知识图谱存储:
┌─────────────────────────────────────────────────────────┐
│                                                         │
│   (User:张三) ──[:DISCUSSED_WITH]──→ (User:李四)       │
│        │                                                │
│        └──[:DISCUSSED_ABOUT]──→ (Topic:Python)         │
│                                      │                  │
│                                      └──[:RELATED_TO]   │
│                                             ↓           │
│                                       (Project:XXX)     │
│                                                         │
└─────────────────────────────────────────────────────────┘

下次用户问:"上次我和谁讨论 Python?"
→ 查询图谱 → 返回"李四"

方案四:层级化长期记忆

模拟人类记忆的层级结构:

class HierarchicalLongTermMemory:
    """层级化长期记忆"""
    
    def __init__(self):
        # Level 1: 工作记忆(当前会话)
        self.working_memory = []
        
        # Level 2: 情景记忆(Episodic Memory)
        # 存储具体事件和经历
        self.episodic_store = {}  # {session_id: [events]}
        
        # Level 3: 语义记忆(Semantic Memory)
        # 存储事实和知识
        self.semantic_store = {}  # {concept: facts}
        
        # Level 4: 程序记忆(Procedural Memory)
        # 存储技能和规则
        self.procedural_store = {}  # {skill: rules}
    
    def add_to_working_memory(self, event: dict):
        """添加到工作记忆"""
        self.working_memory.append(event)
    
    def consolidate_to_episodic(self, session_id: str):
        """将工作记忆巩固到情景记忆"""
        if session_id not in self.episodic_store:
            self.episodic_store[session_id] = []
        
        self.episodic_store[session_id].extend(self.working_memory)
        self.working_memory = []
    
    def extract_to_semantic(self, concept: str, facts: list):
        """提取事实到语义记忆"""
        if concept not in self.semantic_store:
            self.semantic_store[concept] = []
        
        self.semantic_store[concept].extend(facts)
    
    def learn_procedure(self, skill: str, rules: list):
        """学习程序性知识"""
        self.procedural_store[skill] = rules
    
    def retrieve(self, query: str, context: dict) -> dict:
        """检索记忆"""
        results = {
            "episodic": self._search_episodic(query),
            "semantic": self._search_semantic(query),
            "procedural": self._search_procedural(context)
        }
        return results
    
    def _search_episodic(self, query: str) -> list:
        """搜索情景记忆"""
        # 实际应该用向量检索
        results = []
        for session_id, events in self.episodic_store.items():
            for event in events:
                if query.lower() in str(event).lower():
                    results.append(event)
        return results
    
    def _search_semantic(self, query: str) -> list:
        """搜索语义记忆"""
        results = []
        for concept, facts in self.semantic_store.items():
            if query.lower() in concept.lower():
                results.extend(facts)
        return results
    
    def _search_procedural(self, context: dict) -> list:
        """搜索程序记忆"""
        # 根据上下文匹配相关技能
        results = []
        for skill, rules in self.procedural_store.items():
            if self._is_skill_relevant(skill, context):
                results.extend(rules)
        return results
    
    def _is_skill_relevant(self, skill: str, context: dict) -> bool:
        """判断技能是否相关"""
        # 简化实现
        return skill in str(context)

记忆层级示意

┌─────────────────────────────────────────────────────────┐
│                   记忆层级结构                           │
├─────────────────────────────────────────────────────────┤
│                                                         │
│   Level 1: 工作记忆(秒级)                             │
│   └─ 当前对话中的最近几轮                               │
│                                                         │
│   Level 2: 情景记忆(小时-天)                          │
│   └─ 具体事件:"昨天讨论了 Python 项目"                 │
│                                                         │
│   Level 3: 语义记忆(永久)                             │
│   └─ 事实知识:"用户喜欢 Python"、"用户叫张三"         │
│                                                         │
│   Level 4: 程序记忆(永久)                             │
│   └─ 技能规则:"遇到代码问题 → 先检查语法错误"         │
│                                                         │
└─────────────────────────────────────────────────────────┘

3.5 长期记忆的最佳实践

实践一:记忆去重与更新

class DeduplicatedMemory:
    """去重记忆系统"""
    
    def __init__(self, vectorstore, similarity_threshold: float = 0.95):
        self.vectorstore = vectorstore
        self.similarity_threshold = similarity_threshold
    
    def save_memory(self, content: str, metadata: dict):
        """保存记忆(带去重)"""
        
        # 检查是否已存在相似记忆
        similar_docs = self.vectorstore.similarity_search_with_score(
            content,
            k=1
        )
        
        if similar_docs:
            doc, score = similar_docs[0]
            # 相似度高于阈值,认为是重复
            if score > self.similarity_threshold:
                # 更新元数据(如时间戳)
                self._update_metadata(doc.id, metadata)
                return False  # 未添加新记忆
        
        # 添加新记忆
        self.vectorstore.add_texts([content], metadatas=[metadata])
        return True  # 添加成功
    
    def _update_metadata(self, doc_id: str, new_metadata: dict):
        """更新元数据"""
        # 实现取决于具体的向量数据库
        pass

实践二:记忆遗忘机制

模拟人类遗忘,避免存储过多无用信息:

from datetime import datetime, timedelta
import numpy as np

class ForgettingMemory:
    """带遗忘机制的记忆"""
    
    def __init__(self, vectorstore, half_life_days: int = 30):
        self.vectorstore = vectorstore
        self.half_life_days = half_life_days
    
    def calculate_retention_score(
        self,
        timestamp: datetime,
        access_count: int = 1,
        importance: float = 1.0
    ) -> float:
        """
        计算记忆保留分数
        
        艾宾浩斯遗忘曲线变体:
        R = importance * access_count * e^(-t/τ)
        """
        # 时间衰减
        days_passed = (datetime.now() - timestamp).days
        time_decay = np.exp(-days_passed / self.half_life_days)
        
        # 访问增强
        access_boost = np.log1p(access_count)
        
        # 最终分数
        score = importance * access_boost * time_decay
        
        return score
    
    def should_forget(self, memory_metadata: dict) -> bool:
        """判断是否应该遗忘"""
        score = self.calculate_retention_score(
            timestamp=datetime.fromisoformat(memory_metadata["timestamp"]),
            access_count=memory_metadata.get("access_count", 1),
            importance=memory_metadata.get("importance", 1.0)
        )
        
        return score < 0.1  # 阈值
    
    def cleanup_memories(self, user_id: str):
        """清理低价值记忆"""
        # 检索用户所有记忆
        all_memories = self.vectorstore.similarity_search(
            "",
            k=1000,
            filter={"user_id": user_id}
        )
        
        # 识别需要遗忘的记忆
        to_forget = []
        for memory in all_memories:
            if self.should_forget(memory.metadata):
                to_forget.append(memory.id)
        
        # 删除
        if to_forget:
            self.vectorstore.delete(to_forget)
            print(f"已清理 {len(to_forget)} 条记忆")

实践三:记忆重要性评估

class ImportanceScorer:
    """记忆重要性评分"""
    
    def __init__(self):
        self.importance_keywords = {
            "high": [
                "重要", "紧急", "必须", "关键", "决定",
                "偏好", "喜欢", "讨厌", "习惯"
            ],
            "medium": [
                "需要", "想要", "希望", "建议", "想法"
            ],
            "low": [
                "随便", "无所谓", "可能", "或许"
            ]
        }
    
    def score_importance(self, user_input: str, ai_output: str) -> float:
        """评估记忆重要性"""
        score = 0.0
        
        # 1. 关键词匹配
        score += self._keyword_score(user_input)
        
        # 2. 情感强度
        score += self._sentiment_score(user_input)
        
        # 3. 信息熵(信息量)
        score += self._information_entropy_score(user_input)
        
        # 4. 交互深度(追问、确认等)
        score += self._interaction_depth_score(user_input)
        
        return min(max(score, 0.0), 1.0)  # 归一化到 [0, 1]
    
    def _keyword_score(self, text: str) -> float:
        """关键词评分"""
        for level, keywords in self.importance_keywords.items():
            if any(kw in text for kw in keywords):
                if level == "high":
                    return 0.4
                elif level == "medium":
                    return 0.2
                else:
                    return 0.1
        return 0.05
    
    def _sentiment_score(self, text: str) -> float:
        """情感强度评分"""
        # 简化实现,实际可用情感分析模型
        strong_emotions = ["非常", "特别", "极其", "超级"]
        return 0.2 if any(e in text for e in strong_emotions) else 0.0
    
    def _information_entropy_score(self, text: str) -> float:
        """信息熵评分"""
        # 基于文本长度和信息密度
        word_count = len(text.split())
        
        if word_count < 5:
            return 0.05
        elif word_count < 20:
            return 0.1
        else:
            return 0.2
    
    def _interaction_depth_score(self, text: str) -> float:
        """交互深度评分"""
        # 多问题、追问等表示重要
        question_marks = text.count("?") + text.count("?")
        return min(question_marks * 0.1, 0.2)

四、记忆系统集成实践

4.1 完整记忆系统架构

from typing import Optional, List, Dict
from datetime import datetime
from dataclasses import dataclass
from enum import Enum

class MemoryType(Enum):
    """记忆类型"""
    WORKING = "working"          # 工作记忆
    SHORT_TERM = "short_term"    # 短期记忆
    LONG_TERM = "long_term"      # 长期记忆
    EPISODIC = "episodic"        # 情景记忆
    SEMANTIC = "semantic"        # 语义记忆

@dataclass
class MemoryEntry:
    """记忆条目"""
    id: str
    content: str
    memory_type: MemoryType
    timestamp: datetime
    importance: float
    access_count: int
    metadata: dict

class UnifiedMemorySystem:
    """统一记忆系统"""
    
    def __init__(
        self,
        vectorstore,
        embeddings,
        llm,
        user_id: str,
        config: Optional[dict] = None
    ):
        self.vectorstore = vectorstore
        self.embeddings = embeddings
        self.llm = llm
        self.user_id = user_id
        
        # 配置
        self.config = config or {
            "working_memory_limit": 10,
            "short_term_limit": 20,
            "long_term_retrieval_k": 5,
            "importance_threshold": 0.7,
            "consolidation_interval": 10
        }
        
        # 记忆存储
        self.working_memory: List[MemoryEntry] = []
        self.importance_scorer = ImportanceScorer()
        self.forgetting_system = ForgettingMemory(vectorstore)
    
    def add_memory(
        self,
        user_input: str,
        ai_output: str,
        force_long_term: bool = False
    ):
        """添加记忆"""
        
        # 评估重要性
        importance = self.importance_scorer.score_importance(
            user_input, ai_output
        )
        
        # 创建记忆条目
        entry = MemoryEntry(
            id=self._generate_id(),
            content=f"User: {user_input}\nAI: {ai_output}",
            memory_type=MemoryType.WORKING,
            timestamp=datetime.now(),
            importance=importance,
            access_count=1,
            metadata={
                "user_id": self.user_id,
                "user_input": user_input,
                "ai_output": ai_output
            }
        )
        
        # 添加到工作记忆
        self.working_memory.append(entry)
        
        # 检查是否需要巩固到长期记忆
        if (len(self.working_memory) >= self.config["working_memory_limit"] or
            importance > self.config["importance_threshold"] or
            force_long_term):
            self._consolidate_memories()
    
    def retrieve_memories(
        self,
        query: str,
        memory_types: Optional[List[MemoryType]] = None
    ) -> List[MemoryEntry]:
        """检索记忆"""
        
        memory_types = memory_types or [
            MemoryType.WORKING,
            MemoryType.SHORT_TERM,
            MemoryType.LONG_TERM
        ]
        
        results = []
        
        # 1. 从工作记忆检索
        if MemoryType.WORKING in memory_types:
            results.extend(self.working_memory)
        
        # 2. 从长期记忆检索
        if MemoryType.LONG_TERM in memory_types:
            long_term_results = self._retrieve_long_term(query)
            results.extend(long_term_results)
        
        # 3. 排序(按重要性和时间)
        results.sort(key=lambda x: (x.importance, x.timestamp), reverse=True)
        
        # 4. 更新访问计数
        for entry in results:
            entry.access_count += 1
        
        return results[:self.config["long_term_retrieval_k"]]
    
    def _consolidate_memories(self):
        """巩固记忆"""
        
        if not self.working_memory:
            return
        
        # 批量存储到向量数据库
        contents = [entry.content for entry in self.working_memory]
        metadatas = [
            {
                **entry.metadata,
                "timestamp": entry.timestamp.isoformat(),
                "importance": entry.importance,
                "memory_type": entry.memory_type.value
            }
            for entry in self.working_memory
        ]
        
        self.vectorstore.add_texts(contents, metadatas=metadatas)
        
        # 清空工作记忆
        self.working_memory = []
    
    def _retrieve_long_term(self, query: str) -> List[MemoryEntry]:
        """从长期记忆检索"""
        
        docs = self.vectorstore.similarity_search(
            query,
            k=self.config["long_term_retrieval_k"],
            filter={"user_id": self.user_id}
        )
        
        entries = []
        for doc in docs:
            entry = MemoryEntry(
                id=doc.metadata.get("id", ""),
                content=doc.page_content,
                memory_type=MemoryType.LONG_TERM,
                timestamp=datetime.fromisoformat(doc.metadata["timestamp"]),
                importance=doc.metadata.get("importance", 0.5),
                access_count=doc.metadata.get("access_count", 1),
                metadata=doc.metadata
            )
            entries.append(entry)
        
        return entries
    
    def get_context_for_llm(
        self,
        query: str,
        max_tokens: int = 4000
    ) -> str:
        """获取传递给 LLM 的上下文"""
        
        # 检索相关记忆
        memories = self.retrieve_memories(query)
        
        # 构建上下文
        context_parts = []
        
        # 1. 重要事实
        important_memories = [
            m for m in memories if m.importance > 0.7
        ]
        if important_memories:
            facts_str = "\n".join([
                f"- {m.metadata['user_input']}" 
                for m in important_memories
            ])
            context_parts.append(f"【重要信息】\n{facts_str}")
        
        # 2. 相关对话
        relevant_str = "\n\n".join([
            m.content for m in memories[:5]
        ])
        context_parts.append(f"【相关历史】\n{relevant_str}")
        
        context = "\n\n".join(context_parts)
        
        # Token 检查
        token_count = self._count_tokens(context)
        if token_count > max_tokens:
            context = self._truncate_context(context, max_tokens)
        
        return context
    
    def _generate_id(self) -> str:
        """生成唯一 ID"""
        import uuid
        return str(uuid.uuid4())
    
    def _count_tokens(self, text: str) -> int:
        """计算 Token 数"""
        # 简化实现
        return len(text.split()) * 1.5
    
    def _truncate_context(self, context: str, max_tokens: int) -> str:
        """截断上下文"""
        # 简化实现
        words = context.split()
        return " ".join(words[:int(max_tokens / 1.5)])
    
    def cleanup(self):
        """清理低价值记忆"""
        self.forgetting_system.cleanup_memories(self.user_id)

4.2 与 LangChain 集成

from langchain.chains import ConversationChain
from langchain_openai import ChatOpenAI
from langchain.prompts import PromptTemplate

# 初始化记忆系统
memory_system = UnifiedMemorySystem(
    vectorstore=Chroma(
        embedding_function=OpenAIEmbeddings(),
        persist_directory="./memory_db"
    ),
    embeddings=OpenAIEmbeddings(),
    llm=ChatOpenAI(model="gpt-4"),
    user_id="user_123"
)

# 自定义 Prompt 模板
template = """
你是一个智能助手,拥有对用户的长期记忆。

{memory_context}

当前对话:
{input}

请基于记忆和当前输入回复:
"""

prompt = PromptTemplate(
    input_variables=["memory_context", "input"],
    template=template
)

# 对话函数
def chat_with_memory(user_input: str) -> str:
    """带记忆的对话"""
    
    # 1. 检索相关记忆
    memory_context = memory_system.get_context_for_llm(user_input)
    
    # 2. 生成回复
    llm = ChatOpenAI(model="gpt-4")
    prompt_text = prompt.format(
        memory_context=memory_context,
        input=user_input
    )
    ai_output = llm.invoke(prompt_text).content
    
    # 3. 保存记忆
    memory_system.add_memory(user_input, ai_output)
    
    return ai_output

# 使用示例
response1 = chat_with_memory("我叫张三,我喜欢 Python")
print(response1)
# 输出:你好张三!很高兴认识你。Python 是一门很棒的语言...

# ... 开启新对话

response2 = chat_with_memory("你还记得我的名字吗?")
print(response2)
# 输出:当然记得!你叫张三。而且我知道你喜欢 Python...

4.3 记忆系统评估

class MemoryEvaluator:
    """记忆系统评估器"""
    
    def __init__(self, memory_system: UnifiedMemorySystem):
        self.memory_system = memory_system
    
    def evaluate_retrieval_quality(
        self,
        test_cases: List[dict]
    ) -> dict:
        """评估检索质量"""
        
        results = {
            "precision": [],
            "recall": [],
            "mrr": []  # Mean Reciprocal Rank
        }
        
        for case in test_cases:
            query = case["query"]
            expected_ids = set(case["relevant_memory_ids"])
            
            # 检索
            retrieved = self.memory_system.retrieve_memories(query)
            retrieved_ids = set([m.id for m in retrieved])
            
            # 计算指标
            if retrieved_ids:
                precision = len(retrieved_ids & expected_ids) / len(retrieved_ids)
                results["precision"].append(precision)
            
            if expected_ids:
                recall = len(retrieved_ids & expected_ids) / len(expected_ids)
                results["recall"].append(recall)
            
            # MRR
            for i, memory in enumerate(retrieved):
                if memory.id in expected_ids:
                    results["mrr"].append(1.0 / (i + 1))
                    break
            else:
                results["mrr"].append(0.0)
        
        # 平均值
        return {
            "avg_precision": np.mean(results["precision"]),
            "avg_recall": np.mean(results["recall"]),
            "avg_mrr": np.mean(results["mrr"])
        }
    
    def evaluate_memory_usage(self) -> dict:
        """评估记忆使用情况"""
        
        # 统计各类记忆数量
        all_memories = self.memory_system.vectorstore.similarity_search(
            "",
            k=10000,
            filter={"user_id": self.memory_system.user_id}
        )
        
        # 按重要性分布
        importance_dist = {
            "high": 0,
            "medium": 0,
            "low": 0
        }
        
        for memory in all_memories:
            importance = memory.metadata.get("importance", 0.5)
            if importance > 0.7:
                importance_dist["high"] += 1
            elif importance > 0.3:
                importance_dist["medium"] += 1
            else:
                importance_dist["low"] += 1
        
        # Token 消耗估算
        total_tokens = sum([
            len(m.page_content.split()) * 1.5 
            for m in all_memories
        ])
        
        return {
            "total_memories": len(all_memories),
            "importance_distribution": importance_dist,
            "estimated_tokens": total_tokens,
            "working_memory_size": len(self.memory_system.working_memory)
        }
    
    def benchmark_retrieval_speed(
        self,
        queries: List[str],
        k: int = 5
    ) -> dict:
        """基准测试检索速度"""
        
        import time
        
        latencies = []
        
        for query in queries:
            start_time = time.time()
            _ = self.memory_system.retrieve_memories(query)
            latency = time.time() - start_time
            latencies.append(latency)
        
        return {
            "avg_latency_ms": np.mean(latencies) * 1000,
            "p50_latency_ms": np.percentile(latencies, 50) * 1000,
            "p95_latency_ms": np.percentile(latencies, 95) * 1000,
            "p99_latency_ms": np.percentile(latencies, 99) * 1000
        }

五、高级主题

5.1 多用户记忆隔离

class MultiUserMemorySystem:
    """多用户记忆系统"""
    
    def __init__(self, vectorstore, embeddings, llm):
        self.vectorstore = vectorstore
        self.embeddings = embeddings
        self.llm = llm
        
        # 用户记忆系统缓存
        self.user_memories: Dict[str, UnifiedMemorySystem] = {}
    
    def get_user_memory(self, user_id: str) -> UnifiedMemorySystem:
        """获取用户的记忆系统"""
        
        if user_id not in self.user_memories:
            self.user_memories[user_id] = UnifiedMemorySystem(
                vectorstore=self.vectorstore,
                embeddings=self.embeddings,
                llm=self.llm,
                user_id=user_id
            )
        
        return self.user_memories[user_id]
    
    def chat(self, user_id: str, user_input: str) -> str:
        """用户对话"""
        memory = self.get_user_memory(user_id)
        
        # 获取上下文
        context = memory.get_context_for_llm(user_input)
        
        # 生成回复
        prompt = f"{context}\n\n用户:{user_input}\n\n助手:"
        response = self.llm.invoke(prompt).content
        
        # 保存记忆
        memory.add_memory(user_input, response)
        
        return response
    
    def share_memory_between_users(
        self,
        from_user: str,
        to_user: str,
        memory_ids: List[str]
    ):
        """在用户间共享记忆"""
        
        # 检索源用户的记忆
        from_memory = self.get_user_memory(from_user)
        
        # 复制到目标用户
        to_memory = self.get_user_memory(to_user)
        
        for memory_id in memory_ids:
            # 从向量数据库检索
            # 然后添加到目标用户
            pass

5.2 跨模态记忆

class MultiModalMemory:
    """多模态记忆"""
    
    def __init__(self, vectorstore, text_embeddings, image_embeddings):
        self.vectorstore = vectorstore
        self.text_embeddings = text_embeddings
        self.image_embeddings = image_embeddings
    
    def save_text_memory(self, text: str, metadata: dict):
        """保存文本记忆"""
        embedding = self.text_embeddings.embed_query(text)
        
        self.vectorstore.add_embeddings(
            [(text, embedding)],
            metadatas=[{**metadata, "modality": "text"}]
        )
    
    def save_image_memory(self, image_path: str, description: str, metadata: dict):
        """保存图像记忆"""
        # 提取图像特征
        embedding = self.image_embeddings.embed_image(image_path)
        
        self.vectorstore.add_embeddings(
            [(description, embedding)],
            metadatas=[{
                **metadata,
                "modality": "image",
                "image_path": image_path
            }]
        )
    
    def retrieve_multimodal(
        self,
        query: str,
        modalities: List[str] = ["text", "image"]
    ) -> List[dict]:
        """多模态检索"""
        
        # 文本查询嵌入
        query_embedding = self.text_embeddings.embed_query(query)
        
        # 检索
        results = self.vectorstore.similarity_search_by_vector(
            query_embedding,
            k=10,
            filter={"modality": {"$in": modalities}}
        )
        
        return results

5.3 记忆压缩与总结

class MemoryCompressor:
    """记忆压缩器"""
    
    def __init__(self, llm):
        self.llm = llm
    
    def compress_memories(
        self,
        memories: List[MemoryEntry],
        strategy: str = "summary"
    ) -> str:
        """压缩记忆"""
        
        if strategy == "summary":
            return self._summarize(memories)
        elif strategy == "extract":
            return self._extract_key_points(memories)
        else:
            raise ValueError(f"Unknown strategy: {strategy}")
    
    def _summarize(self, memories: List[MemoryEntry]) -> str:
        """生成摘要"""
        
        # 合并记忆内容
        combined_text = "\n\n".join([
            m.content for m in memories
        ])
        
        # 使用 LLM 生成摘要
        prompt = f"""
请将以下对话历史压缩成简洁的摘要,保留关键信息:

{combined_text}

摘要:
"""
        
        summary = self.llm.invoke(prompt).content
        return summary
    
    def _extract_key_points(self, memories: List[MemoryEntry]) -> str:
        """提取关键点"""
        
        combined_text = "\n\n".join([
            m.content for m in memories
        ])
        
        prompt = f"""
从以下对话中提取关键事实和信息点:

{combined_text}

关键信息:
- 
"""
        
        key_points = self.llm.invoke(prompt).content
        return key_points

六、性能优化

6.1 检索优化

class OptimizedMemoryRetriever:
    """优化的记忆检索器"""
    
    def __init__(
        self,
        vectorstore,
        embeddings,
        cache_size: int = 1000
    ):
        self.vectorstore = vectorstore
        self.embeddings = embeddings
        
        # 查询缓存
        self.query_cache = {}
        self.cache_size = cache_size
    
    def retrieve_with_cache(
        self,
        query: str,
        k: int = 5,
        use_cache: bool = True
    ) -> List:
        """带缓存的检索"""
        
        # 检查缓存
        cache_key = self._get_cache_key(query, k)
        
        if use_cache and cache_key in self.query_cache:
            return self.query_cache[cache_key]
        
        # 执行检索
        results = self.vectorstore.similarity_search(query, k=k)
        
        # 更新缓存
        if use_cache:
            self._update_cache(cache_key, results)
        
        return results
    
    def _get_cache_key(self, query: str, k: int) -> str:
        """生成缓存键"""
        import hashlib
        return hashlib.md5(f"{query}_{k}".encode()).hexdigest()
    
    def _update_cache(self, key: str, value: List):
        """更新缓存"""
        # LRU 淘汰
        if len(self.query_cache) >= self.cache_size:
            # 删除最旧的
            oldest_key = next(iter(self.query_cache))
            del self.query_cache[oldest_key]
        
        self.query_cache[key] = value
    
    def batch_retrieve(
        self,
        queries: List[str],
        k: int = 5
    ) -> List[List]:
        """批量检索"""
        
        # 批量嵌入
        query_embeddings = self.embeddings.embed_documents(queries)
        
        # 批量检索
        results = []
        for embedding in query_embeddings:
            docs = self.vectorstore.similarity_search_by_vector(
                embedding,
                k=k
            )
            results.append(docs)
        
        return results

6.2 存储优化

class OptimizedMemoryStorage:
    """优化的记忆存储"""
    
    def __init__(self, vectorstore, compression_threshold: int = 1000):
        self.vectorstore = vectorstore
        self.compression_threshold = compression_threshold
    
    def save_with_compression(
        self,
        contents: List[str],
        metadatas: List[dict]
    ):
        """带压缩的存储"""
        
        # 如果内容过长,先压缩
        compressed_contents = []
        for content in contents:
            if len(content) > self.compression_threshold:
                content = self._compress_content(content)
            compressed_contents.append(content)
        
        # 存储
        self.vectorstore.add_texts(compressed_contents, metadatas=metadatas)
    
    def _compress_content(self, content: str) -> str:
        """压缩内容"""
        # 移除多余空白
        import re
        content = re.sub(r'\s+', ' ', content)
        
        # 截断
        if len(content) > self.compression_threshold:
            content = content[:self.compression_threshold] + "..."
        
        return content
    
    def optimize_storage(self):
        """优化存储"""
        # 1. 合并相似记忆
        # 2. 删除过期记忆
        # 3. 更新索引
        pass

七、实战案例

案例:个性化学习助手记忆系统

class PersonalizedLearningAssistant:
    """个性化学习助手"""
    
    def __init__(self, user_id: str):
        # 初始化记忆系统
        self.memory_system = UnifiedMemorySystem(
            vectorstore=Chroma(
                embedding_function=OpenAIEmbeddings(),
                persist_directory=f"./learning_assistant/{user_id}"
            ),
            embeddings=OpenAIEmbeddings(),
            llm=ChatOpenAI(model="gpt-4"),
            user_id=user_id
        )
        
        # 学习档案
        self.learning_profile = {
            "knowledge_level": {},  # 各知识点的掌握程度
            "learning_style": None,  # 学习风格
            "goals": [],            # 学习目标
            "weaknesses": []        # 薄弱环节
        }
    
    def learn(self, topic: str, content: str):
        """学习过程"""
        
        # 1. 检索相关记忆
        context = self.memory_system.get_context_for_llm(
            f"关于 {topic} 的知识"
        )
        
        # 2. 生成学习内容
        prompt = f"""
用户正在学习 {topic}
历史学习记录:
{context}

当前学习内容:
{content}

请:
1. 讲解这个知识点
2. 关联之前的学习内容
3. 提供练习题
"""
        
        response = self.memory_system.llm.invoke(prompt).content
        
        # 3. 保存记忆
        self.memory_system.add_memory(
            f"学习 {topic}: {content}",
            response,
            force_long_term=True
        )
        
        # 4. 更新学习档案
        self._update_profile(topic, content, response)
        
        return response
    
    def review(self, topic: str) -> str:
        """复习过程"""
        
        # 检索该主题的所有相关记忆
        memories = self.memory_system.retrieve_memories(
            f"{topic} 知识点",
            memory_types=[MemoryType.LONG_TERM]
        )
        
        # 生成复习内容
        context = "\n\n".join([m.content for m in memories])
        
        prompt = f"""
用户要复习 {topic}
学习历史:
{context}

请生成一份复习总结,包括:
1. 核心概念回顾
2. 重点难点梳理
3. 常见错误提醒
4. 巩固练习
"""
        
        return self.memory_system.llm.invoke(prompt).content
    
    def _update_profile(self, topic: str, content: str, response: str):
        """更新学习档案"""
        
        # 使用 LLM 提取知识点和掌握程度
        extraction_prompt = f"""
分析以下学习内容,提取:
1. 知识点
2. 用户掌握程度(1-5)
3. 学习难点

学习内容:
{content}

AI 回复:
{response}

以 JSON 格式返回。
"""
        
        import json
        extraction = self.memory_system.llm.invoke(extraction_prompt).content
        
        try:
            data = json.loads(extraction)
            
            # 更新知识点掌握程度
            for kp in data.get("knowledge_points", []):
                self.learning_profile["knowledge_level"][kp["name"]] = {
                    "level": kp["level"],
                    "last_review": datetime.now().isoformat()
                }
            
            # 更新薄弱环节
            difficulties = data.get("difficulties", [])
            self.learning_profile["weaknesses"].extend(difficulties)
            self.learning_profile["weaknesses"] = list(set(
                self.learning_profile["weaknesses"]
            ))
            
        except Exception as e:
            print(f"更新档案失败: {e}")
    
    def get_personalized_recommendation(self) -> str:
        """获取个性化学习推荐"""
        
        # 基于记忆和学习档案生成推荐
        prompt = f"""
基于用户的学习档案:

{json.dumps(self.learning_profile, indent=2)}

请推荐:
1. 下一步应该学习什么
2. 需要复习哪些内容
3. 如何改进学习方法
"""
        
        return self.memory_system.llm.invoke(prompt).content

# 使用示例
assistant = PersonalizedLearningAssistant(user_id="learner_123")

# 学习
response1 = assistant.learn(
    "Python 基础",
    "变量和数据类型"
)
print(response1)

# 继续学习
response2 = assistant.learn(
    "Python 基础",
    "条件和循环"
)
print(response2)

# 复习
review = assistant.review("Python 基础")
print(review)

# 获取推荐
recommendation = assistant.get_personalized_recommendation()
print(recommendation)

八、总结与最佳实践清单

记忆系统设计原则

  1. 分层原则:工作记忆 → 短期记忆 → 长期记忆
  2. 重要性优先:重要信息优先存储和检索
  3. 遗忘机制:定期清理低价值记忆
  4. 上下文相关:检索与当前上下文相关的记忆
  5. 用户隔离:多用户场景下的记忆隔离

最佳实践清单

短期记忆

  • ✅ 根据场景选择合适的记忆类型(滑动窗口 vs 摘要)
  • ✅ 监控 Token 消耗,避免超出 Context Window
  • ✅ 提取并优先保留重要信息
  • ✅ 考虑混合记忆策略(摘要 + 完整对话)

长期记忆

  • ✅ 选择合适的向量数据库(考虑规模、性能、成本)
  • ✅ 选择合适的嵌入模型(考虑语言、性能、成本)
  • ✅ 实现记忆去重机制
  • ✅ 实现遗忘机制,定期清理
  • ✅ 为记忆添加丰富的元数据
  • ✅ 考虑混合检索(向量 + 关键词)

性能优化

  • ✅ 实现查询缓存
  • ✅ 批量操作优化
  • ✅ 内容压缩
  • ✅ 异步处理

评估与监控

  • ✅ 建立检索质量评估体系
  • ✅ 监控记忆使用情况
  • ✅ 基准测试检索速度
  • ✅ 收集用户反馈

常见问题与解决方案

问题解决方案
Token 超限滑动窗口 + 摘要记忆
检索不准确混合检索 + 元数据过滤
记忆重复去重机制 + 相似度阈值
检索慢缓存 + 索引优化
存储爆炸遗忘机制 + 压缩
跨用户泄露用户 ID 过滤 + 权限控制

参考资料

论文

开源项目

向量数据库

教程

使用 Hugo 构建
主题 StackJimmy 设计