链与LCEL
在之前的笔记中,我们已经能用LangChain调用LLM、构建提示词模板以及解析输出,其中偶尔会用到一个类似Bash管道符号的神秘|运算符,实际上,这是LangChain提供的LCEL(LangChain Expression Language)语法,它是LangChain中构建"链(Chain)"的推荐方式。LCEL的核心思想非常简单,将各个组件通过管道符|串联起来,前一个组件的输出自动成为下一个组件的输入,整个流程就能以非常简洁直观的形式表达出来。这篇笔记我们学习管道符号以及相关的接口和函数。
管道符与Runnable
LCEL能使用|连接组件,其实是因为LangChain框架中所有组件都派生自统一的Runnable基类,而Runnable重载了Python的__or__运算符,使得|的含义变为将左侧组件的输出传递给右侧组件,这和Linux Bash Shell中的管道符概念类似。LCEL中,|是一种简写形式,它等价于调用pipe()方法。
# 使用管道符(推荐写法)
chain = prompt_template | model | output_parser
# 等价的显式写法
chain = prompt_template.pipe(model).pipe(output_parser)
Runnable接口
所有Runnable对象都拥有同一套调用接口,主要方法如下:
invoke(input):阻塞式调用,输入一个值,返回一个结果。stream(input):同步流式调用,逐步返回输出片段。astream(input):异步流式调用,基于asyncio,适合高并发服务端场景。batch(inputs):批量调用,传入输入列表,并发处理后返回结果列表。abatch(inputs):批量调用的异步版本。
用|连接多个Runnable后得到的链本身也是一个Runnable,因此也支持以上所有方法。
下面例子中,我们阻塞调用了一个链,类似例子之前已经多次出现过。
from langchain_core.prompts import PromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_openai import ChatOpenAI
model = ChatOpenAI(
model="qwen3:30b-a3b",
base_url="http://localhost:11434/v1/",
api_key="dummy",
temperature=1,
top_p=1,
max_tokens=16384,
timeout=120,
max_retries=6
)
prompt = PromptTemplate.from_template("用一句话介绍{topic}。")
chain = prompt | model | StrOutputParser()
result = chain.invoke({"topic": "LangChain"})
print(result)
RunnableLambda
有时我们需要在链中插入自定义的数据处理逻辑,例如格式化文本、过滤字段等,此时可以使用RunnableLambda将任意Python函数包装成Runnable对象,使其能够参与管道操作。
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import PromptTemplate
from langchain_core.runnables import RunnableLambda
from langchain_openai import ChatOpenAI
model = ChatOpenAI(
model="qwen3:30b-a3b",
base_url="http://localhost:11434/v1/",
api_key="dummy",
temperature=1,
top_p=1,
max_tokens=16384,
timeout=120,
max_retries=6
)
prompt = PromptTemplate.from_template("Who are you?")
def to_uppercase(text: str) -> str:
return text.upper()
uppercase_runnable = RunnableLambda(to_uppercase)
chain = prompt | model | StrOutputParser() | uppercase_runnable
result = chain.invoke({"topic": "LangChain"})
print(result)
RunnableLambda也支持直接传入lambda表达式,对于简单的单行逻辑来说更加简洁。
chain = prompt | model | StrOutputParser() | RunnableLambda(lambda x: x.strip())
RunnablePassthrough
RunnablePassthrough是一个“透传”组件,也可以说是个什么都不做的Runnable,它直接将输入原样传递给下一个组件。乍一看这似乎毫无意义,但在需要将原始输入和某些处理结果同时传递给下游组件时,它就非常有用了。
下面例子代码中,会输出original和modified两个结果。
from langchain_core.runnables import RunnablePassthrough, RunnableParallel
runnable = RunnableParallel(
original=RunnablePassthrough(),
modified=lambda x: x["num"] + 1,
)
result = runnable.invoke({"num": 5})
print(result)
RunnablePassthrough还有一个assign()方法,可以在透传原始输入的同时,向输出字典中追加新的字段,这在RAG管道中有时会用到。下面例子中,我们向输出字典追加了word_count字段。
from langchain_core.runnables import RunnablePassthrough
chain = RunnablePassthrough.assign(
word_count=lambda inputs: len(inputs["text"].split())
)
result = chain.invoke({"text": "Hello world this is LangChain"})
print(result)
RunnableParallel
RunnableParallel允许我们并发运行多个Runnable,将同一份输入分发给多个分支,各分支并发执行,最终将各自的输出汇总到一个字典中返回,字典的键就是我们为每个分支指定的名称。
from langchain_core.prompts import PromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnableParallel
from langchain_openai import ChatOpenAI
model = ChatOpenAI(
model="qwen3:30b-a3b",
base_url="http://localhost:11434/v1/",
api_key="dummy",
temperature=1,
top_p=1,
max_tokens=16384,
timeout=120,
max_retries=6
)
chain_summary = (
PromptTemplate.from_template("用一句话总结{country}。")
| model
| StrOutputParser()
)
chain_capital = (
PromptTemplate.from_template("{country}的首都是哪里?")
| model
| StrOutputParser()
)
parallel_chain = RunnableParallel(
summary=chain_summary,
capital=chain_capital,
)
result = parallel_chain.invoke({"country": "美国"})
print(result["summary"])
print(result["capital"])
RunnableParallel会自动并发执行所有分支,相比串行执行能显著降低整体延迟。在构建RAG应用时,这个特性非常实用,例如同时从多个数据源检索上下文。
RunnableBranch
RunnableBranch的作用类似于if-elif-else条件判断,它根据条件将输入路由到不同的Runnable分支。构造时传入一组(条件函数, Runnable)元组,最后还需要加上一个不带条件的默认分支。执行时,LangChain会依次评估条件函数,第一个返回True的分支会被执行,如果所有条件都不满足则执行默认分支。
from langchain_core.runnables import RunnableBranch, RunnableLambda
branch = RunnableBranch(
(lambda x: "你好" in x, RunnableLambda(lambda x: "检测到中文问候!")),
(lambda x: "hello" in x.lower(), RunnableLambda(lambda x: "Detected English greeting!")),
RunnableLambda(lambda x: f"未知输入:{x}")
)
print(branch.invoke("你好,LangChain"))
print(branch.invoke("Hello, LangChain"))
print(branch.invoke("Bonjour"))
在实际开发中,RunnableBranch常用于根据用户输入的语言、意图或其他特征,将请求路由到不同的提示词模板或处理逻辑。