简介

模糊测试框架 boofuzz 基于现已不再更新维护的 Sulley 框架开发,是一个基于生成的模糊测试框架。除了修复大量的错误外,boofuzz 还基于 Sulley 框架的数据生成方式更新了一系列功能,使得数据生成简单方便,同时增强了系统的兼容性和可扩展性。boofuzz 的主要特性在于最新且仍在更新和维护、安装简单、支持任意通信媒介、数据生成速度快、可扩展性强、可以实时监测并记录测试数据,因此 boofuzz 是一个理想的模糊测试工具。

boofuzz样例

以一个 TFTP Fuzzer 为例,大概看一下 boofuzz 的用法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
from boofuzz import *
import time

def main():
session = Session(sleep_time=1,
target=Target(connection=SocketConnection("127.0.0.1",69,proto="udp")))
s_initialize("write")
s_static("\\x00\\x02")
s_string("filename")
s_static("\\x00")
s_static("netascii")
s_static("\\x00")

session.connect(s_get('write'))
session.fuzz()

if __name__ == '__main__':
main()

在源码中可以看到 SocketConnection 会在未来版本中移除,现在应该使用 BaseSocketConnection

1
2
3
4
5
warnings.warn(
"SocketConnection is deprecated and will be removed in a future version of Boofuzz. "
"Use the classes derived from BaseSocketConnection instead.",
FutureWarning,
)

boofuzz 依次从连接(Connection)、目标(Target)和会话(Session)三部分来创建模糊器,其模糊测试的流程如图所示:

Connection

顾名思义,Connection 对象是网络连接的代表,boofuzz 支持各种基于 socket 的连接,从源码部分就可以看到,常用的还是 TCP 和 UDP,这里还支持网络协议栈中2层和3层的原生 socket。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
def SocketConnection(
host,
port=None,
proto="tcp",
bind=None,
send_timeout=5.0,
recv_timeout=5.0,
ethernet_proto=None,
l2_dst=b"\xFF" * 6,
udp_broadcast=False,
server=False,
sslcontext=None,
server_hostname=None,
):


warnings.warn(
"SocketConnection is deprecated and will be removed in a future version of Boofuzz. "
"Use the classes derived from BaseSocketConnection instead.",
FutureWarning,
)
if proto not in _PROTOCOLS:
raise exception.SullyRuntimeError("INVALID PROTOCOL SPECIFIED: %s" % proto)

if proto in _PROTOCOLS_PORT_REQUIRED and port is None:
raise ValueError("__init__() argument port required for protocol {0}".format(proto))

if proto == "udp":
return udp_socket_connection.UDPSocketConnection(
host, port, send_timeout, recv_timeout, server, bind, udp_broadcast
)
elif proto == "tcp":
return tcp_socket_connection.TCPSocketConnection(host, port, send_timeout, recv_timeout, server)
elif proto == "ssl":
return ssl_socket_connection.SSLSocketConnection(
host, port, send_timeout, recv_timeout, server, sslcontext, server_hostname
)
elif proto == "raw-l2":
return raw_l2_socket_connection.RawL2SocketConnection(host, send_timeout, recv_timeout)
elif proto == "raw-l3":
if ethernet_proto is None:
ethernet_proto = raw_l3_socket_connection.ETH_P_IP

return raw_l3_socket_connection.RawL3SocketConnection(host, send_timeout, recv_timeout, ethernet_proto, l2_dst)

SocketConnection 函数中根据我们传入的 proto 参数来调用响应 Connection 类的构造函数。

TCPSocketConnection 为例,其继承自 BaseSocketConnection

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
class TCPSocketConnection(base_socket_connection.BaseSocketConnection):
"""BaseSocketConnection implementation for use with TCP Sockets.

.. versionadded:: 0.2.0

Args:
host (str): Hostname or IP adress of target system.
port (int): Port of target service.
send_timeout (float): Seconds to wait for send before timing out. Default 5.0.
recv_timeout (float): Seconds to wait for recv before timing out. Default 5.0.
server (bool): Set to True to enable server side fuzzing.

"""

def __init__(self, host, port, send_timeout=5.0, recv_timeout=5.0, server=False):
super(TCPSocketConnection, self).__init__(send_timeout, recv_timeout)

self.host = host
self.port = port
self.server = server
self._serverSock = None

BaseSocketConnection 是一个继承了 itarget_connection.ITargetConnection 接口的抽象基类。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
class BaseSocketConnection(with_metaclass(abc.ABCMeta, itarget_connection.ITargetConnection)):
"""This class serves as a base for a number of Connections over sockets.

.. versionadded:: 0.2.0

Args:
send_timeout (float): Seconds to wait for send before timing out. Default 5.0.
recv_timeout (float): Seconds to wait for recv before timing out. Default 5.0.
"""

def __init__(self, send_timeout, recv_timeout):
self._send_timeout = send_timeout
self._recv_timeout = recv_timeout

self._sock = None

def close(self):
"""
Close connection to the target.

Returns:
None
"""
self._sock.close()

@abc.abstractmethod
def open(self):
"""
Opens connection to the target. Make sure to call close!

Returns:
None
"""
self._sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDTIMEO, _seconds_to_sockopt_format(self._send_timeout))
self._sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVTIMEO, _seconds_to_sockopt_format(self._recv_timeout))

python元类编程

抽象类和接口都是面向对象里面的概念,抽象类是指一类不可直接实例化,只可被继承的类,接口则定义了继承接口的类必须实现的方法。python 是没有这两个概念相关的关键字的,在 python 中,抽象类是以抽象基类的方式实现的(Abstract Base Classes, ABC)。

ABC中提供了 @abstractmethod 装饰器来指定抽象方法,下面代码中定义了一个抽象类C,并且定义了三个抽象方法,D类则是继承抽象类C然后实现了他的方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import abc

class C(metaclass=abc.ABCMeta):
@property
@abc.abstractmethod
def a(self): pass

@classmethod
@abc.abstractmethod
def clsa(cls): pass

@staticmethod
@abc.abstractmethod
def stca(): pass

class D(C):
@property
def a(self):
print('property: a')

@classmethod
def clsa(cls):
print('classmethod clsa')

@staticmethod
def stca():
print('staticmethod stca')

d = D()
d.a
# property: a
d.clsa()
# classmethod clsa
d.stca()
# staticmethod stca

这里 BaseSocketConnection 的定义中用到了 with_metaclass 来创建这个类。

Python Metaclass : Understanding the ‘with_metaclass()’ - Stack Overflow

这里引入 with_metaclass 是为了兼容 python2 和 python3,在我的 python3.8 上 with_metaclass 如下:

1
2
3
4
5
6
7
8
9
def with_metaclass(meta, *bases):
class metaclass(meta):
__call__ = type.__call__
__init__ = type.__init__
def __new__(cls, name, this_bases, d):
if this_bases is None:
return type.__new__(cls, name, (), d)
return meta(name, bases, d)
return metaclass('temporary_class', None, {})

根据 BaseSocketConnection 传入的参数,这里 meta 是 ABCmeta,bases 是 ITargetConnection,这里是定义了一个临时元类 metaclass 继承自 ABCmeta,并重写了其 new 方法,这样下面 return 时,就会依次调用其 new 和 init 方法来新建一个对象,而元类创建的是一个类,因此结果是一个基类为 bases 的抽象类,然后 BaseSocketConnection 就继承自它。这里具体的调试过程没搞明白。

总之这里 BaseSocketConnection 继承了 ITargetConnection 的接口,并且定义了一些基本方法,如 open、send、recv、close 以及关于延时的变量等。

TCPSocketConnection

具体的 TCPSocket 的实现。

open

1
2
3
def open(self):
self._open_socket()
self._connect_socket()

open_socket函数

就是创建一个 socket。

1
2
3
4
5
def _open_socket(self):
self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

# call superclass to set timeout sockopt
super(TCPSocketConnection, self).open()

