在 Langchain 中，当你运行一个 LLM、Chain、Agent 或 Tool 时，会经历一系列的步骤（例如，开始调用 LLM，LLM 返回结果，Chain 开始执行，Tool 被调用等）。事件系统（通过回调处理器 CallbackHandler 实现）允许你在这些步骤发生时执行自定义代码。这对于日志记录、监控、调试、数据收集、UI 更新等都非常有用。

## 1. 准备工作

In [10]:
# 安装必要的库 (如果还没安装)
!pip install langchain langchain-openai pandas langchain-community # pandas 仅用于某些回调数据结构示例
# !pip install langsmith # 如果你想体验 LangSmith

Collecting langchain-community
  Downloading langchain_community-0.3.23-py3-none-any.whl.metadata (2.5 kB)
Collecting aiohttp<4.0.0,>=3.8.3 (from langchain-community)
  Using cached aiohttp-3.11.18-cp311-cp311-macosx_11_0_arm64.whl.metadata (7.7 kB)
Collecting pydantic-settings<3.0.0,>=2.4.0 (from langchain-community)
  Using cached pydantic_settings-2.9.1-py3-none-any.whl.metadata (3.8 kB)
Collecting httpx-sse<1.0.0,>=0.4.0 (from langchain-community)
  Using cached httpx_sse-0.4.0-py3-none-any.whl.metadata (9.0 kB)
Collecting aiohappyeyeballs>=2.3.0 (from aiohttp<4.0.0,>=3.8.3->langchain-community)
  Using cached aiohappyeyeballs-2.6.1-py3-none-any.whl.metadata (5.9 kB)
Collecting aiosignal>=1.1.2 (from aiohttp<4.0.0,>=3.8.3->langchain-community)
  Using cached aiosignal-1.3.2-py2.py3-none-any.whl.metadata (3.8 kB)
Collecting frozenlist>=1.1.1 (from aiohttp<4.0.0,>=3.8.3->langchain-community)
  Using cached frozenlist-1.6.0-cp311-cp311-macosx_11_0_arm64.whl.metadata (16 kB)
Collecting

In [1]:
import os
from typing import Any, Dict, List, Union, Optional, Sequence
from uuid import UUID

# 设置你的 OpenAI API 密钥 (或者你选择的其他 LLM 提供商的密钥)
# 强烈建议从环境变量中读取


#这里我使用阿里云的百炼平台

api_key = os.getenv("DASHSCOPE_API_KEY")


from langchain_openai import ChatOpenAI
from langchain_core.callbacks.base import BaseCallbackHandler
from langchain_core.callbacks.manager import CallbackManager, AsyncCallbackManager
from langchain_core.outputs import LLMResult, ChatResult, GenerationChunk
from langchain_core.messages import BaseMessage
from langchain.chains import LLMChain
from langchain_core.prompts import PromptTemplate
from langchain_core.agents import AgentAction, AgentFinish
from langchain.agents import Tool, initialize_agent, AgentType


## 2. 使用内置的StdOutCallbackHandler

当你运行下面的代码时，你会看到详细的输出，包括 on_llm_start, on_llm_end 等事件，以及它们携带的数据（如 prompt, response）。

In [None]:
from langchain.callbacks import StdOutCallbackHandler

# 初始化 LLM
llm = ChatOpenAI(
    api_key=os.getenv("DASHSCOPE_API_KEY"),
    base_url="https://dashscope.aliyuncs.com/compatible-mode/v1",
    model="qwen-plus",  # 此处以qwen-plus为例，您可按需更换模型名称。模型列表：https://help.aliyun.com/zh/model-studio/getting-started/models
    # other params...
)

# 创建一个 StdOutCallbackHandler 实例
stdout_handler = StdOutCallbackHandler()

# 使用回调处理器运行 LLM
print("--- LLM Call with StdOutCallbackHandler ---")
# 方法1: 在构造时传入
llm_with_stdout = ChatOpenAI(
    api_key=os.getenv("DASHSCOPE_API_KEY"),
    base_url="https://dashscope.aliyuncs.com/compatible-mode/v1",
    model="qwen-plus",  # 此处以qwen-plus为例，您可按需更换模型名称。模型列表：https://help.aliyun.com/zh/model-studio/getting-started/models
    temperature=0,
    callbacks=[stdout_handler] # 可以传入一个列表
)
response = llm_with_stdout.invoke("Hello, how are you today?")
print("\nLLM Response:")
print(response.content)

