链与LCEL

在之前的笔记中,我们已经能用LangChain调用LLM、构建提示词模板以及解析输出,其中偶尔会用到一个类似Bash管道符号的神秘|运算符,实际上,这是LangChain提供的LCEL(LangChain Expression Language)语法,它是LangChain中构建"链(Chain)"的推荐方式。LCEL的核心思想非常简单,将各个组件通过管道符|串联起来,前一个组件的输出自动成为下一个组件的输入,整个流程就能以非常简洁直观的形式表达出来。这篇笔记我们学习管道符号以及相关的接口和函数。

管道符与Runnable

LCEL能使用|连接组件,其实是因为LangChain框架中所有组件都派生自统一的Runnable基类,而Runnable重载了Python的__or__运算符,使得|的含义变为将左侧组件的输出传递给右侧组件,这和Linux Bash Shell中的管道符概念类似。LCEL中,|是一种简写形式,它等价于调用pipe()方法。

# 使用管道符(推荐写法)
chain = prompt_template | model | output_parser

# 等价的显式写法
chain = prompt_template.pipe(model).pipe(output_parser)

Runnable接口

所有Runnable对象都拥有同一套调用接口,主要方法如下:

  • invoke(input):阻塞式调用,输入一个值,返回一个结果。
  • stream(input):同步流式调用,逐步返回输出片段。
  • astream(input):异步流式调用,基于asyncio,适合高并发服务端场景。
  • batch(inputs):批量调用,传入输入列表,并发处理后返回结果列表。
  • abatch(inputs):批量调用的异步版本。

|连接多个Runnable后得到的链本身也是一个Runnable,因此也支持以上所有方法。

下面例子中,我们阻塞调用了一个链,类似例子之前已经多次出现过。

from langchain_core.prompts import PromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_openai import ChatOpenAI

model = ChatOpenAI(
    model="qwen3:30b-a3b",
    base_url="http://localhost:11434/v1/",
    api_key="dummy",
    temperature=1,
    top_p=1,
    max_tokens=16384,
    timeout=120,
    max_retries=6
)

prompt = PromptTemplate.from_template("用一句话介绍{topic}。")

chain = prompt | model | StrOutputParser()

result = chain.invoke({"topic": "LangChain"})
print(result)

RunnableLambda

有时我们需要在链中插入自定义的数据处理逻辑,例如格式化文本、过滤字段等,此时可以使用RunnableLambda将任意Python函数包装成Runnable对象,使其能够参与管道操作。

from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import PromptTemplate
from langchain_core.runnables import RunnableLambda
from langchain_openai import ChatOpenAI

model = ChatOpenAI(
    model="qwen3:30b-a3b",
    base_url="http://localhost:11434/v1/",
    api_key="dummy",
    temperature=1,
    top_p=1,
    max_tokens=16384,
    timeout=120,
    max_retries=6
)

prompt = PromptTemplate.from_template("Who are you?")


def to_uppercase(text: str) -> str:
    return text.upper()


uppercase_runnable = RunnableLambda(to_uppercase)

chain = prompt | model | StrOutputParser() | uppercase_runnable

result = chain.invoke({"topic": "LangChain"})
print(result)

RunnableLambda也支持直接传入lambda表达式,对于简单的单行逻辑来说更加简洁。

chain = prompt | model | StrOutputParser() | RunnableLambda(lambda x: x.strip())

RunnablePassthrough

RunnablePassthrough是一个“透传”组件,也可以说是个什么都不做的Runnable,它直接将输入原样传递给下一个组件。乍一看这似乎毫无意义,但在需要将原始输入和某些处理结果同时传递给下游组件时,它就非常有用了。

下面例子代码中,会输出originalmodified两个结果。

from langchain_core.runnables import RunnablePassthrough, RunnableParallel

runnable = RunnableParallel(
    original=RunnablePassthrough(),
    modified=lambda x: x["num"] + 1,
)

result = runnable.invoke({"num": 5})
print(result)

RunnablePassthrough还有一个assign()方法,可以在透传原始输入的同时,向输出字典中追加新的字段,这在RAG管道中有时会用到。下面例子中,我们向输出字典追加了word_count字段。

from langchain_core.runnables import RunnablePassthrough

chain = RunnablePassthrough.assign(
    word_count=lambda inputs: len(inputs["text"].split())
)

result = chain.invoke({"text": "Hello world this is LangChain"})
print(result)

RunnableParallel

RunnableParallel允许我们并发运行多个Runnable,将同一份输入分发给多个分支,各分支并发执行,最终将各自的输出汇总到一个字典中返回,字典的键就是我们为每个分支指定的名称。

from langchain_core.prompts import PromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnableParallel
from langchain_openai import ChatOpenAI

model = ChatOpenAI(
    model="qwen3:30b-a3b",
    base_url="http://localhost:11434/v1/",
    api_key="dummy",
    temperature=1,
    top_p=1,
    max_tokens=16384,
    timeout=120,
    max_retries=6
)

chain_summary = (
        PromptTemplate.from_template("用一句话总结{country}。")
        | model
        | StrOutputParser()
)

chain_capital = (
        PromptTemplate.from_template("{country}的首都是哪里?")
        | model
        | StrOutputParser()
)

parallel_chain = RunnableParallel(
    summary=chain_summary,
    capital=chain_capital,
)

result = parallel_chain.invoke({"country": "美国"})
print(result["summary"])
print(result["capital"])

RunnableParallel会自动并发执行所有分支,相比串行执行能显著降低整体延迟。在构建RAG应用时,这个特性非常实用,例如同时从多个数据源检索上下文。

RunnableBranch

RunnableBranch的作用类似于if-elif-else条件判断,它根据条件将输入路由到不同的Runnable分支。构造时传入一组(条件函数, Runnable)元组,最后还需要加上一个不带条件的默认分支。执行时,LangChain会依次评估条件函数,第一个返回True的分支会被执行,如果所有条件都不满足则执行默认分支。

from langchain_core.runnables import RunnableBranch, RunnableLambda

branch = RunnableBranch(
    (lambda x: "你好" in x, RunnableLambda(lambda x: "检测到中文问候!")),
    (lambda x: "hello" in x.lower(), RunnableLambda(lambda x: "Detected English greeting!")),
    RunnableLambda(lambda x: f"未知输入:{x}")
)

print(branch.invoke("你好,LangChain"))
print(branch.invoke("Hello, LangChain"))
print(branch.invoke("Bonjour"))

在实际开发中,RunnableBranch常用于根据用户输入的语言、意图或其他特征,将请求路由到不同的提示词模板或处理逻辑。

作者:Gacfox
版权声明:本网站为非盈利性质,文章如非特殊说明均为原创,版权遵循知识共享协议CC BY-NC-ND 4.0进行授权,转载必须署名,禁止用于商业目的或演绎修改后转载。