connect_socket函数

这里可以看到 boofuzz 支持 server 被连接的模式,server 只会接受一个连接。如果是多连接的场景,需要自己修改对应逻辑,主动连接同理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
def _connect_socket(self):
#server模式
if self.server:
self._sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
try:
self._sock.bind((self.host, self.port))
except socket.error as e:
if e.errno == errno.EADDRINUSE:
raise exception.BoofuzzOutOfAvailableSockets()
else:
raise

self._serverSock = self._sock
try:
#只会接受一个连接
self._serverSock.listen(1)
self._sock, addr = self._serverSock.accept()
except socket.error as e:
# When connection timeout expires, tear down the server socket so we can re-open it again after
# restarting the target.
self.close()
if e.errno in [errno.EAGAIN]:
raise exception.BoofuzzTargetConnectionFailedError(str(e))
else:
raise
#主动连接
else:
try:
self._sock.connect((self.host, self.port))
except socket.error as e:
if e.errno == errno.EADDRINUSE:
raise exception.BoofuzzOutOfAvailableSockets()
elif e.errno in [errno.ECONNREFUSED, errno.EINPROGRESS, errno.ETIMEDOUT]:
raise exception.BoofuzzTargetConnectionFailedError(str(e))
else:
raise

send

向目标发送数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
def send(self, data):
num_sent = 0

try:
num_sent = self._sock.send(data)
except socket.error as e:
if e.errno == errno.ECONNABORTED:
raise_(
exception.BoofuzzTargetConnectionAborted(socket_errno=e.errno, socket_errmsg=e.strerror),
None,
sys.exc_info()[2],
)
elif e.errno in [errno.ECONNRESET, errno.ENETRESET, errno.ETIMEDOUT, errno.EPIPE]:
raise_(exception.BoofuzzTargetConnectionReset(), None, sys.exc_info()[2])
else:
raise

return num_sent

recv

这里可以设定最大接受字节数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
def recv(self, max_bytes):
data = b""

try:
data = self._sock.recv(max_bytes)
except socket.timeout:
data = b""
except socket.error as e:
if e.errno == errno.ECONNABORTED:
raise_(
exception.BoofuzzTargetConnectionAborted(socket_errno=e.errno, socket_errmsg=e.strerror),
None,
sys.exc_info()[2],
)
elif (e.errno == errno.ECONNRESET) or (e.errno == errno.ENETRESET) or (e.errno == errno.ETIMEDOUT):
raise_(exception.BoofuzzTargetConnectionReset(), None, sys.exc_info()[2])
elif e.errno == errno.EWOULDBLOCK: # timeout condition if using SO_RCVTIMEO or SO_SNDTIMEO
data = b""
else:
raise

return data

可以看出 Connection 这层就已经实现了连接的建立以及数据的收发相关的功能。

Target

1
2
3
4
5
6
class Target(object):
"""Target descriptor container.

Takes an ITargetConnection and wraps send/recv with appropriate
FuzzDataLogger calls.
"""

Target 注释部分也说了,Target 对象主要是在Connection的接口上 wrap 上 log。可以看到 Target 的 send 函数,除了加的 repeater 功能,基本上就是添加了 log。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
def open(self):
"""
Opens connection to the target. Make sure to call close!

:return: None
"""
self._fuzz_data_logger.log_info("Opening target connection ({0})...".format(self._target_connection.info))
self._target_connection.open()
self._fuzz_data_logger.log_info("Connection opened.")

def send(self, data):
"""
Send data to the target. Only valid after calling open!

Args:
data: Data to send.

Returns:
None
"""
num_sent = 0
if self._fuzz_data_logger is not None:
repeat = ""
if self.repeater is not None:
repeat = ", " + self.repeater.log_message()

self._fuzz_data_logger.log_info("Sending {0} bytes{1}...".format(len(data), repeat))

if self.repeater is not None:
self.repeater.start()
while self.repeater.repeat():
num_sent = self._target_connection.send(data=data)
self.repeater.reset()
else:
num_sent = self._target_connection.send(data=data)

if self._fuzz_data_logger is not None:
self._fuzz_data_logger.log_send(data[:num_sent])

另外 Target 中还有一些 Monitor 的初始化工作,后面再说,Target 中提供了设置 Logger 的接口函数。

1
2
3
4
5
6
7
8
9
10
def set_fuzz_data_logger(self, fuzz_data_logger):
"""
Set this object's fuzz data logger -- for sent and received fuzz data.

:param fuzz_data_logger: New logger.
:type fuzz_data_logger: ifuzz_logger.IFuzzLogger

:return: None
"""
self._fuzz_data_logger = fuzz_data_logger

在 session 中会调用这个函数来添加 logger,默认是FuzzLoggerText

Session Logger部分

Session 对象可以看成 fuzzer 的后端对象,其参数基本上就是涉及到 fuzz 控制的各种粒度,其函数基本上就是 fuzz 过程了。fuzz 过程后面再分析,这里先看下上面余留的 logger 部分。

1
2
3
4
5
6
7
8
9
class Session(pgraph.Graph):
def __init__(args):
if fuzz_loggers is None:
fuzz_loggers = []
if self.console_gui and os.name != "nt":
fuzz_loggers.append(fuzz_logger_curses.FuzzLoggerCurses(web_port=self.web_port))
self._keep_web_open = False
else:
fuzz_loggers = [fuzz_logger_text.FuzzLoggerText()]

如果fuzz_loggers没指定的话,这里就设置成FuzzLoggerText,而FuzzLoggerText默认会设置为标注输出,因此就形成了打印到终端,所以如果想输出到文件,就可以 set 自己 new 的 FuzzLoggerText,并设置其 file_handle 为文件句柄。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
class FuzzLoggerText(ifuzz_logger_backend.IFuzzLoggerBackend):
def __init__(self, file_handle=sys.stdout, bytes_to_str=DEFAULT_HEX_TO_STR):
"""
:type file_handle: io.BinaryIO
:param file_handle: Open file handle for logging. Defaults to sys.stdout.

:type bytes_to_str: function
:param bytes_to_str: Function that converts sent/received bytes data to string for logging.
"""
self._file_handle = file_handle
self._format_raw_bytes = bytes_to_str
def _print_log_msg(self, msg_type, msg=None, data=None):
print(
helpers.format_log_msg(msg_type=msg_type, description=msg, data=data, indent_size=self.INDENT_SIZE),
file=self._file_handle,
)

数据格式

boofuzz 是基于格式的,因此在开始 fuzz 前需要先定义目标数据格式。boofuzz 有两种数据定义的方式:Static Protocol Definition(old) 和 Protocol Definition(new) 。这两种数据定义的方式只是接口不同,其内部存储的格式是类似的,而且每种基本都够用了,所以这里只分析下 Static Protocol Definition。

Static Protocol Definition — boofuzz 0.4.0 documentation

Protocol Definition — boofuzz 0.4.0 documentation

数据分成三个层次,Requests 是发出的 message,Blocks 来组成 message,Primitives(原语)是组成 block 的元素(字节、字符串、数字、校验和等)。

s_initialize

s_initialize会创建一个 request,我们需要提供一个 name 来标识这个 request,新建的 request 会被加到REQUESTS中,并设置为当前操作的 Request。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
def s_initialize(name):
"""
Initialize a new block request. All blocks / primitives generated after this call apply to the named request.
Use s_switch() to jump between factories.

:type name: str
:param name: Name of request
"""
if name in blocks.REQUESTS:
raise exception.SullyRuntimeError("blocks.REQUESTS ALREADY EXISTS: %s" % name)
#REQUESTS是全局字典,这里向其添加个request
blocks.REQUESTS[name] = Request(name)
#同时将新建的设置为当前操作的request
blocks.CURRENT = blocks.REQUESTS[name]

Session.init