print("\n--- LLM Call using 'config' (preferred for LCEL) ---")
# 方法2: 使用 invoke/stream/batch 的 config 参数 (更推荐，特别是对于 LCEL)
response_config = llm.invoke(
    "Tell me a short joke.",
    config={"callbacks": [stdout_handler]}
)
print("\nLLM Response (config):")
print(response_config.content)

# 使用回调处理器运行 Chain
prompt = PromptTemplate.from_template("What is the capital of {country}?")
chain = LLMChain(llm=llm, prompt=prompt) # 旧版 Chain，目前先演示旧版，后面有新版

print("\n--- Chain Call with StdOutCallbackHandler (using config) ---")
chain_response = chain.invoke(
    {"country": "France"},
    config={"callbacks": [stdout_handler]}
)
print("\nChain Response:")
print(chain_response)

--- LLM Call with StdOutCallbackHandler ---

LLM Response:
Hello! I'm just a program, so I don't have feelings, but thanks for asking! How can I assist you today?

--- LLM Call using 'config' (preferred for LCEL) ---

LLM Response (config):
Why don't skeletons fight each other?

Because they don't have the guts!

--- Chain Call with StdOutCallbackHandler (using config) ---


[1m> Entering new RunnableSequence chain...[0m


[1m> Entering new PromptTemplate chain...[0m

[1m> Finished chain.[0m

[1m> Finished chain.[0m

Chain Response:
content='The capital of France is **Paris**. It is the political, cultural, and economic center of the country, known for its iconic landmarks such as the Eiffel Tower, Louvre Museum, and Notre-Dame Cathedral.' additional_kwargs={'refusal': None} response_metadata={'token_usage': {'completion_tokens': 46, 'prompt_tokens': 15, 'total_tokens': 61, 'completion_tokens_details': None, 'prompt_tokens_details': {'audio_tokens': None, 'cached_tokens': 0}}, 

## 3. 创建自定义回调处理器

通过继承 BaseCallbackHandler，你可以创建自己的处理器来精确控制如何响应特定的事件。

In [5]:
from langchain_core.outputs import GenerationChunk, ChatGenerationChunk

class MyCustomHandler(BaseCallbackHandler):
    def __init__(self, description: str = "MyHandler"):
        self.description = description
        print(f"[{self.description}] Handler Initialized")

    def on_llm_start(
        self, serialized: Dict[str, Any], prompts: List[str], **kwargs: Any
    ) -> Any:
        """Run when LLM starts running."""
        print(f"\n[{self.description}] LLM Start:")
        print(f"  Serialized: {serialized}") # 描述 LLM 的信息
        print(f"  Prompts: {prompts}")

    def on_chat_model_start(
        self,
        serialized: Dict[str, Any],
        messages: List[List[BaseMessage]],
        *,
        run_id: UUID,
        parent_run_id: Optional[UUID] = None,
        tags: Optional[List[str]] = None,
        metadata: Optional[Dict[str, Any]] = None,
        **kwargs: Any,
    ) -> Any:
        """Run when a Chat Model starts running."""
        print(f"\n[{self.description}] Chat Model Start (ID: {run_id}):")
        # print(f"  Serialized: {serialized}") # 通常与 on_llm_start 类似
        for i, msg_list in enumerate(messages):
            print(f"  Message Group {i+1}:")
            for msg in msg_list:
                print(f"    - {msg.type}: {msg.content[:80]}...") # 打印消息类型和部分内容

    def on_llm_new_token(
        self,
        token: str,
        *,
        chunk: Optional[Union[GenerationChunk, ChatGenerationChunk]] = None, # 注意这里的类型提示变化
        run_id: UUID,
        parent_run_id: Optional[UUID] = None,
        **kwargs: Any,
    ) -> Any:
        """Run on new LLM token. Only available when streaming is enabled."""
        # print(f"[{self.description}] New Token: '{token}' (Run ID: {run_id})")
        # 为了避免过多输出，streaming token 通常只在需要时打印
        pass


    def on_llm_end(self, response: LLMResult, *, run_id: UUID, **kwargs: Any) -> Any:
        """Run when LLM ends running."""
        print(f"\n[{self.description}] LLM End (ID: {run_id}):")
        for i, generation_list in enumerate(response.generations):
            print(f"  Generation Group {i+1}:")
            for generation in generation_list:
                # ChatGeneration 通常有 'message' 属性，Generation 有 'text'
                if hasattr(generation, 'message') and generation.message:
                    print(f"    - Text: {generation.message.content[:100]}...")
                    if generation.message.additional_kwargs:
                         print(f"    - Tool Calls: {generation.message.additional_kwargs.get('tool_calls')}")
                elif hasattr(generation, 'text'):
                    print(f"    - Text: {generation.text[:100]}...")
        if response.llm_output:
            print(f"  LLM Output (Token Usage, etc.): {response.llm_output}")

    def on_llm_error(
        self, error: Union[Exception, KeyboardInterrupt], **kwargs: Any
    ) -> Any:
        """Run when LLM errors."""
        print(f"\n[{self.description}] LLM Error: {error}")

    def on_chain_start(
        self, serialized: Dict[str, Any], inputs: Dict[str, Any], **kwargs: Any
    ) -> Any:
        """Run when chain starts running."""
        print(f"\n[{self.description}] Chain Start:")
        print(f"  Serialized: {serialized}") # 描述 Chain 的信息
        print(f"  Inputs: {inputs}")

    def on_chain_end(self, outputs: Dict[str, Any], **kwargs: Any) -> Any:
        """Run when chain ends running."""
        print(f"\n[{self.description}] Chain End:")
        print(f"  Outputs: {outputs}")

    def on_chain_error(
        self, error: Union[Exception, KeyboardInterrupt], **kwargs: Any
    ) -> Any:
        """Run when chain errors."""
        print(f"\n[{self.description}] Chain Error: {error}")

    def on_tool_start(
        self, serialized: Dict[str, Any], input_str: str, **kwargs: Any
    ) -> Any:
        """Run when tool starts running."""
        print(f"\n[{self.description}] Tool Start:")
        print(f"  Serialized: {serialized}") # 描述 Tool 的信息
        print(f"  Input: {input_str}")

    def on_tool_end(self, output: str, **kwargs: Any) -> Any: # output 类型可能因工具而异
        """Run when tool ends running."""
        print(f"\n[{self.description}] Tool End:")
        print(f"  Output: {output}")

    def on_tool_error(
        self, error: Union[Exception, KeyboardInterrupt], **kwargs: Any
    ) -> Any:
        """Run when tool errors."""
        print(f"\n[{self.description}] Tool Error: {error}")

    def on_agent_action(self, action: AgentAction, **kwargs: Any) -> Any:
        """Run on agent action."""
        print(f"\n[{self.description}] Agent Action:")
        print(f"  Tool: {action.tool}")
        print(f"  Tool Input: {action.tool_input}")
        print(f"  Log: {action.log}")

    def on_agent_finish(self, finish: AgentFinish, **kwargs: Any) -> Any:
        """Run on agent end."""
        print(f"\n[{self.description}] Agent Finish:")
        print(f"  Return Values: {finish.return_values}")
        print(f"  Log: {finish.log}")

# 实例化你的自定义处理器
my_handler = MyCustomHandler(description="LoggerV1")

# 再次运行 LLM，这次使用自定义处理器
print("\n--- LLM Call with MyCustomHandler ---")
llm_with_custom_handler  = ChatOpenAI(
    api_key=os.getenv("DASHSCOPE_API_KEY"),
    base_url="https://dashscope.aliyuncs.com/compatible-mode/v1",
    model="qwen-plus",  # 此处以qwen-plus为例，您可按需更换模型名称。模型列表：https://help.aliyun.com/zh/model-studio/getting-started/models
    temperature=0,
    callbacks=[stdout_handler] # 可以传入一个列表
)
response_custom = llm_with_custom_handler.invoke("What are the benefits of learning Langchain?")
# response_custom = llm.invoke("What are the benefits of learning Langchain?", config={"callbacks": [my_handler]})
print("\nLLM Response (Custom Handler):")
print(response_custom.content)

# 使用自定义处理器运行 Chain
print("\n--- Chain Call with MyCustomHandler ---")
chain_response_custom = chain.invoke(
    {"country": "Canada"},
    config={"callbacks": [my_handler]} # 推荐方式
)
print("\nChain Response (Custom Handler):")
print(chain_response_custom)

[LoggerV1] Handler Initialized

--- LLM Call with MyCustomHandler ---

LLM Response (Custom Handler):
Learning **Langchain** offers several benefits, especially for developers and data scientists interested in building applications that leverage large language models (LLMs) like OpenAI's GPT, Anthropic's Claude, or other models. Langchain is a framework designed to simplify the integration of LLMs into applications by providing tools, abstractions, and pre-built components. Here are some key benefits:

### 1. **Simplified Integration with LLMs**
   - Langchain abstracts away much of the complexity involved in integrating LLMs into applications. Instead of manually handling API calls, input/output formatting, and model management, Langchain provides ready-to-use components that streamline these processes.
   - This allows developers to focus on higher-level tasks, such as designing user experiences or optimizing workflows.

### 2. **Modular Components**
   - Langchain is built around mo

## 4. Langchain Expression Language (LCEL) 与回调

LCEL 是构建 Chain 的现代方式。回调系统与 LCEL 无缝集成，主要通过在 invoke, stream, batch, ainvoke 等方法的 config 参数中传递回调。

在 stream 的例子中，streaming_token_handler 会在 LLM 逐个生成 token 时被调用，而 my_handler 则会记录整个 LLM 调用和 Chain 调用的开始和结束事件。

In [8]:
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser

# LCEL Chain
prompt_lcel = ChatPromptTemplate.from_template("Tell me a fun fact about {topic}.")


# 可以不在这里传入callbacks
llm_lcel =  ChatOpenAI(
    api_key=os.getenv("DASHSCOPE_API_KEY"),
    base_url="https://dashscope.aliyuncs.com/compatible-mode/v1",
    model="qwen-plus",  # 此处以qwen-plus为例，您可按需更换模型名称。模型列表：https://help.aliyun.com/zh/model-studio/getting-started/models
    temperature=0,
)


parser = StrOutputParser()

chain_lcel = prompt_lcel | llm_lcel | parser

print("\n--- LCEL Chain Call with MyCustomHandler ---")
lcel_response = chain_lcel.invoke(
    {"topic": "the moon"},
    config={"callbacks": [my_handler, stdout_handler]} # 可以传递多个处理器
)
print("\nLCEL Chain Response:")
print(lcel_response)

# 演示 streaming 和 on_llm_new_token
print("\n--- LCEL Chain Stream with MyCustomHandler ---")
# 为了看到 on_llm_new_token, 我们需要修改 MyCustomHandler 或使用一个专门的 handler
class StreamingTokenHandler(BaseCallbackHandler):
    def on_llm_new_token(self, token: str, **kwargs: Any) -> None:
        print(f"Streamed Token: '{token}'", end="", flush=True)

streaming_token_handler = StreamingTokenHandler()
llm_streaming = ChatOpenAI(
    api_key=os.getenv("DASHSCOPE_API_KEY"),
    base_url="https://dashscope.aliyuncs.com/compatible-mode/v1",
    model="qwen-plus",
    temperature=0,
    streaming=True, # 启用 streaming
    # callbacks=[streaming_token_handler] # 也可以在这里全局设置
)
chain_streaming = prompt_lcel | llm_streaming | parser

full_response_streamed = ""
for chunk in chain_streaming.stream({"topic": "black holes"}, config={"callbacks": [streaming_token_handler, my_handler]}):
    # chunk 是 StrOutputParser 输出的字符串片段
    # streaming_token_handler 会在 LLM 层面捕获 token
    # my_handler 会捕获 chain 和 llm 的 start/end 事件
    full_response_streamed += chunk
    # print(f"Chain Stream Chunk: {chunk}") # 这是 chain 级别的输出 chunk
print("\n\nLCEL Streamed Full Response:")
print(full_response_streamed)


--- LCEL Chain Call with MyCustomHandler ---

[LoggerV1] Chain Start:
  Serialized: None
  Inputs: {'topic': 'the moon'}


[1m> Entering new RunnableSequence chain...[0m

[LoggerV1] Chain Start:
  Serialized: {'lc': 1, 'type': 'constructor', 'id': ['langchain', 'prompts', 'chat', 'ChatPromptTemplate'], 'kwargs': {'input_variables': ['topic'], 'messages': [{'lc': 1, 'type': 'constructor', 'id': ['langchain', 'prompts', 'chat', 'HumanMessagePromptTemplate'], 'kwargs': {'prompt': {'lc': 1, 'type': 'constructor', 'id': ['langchain', 'prompts', 'prompt', 'PromptTemplate'], 'kwargs': {'input_variables': ['topic'], 'template': 'Tell me a fun fact about {topic}.', 'template_format': 'f-string'}, 'name': 'PromptTemplate'}}}]}, 'name': 'ChatPromptTemplate'}
  Inputs: {'topic': 'the moon'}


[1m> Entering new ChatPromptTemplate chain...[0m

[LoggerV1] Chain End:
  Outputs: messages=[HumanMessage(content='Tell me a fun fact about the moon.', additional_kwargs={}, response_metadata={})]

[1m>

5. Agent 和 Tool 事件

当使用 Agent 和 Tool 时，会有额外的事件如 on_tool_start, on_tool_end, on_agent_action, on_agent_finish。


In [12]:
!pip install duckduckgo-search

Collecting duckduckgo-search
  Downloading duckduckgo_search-8.0.1-py3-none-any.whl.metadata (16 kB)
Collecting primp>=0.15.0 (from duckduckgo-search)
  Downloading primp-0.15.0-cp38-abi3-macosx_11_0_arm64.whl.metadata (13 kB)
Downloading duckduckgo_search-8.0.1-py3-none-any.whl (18 kB)
Downloading primp-0.15.0-cp38-abi3-macosx_11_0_arm64.whl (3.0 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m3.0/3.0 MB[0m [31m3.8 MB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0m
[?25hInstalling collected packages: primp, duckduckgo-search
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2/2[0m [duckduckgo-search]
[1A[2KSuccessfully installed duckduckgo-search-8.0.1 primp-0.15.0


In [13]:
from langchain.tools import DuckDuckGoSearchRun

# 定义一个简单的工具
search_tool = Tool(
    name="DuckDuckGo Search",
    func=DuckDuckGoSearchRun().run,
    description="Useful for when you need to answer questions about current events or general knowledge.",
)

# 初始化 Agent (使用旧版 agent executor 来清晰演示)
# 注意：新版 agents 推荐使用 `create_openai_functions_agent` 等配合 `AgentExecutor`
# 但回调机制是通用的
try:
    # 使用一个支持函数调用的模型
    agent_llm = ChatOpenAI(
        api_key=os.getenv("DASHSCOPE_API_KEY"),
        base_url="https://dashscope.aliyuncs.com/compatible-mode/v1",
        model="qwen-plus",  # 此处以qwen-plus为例，您可按需更换模型名称。模型列表：https://help.aliyun.com/zh/model-studio/getting-started/models
        temperature=0,
    )
    # 旧版 initialize_agent 仍然可以工作，但请注意 Langchain 的发展方向
    # verbose=True 本身也使用了一个 Callback Handler
    agent_executor = initialize_agent(
        tools=[search_tool],
        llm=agent_llm,
        agent=AgentType.ZERO_SHOT_REACT_DESCRIPTION, # 简单的 Agent 类型
        verbose=False, # 设置为 False 避免与我们的 Handler 输出混淆
        handle_parsing_errors=True
    )

    print("\n--- Agent Call with MyCustomHandler ---")
    agent_response = agent_executor.invoke(
        {"input": "What is the latest news about Mars rovers?"},
        config={"callbacks": [my_handler, stdout_handler]} # 使用我们的自定义处理器和 stdout
    )
    print("\nAgent Final Response:")
    print(agent_response["output"])

except Exception as e:
    print(f"Error initializing or running agent: {e}")
    print("Skipping agent example. Ensure you have 'duckduckgo-search' installed if using DuckDuckGoSearchRun.")
    print("pip install duckduckgo-search")

  agent_executor = initialize_agent(



--- Agent Call with MyCustomHandler ---

[LoggerV1] Chain Start:
  Serialized: None
  Inputs: {'input': 'What is the latest news about Mars rovers?'}


[1m> Entering new AgentExecutor chain...[0m

[LoggerV1] Chain Start:
  Serialized: None
  Inputs: {'input': 'What is the latest news about Mars rovers?', 'agent_scratchpad': '', 'stop': ['\nObservation:', '\n\tObservation:']}


[1m> Entering new LLMChain chain...[0m
Prompt after formatting:
[32;1m[1;3mAnswer the following questions as best you can. You have access to the following tools:

DuckDuckGo Search(tool_input: 'Union[str, dict[str, Any]]', verbose: 'Optional[bool]' = None, start_color: 'Optional[str]' = 'green', color: 'Optional[str]' = 'green', callbacks: 'Callbacks' = None, *, tags: 'Optional[list[str]]' = None, metadata: 'Optional[dict[str, Any]]' = None, run_name: 'Optional[str]' = None, run_id: 'Optional[uuid.UUID]' = None, config: 'Optional[RunnableConfig]' = None, tool_call_id: 'Optional[str]' = None, **kwargs: 'An

## 6. 异步回调处理器 (AsyncCallbackHandler)

如果你的应用主要使用 Langchain 的异步功能（如 ainvoke, astream），你应该使用 AsyncCallbackHandler。方法签名将是 async def。

In [15]:
from langchain_core.callbacks.base import AsyncCallbackHandler

class MyAsyncCustomHandler(AsyncCallbackHandler):
    def __init__(self, description: str = "MyAsyncHandler"):
        self.description = description
        print(f"[{self.description}] Async Handler Initialized")

    async def on_llm_start(
        self, serialized: Dict[str, Any], prompts: List[str], **kwargs: Any
    ) -> None:
        print(f"\n[{self.description}] (Async) LLM Start:")
        # print(f"  Serialized: {serialized}")
        print(f"  Prompts: {prompts}")

    async def on_llm_end(self, response: LLMResult, **kwargs: Any) -> None:
        print(f"\n[{self.description}] (Async) LLM End:")
        # print(f"  Response: {response.generations[0][0].text[:80]}...")

    # ... 你可以实现其他 on_... 方法的异步版本
    # 例如: async def on_chain_start(...) etc.

# 实例化异步处理器
my_async_handler = MyAsyncCustomHandler()

# 使用异步处理器运行 LLM
print("\n--- (Async) LLM Call with MyAsyncCustomHandler ---")
# llm_async = ChatOpenAI(model="gpt-3.5-turbo", temperature=0, callbacks=[my_async_handler]) # 不推荐这样用于异步
# response_async = await llm_async.ainvoke("Tell me about asynchronous programming.")

# 推荐使用 config
llm_for_async = ChatOpenAI(
    api_key=os.getenv("DASHSCOPE_API_KEY"),
    base_url="https://dashscope.aliyuncs.com/compatible-mode/v1",
    model="qwen-plus",  # 此处以qwen-plus为例，您可按需更换模型名称。模型列表：https://help.aliyun.com/zh/model-studio/getting-started/models
    temperature=0,
)
response_async = await llm_for_async.ainvoke(
    "Tell me about asynchronous programming.",
    config={"callbacks": [my_async_handler, stdout_handler]}
)
print("\nAsync LLM Response:")
print(response_async.content)

# 异步 LCEL Chain
print("\n--- (Async) LCEL Chain Call with MyAsyncCustomHandler ---")
chain_lcel_async = prompt_lcel | llm_for_async | parser
async_lcel_response = await chain_lcel_async.ainvoke(
    {"topic": "quantum computing"},
    config={"callbacks": [my_async_handler, stdout_handler]}
)
print("\nAsync LCEL Chain Response:")
print(async_lcel_response)

[MyAsyncHandler] Async Handler Initialized

--- (Async) LLM Call with MyAsyncCustomHandler ---

[MyAsyncHandler] (Async) LLM Start:
  Prompts: ['Human: Tell me about asynchronous programming.']

[MyAsyncHandler] (Async) LLM End:

Async LLM Response:
Asynchronous programming is a technique used in software development to allow programs to perform multiple operations concurrently without blocking the execution of other tasks. This approach is particularly useful when dealing with I/O-bound or network-bound operations, where tasks may involve waiting for external resources such as databases, file systems, or web services.

### Key Concepts of Asynchronous Programming

1. **Blocking vs Non-Blocking**:
   - In synchronous (blocking) programming, the program waits for an operation to complete before moving on to the next task. For example, if you're reading data from a file, the program will pause until the read operation finishes.
   - In asynchronous (non-blocking) programming, the program

要运行包含 await 的异步代码，你需要在一个异步上下文中执行它，例如在一个 async def main(): 函数中，然后用 asyncio.run(main()) 来运行。

In [18]:
import asyncio

async def main():
    class MyAsyncCustomHandler(AsyncCallbackHandler):
        def __init__(self, description: str = "MyAsyncHandler"):
            self.description = description
            print(f"[{self.description}] Async Handler Initialized")

        async def on_llm_start(
            self, serialized: Dict[str, Any], prompts: List[str], **kwargs: Any
        ) -> None:
            print(f"\n[{self.description}] (Async) LLM Start:")
            # print(f"  Serialized: {serialized}")
            print(f"  Prompts: {prompts}")

        async def on_llm_end(self, response: LLMResult, **kwargs: Any) -> None:
            print(f"\n[{self.description}] (Async) LLM End:")

    # 实例化异步处理器
    my_async_handler = MyAsyncCustomHandler()
    stdout_async_handler = StdOutCallbackHandler() # StdOutCallbackHandler 也能在异步场景下工作

    # 使用异步处理器运行 LLM
    print("\n--- (Async) LLM Call with MyAsyncCustomHandler ---")
    llm_for_async = ChatOpenAI(
        api_key=os.getenv("DASHSCOPE_API_KEY"),
        base_url="https://dashscope.aliyuncs.com/compatible-mode/v1",
        model="qwen-plus",  # 此处以qwen-plus为例，您可按需更换模型名称。模型列表：https://help.aliyun.com/zh/model-studio/getting-started/models
        temperature=0,
    )
    response_async = await llm_for_async.ainvoke(
        "Tell me about asynchronous programming.",
        config={"callbacks": [my_async_handler, stdout_async_handler]}
    )
    print("\nAsync LLM Response:")
    print(response_async.content)

    # 异步 LCEL Chain
    print("\n--- (Async) LCEL Chain Call with MyAsyncCustomHandler ---")
    prompt_lcel_async = ChatPromptTemplate.from_template("Explain {concept} in simple terms.")
    chain_lcel_async = prompt_lcel_async | llm_for_async | StrOutputParser()
    async_lcel_response = await chain_lcel_async.ainvoke(
        {"concept": "black holes"},
        config={"callbacks": [my_async_handler, stdout_async_handler]}
    )
    print("\nAsync LCEL Chain Response:")
    print(async_lcel_response)

    print("\n--- (Async) LCEL Chain Stream with MyAsyncCustomHandler ---")
    # 为了看到 on_llm_new_token, 我们需要一个异步的 streaming handler
    class AsyncStreamingTokenHandler(AsyncCallbackHandler):
        async def on_llm_new_token(self, token: str, **kwargs: Any) -> None:
            print(f"Async Streamed Token: '{token}'", end="", flush=True)

    async_streaming_token_handler = AsyncStreamingTokenHandler()
    llm_async_streaming = ChatOpenAI(
        api_key=os.getenv("DASHSCOPE_API_KEY"),
        base_url="https://dashscope.aliyuncs.com/compatible-mode/v1",
        model="qwen-plus",  # 此处以qwen-plus为例，您可按需更换模型名称。模型列表：https://help.aliyun.com/zh/model-studio/getting-started/models
        temperature=0,
        streaming=True,
    )
    chain_async_streaming = prompt_lcel_async | llm_async_streaming | StrOutputParser()

    full_async_response_streamed = ""
    async for chunk in chain_async_streaming.astream(
        {"concept": "neural networks"},
        config={"callbacks": [async_streaming_token_handler, my_async_handler]}
    ):
        full_async_response_streamed += chunk
    print("\n\nAsync LCEL Streamed Full Response:")
    print(full_async_response_streamed)


# 如果你在 Jupyter Notebook 或类似环境中，可以直接 await main()
await main()
# 否则，使用 asyncio.run()
if __name__ == "__main__":
    # 注意：直接在脚本顶层使用 await 需要 Python 3.8+ 的交互式解释器或特定运行方式
    # 在普通 .py 文件中，标准做法是：
    try:
        asyncio.run(main())
    except OSError as e:
        if "Cannot run the event loop while another loop is running" in str(e):
            print("\nSkipping asyncio.run(main()) as an event loop is already running (e.g., in Jupyter).")
            print("You can typically 'await main()' directly in such environments.")
        else:
            raise e
    except Exception as e:
        print(f"An error occurred: {e}")

[MyAsyncHandler] Async Handler Initialized

--- (Async) LLM Call with MyAsyncCustomHandler ---

[MyAsyncHandler] (Async) LLM Start:
  Prompts: ['Human: Tell me about asynchronous programming.']

[MyAsyncHandler] (Async) LLM End:

Async LLM Response:
Asynchronous programming is a technique that allows a program to perform other tasks while waiting for certain operations to complete. This approach contrasts with synchronous programming, where the program execution halts and waits until the current operation finishes before moving on to the next one.

### Key Concepts of Asynchronous Programming:

1. **Non-Blocking Operations**: In asynchronous programming, when an operation (such as reading from a file, making a network request, or querying a database) starts, the program doesn't wait for it to finish. Instead, it continues executing other parts of the code, improving efficiency and responsiveness.

2. **Concurrency**: Asynchronous programming enables concurrency, meaning multiple tasks 

  print(f"An error occurred: {e}")


## 7. aevents() (实验性/较新特性)

Langchain 引入了 aevents API，它提供了一种通过异步生成器流式传输事件的更现代方式。这允许你以编程方式迭代运行过程中的事件。

In [None]:
# (确保在异步上下文中运行，例如上面的 main() 函数)

async def demonstrate_aevents():
    print("\n--- Demonstrating aevents API ---")
    llm = ChatOpenAI(
        api_key=os.getenv("DASHSCOPE_API_KEY"),
        base_url="https://dashscope.aliyuncs.com/compatible-mode/v1",
        model="qwen-plus",  # 此处以qwen-plus为例，您可按需更换模型名称。模型列表：https://help.aliyun.com/zh/model-studio/getting-started/models
        temperature=0,
    )
    prompt = ChatPromptTemplate.from_template("What are three key features of Python?")
    chain = prompt | llm | StrOutputParser()

    # aevents 返回一个异步生成器
    event_stream = chain.aevents(
        {"input": "What are three key features of Python?"}, # LCEL chain.invoke takes a dict
        # 如果你的 chain 输入 key 不是 "input", 请相应修改
        # 例如, 如果 prompt_lcel = ChatPromptTemplate.from_template("Tell me a fun fact about {topic}.")
        # 则 chain.aevents({"topic": "the moon"})
        config={"tags": ["aevents_demo"]} # 可以添加标签
    )

    async for event in event_stream:
        event_name = event["name"] # 事件的名称，如 on_llm_start
        run_id = event["run_id"]   # 运行 ID
        print(f"Event: {event_name} (Run ID: {run_id})")
        # print(f"  Data: {event['data']}") # 事件携带的数据
        if event_name == "on_chat_model_stream": # 对于 streaming
            print(f"    Chunk: {event['data']['chunk']}")
        elif event_name == "on_chat_model_end":
            if event['data'].get('output') and hasattr(event['data']['output'], 'generations'):
                 print(f"    LLM Output: {event['data']['output'].generations[0][0].message.content[:50]}...")
        # 你可以根据 event_name 检查不同的事件类型并处理其 data
        # 例如 event["name"] 可以是 "on_llm_start", "on_llm_stream", "on_llm_end",
        # "on_chain_start", "on_chain_end", etc.
        # event["data"] 包含该事件的具体信息

# 在 main 函数中调用:
# await demonstrate_aevents()
# 然后运行 asyncio.run(main())

将 demonstrate_aevents 添加到 main 函数中并运行，你将看到每个事件被异步地打印出来。aevents 非常强大，因为它允许你以编程方式访问和处理事件流，而不仅仅是通过回调函数。

## 8. LangSmith 和事件

LangSmith 是 Langchain 的官方可观测性和调试平台。当你配置好 LangSmith（通常通过设置环境变量 LANGCHAIN_API_KEY, LANGCHAIN_TRACING_V2="true", LANGCHAIN_PROJECT）后，Langchain 会自动将所有这些内部事件（回调数据）发送到 LangSmith 服务器。

你不需要显式地添加回调处理器来与 LangSmith 集成（除非你想在发送到 LangSmith 之前 进行某些自定义本地处理）。LangSmith 内部就利用了这个事件系统来捕获所有运行的详细信息。

总结

Langchain 的事件/回调系统是其核心功能之一，提供了强大的可观测性和灵活性。

使用内置的 StdOutCallbackHandler 进行快速调试。
创建自定义的 BaseCallbackHandler 或 AsyncCallbackHandler 来实现特定的日志记录、监控或交互逻辑。
通过 config={"callbacks": [...]} 参数将处理器传递给 LLMs, Chains (特别是 LCEL), 和 Agents 的 invoke, stream, batch (以及对应的 ainvoke, astream, abatch) 方法。
利用 aevents() API 以异步流的方式消费事件。
理解 LangSmith 在后台利用此事件系统来提供丰富的追踪和调试体验。
