Skip to content

LangChain表达式(LCEL)

1 定义

  • LCEL的全称是LangChain Expression Language。其实他的用处就是使用“|”运算符链接LangChain应用的各个组件。
  • LCEL是一种声明式的方法来链接LangChain组件。
  • LCEL从第一天起就被设计为支持将原型投入生产,无需代码更改,从最简单的“提示词+ 大型语言模型”链到最复杂的链

2 LCEL的各种语法

  • Runnable节点
  • 节点调用、批量、流式运行
  • 组合成chain
  • 并行调用运行
  • 合并输入和输出字典
  • 后备选项
  • 重复多次执行Runnable节点
  • 条件构建chain
  • map高阶处理
  • 打印chain图形
  • 生命周期管理

2.0 安装包

python
pip install langchain grandalf

2.1 Runnable节点

把函数封装成一个组件节点

python
from langchain_core.runnables import RunnableLambda

# 定义函数
def test1(x: int):
    return x + 10

# 把函数封装成一个组件节点
r1 = RunnableLambda(test1)
print(r1)
# RunnableLambda(test1)

2.2 调用

python
from langchain_core.runnables import RunnableLambda

# 定义函数
def test1(x: int):
    return x + 10

# 把函数封装成一个组件
r1 = RunnableLambda(test1)

# 1 单次调用
res = r1.invoke(4)
print(res) # 14


# 2 批量调用
res = r1.batch([4, 5])
print(res) # [14, 15]


# 3 流式调用
def test2(prompt: str):
    for item in prompt.split(' '):
        yield item

r2 = RunnableLambda(test2)
# 返回的res是一个生成器
res = r2.stream('This is a Dog.')
print(res) 
# <generator object RunnableLambda.transform at 0x0000024265E9C260>

for chunk in res:
    print(chunk)

'''
This
is
a
Dog.
'''

2.3 组合链-chain-串行

python
from langchain_core.runnables import RunnableLambda


def test1(x: int):
    return x + 10

# 4 组合链-chain
r1 = RunnableLambda(test1)
r2 = RunnableLambda(lambda x: x * 2)

# 串行
chain = r1 | r2
print(chain.invoke(2)) # 24

2.4 组合链-chain-并行

python
from langchain_core.runnables import RunnableLambda, RunnableParallel

def test1(x: int):
    return x + 10

r1 = RunnableLambda(test1)
r2 = RunnableLambda(lambda x: x * 2)


# 5 并行调用运行
chain2 = RunnableParallel(r1 = r1, r2 = r2)
# max_concurrency 最大并发数
print(chain2.invoke(2, config={'max_concurrency': 1}))
# {'r1': 12, 'r2': 4}

# 扩展第一种:组合新链
new_chain = (chain | chain2)
res = new_chain.invoke(2)
print(res) # {'r1': 34, 'r2': 48}

# 扩展第二种:组合新链
new_chain = (chain | chain2)
# 打印链的图像描述
new_chain.get_graph().print_ascii()
res = new_chain.invoke(2)
print(res) # {'r1': 34, 'r2': 48}

2.5 合并输入和输出字典

RunnablePassthrough: 允许传递输入数据,可以保持不变或添加额外的键。必须传入一个字典数据,还可以过滤

python
from langchain_core.runnables import RunnableLambda, RunnableParallel,RunnablePassthrough

# 合并输入,并处理中间数据
r1 = RunnableLambda(lambda x: {'key1': x})
r2 = RunnableLambda(lambda x: x['key1'] + 10)

# RunnablePassthrough.assign 将结果指定为字典(添加额外的键)
# new_key: 随意指定的,代表新输出的key
chain = r1 | RunnablePassthrough.assign(new_key=r2)
print(chain.invoke(2))
# {'key1': 2, 'new_key': 12}


# RunnablePassthrough() 不加参数,就表示结果保持不变
chain = r1 | RunnablePassthrough() | RunnablePassthrough.assign(new_key=r2)
print(chain.invoke(2))
# {'key1': 2, 'new_key': 12}


# 复杂示例,加入并行计算
chain = r1 | RunnableParallel(foo=RunnablePassthrough(), new_key=RunnablePassthrough.assign(key2=r2))
print(chain.invoke(2))
# {'foo': {'key1': 2}, 'new_key': {'key1': 2, 'key2': 12}}

# 加入过滤 pick
chain = r1 | RunnableParallel(foo=RunnablePassthrough(), new_key=RunnablePassthrough.assign(key2=r2)) | RunnablePassthrough().pick(['new_key'])
print(chain.invoke(2))
# {'new_key': {'key1': 2, 'key2': 12}}

2.6 后备选项

后备选项:后备选项是一种可以在紧急情况下使用的替代方案