Session 中创建根结点,用户可以指定一些 initial requests:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
#对图初始化,新建一个root节点
self.root = pgraph.Node()
self.root.label = "__ROOT_NODE__"
self.root.name = self.root.label
self.last_recv = None
self.last_send = None

self.add_node(self.root)
#把传进来的target加到target数组中
if target is not None:

def apply_options(monitor):
monitor.set_options(crash_filename=self._crash_filename)

return

target.monitor_alive.append(apply_options)

try:
self.add_target(target)
except exception.BoofuzzRpcError as e:
self._fuzz_data_logger.log_error(str(e))
raise

s_get、s_switch

网络协议一般是各种 Request 的状态转移图,boofuzz 也支持建立这种图。我们可以再次调用 s_initialize 来创建一个新的 Request,通过 s_get 可以在不同的 Request 直接切换,从而改变当前操作的对象。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
def s_get(name=None):
if not name:
return blocks.CURRENT

# ensure this gotten request is the new current.
s_switch(name)

if name not in blocks.REQUESTS:
raise exception.SullyRuntimeError("blocks.REQUESTS NOT FOUND: %s" % name)

return blocks.REQUESTS[name]

def s_switch(name):
"""
Change the current request to the one specified by "name".

:type name: str
:param name: Name of request
"""

if name not in blocks.REQUESTS:
raise exception.SullyRuntimeError("blocks.REQUESTS NOT FOUND: %s" % name)
#将name的Request设置为CURRENT
blocks.CURRENT = blocks.REQUESTS[name]

connect

connect 是连边,即在两个 Node(Request) 上连边。只填一个参数的话,就是默认把提供的参数 node 连到 root 上,node 就是 Request 对象。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
def connect(self, src, dst=None, callback=None):
# if only a source was provided, then make it the destination and set the source to the root node.
if dst is None: #dst不指定就是从 root连接到src
dst = src
src = self.root

# if source or destination is a name, resolve the actual node.
if isinstance(src, six.string_types):
src = self.find_node("name", src)

if isinstance(dst, six.string_types):
dst = self.find_node("name", dst)

# if source or destination is not in the graph, add it.
if src != self.root and self.find_node("name", src.name) is None:
self.add_node(src)

if self.find_node("name", dst.name) is None:
self.add_node(dst)

# create an edge between the two nodes and add it to the graph.
edge = Connection(src.id, dst.id, callback) #建边
self.add_edge(edge)

return edge

def add_node(self, node):
"""
Add a pgraph node to the graph. We overload this routine to automatically generate and assign an ID whenever a
node is added.

Args:
node (pgraph.Node): Node to add to session graph
"""

node.number = len(self.nodes)
node.id = len(self.nodes)

if node.id not in self.nodes:
self.nodes[node.id] = node

return self

这个 Connection 就是继承自最朴素的 Edge(边),只不过其提供了一个 callback 参数,这个会在状态转移的时候调用,因此可以添加一些自定义的功能。

1
2
3
4
5
class Connection(pgraph.Edge):
def __init__(self, src, dst, callback=None):
super(Connection, self).__init__(src, dst)

self.callback = callback

状态图案例:

创建完 Request 之后,接下来就是向里面添加 Primitives,根据数据类型划分出多个添加函数,首先看 string 类型的函数 s_string。

s_bit_field

函数中新建 BitField 对象后,通过 Request 的 push 函数填充到当前 block 栈中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
def s_bit_field(
value=0,
width=8,
endian=LITTLE_ENDIAN,
output_format="binary",
signed=False,
full_range=False,
fuzzable=True,
name=None,
fuzz_values=None,
):
blocks.CURRENT.push(
BitField(
name=name,
default_value=value,
width=width,
endian=endian,
output_format=output_format,
signed=signed,
full_range=full_range,
fuzzable=fuzzable,
fuzz_values=fuzz_values,
)
)

Request.push

1.首先给传进来的 item 也就是 Primitive 添加一些环境信息:

(1)context_path: 调用 _generate_context_path 产生的字符串,_generate_context_path 是将 block_stack 中的字符串全部拼接在一起产生路径字符串,用于标记item的位置;

(2)设置 item 的 request 为当前 request。

2.检查 item 的 qualified_name 是否重复,判断是否将 item 加入到 names map 中;

3.如果当前 request 还没有 block,block_stack 就为空,就将 item 插入到 request 的 stack 中;如果 block_stack 不为空,就相当于现在还在组建 block,就把 item 插入到栈顶的 block 中;

4.如果 item 是 block,会先把 block 插入到 stack 中,然后插入到 block_stack 中作为当前的 open_block,接下来的 primitive 都会插入到该 block 里面。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
def push(self, item):
"""
Push an item into the block structure. If no block is open, the item goes onto the request stack. otherwise,
the item goes onto the last open blocks stack.

What this method does:
1. Sets context_path for each pushed FuzzableWrapper.
2. Sets request for each FuzzableWrapper
3. Checks for duplicate qualified_name items
4. Adds item to self.names map (based on qualified_name)
5. Adds the item to self.stack, or to the stack of the currently opened block.

Also: Manages block_stack, mostly an implementation detail to help static protocol definition

@type item: BasePrimitive | Block | Request | Size | Repeat
@param item: Some primitive/block/request/etc.
"""
item.context_path = self._generate_context_path(self.block_stack)
item.request = self
# ensure the name doesn't already exist.
if item.qualified_name in list(self.names):
raise exception.SullyRuntimeError("BLOCK NAME ALREADY EXISTS: %s" % item.qualified_name)

self.names[item.qualified_name] = item

# if there are no open blocks, the item gets pushed onto the request stack.
# otherwise, the pushed item goes onto the stack of the last opened block.
if not self.block_stack:
self.stack.append(item)
else:
self.block_stack[-1].push(item)

# add the opened block to the block stack.
if isinstance(item, Block) or isinstance(item, Aligned): # TODO generic check here
self.block_stack.append(item)

def _generate_context_path(self, block_stack):
context_path = ".".join(x.name for x in block_stack) # TODO put in method
context_path = ".".join(filter(None, (self.name, context_path)))
return context_path

block

根据文档,有两种插入 block 的方式:startendwith 模式。

startend方式

s_block_start 初始化一个 block,并将其 push。

s_block_close 关闭这个 block,说明数据已经填充完毕了。

1
2
3
4
5
if s_block_start("header"):
s_static("\x00\x01")
if s_block_start("body"):
...
s_block_end()

s_block_start 函数:

1
2
3
4
5
def s_block_start(name=None, *args, **kwargs):
block = Block(name=name, *args, **kwargs)
blocks.CURRENT.push(block)

return block

s_block_end 函数:

1
2
def s_block_end(name=None):
blocks.CURRENT.pop()

with方式

1
2
3
with s_block("header"):
s_static("\x00\x01")
if s_block_start("body"):

with 方式能用是因为 s_block 中调用的是 s_block_start 插入 block,但是返回的是个 ScopedBlock 对象,这个对象注册了 exit 方法。

当 with 范围结束时,就会调用 s_block_end 方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
def s_block(name=None, group=None, encoder=None, dep=None, dep_value=None, dep_values=None, dep_compare="=="):

class ScopedBlock(object):
def __init__(self, block):
self.block = block

def __enter__(self):
"""
Setup before entering the "with" statement body
"""
return self.block

def __exit__(self, type, value, traceback):
"""
Cleanup after executing the "with" statement body
"""
# Automagically close the block when exiting the "with" statement
s_block_end()

block = s_block_start(
name,
request=blocks.CURRENT,
group=group,
encoder=encoder,
dep=dep,
dep_value=dep_value,
dep_values=dep_values,
dep_compare=dep_compare,
)

return ScopedBlock(block)

start fuzz

fuzz

