Appearance
LangChain表达式(LCEL)
1 定义
- LCEL的全称是LangChain Expression Language。其实他的用处就是使用“|”运算符链接LangChain应用的各个组件。
- LCEL是一种声明式的方法来链接LangChain组件。
- LCEL从第一天起就被设计为支持将原型投入生产,无需代码更改,从最简单的“提示词+ 大型语言模型”链到最复杂的链
2 LCEL的各种语法
- Runnable节点
- 节点调用、批量、流式运行
- 组合成chain
- 并行调用运行
- 合并输入和输出字典
- 后备选项
- 重复多次执行Runnable节点
- 条件构建chain
- map高阶处理
- 打印chain图形
- 生命周期管理
2.0 安装包
python
pip install langchain grandalf2.1 Runnable节点
把函数封装成一个组件节点
python
from langchain_core.runnables import RunnableLambda
# 定义函数
def test1(x: int):
return x + 10
# 把函数封装成一个组件节点
r1 = RunnableLambda(test1)
print(r1)
# RunnableLambda(test1)2.2 调用
python
from langchain_core.runnables import RunnableLambda
# 定义函数
def test1(x: int):
return x + 10
# 把函数封装成一个组件
r1 = RunnableLambda(test1)
# 1 单次调用
res = r1.invoke(4)
print(res) # 14
# 2 批量调用
res = r1.batch([4, 5])
print(res) # [14, 15]
# 3 流式调用
def test2(prompt: str):
for item in prompt.split(' '):
yield item
r2 = RunnableLambda(test2)
# 返回的res是一个生成器
res = r2.stream('This is a Dog.')
print(res)
# <generator object RunnableLambda.transform at 0x0000024265E9C260>
for chunk in res:
print(chunk)
'''
This
is
a
Dog.
'''2.3 组合链-chain-串行
python
from langchain_core.runnables import RunnableLambda
def test1(x: int):
return x + 10
# 4 组合链-chain
r1 = RunnableLambda(test1)
r2 = RunnableLambda(lambda x: x * 2)
# 串行
chain = r1 | r2
print(chain.invoke(2)) # 242.4 组合链-chain-并行
python
from langchain_core.runnables import RunnableLambda, RunnableParallel
def test1(x: int):
return x + 10
r1 = RunnableLambda(test1)
r2 = RunnableLambda(lambda x: x * 2)
# 5 并行调用运行
chain2 = RunnableParallel(r1 = r1, r2 = r2)
# max_concurrency 最大并发数
print(chain2.invoke(2, config={'max_concurrency': 1}))
# {'r1': 12, 'r2': 4}
# 扩展第一种:组合新链
new_chain = (chain | chain2)
res = new_chain.invoke(2)
print(res) # {'r1': 34, 'r2': 48}
# 扩展第二种:组合新链
new_chain = (chain | chain2)
# 打印链的图像描述
new_chain.get_graph().print_ascii()
res = new_chain.invoke(2)
print(res) # {'r1': 34, 'r2': 48}2.5 合并输入和输出字典
RunnablePassthrough: 允许传递输入数据,可以保持不变或添加额外的键。必须传入一个字典数据,还可以过滤
python
from langchain_core.runnables import RunnableLambda, RunnableParallel,RunnablePassthrough
# 合并输入,并处理中间数据
r1 = RunnableLambda(lambda x: {'key1': x})
r2 = RunnableLambda(lambda x: x['key1'] + 10)
# RunnablePassthrough.assign 将结果指定为字典(添加额外的键)
# new_key: 随意指定的,代表新输出的key
chain = r1 | RunnablePassthrough.assign(new_key=r2)
print(chain.invoke(2))
# {'key1': 2, 'new_key': 12}
# RunnablePassthrough() 不加参数,就表示结果保持不变
chain = r1 | RunnablePassthrough() | RunnablePassthrough.assign(new_key=r2)
print(chain.invoke(2))
# {'key1': 2, 'new_key': 12}
# 复杂示例,加入并行计算
chain = r1 | RunnableParallel(foo=RunnablePassthrough(), new_key=RunnablePassthrough.assign(key2=r2))
print(chain.invoke(2))
# {'foo': {'key1': 2}, 'new_key': {'key1': 2, 'key2': 12}}
# 加入过滤 pick
chain = r1 | RunnableParallel(foo=RunnablePassthrough(), new_key=RunnablePassthrough.assign(key2=r2)) | RunnablePassthrough().pick(['new_key'])
print(chain.invoke(2))
# {'new_key': {'key1': 2, 'key2': 12}}2.6 后备选项
后备选项:后备选项是一种可以在紧急情况下使用的替代方案
python
from langchain_core.runnables import RunnableLambda, RunnableParallel,RunnablePassthrough
def test1(x: int):
return x + 10
# 7 后备选项:后备选项是一种可以在紧急情况下使用的替代方案。
r1 = RunnableLambda(test1)
r2 = RunnableLambda(lambda x: int(x) + 20)
# 在加法计算中的后备选项
# 在r1报错的情况下,r2 是 r1的后备方案
chain = r1.with_fallbacks([r2])
res = chain.invoke('2')
print(res) # 222.7 重复多次执行Runnable节点
报错了才会重试
python
from langchain_core.runnables import RunnableLambda
# 8 重复多次执行Runnable节点
counter = 1
def test3(x):
global counter
counter += 1
print(f'执行了 {counter} 次')
return x / counter
r1 = RunnableLambda(test3).with_retry(stop_after_attempt=4)
print(r1.invoke(2))
# 执行了 2 次
# 1.02.8 根据条件构建动态的chain
python
from langchain_core.runnables import RunnableLambda, RunnableParallel,RunnablePassthrough
def test1(x: int):
return x + 10
# 9 根据条件构建动态的chain
r1 = RunnableLambda(test1)
r2 = RunnableLambda(lambda x: [x] * 2)
# 根据r1的输出结果,判断是否要执行r2 (判断本身也必须是一个节点)
chain = r1 | RunnableLambda(lambda x: r2 if x > 12 else RunnableLambda(lambda x: x))
res = chain.invoke(5)
print(res) # [15, 15]
res = chain.invoke(1)
print(res) # 112.9 生命周期管理
with_listeners 监听生命周期
python
import time
from langchain_core.runnables import RunnableLambda, RunnableParallel,RunnablePassthrough
from langchain_core.tracers import Run
# 生命周期管理
def test4(n: int):
time.sleep(n)
return n * 2
r1 = RunnableLambda(test4)
def on_start(run_obj: Run):
""" 当 r1节点启动的时候,自动调用"""
print('r1启动的时间:', run_obj.start_time)
def on_end(run_obj: Run):
""" 当 r1节点已经运行结束的时候,自动调用"""
print('r1结束的时间:', run_obj.end_time)
chain = r1.with_listeners(on_start=on_start, on_end=on_end)
res = chain.invoke(2)
print(res)3 LCEL案例
3.1 案例1-简单
提示词 > llm > 文本---提示词2 > llm > 评分
python
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import PromptTemplate
from langchain_core.runnables import RunnableLambda
from langchain_openai import ChatOpenAI
from env_utils import LOCAL_BASE_URL
# 提示词 > llm > 文本---提示词2 > llm > 评分
# 调用本地的私有化大模型
llm = ChatOpenAI(
model='qwen3-8b',
temperature=0.8,
api_key='xxx',
base_url=LOCAL_BASE_URL,
# qwen3特有的参数: enable_thinking 表示是否开启深度思考
extra_body={'chat_template_kwargs': {'enable_thinking: true'}}
)
prompt1 = PromptTemplate.from_template('给我写一篇关于{key_word}的{type},字数不超过{count}')
prompt2 = PromptTemplate.from_template('请简单评价一下这篇短文,如果总分是10分,请给这篇短文打分: {text_content}')
# 整个需求的第一段,组成一个chain
chain1 = prompt1 | llm | StrOutputParser()
# 第一种:问题-没有输出中间的text_content结果
# chain2 = {'text_content': chain1} | prompt2 | llm | StrOutputParser()
# 第二种:处理中间结果
def print_chian1(input):
print(input)
print('--' * 30)
return {'text_content': input}
chain2 = chain1 | RunnableLambda(print_chian1) | prompt2 | llm | StrOutputParser()
resp = chain2.invoke({'key_word': '青春', 'type': '散文', 'count': 400})
print(resp)3.2 案例2-复杂
用户输入选餐厅的要求 》 整理用户的需求 》 挑选3个餐厅 》 总结每个餐厅的推荐理由 》 预定位置
python
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI
from env_utils import LOCAL_BASE_URL
# 调用本地的私有化大模型
llm = ChatOpenAI(
model='qwen3-8b',
temperature=0.8,
api_key='xxx',
base_url=LOCAL_BASE_URL,
# qwen3特有的参数: enable_thinking 表示是否开启深度思考
extra_body={'chat_template_kwargs': {'enable_thinking: true'}}
)
# 步骤1:定义需求
gather_preferences_prompt = ChatPromptTemplate.from_template(
"用户输入了一些餐厅偏好: {input1}\n"
"请将用户的偏好总结为清晰的需求:"
)
# 步骤2:定义推荐理由
recommend_restaurants_prompt = ChatPromptTemplate.from_template(
"基于用户需求:{input2}\n"
"请推荐 3 家适合的餐厅,并说明推荐理由:"
)
# 步骤3:总结推荐内容供用户快速参考
summarize_recommendations_prompt = ChatPromptTemplate.from_template(
"以下是餐厅推荐和推荐理由:\n{input3}\n"
"请总结成 2-3 句话,供用户快速参考: "
)
chain = gather_preferences_prompt | llm | recommend_restaurants_prompt | llm |summarize_recommendations_prompt | llm | StrOutputParser()
resp = chain.invoke({'input1': '我喜欢安静的地方,有素食的餐厅更好,而且价格也不贵'})
print(resp)3.3 案例3-复杂
需求:用户会问到各种领域(数学,物理,历史等)的问题,根据不同的领域,定义不同的提示词模板。动态的选择合适的任务调度
python
from langchain_core.output_parsers import JsonOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnableLambda, RouterRunnable, RunnableSequence
from langchain_openai import ChatOpenAI
from env_utils import LOCAL_BASE_URL
# 需求:用户会问到各种领域(数学,物理,历史等)的问题,根据不同的领域,定义不同的提示词模板。动态的选择合适的任务调度
# 调用本地的私有化大模型
llm = ChatOpenAI(
model='qwen3-8b',
temperature=0.8,
api_key='xxx',
base_url=LOCAL_BASE_URL,
# qwen3特有的参数: enable_thinking 表示是否开启深度思考
extra_body={'chat_template_kwargs': {'enable_thinking: true'}}
)
# 定义物理任销模板
physics_template = ChatPromptTemplate.from_template(
"你是一位物理学教授,擅长用简洁易懂的方式回答物理问题。以下是问题内容: {input}"
)
# 定义数学任务模板
math_template = ChatPromptTemplate.from_template(
"你是一位数学家,擅长分步骤解决数学问题,并提供详细的解决过程。以下是问题内容: {input}"
)
# 定义历史任务模板
history_template = ChatPromptTemplate.from_template(
"你是一位历史学家,对历史事件和背景有深入研究。以下是问题内容: {input}"
)
# 定义计算机科学任务模板
computerscience_template = ChatPromptTemplate.from_template(
"你是一位计算机科学专家,擅长算法、数据结构和编程问题。以下是问题内容: {input}"
)
# 定义默认任务模版
default_template = ChatPromptTemplate.from_template(
"输入内容无法归类,请直接回答:{input}"
)
# 定义模版chain
default_chain = default_template | llm
physics_chain = physics_template | llm
math_chain = math_template | llm
history_chain = history_template | llm
computerscience_chain = computerscience_template | llm
# 动态路由的chain
def route(input):
"""根据大侯型第一次处理只有的输出来: 动态判断各种领域的任务"""
if '物理' in input['type']:
print('1号')
return {'key': 'physics', 'input': input['input']}
elif '数学' in input['type']:
print('2号')
return {'key': 'math', 'input': input['input']}
elif '历史' in input['type']:
print('3号')
return {'key': 'history', 'input': input['input']}
elif '计算机' in input['type']:
print('4号')
return {'key': 'computer_science', 'input': input['input']}
else:
print('5号')
return {'key': 'default', 'input': input['input']}
# 创建一个路由节点
route_runnable = RunnableLambda(route)
# 路由调度器 runnables: 所有各个领域对应的任务字典
router = RouterRunnable(runnables={
'physics': physics_chain,
'math': math_chain,
'history': history_chain,
'computer_science': computerscience_chain,
'default': default_chain,
})
# 第一个提示词模版
first_prompt = ChatPromptTemplate.from_template(
"不要回答下面用户的问题,只要根据用户的输入来判断分类,一共有[物理,历史,计算机,数学,其他]5种类别。\n\n \
用户的输入: {input} \n\n \
最后的输出包含分类的类别和用户输入的内容,输出格式为json. 其中,类别的key为type,用户输入内容的key为input"
)
chain1 = first_prompt | llm | JsonOutputParser()
# chain2 = chain1 | route_runnable | router
# 上下等价
chain2 = RunnableSequence(chain1, route_runnable, router)
inputs = [
{"input": "什么是罢体辐射?"}, # 物理问题
{"input": "计算 2+2 的结果。"}, # 数学问题
{"input": "介绍一次世界大战的背景。"}, # 历史问题
{"input": "如何实现快速排序算法?"} # 计算机科学问题
]
for inp in inputs:
result = chain2.invoke(inp)
print(f'问题:{inp} \n 回答:{result} \n')