python
from langchain_core.runnables import RunnableLambda, RunnableParallel,RunnablePassthrough

def test1(x: int):
    return x + 10

# 7 后备选项:后备选项是一种可以在紧急情况下使用的替代方案。
r1 = RunnableLambda(test1)
r2 = RunnableLambda(lambda x: int(x) + 20)

# 在加法计算中的后备选项
# 在r1报错的情况下,r2 是 r1的后备方案
chain = r1.with_fallbacks([r2])
res = chain.invoke('2')
print(res) # 22

2.7 重复多次执行Runnable节点

报错了才会重试

python
from langchain_core.runnables import RunnableLambda

# 8 重复多次执行Runnable节点
counter = 1

def test3(x):
    global counter
    counter += 1
    print(f'执行了 {counter} 次')
    return x / counter

r1 = RunnableLambda(test3).with_retry(stop_after_attempt=4)
print(r1.invoke(2))
# 执行了 2 次
# 1.0

2.8 根据条件构建动态的chain

python
from langchain_core.runnables import RunnableLambda, RunnableParallel,RunnablePassthrough

def test1(x: int):
    return x + 10

# 9 根据条件构建动态的chain
r1 = RunnableLambda(test1)
r2 = RunnableLambda(lambda x: [x] * 2)


# 根据r1的输出结果,判断是否要执行r2 (判断本身也必须是一个节点)
chain = r1 | RunnableLambda(lambda x: r2 if x > 12 else RunnableLambda(lambda x: x))
res = chain.invoke(5)
print(res) # [15, 15]

res = chain.invoke(1)
print(res) # 11

2.9 生命周期管理

with_listeners 监听生命周期

python
import time
from langchain_core.runnables import RunnableLambda, RunnableParallel,RunnablePassthrough
from langchain_core.tracers import Run

# 生命周期管理

def test4(n: int):
    time.sleep(n)
    return n * 2

r1 = RunnableLambda(test4)

def on_start(run_obj: Run):
    """ 当 r1节点启动的时候,自动调用"""
    print('r1启动的时间:', run_obj.start_time)

def on_end(run_obj: Run):
    """ 当 r1节点已经运行结束的时候,自动调用"""
    print('r1结束的时间:', run_obj.end_time)

chain = r1.with_listeners(on_start=on_start, on_end=on_end)
res = chain.invoke(2)
print(res)

3 LCEL案例

3.1 案例1-简单

提示词 > llm > 文本---提示词2 > llm > 评分

python
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import PromptTemplate
from langchain_core.runnables import RunnableLambda
from langchain_openai import ChatOpenAI

from env_utils import LOCAL_BASE_URL

#  提示词  > llm > 文本---提示词2 > llm > 评分

# 调用本地的私有化大模型
llm = ChatOpenAI(
    model='qwen3-8b',
    temperature=0.8,
    api_key='xxx',
    base_url=LOCAL_BASE_URL,

    # qwen3特有的参数: enable_thinking 表示是否开启深度思考
    extra_body={'chat_template_kwargs': {'enable_thinking: true'}}
)

prompt1 = PromptTemplate.from_template('给我写一篇关于{key_word}{type},字数不超过{count}')

prompt2 = PromptTemplate.from_template('请简单评价一下这篇短文,如果总分是10分,请给这篇短文打分: {text_content}')


# 整个需求的第一段,组成一个chain
chain1 = prompt1 | llm | StrOutputParser()

# 第一种:问题-没有输出中间的text_content结果
# chain2 = {'text_content': chain1} | prompt2 | llm | StrOutputParser()

# 第二种:处理中间结果
def print_chian1(input):
    print(input)
    print('--' * 30)
    return {'text_content': input}

chain2 = chain1 | RunnableLambda(print_chian1) | prompt2 | llm | StrOutputParser()

resp = chain2.invoke({'key_word': '青春', 'type': '散文', 'count': 400})
print(resp)

3.2 案例2-复杂

用户输入选餐厅的要求 》 整理用户的需求 》 挑选3个餐厅 》 总结每个餐厅的推荐理由 》 预定位置

python
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import  ChatPromptTemplate
from langchain_openai import ChatOpenAI

from env_utils import LOCAL_BASE_URL

# 调用本地的私有化大模型
llm = ChatOpenAI(
    model='qwen3-8b',
    temperature=0.8,
    api_key='xxx',
    base_url=LOCAL_BASE_URL,

    # qwen3特有的参数: enable_thinking 表示是否开启深度思考
    extra_body={'chat_template_kwargs': {'enable_thinking: true'}}
)

# 步骤1:定义需求
gather_preferences_prompt = ChatPromptTemplate.from_template(
    "用户输入了一些餐厅偏好: {input1}\n"
    "请将用户的偏好总结为清晰的需求:"
)