fuzz 开始于 fuzz 函数,传入一个 request 的 name 的话就只会 fuzz 这个 request,不传就会按建立的图去遍历着 fuzz。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
def fuzz(self, name=None, max_depth=None):
"""Fuzz the entire protocol tree.
Iterates through and fuzzes all fuzz cases, skipping according to
self.skip and restarting based on self.restart_interval.
If you want the web server to be available, your program must persist
after calling this method. helpers.pause_for_signal() is
available to this end.
Args:
name (str): Pass in a Request name to fuzz only a single request message. Pass in a test case name to fuzz only a single test case.
max_depth (int): Maximum combinatorial depth; set to 1 for "simple" fuzzing.
Returns:
None
"""
self.total_mutant_index = 0
self.total_num_mutations = self.num_mutations(max_depth=max_depth)
if name is None or name == "":
self._main_fuzz_loop(self._generate_mutations_indefinitely(max_depth=max_depth))
else:
self.fuzz_by_name(name=name)

_main_fuzz_loop

1.首先会开启一个 boofuzz 的可视化 web server;

2.调用 _start_target 启动 target,一般测试服务器的时候,是我们手动启动目标服务器,所以用不到这个,但是配合 ProcMonitor 我们可以设置自启动目标(Windows平台);

3.记录 fuzz 开始时间;

4.开始 fuzz 大循环,每次循环调用 _fuzz_current_case 进行 fuzz;

5.num_cases_actually_fuzzed + 1,如果 _index_end 参数不为空且 total_mutant_index >= _index_end 的话就结束 fuzz;

6.记录 fuzz 结束时间。

这里还有个选项是 _reuse_target_connection,即重用连接。开启这个选项后,整个大循环中只会在这里 open 一次连接,如果不开这个选项,每次 fuzz 都会重新 open 一次连接。

1
2
3
4
5
6
7
8
9
10
11
12
def _main_fuzz_loop(self, fuzz_case_iterator):
"""Execute main fuzz logic; takes an iterator of test cases.
Preconditions: `self.total_mutant_index` and `self.total_num_mutations` are set properly.
Args:
fuzz_case_iterator (Iterable): An iterator that walks through fuzz cases and yields MutationContext objec
See _iterate_single_node() for details.
Returns:
None
"""
#这里是创建线程开启一个boofuzz的可视化web端口
if self.web_port is not None:
self.server_init()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
try:
self._start_target(self.targets[0])
if self._reuse_target_connection:
self.targets[0].open()
self.num_cases_actually_fuzzed = 0
#记录fuzz开始时间
self.start_time = time.time()
for mutation_context in fuzz_case_iterator:
if self.total_mutant_index < self._index_start:
continue
# Check restart interval
if (
self.num_cases_actually_fuzzed
and self.restart_interval
and self.num_cases_actually_fuzzed % self.restart_interval == 0
):
self._fuzz_data_logger.open_test_step("restart interval of %d reached" % self.restart_interval)
self._restart_target(self.targets[0])
#这里开始fuzz这次
self._fuzz_current_case(mutation_context)
#这里是记录实际进行fuzz的次数
self.num_cases_actually_fuzzed += 1
if self._index_end is not None and self.total_mutant_index >= self._index_end:
break
if self._reuse_target_connection:
self.targets[0].close()
if self._keep_web_open and self.web_port is not None:
self.end_time = time.time()
print(
"\nFuzzing session completed. Keeping webinterface up on localhost:{}".format(self.web_port),
"\nPress ENTER to close webinterface",
)
input()

_start_target

内部调用 monitor 的 start_target 来启动目标,目标启动后,调用 monitor 的 post_start_target 回调函数。

1
2
3
4
5
6
7
8
9
def _start_target(self, target):
started = False
for monitor in target.monitors:
if monitor.start_target():
started = True
break
if started:
for monitor in target.monitors:
monitor.post_start_target(target=target, fuzz_data_logger=self._fuzz_data_logger, session=self)

_fuzz_current_case

1.打印一些信息;

2.调用 _open_connection_keep_trying 打开连接,在这里可以实现自定义的网络状态 monitor;

3.调用 _pre_send 函数,这里会调用 monitor 中的 pre_send 回调函数(Session处填的 pre_send_callback 会复制到 CallbackMonitoron_pre_send 中,这里 pre_send 就会调用它们)具体可以看后面单独对 CallbackMonitor 的分析;

4.调用 edge 的 callback 函数,产生 callback 数据;

5.调用 transmit_fuzz 进行测试数据的收发;

6.调用 _check_for_passively_detected_failures 函数检查是否发生了 crash。

根据设置的 sleep_time 参数暂停。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
def _fuzz_current_case(self, mutation_context):
"""
Fuzzes the current test case. Current test case is controlled by
fuzz_case_iterator().
Args:
mutation_context (MutationContext): Current mutation context.
"""
target = self.targets[0]
self._pause_if_pause_flag_is_set()
test_case_name = self._test_case_name(mutation_context)
self.current_test_case_name = test_case_name
self._fuzz_data_logger.open_test_case(
"{0}: {1}".format(self.total_mutant_index, test_case_name),
name=test_case_name,
index=self.total_mutant_index,
num_mutations=self.total_num_mutations,
current_index=self.mutant_index,
current_num_mutations=self.fuzz_node.get_num_mutations(),
)
if self.total_num_mutations is not None:
self._fuzz_data_logger.log_info(
"Type: {0}. Case {1} of {2} overall.".format(
type(self.fuzz_node.mutant).__name__,
self.total_mutant_index,
self.total_num_mutations,
)
)
else:
self._fuzz_data_logger.log_info(
"Type: {0}".format(
type(self.fuzz_node.mutant).__name__,
)
)
try:
#打开连接
self._open_connection_keep_trying(target)
#_pre_send被调用
self._pre_send(target)
#这里是正常发送message_path最后一条路径前面的路径数据,这里就可以看出boofuzz是按node进行fuzz的
for e in mutation_context.message_path[:-1]:
prev_node = self.nodes[e.src]
node = self.nodes[e.dst]
protocol_session = ProtocolSession(
previous_message=prev_node,
current_message=node,
)
mutation_context.protocol_session = protocol_session
callback_data = self._callback_current_node(node=node, edge=e, test_case_context=protocol_session)
self._fuzz_data_logger.open_test_step("Transmit Prep Node '{0}'".format(node.name))
self.transmit_normal(target, node, e, callback_data=callback_data, mutation_context=mutation_context)
prev_node = self.nodes[mutation_context.message_path[-1].src]
node = self.nodes[mutation_context.message_path[-1].dst]
protocol_session = ProtocolSession(
previous_message=prev_node,
current_message=node,
)
mutation_context.protocol_session = protocol_session
#这里会调用callback,同时返回一个callback数据
callback_data = self._callback_current_node(
node=self.fuzz_node, edge=mutation_context.message_path[-1], test_case_context=protocol_session
)
self._fuzz_data_logger.open_test_step("Fuzzing Node '{0}'".format(self.fuzz_node.name))
#进行实际的数据发送
self.transmit_fuzz(
target,
self.fuzz_node,
mutation_context.message_path[-1],
callback_data=callback_data,
mutation_context=mutation_context,
)
#检查是否发生了crash
self._check_for_passively_detected_failures(target=target)
if not self._reuse_target_connection:
target.close()
#这里也提供了接口来睡眠
if self.sleep_time > 0:
self._fuzz_data_logger.open_test_step("Sleep between tests.")
self._sleep(self.sleep_time)

_open_connection_keep_trying

在不开启 _reuse_target_connection 的情况下调用 target 的 open 函数,代码中已经实现了自定义的网络状态 monitor。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
def _open_connection_keep_trying(self, target):
"""Open connection and if it fails, keep retrying.
Args:
target (Target): Target to open.
"""
#只有不开_reuse_target_connection的时候才会open
if not self._reuse_target_connection:
out_of_available_sockets_count = 0
unable_to_connect_count = 0
initial_time = time.time()
while True:
try:
#内部就是调用target的open函数,前面已经分析了
target.open()
break # break if no exception
except exception.BoofuzzTargetConnectionFailedError:
if self.restart_threshold and unable_to_connect_count >= self.restart_threshold:
self._fuzz_data_logger.log_info(
"Unable to reconnect to target: Reached threshold of {0} retries. Ending fuzzing.".format(
self.restart_threshold
)
)
#自添加代码,实现网络状态的Monitor
with open(self.crash_filename + "_" + str(self.num_cases_actually_fuzzed),"wb") as fp:
fp.write((self.current_test_case_name+"\n").encode())
fp.write(self.last_send)
pass
raise
elif self.restart_timeout and time.time() >= initial_time + self.restart_timeout:
self._fuzz_data_logger.log_info(
"Unable to reconnect to target: Reached restart timeout of {0}s. Ending fuzzing.".format(
self.restart_timeout
)
)
raise
else:
self._fuzz_data_logger.log_info(constants.WARN_CONN_FAILED_TERMINAL)
self._restart_target(target)
unable_to_connect_count += 1
except exception.BoofuzzOutOfAvailableSockets:
out_of_available_sockets_count += 1
if out_of_available_sockets_count == 50:
raise exception.BoofuzzError("There are no available sockets. Ending fuzzing.")
self._fuzz_data_logger.log_info("There are no available sockets. Waiting for another 5 seconds.")
time.sleep(5)

_pre_send

依次调用 target 的 monitor 中的回调函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
def _pre_send(self, target):
"""
Execute custom methods to run prior to each fuzz request. The order of events is as follows::
pre_send() - req - callback ... req - callback - post_send()
When fuzzing RPC for example, register this method to establish the RPC bind.
Args:
target (session.target): Target we are sending data to
"""
for monitor in target.monitors:
try:
self._fuzz_data_logger.open_test_step("Monitor {}.pre_send()".format(str(monitor)))
monitor.pre_send(target=target, fuzz_data_logger=self._fuzz_data_logger, session=self)
except Exception:
self._fuzz_data_logger.log_error(
constants.ERR_CALLBACK_FUNC.format(func_name="{}.pre_send()".format(str(monitor)))
+ traceback.format_exc()
)

_callback_current_node

调用当前边 edge 的 callback 函数,并返回 callback 数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
def _callback_current_node(self, node, edge, test_case_context):
"""Execute callback preceding current node.

Args:
test_case_context (ProtocolSession): Context for test case-scoped data.
node (pgraph.node.node (Node), optional): Current Request/Node
edge (pgraph.edge.edge (pgraph.edge), optional): Edge along the current fuzz path from "node" to next node.

Returns:
bytes: Data rendered by current node if any; otherwise None.
"""
data = None

# if the edge has a callback, process it. the callback has the option to render the node, modify it and return.
#调用edge的callback函数,并返回callback数据
if edge.callback:
self._fuzz_data_logger.open_test_step("Callback function '{0}'".format(edge.callback.__name__))
data = edge.callback(
self.targets[0],
self._fuzz_data_logger,
session=self,
node=node,
edge=edge,
test_case_context=test_case_context,
)

return data

_check_for_passively_detected_failures

依次调用 monitor 的 post_send 函数来获取是否发生了 crash,如果发生了 crash 就继续调用 get_crash_synopsis 函数来获取 crash 概要。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
def _check_for_passively_detected_failures(self, target, failure_already_detected=False):
"""Check for and log passively detected failures. Return True if any found.

Args:
target (Target): Target to be checked for failures.
failure_already_detected (bool): If a failure was already detected.

Returns:
bool: True if failures were found. False otherwise.
"""
has_crashed = False
if len(target.monitors) > 0:
self._fuzz_data_logger.open_test_step("Contact target monitors")
# So, we need to run through the array two times. First, we check
# if any of the monitors reported a failure and
# if so, we need to
# gather a crash synopsis from them. We don't know whether
# a monitor can provide a crash synopsis, but in any case, we'll
# check. In the second run, we try to get crash synopsis from the
# monitors that did not detect a crash as supplemental information.
finished_monitors = []
#依次调用monitor的post_send函数
for monitor in target.monitors:
if not monitor.post_send(target=target, fuzz_data_logger=self._fuzz_data_logger, session=self):
has_crashed = True
self._fuzz_data_logger.log_fail(
"{0} detected crash on test case #{1}: {2}".format(
str(monitor), self.total_mutant_index, monitor.get_crash_synopsis()
)
)
finished_monitors.append(monitor)

if not has_crashed and not failure_already_detected:
self._fuzz_data_logger.log_pass("No crash detected.")
else:
for monitor in set(target.monitors) - set(finished_monitors):

synopsis = monitor.get_crash_synopsis()
if len(synopsis) > 0:
self._fuzz_data_logger.log_fail(
"{0} provided additional information for crash on #{1}: {2}".format(
str(monitor), self.total_mutant_index, monitor.get_crash_synopsis()
)
)
return has_crashed

transmit_fuzz

进行实际的数据收发:

1.判断是否传入了 callback 数据,如果有 callback 数据就使用 callback 数据,否则调用 render 来产生变异数据;

2.发送数据,并将发送的数据保存在 last_send 中;

3.接受数据,并将接受的数据保存在 last_recv 中。

last_sendlast_recv 都非常重要,last_send 可以在监测 crash 时 dump 出来作为 crash 样本,last_recv 则可以在边回调中决定状态机的走向,以及产生 callback 数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
def transmit_fuzz(self, sock, node, edge, callback_data, mutation_context):
"""Render and transmit a fuzzed node, process callbacks accordingly.

Args:
sock (Target, optional): Socket-like object on which to transmit node
node (pgraph.node.node (Node), optional): Request/Node to transmit
edge (pgraph.edge.edge (pgraph.edge), optional): Edge along the current fuzz path from "node" to next node.
callback_data (bytes): Data from previous callback.
mutation_context (MutationContext): Current mutation context.
"""
#这里就可以看到,边的callback是先于数据发送的,如果callback返回了自定义数据,那这里就会直接拿callback返回的数据发送
#如果callback返回空数据,这里就会正常调用变异的数据渲染,然后发送变异数据
if callback_data:
data = callback_data
else:
data = self.fuzz_node.render(mutation_context)

try: # send
#这里发送变异数据,同时将发送的数据保存在last_send里面
self.targets[0].send(data)
self.last_send = data
except exception.BoofuzzTargetConnectionReset:
if self._ignore_connection_issues_when_sending_fuzz_data:
self._fuzz_data_logger.log_info(constants.ERR_CONN_RESET)
else:
raise BoofuzzFailure(message=constants.ERR_CONN_RESET)
except exception.BoofuzzTargetConnectionAborted as e:
msg = constants.ERR_CONN_ABORTED.format(socket_errno=e.socket_errno, socket_errmsg=e.socket_errmsg)
if self._ignore_connection_issues_when_sending_fuzz_data:
self._fuzz_data_logger.log_info(msg)
else:
raise BoofuzzFailure(msg)
except exception.BoofuzzSSLError as e:
if self._ignore_connection_ssl_errors:
self._fuzz_data_logger.log_info(str(e))
else:
raise BoofuzzFailure(str(e))

received = b""
try: # recv
if self._receive_data_after_fuzz:
received = self.targets[0].recv()
except exception.BoofuzzTargetConnectionReset:
if self._check_data_received_each_request:
raise BoofuzzFailure(message=constants.ERR_CONN_RESET)
else:
self._fuzz_data_logger.log_info(constants.ERR_CONN_RESET)
except exception.BoofuzzTargetConnectionAborted as e:
msg = constants.ERR_CONN_ABORTED.format(socket_errno=e.socket_errno, socket_errmsg=e.socket_errmsg)
if self._check_data_received_each_request:
raise BoofuzzFailure(msg)
else:
self._fuzz_data_logger.log_info(msg)
pass
except exception.BoofuzzSSLError as e:
if self._ignore_connection_ssl_errors:
self._fuzz_data_logger.log_info(str(e))
else:
self._fuzz_data_logger.log_fail(str(e))
raise BoofuzzFailure(str(e))
#这里会将这次接受到的数据保存在last_recv里面
self.last_recv = received