# 步骤2:定义推荐理由
recommend_restaurants_prompt = ChatPromptTemplate.from_template(
    "基于用户需求:{input2}\n"
    "请推荐 3 家适合的餐厅,并说明推荐理由:"
)

# 步骤3:总结推荐内容供用户快速参考
summarize_recommendations_prompt = ChatPromptTemplate.from_template(
    "以下是餐厅推荐和推荐理由:\n{input3}\n"
    "请总结成 2-3 句话,供用户快速参考: "
)

chain = gather_preferences_prompt | llm | recommend_restaurants_prompt | llm |summarize_recommendations_prompt | llm | StrOutputParser()

resp = chain.invoke({'input1': '我喜欢安静的地方,有素食的餐厅更好,而且价格也不贵'})
print(resp)

3.3 案例3-复杂

需求:用户会问到各种领域(数学,物理,历史等)的问题,根据不同的领域,定义不同的提示词模板。动态的选择合适的任务调度

python
from langchain_core.output_parsers import JsonOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnableLambda, RouterRunnable, RunnableSequence
from langchain_openai import ChatOpenAI

from env_utils import LOCAL_BASE_URL

# 需求:用户会问到各种领域(数学,物理,历史等)的问题,根据不同的领域,定义不同的提示词模板。动态的选择合适的任务调度


# 调用本地的私有化大模型
llm = ChatOpenAI(
    model='qwen3-8b',
    temperature=0.8,
    api_key='xxx',
    base_url=LOCAL_BASE_URL,

    # qwen3特有的参数: enable_thinking 表示是否开启深度思考
    extra_body={'chat_template_kwargs': {'enable_thinking: true'}}
)

# 定义物理任销模板
physics_template = ChatPromptTemplate.from_template(
    "你是一位物理学教授,擅长用简洁易懂的方式回答物理问题。以下是问题内容: {input}"
)
# 定义数学任务模板
math_template = ChatPromptTemplate.from_template(
    "你是一位数学家,擅长分步骤解决数学问题,并提供详细的解决过程。以下是问题内容: {input}"
)

# 定义历史任务模板
history_template = ChatPromptTemplate.from_template(
    "你是一位历史学家,对历史事件和背景有深入研究。以下是问题内容: {input}"
)
# 定义计算机科学任务模板
computerscience_template = ChatPromptTemplate.from_template(
    "你是一位计算机科学专家,擅长算法、数据结构和编程问题。以下是问题内容: {input}"
)

# 定义默认任务模版
default_template = ChatPromptTemplate.from_template(
    "输入内容无法归类,请直接回答:{input}"
)

# 定义模版chain
default_chain = default_template | llm
physics_chain = physics_template | llm
math_chain = math_template | llm
history_chain = history_template | llm
computerscience_chain = computerscience_template | llm


# 动态路由的chain
def route(input):
    """根据大侯型第一次处理只有的输出来: 动态判断各种领域的任务"""
    if '物理' in input['type']:
        print('1号')
        return {'key': 'physics', 'input': input['input']}
    elif '数学' in input['type']:
        print('2号')
        return {'key': 'math', 'input': input['input']}
    elif '历史' in input['type']:
        print('3号')
        return {'key': 'history', 'input': input['input']}
    elif '计算机' in input['type']:
        print('4号')
        return {'key': 'computer_science', 'input': input['input']}
    else:
        print('5号')
        return {'key': 'default', 'input': input['input']}


# 创建一个路由节点
route_runnable = RunnableLambda(route)

# 路由调度器  runnables: 所有各个领域对应的任务字典
router = RouterRunnable(runnables={
    'physics': physics_chain,
    'math': math_chain,
    'history': history_chain,
    'computer_science': computerscience_chain,
    'default': default_chain,
})

# 第一个提示词模版
first_prompt = ChatPromptTemplate.from_template(
    "不要回答下面用户的问题,只要根据用户的输入来判断分类,一共有[物理,历史,计算机,数学,其他]5种类别。\n\n \
    用户的输入: {input} \n\n \
    最后的输出包含分类的类别和用户输入的内容,输出格式为json. 其中,类别的key为type,用户输入内容的key为input"
)

chain1 = first_prompt | llm | JsonOutputParser()

# chain2 = chain1 | route_runnable | router
# 上下等价
chain2 = RunnableSequence(chain1, route_runnable, router)

inputs = [
    {"input": "什么是罢体辐射?"},  # 物理问题
    {"input": "计算 2+2 的结果。"},  # 数学问题
    {"input": "介绍一次世界大战的背景。"},  # 历史问题
    {"input": "如何实现快速排序算法?"}  # 计算机科学问题
]

for inp in inputs:
    result = chain2.invoke(inp)
    print(f'问题:{inp} \n 回答:{result} \n')

Released under the MIT License.