crash dump

上面只介绍到监测 crash 而没说哪里 dump crash,实际上 crash 的 dump 在各个 Monitor 中(在 _fuzz_current_case 函数中是没有的)。

以 Procmon 的 DebugThread 为例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
def post_send(self):
"""
This routine is called after the fuzzer transmits a test case and returns the status of the target.

Returns:
bool: True if the target is still active, False otherwise.
"""
if self.is_alive():
return True
else:
with open(self.process_monitor.crash_filename, "a") as rec_file:
rec_file.write(self.process_monitor.last_synopsis)

if self.process_monitor.coredump_dir is not None:
dest = os.path.join(self.process_monitor.coredump_dir, str(self.process_monitor.test_number))
src = _get_coredump_path()

if src is not None:
self.log("moving core dump %s -> %s" % (src, dest))
os.rename(src, dest)
return False

到此为止,数据的收发流程基本就了解了,剩下需要看下数据是怎么从 request 中产生并变异的。

数据变异

_generate_mutations_indefinitely 这里会产生一个 iterator,迭代产生 mutation_context

1
2
if name is None or name == "":
self._main_fuzz_loop(self._generate_mutations_indefinitely(max_depth=max_depth))

流程如图所示:

_generate_mutations_indefinitely

这里 max_path 默认传进来是个 none。

调用 _generate_n_mutations 来产生 mutation_context

depth 是在一次 fuzz_case 中,产生几个变异体:depth 为1,那就是一次就变异一个 primitive。

1
2
3
4
5
6
7
8
9
10
11
12
def _generate_mutations_indefinitely(self, max_depth=None, path=None):
"""Yield MutationContext with n mutations per message over all messages, with n increasing indefinitely."""
# indefinitely 无限期的
depth = 1
while max_depth is None or depth <= max_depth:
valid_case_found_at_this_depth = False
for m in self._generate_n_mutations(depth=depth, path=path):
valid_case_found_at_this_depth = True
yield m
if not valid_case_found_at_this_depth:
break
depth += 1

_generate_n_mutations

这里会先得到 path 再从 path 里得到要 fuzz 的 node。

1
2
3
4
5
6
7
8
9
10
def _generate_n_mutations(self, depth, path):
"""Yield MutationContext with n mutations per message over all messages."""
#调试此处的yield
# for path in self._iterate_protocol_message_paths(path=path):
# print(self._message_path_to_str(path))
#先得到要fuzz的path,再从中获取要fuzz的node
for path in self._iterate_protocol_message_paths(path=path):
#_generate_n_mutations_for_path这个函数会根据上面取得的path构建MutationContext
for m in self._generate_n_mutations_for_path(path, depth=depth):
yield m

_iterate_protocol_message_paths

先检查下是否有 target 以及从 root 发出的边。

如果指定了 path,就直接返回指定的 path,但是默认为空。

调用 _iterate_protocol_message_paths_recursive 遍历 path。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
def _iterate_protocol_message_paths(self, path=None):
"""
Iterates over protocol and yields a path (list of Connection) leading to a given message).
#如果指定了path的集合,就返回这个指定的边的集合,否则就遍历整个协议中所有的边的可能性
Args:
path (list of Connection): Provide a specific path to yield only that specific path.

Yields:
list of Connection: List of edges along the path to the current one being fuzzed.

Raises:
exception.SulleyRuntimeError: If no requests defined or no targets specified
"""
# we can't fuzz if we don't have at least one target and one request.
if not self.targets:
raise exception.SullyRuntimeError("No targets specified in session")

if not self.edges_from(self.root.id):
raise exception.SullyRuntimeError("No requests specified in session")

if path is not None:
yield path
else:
for x in self._iterate_protocol_message_paths_recursive(this_node=self.root, path=[]):
yield x

_iterate_protocol_message_paths_recursive

这里遍历 path 的方法使用的是 DFS,只不过用 yield 实现的,看着有些别扭,最终会产生从 root 出发的所有路径。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
def _iterate_protocol_message_paths_recursive(self, this_node, path):
"""Recursive helper for _iterate_protocol.
#迭代的去取该协议中的msg的path
#这里应该是会返回所有路径
Args:
this_node (node.Node): Current node that is being fuzzed.
path (list of Connection): List of edges along the path to the current one being fuzzed.

Yields:
list of Connection: List of edges along the path to the current one being fuzzed.
"""
# step through every edge from the current node.
for edge in self.edges_from(this_node.id):
# keep track of the path as we fuzz through it, don't count the root node.
# we keep track of edges as opposed to nodes because if there is more then one path through a set of
# given nodes we don't want any ambiguity.
path.append(edge)

message_path = self._message_path_to_str(path)
logging.debug("fuzzing: {0}".format(message_path))
self.fuzz_node = self.nodes[path[-1].dst]

yield path

# recursively fuzz the remainder of the nodes in the session graph.
for x in self._iterate_protocol_message_paths_recursive(self.fuzz_node, path):
yield x

# finished with the last node on the path, pop it off the path stack.
if path:
path.pop()

_generate_n_mutations_for_path

知道 path 如何产生的之后再回去看 MutationContext 是怎么产生的, _generate_n_mutations_for_path 函数对传进来的 path 产生 mutation_contextmutation_context 就是代表这次 case 的变异体上下文,depth 是标识一个 fuzz_case 使用几个变异体,默认为1。

那么 MutationContext 的 mutations 就只有一个 qualified_name

1
2
3
4
5
6
7
8
9
10
11
12
13
14
def _generate_n_mutations_for_path(self, path, depth):
"""Yield MutationContext with n mutations for a specific message.

Args:
path (list of Connection): Nodes (Requests) along the path to the current one being fuzzed.
depth (int): Yield sets of depth mutations.

Yields:
MutationContext: A MutationContext containing one mutation.
"""
for mutations in self._generate_n_mutations_for_path_recursive(path, depth=depth):
if not self._mutations_contain_duplicate(mutations):
self.total_mutant_index += 1
yield MutationContext(message_path=path, mutations={n.qualified_name: n for n in mutations})

_generate_n_mutations_for_path_recursive

调用 _generate_n_mutations_for_path_recursive 产生 mutation 集合。

mutaions 由两个部分组成,一个是 _generate_mutations_for_request 函数产生的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
def _generate_n_mutations_for_path_recursive(self, path, depth, skip_elements=None):
if skip_elements is None:
skip_elements = set()

if depth == 0:
yield []
return
new_skip = skip_elements.copy()
# 调试yield
# for mutations in self._generate_mutations_for_request(path=path, skip_elements=skip_elements):
# print(mutations)
for mutations in self._generate_mutations_for_request(path=path, skip_elements=skip_elements):
new_skip.update(m.qualified_name for m in mutations)
for ms in self._generate_n_mutations_for_path_recursive(path, depth=depth - 1, skip_elements=new_skip):
yield mutations + ms

_generate_mutations_for_request

设置 fuzz_node 为当前路径上的最后一个 node,之后调用 fuzz_nodeget_mutationsfuzz_node 是一个 request 对象,所以这里调用的是 request 的 get_mutations 方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
def _generate_mutations_for_request(self, path, skip_elements=None):
"""Yield each mutation for a specific message (the last message in path).

Args:
path (list of Connection): Nodes (Requests) along the path to the current one being fuzzed.
path (iter of str): Qualified names of elements to skip while fuzzing.

Yields:
Mutation: Mutation object describing a single mutation.
"""
if skip_elements is None:
skip_elements = []
#这里设置fuzz_node为当前fuzz路径的dst
self.fuzz_node = self.nodes[path[-1].dst]
self.mutant_index = 0
#调试yield
#value_list = list(self.fuzz_node.get_mutations(skip_elements=skip_elements))
#这里会对node里的item枚举产生mutation
for mutations in self.fuzz_node.get_mutations(skip_elements=skip_elements):
#记录整体的已经发生的变异次数
self.mutant_index += 1
yield mutations

if self._skip_current_node_after_current_test_case:
self._skip_current_node_after_current_test_case = False
break
elif self._skip_current_element_after_current_test_case:
self.fuzz_node.mutant.stop_mutations()
self._skip_current_element_after_current_test_case = False
continue

request.get_mutations

Request 继承自 FuzzableBlock,mutations 是 FuzzableBlock 的方法。

1
2
def get_mutations(self, default_value=None, skip_elements=None):
return self.mutations(default_value=default_value, skip_elements=skip_elements)

FuzzableBlock.mutations

遍历当前 request 的 stack 中的 item,即插入的 block 和 primitive,再调用他们的 get_mutations 函数得到 mutation。

primitives 都继承自 fuzzable,所以这里调用的是 fuzzable 的 get_mutations

1
2
3
4
5
6
7
8
9
10
def mutations(self, default_value, skip_elements=None):
if skip_elements is None:
skip_elements = []
#遍历stack中的item
for item in self.stack:
if item.qualified_name in skip_elements:
continue
self.request.mutant = item
for mutation in item.get_mutations():
yield mutation

fuzzable.get_mutations

这个函数就是对当前 item 进行变异,并将变异的值传到生成的 Mutation 里面。

Mutation 的构造这里就能看到,是由一个值 value,一个所属 item的qualified_name,以及变异计数 index 组成的。

这里终止变异用的是 _halt_mutations 标志,而 stop_mutations 函数是提供的接口,其内部就是设置 _halt_mutations 为 true。

itertools.chain 的功能就是合并列表,所以这里值得来源就为 self.mutations(self.original_value())self._fuzz_values

其中 _fuzz_values 在 Fuzzable 的 init 函数中是可以作为构造参数传入的,但是String(继承自Fuzzble)的构造函数里并没有这个参数,所以就没找到接口设置这个值(除了手动赋值),所以这里总是空列表。

下面会以数值型为例,分析其 mutations 函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
def get_mutations(self):
"""Iterate mutations. Used by boofuzz framework.

Yields:
list of Mutation: Mutations

"""
try:
if not self.fuzzable:
return
index = 0
for value in itertools.chain(self.mutations(self.original_value()), self._fuzz_values):
if self._halt_mutations:
self._halt_mutations = False
return
if isinstance(value, list):
yield value
elif isinstance(value, Mutation):
yield [value]
else:
yield [Mutation(value=value, qualified_name=self.qualified_name, index=index)]
index += 1
finally:
self._halt_mutations = False # in case stop_mutations is called when mutations were exhausted anyway

stop_mutations

1
2
3
4
5
6
7
8
9
def stop_mutations(self):
"""Stop yielding mutations on the currently running :py:meth:`mutations` call.

Used by boofuzz to stop fuzzing an element when it's already caused several failures.

Returns:
NoneType: None
"""
self._halt_mutations = True

BitField.mutation

数值型变异策略只有一种,通过 _iterate_fuzz_lib 在感兴趣值附近取 $\pm$10 以内的数,但不会超出设定的取值范围。其中 max_num 默认为 None ,如果没有传入具体值,则通过 binary_string_to_int 将其设为位数对应的 int 值(默认 width 为8,取值为65536)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
self.max_num = max_num
if not self.max_num:
self.max_num = binary_string_to_int("1" + "0" * width)

def binary_string_to_int(binary):
"""
Convert a binary string to a decimal number.

@type binary: str
@param binary: Binary string

@rtype: int
@return: Converted bit string
"""

return int(binary, 2)

def mutations(self, default_value):
for val in self._iterate_fuzz_lib():
yield val

def _iterate_fuzz_lib(self):
if self.full_range:
for i in range(0, self.max_num):
yield i
else:
# try only "smart" values.
interesting_boundaries = [
0,
self.max_num // 2,
self.max_num // 3,
self.max_num // 4,
self.max_num // 8,
self.max_num // 16,
self.max_num // 32,
self.max_num,
]
for boundary in interesting_boundaries:
for v in self._yield_integer_boundaries(boundary):
yield v
# TODO Add a way to inject a list of fuzz values
# elif isinstance(default_value, (list, tuple)):
# for val in iter(default_value):
# yield val

# TODO: Add injectable arbitrary bit fields

def _yield_integer_boundaries(self, integer):
"""
Add the supplied integer and border cases to the integer fuzz heuristics library.

@type integer: int
@param integer: int to append to fuzz heuristics
"""
for i in range(-10, 10):
case = integer + i
if 0 <= case < self.max_num:
# some day: if case not in self._user_provided_values
yield case

总结数据变异流程

一次 fuzz_case 所用的变异数据来自于 mutation_contextmutation_contextmessage_path 和 mutations 组成。

mutations 产生于 primitive,对 primitive 的一次变异产生一个 mutation,mutation 中包含变异值和所属的 primitive 的 qualified_name

根据传入的 depth 的数值,mutation_context 可以包含多个 mutation,只不过默认值 depth 为1,因此 mutation_context 一般就包含一个 mutation。

1
2
3
4
5
6
7
8
9
10
11
#session._main_fuzz_loop()
for mutation_context in fuzz_case_iterator:
if self.total_mutant_index < self._index_start:
continue

#session._generate_n_mutations_for_path()
self.total_mutant_index += 1
yield MutationContext(message_path=path, mutations={n.qualified_name: n for n in mutations})

#Fuzzable.get_mutations()
yield [Mutation(value=value, qualified_name=self.qualified_name, index=index)]

变量 total_mutant_index 标记产生了多少 mutation_context,也就等同于 fuzz_case 的次数。

变量 mutant_index 标记产生了多少 mutant(mutation),在 depth 为1的情况下,mutant_index 就等于 total_mutant_index

而 mutation 的产生则是首先会遍历出状态图的所有 path,然后对 path 中最后一个 node 中的 item 进行变异。

其他细节

这里再分析下刚才没提到的一些细节,其实整体框架和流程已经分析完了,但是对这些小细节也比较清楚的话,能更好的了解 boofuzz。

qualified_name

我们创建 primitive 时一般只给个default_value,这样在 Fuzzable 里,就会默认赋值个 name,格式是类型加计数,例如 String1,String2。

primitive 的 context_path 是在 push 的时候赋予的,标记着的是其在 request 中的位置。

最后 qualified_name 的产生是将 context_path 和 name 拼接在一起。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
#Fuzzable._init()
if self._name is None:
Fuzzable.name_counter += 1
self._name = "{0}{1}".format(type(self).__name__, Fuzzable.name_counter)

#Request.push()
item.context_path = self._generate_context_path(self.block_stack)
def _generate_context_path(self, block_stack):
context_path = ".".join(x.name for x in block_stack) # TODO put in method
context_path = ".".join(filter(None, (self.name, context_path)))
return context_path
#Fuzzable.qualified_name
@property
def qualified_name(self):
"""Dot-delimited name that describes the request name and the path to the element within the request.

Example: "request1.block1.block2.node1"

"""
return ".".join(s for s in (self._context_path, self.name) if s != "")

path 数据发送

前面已经介绍了变异值数据产生和数据发送,但是实际上数据产生和数据发送间还有一些细节没分析。

为了贴合网络协议,boofuzz 在发送变异数据前,会先把其 path 上的正常数据先都发送过去,变异 mutation 都是在 path 的最后一个 node 上。

1
2
3
4
5
6
7
8
9
10
11
12
13
#seesion._fuzz_current_case()
#这里是正常发送message_path最后一条路径前面的路径数据,这里就可以看出boofuzz是按node Fuzz的
for e in mutation_context.message_path[:-1]:
prev_node = self.nodes[e.src]
node = self.nodes[e.dst]
protocol_session = ProtocolSession(
previous_message=prev_node,
current_message=node,
)
mutation_context.protocol_session = protocol_session
callback_data = self._callback_current_node(node=node, edge=e, test_case_context=protocol_session)
self._fuzz_data_logger.open_test_step("Transmit Prep Node '{0}'".format(node.name))
self.transmit_normal(target, node, e, callback_data=callback_data,mutation_context=mutation_context)

transmit_normal

如果 callback_data 不为空就发送 callback_data,否则发送 render(mutation_context)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
def transmit_normal(self, sock, node, edge, callback_data, mutation_context):
"""Render and transmit a non-fuzzed node, process callbacks accordingly.

Args:
sock (Target, optional): Socket-like object on which to transmit node
node (pgraph.node.node (Node), optional): Request/Node to transmit
edge (pgraph.edge.edge (pgraph.edge), optional): Edge along the current fuzz path from "node" to next node.
callback_data (bytes): Data from previous callback.
mutation_context (MutationContext): active mutation context
"""
if callback_data:
data = callback_data
else:
data = node.render(mutation_context=mutation_context)

try: # send
self.targets[0].send(data)
self.last_send = data
if self._receive_data_after_each_request:
self.last_recv = self.targets[0].recv()

request.render

这个函数流程前面也没分析。

1
2
3
4
5
def render(self, mutation_context=None):
if self.block_stack:
raise exception.SullyRuntimeError("UNCLOSED BLOCK: %s" % self.block_stack[-1].qualified_name)

return self.get_child_data(mutation_context=mutation_context)

request.get_child_data

这个函数遍历 request 中的 item,来拼接出数据,item 基本都继承自 Fuzzable(除了 Block)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
def get_child_data(self, mutation_context):
"""Get child or referenced data for this node.

For blocks that reference other data from the message structure (e.g. size, checksum, blocks). See
FuzzableBlock for an example.

Args:
mutation_context (MutationContext): Mutation context.

Returns:
bytes: Child data.
"""
rendered = b""
for item in self.stack:
rendered += item.render(mutation_context=mutation_context)
return rendered

Fuzzable.render

调用 get_value 获取值。

1
2
3
4
5
6
7
8
def render(self, mutation_context=None):
"""Render after applying mutation, if applicable.
:type mutation_context: MutationContext
"""
return self.encode(value=self.get_value(mutation_context=mutation_context),mutation_context=mutation_context)

def encode(self, value, mutation_context):
return value

Fuzzable.get_value

就是如果当前 item 在 mutation_contextqualified_name 中,就返回变异值,否则就返回初始值 _default_value

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
def get_value(self, mutation_context=None):
"""Helper method to get the currently applicable value.

This is either the default value, or the active mutation value as dictated by mutation_context.

Args:
mutation_context (MutationContext):

Returns:

"""
if mutation_context is None:
mutation_context = MutationContext()
if self.qualified_name in mutation_context.mutations:
mutation = mutation_context.mutations[self.qualified_name]
if callable(mutation.value):
value = mutation.value(self.original_value(test_case_context=mutation_context.protocol_session))
else:
value = mutation.value
else:
value = self.original_value(test_case_context=mutation_context.protocol_session)

return value

original_value

因为传进来的都是 ProtocolSession 对象,所以走 else 分支返回 _default_value

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
def original_value(self, test_case_context=None):
"""Original, non-mutated value of element.

Args:
test_case_context (ProtocolSession): Used to resolve ReferenceValueTestCaseSession type default values.

Returns:
"""
#这个分支不知道什么时候用到
if isinstance(self._default_value, ProtocolSessionReference):
if test_case_context is None:
return self._default_value.default_value
else:
return test_case_context.session_variables[self._default_value.name]
else:
return self._default_value

继续上面数据发送的位置,path 上的正常数据发送完之后才会发送变异数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
prev_node = self.nodes[mutation_context.message_path[-1].src]
node = self.nodes[mutation_context.message_path[-1].dst]
protocol_session = ProtocolSession(
previous_message=prev_node,
current_message=node,
)
mutation_context.protocol_session = protocol_session
#这里会调用callback,同时返回一个callback数据
callback_data = self._callback_current_node(
node=self.fuzz_node, edge=mutation_context.message_path[-1], test_case_context=protocol_session
)
self._fuzz_data_logger.open_test_step("Fuzzing Node '{0}'".format(self.fuzz_node.name))
#进行实际的变异数据发送
self.transmit_fuzz(
target,
self.fuzz_node,
mutation_context.message_path[-1],
callback_data=callback_data,
mutation_context=mutation_context,
)

Monitor

boofuzz 只提供了三种 monitor:

ProcessMonitor 大概是和 Procman 进行 rpc 通讯来监控;

NetworkMonitor 具体用法不太清楚,看文档里说用了 wireshark;

CallbackMonitor 是默认的 Monitor,提供回调函数的功能。

我们一般需要一个监控连接状态的 Monitor,如果连接失败则判定为发生了 crash,保存 crash 样本,前面代码中有我实现的简陋的方案。

CallbackMonitor

这个 Monitor 是以 Monitor 的形式提供几种 callback,在 session 的 init 函数中,是把传进来的 callback 赋值给 CallbackMonitor,这个 Monitor 也是会默认。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
if pre_send_callbacks is None:
pre_send_methods = []
else:
pre_send_methods = pre_send_callbacks

if post_test_case_callbacks is None:
post_test_case_methods = []
else:
post_test_case_methods = post_test_case_callbacks

if post_start_target_callbacks is None:
post_start_target_methods = []
else:
post_start_target_methods = post_start_target_callbacks

if restart_callbacks is None:
restart_methods = []
else:
restart_methods = restart_callbacks


self._callback_monitor = CallbackMonitor(
on_pre_send=pre_send_methods,
on_post_send=post_test_case_methods,
on_restart_target=restart_methods,
on_post_start_target=post_start_target_methods,
)


if target is not None:

def apply_options(monitor):
monitor.set_options(crash_filename=self._crash_filename)

return

target.monitor_alive.append(apply_options)

try:
self.add_target(target)
except exception.BoofuzzRpcError as e:
self._fuzz_data_logger.log_error(str(e))
raise

前面回调函数都是用的 target 的 monitor 的回调,在 session 的 init 中首先设置了 _callback_monitor 为刚才创建的 CallbackMonitor,其给 target 设置的有些隐蔽,是在 add_target 中设置的。

add_target

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
def add_target(self, target):
"""
Add a target to the session. Multiple targets can be added for parallel fuzzing.

Args:
target (Target): Target to add to session
"""

# pass specified target parameters to the PED-RPC server.
target.monitors_alive()
target.set_fuzz_data_logger(fuzz_data_logger=self._fuzz_data_logger)

if self._callback_monitor not in target.monitors:
target.monitors.append(self._callback_monitor)

# add target to internal list.
self.targets.append(target)

pre_send

CallbackMonitorpre_send 为例,可以看到其遍历 on_pre_send 函数来调用。

1
2
3
4
5
6
7
8
9
10
11
12
13
def pre_send(self, target=None, fuzz_data_logger=None, session=None):
"""This method iterates over all supplied pre send callbacks and executes them.
Their return values are discarded, exceptions are catched and logged, but otherwise
discarded.
"""
try:
for f in self.on_pre_send:
fuzz_data_logger.open_test_step('Pre_Send callback: "{0}"'.format(f.__name__))
f(target=target, fuzz_data_logger=fuzz_data_logger, session=session, sock=target)
except Exception:
fuzz_data_logger.log_error(
constants.ERR_CALLBACK_FUNC.format(func_name="pre_send") + traceback.format_exc()
)

Refs

Boofuzz分析

boofuzz 0.4.0 documentation

IoT设备固件分析之网络协议 fuzz (seebug.org)