warnings.warn( "SocketConnection is deprecated and will be removed in a future version of Boofuzz. " "Use the classes derived from BaseSocketConnection instead.", FutureWarning, )
defSocketConnection( host, port=None, proto="tcp", bind=None, send_timeout=5.0, recv_timeout=5.0, ethernet_proto=None, l2_dst=b"\xFF" * 6, udp_broadcast=False, server=False, sslcontext=None, server_hostname=None, ): warnings.warn( "SocketConnection is deprecated and will be removed in a future version of Boofuzz. " "Use the classes derived from BaseSocketConnection instead.", FutureWarning, ) if proto notin _PROTOCOLS: raise exception.SullyRuntimeError("INVALID PROTOCOL SPECIFIED: %s" % proto)
if proto in _PROTOCOLS_PORT_REQUIRED and port isNone: raise ValueError("__init__() argument port required for protocol {0}".format(proto))
if proto == "udp": return udp_socket_connection.UDPSocketConnection( host, port, send_timeout, recv_timeout, server, bind, udp_broadcast ) elif proto == "tcp": return tcp_socket_connection.TCPSocketConnection(host, port, send_timeout, recv_timeout, server) elif proto == "ssl": return ssl_socket_connection.SSLSocketConnection( host, port, send_timeout, recv_timeout, server, sslcontext, server_hostname ) elif proto == "raw-l2": return raw_l2_socket_connection.RawL2SocketConnection(host, send_timeout, recv_timeout) elif proto == "raw-l3": if ethernet_proto isNone: ethernet_proto = raw_l3_socket_connection.ETH_P_IP
classTCPSocketConnection(base_socket_connection.BaseSocketConnection): """BaseSocketConnection implementation for use with TCP Sockets. .. versionadded:: 0.2.0 Args: host (str): Hostname or IP adress of target system. port (int): Port of target service. send_timeout (float): Seconds to wait for send before timing out. Default 5.0. recv_timeout (float): Seconds to wait for recv before timing out. Default 5.0. server (bool): Set to True to enable server side fuzzing. """
classBaseSocketConnection(with_metaclass(abc.ABCMeta, itarget_connection.ITargetConnection)): """This class serves as a base for a number of Connections over sockets. .. versionadded:: 0.2.0 Args: send_timeout (float): Seconds to wait for send before timing out. Default 5.0. recv_timeout (float): Seconds to wait for recv before timing out. Default 5.0. """
def_connect_socket(self): #server模式 if self.server: self._sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) try: self._sock.bind((self.host, self.port)) except socket.error as e: if e.errno == errno.EADDRINUSE: raise exception.BoofuzzOutOfAvailableSockets() else: raise
self._serverSock = self._sock try: #只会接受一个连接 self._serverSock.listen(1) self._sock, addr = self._serverSock.accept() except socket.error as e: # When connection timeout expires, tear down the server socket so we can re-open it again after # restarting the target. self.close() if e.errno in [errno.EAGAIN]: raise exception.BoofuzzTargetConnectionFailedError(str(e)) else: raise #主动连接 else: try: self._sock.connect((self.host, self.port)) except socket.error as e: if e.errno == errno.EADDRINUSE: raise exception.BoofuzzOutOfAvailableSockets() elif e.errno in [errno.ECONNREFUSED, errno.EINPROGRESS, errno.ETIMEDOUT]: raise exception.BoofuzzTargetConnectionFailedError(str(e)) else: raise
send
向目标发送数据。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
defsend(self, data): num_sent = 0
try: num_sent = self._sock.send(data) except socket.error as e: if e.errno == errno.ECONNABORTED: raise_( exception.BoofuzzTargetConnectionAborted(socket_errno=e.errno, socket_errmsg=e.strerror), None, sys.exc_info()[2], ) elif e.errno in [errno.ECONNRESET, errno.ENETRESET, errno.ETIMEDOUT, errno.EPIPE]: raise_(exception.BoofuzzTargetConnectionReset(), None, sys.exc_info()[2]) else: raise
defopen(self): """ Opens connection to the target. Make sure to call close! :return: None """ self._fuzz_data_logger.log_info("Opening target connection ({0})...".format(self._target_connection.info)) self._target_connection.open() self._fuzz_data_logger.log_info("Connection opened.") defsend(self, data): """ Send data to the target. Only valid after calling open! Args: data: Data to send. Returns: None """ num_sent = 0 if self._fuzz_data_logger isnotNone: repeat = "" if self.repeater isnotNone: repeat = ", " + self.repeater.log_message()
defset_fuzz_data_logger(self, fuzz_data_logger): """ Set this object's fuzz data logger -- for sent and received fuzz data. :param fuzz_data_logger: New logger. :type fuzz_data_logger: ifuzz_logger.IFuzzLogger :return: None """ self._fuzz_data_logger = fuzz_data_logger
s_initialize会创建一个 request,我们需要提供一个 name 来标识这个 request,新建的 request 会被加到REQUESTS中,并设置为当前操作的 Request。
1 2 3 4 5 6 7 8 9 10 11 12 13 14
defs_initialize(name): """ Initialize a new block request. All blocks / primitives generated after this call apply to the named request. Use s_switch() to jump between factories. :type name: str :param name: Name of request """ if name in blocks.REQUESTS: raise exception.SullyRuntimeError("blocks.REQUESTS ALREADY EXISTS: %s" % name) #REQUESTS是全局字典,这里向其添加个request blocks.REQUESTS[name] = Request(name) #同时将新建的设置为当前操作的request blocks.CURRENT = blocks.REQUESTS[name]
# ensure this gotten request is the new current. s_switch(name)
if name notin blocks.REQUESTS: raise exception.SullyRuntimeError("blocks.REQUESTS NOT FOUND: %s" % name)
return blocks.REQUESTS[name]
defs_switch(name): """ Change the current request to the one specified by "name". :type name: str :param name: Name of request """
if name notin blocks.REQUESTS: raise exception.SullyRuntimeError("blocks.REQUESTS NOT FOUND: %s" % name) #将name的Request设置为CURRENT blocks.CURRENT = blocks.REQUESTS[name]
defconnect(self, src, dst=None, callback=None): # if only a source was provided, then make it the destination and set the source to the root node. if dst isNone: #dst不指定就是从 root连接到src dst = src src = self.root
# if source or destination is a name, resolve the actual node. ifisinstance(src, six.string_types): src = self.find_node("name", src)
# if source or destination is not in the graph, add it. if src != self.root and self.find_node("name", src.name) isNone: self.add_node(src)
if self.find_node("name", dst.name) isNone: self.add_node(dst)
# create an edge between the two nodes and add it to the graph. edge = Connection(src.id, dst.id, callback) #建边 self.add_edge(edge)
return edge
defadd_node(self, node): """ Add a pgraph node to the graph. We overload this routine to automatically generate and assign an ID whenever a node is added. Args: node (pgraph.Node): Node to add to session graph """
defpush(self, item): """ Push an item into the block structure. If no block is open, the item goes onto the request stack. otherwise, the item goes onto the last open blocks stack. What this method does: 1. Sets context_path for each pushed FuzzableWrapper. 2. Sets request for each FuzzableWrapper 3. Checks for duplicate qualified_name items 4. Adds item to self.names map (based on qualified_name) 5. Adds the item to self.stack, or to the stack of the currently opened block. Also: Manages block_stack, mostly an implementation detail to help static protocol definition @type item: BasePrimitive | Block | Request | Size | Repeat @param item: Some primitive/block/request/etc. """ item.context_path = self._generate_context_path(self.block_stack) item.request = self # ensure the name doesn't already exist. if item.qualified_name inlist(self.names): raise exception.SullyRuntimeError("BLOCK NAME ALREADY EXISTS: %s" % item.qualified_name)
self.names[item.qualified_name] = item
# if there are no open blocks, the item gets pushed onto the request stack. # otherwise, the pushed item goes onto the stack of the last opened block. ifnot self.block_stack: self.stack.append(item) else: self.block_stack[-1].push(item)
# add the opened block to the block stack. ifisinstance(item, Block) orisinstance(item, Aligned): # TODO generic check here self.block_stack.append(item) def_generate_context_path(self, block_stack): context_path = ".".join(x.name for x in block_stack) # TODO put in method context_path = ".".join(filter(None, (self.name, context_path))) return context_path
block
根据文档,有两种插入 block 的方式:startend 和 with 模式。
startend方式
s_block_start 初始化一个 block,并将其 push。
s_block_close 关闭这个 block,说明数据已经填充完毕了。
1 2 3 4 5
if s_block_start("header"): s_static("\x00\x01") if s_block_start("body"): ... s_block_end()
def__enter__(self): """ Setup before entering the "with" statement body """ return self.block
def__exit__(self, type, value, traceback): """ Cleanup after executing the "with" statement body """ # Automagically close the block when exiting the "with" statement s_block_end()
deffuzz(self, name=None, max_depth=None): """Fuzz the entire protocol tree. Iterates through and fuzzes all fuzz cases, skipping according to self.skip and restarting based on self.restart_interval. If you want the web server to be available, your program must persist after calling this method. helpers.pause_for_signal() is available to this end. Args: name (str): Pass in a Request name to fuzz only a single request message. Pass in a test case name to fuzz only a single test case. max_depth (int): Maximum combinatorial depth; set to 1 for "simple" fuzzing. Returns: None """ self.total_mutant_index = 0 self.total_num_mutations = self.num_mutations(max_depth=max_depth) if name isNoneor name == "": self._main_fuzz_loop(self._generate_mutations_indefinitely(max_depth=max_depth)) else: self.fuzz_by_name(name=name)
这里还有个选项是 _reuse_target_connection,即重用连接。开启这个选项后,整个大循环中只会在这里 open 一次连接,如果不开这个选项,每次 fuzz 都会重新 open 一次连接。
1 2 3 4 5 6 7 8 9 10 11 12
def_main_fuzz_loop(self, fuzz_case_iterator): """Execute main fuzz logic; takes an iterator of test cases. Preconditions: `self.total_mutant_index` and `self.total_num_mutations` are set properly. Args: fuzz_case_iterator (Iterable): An iterator that walks through fuzz cases and yields MutationContext objec See _iterate_single_node() for details. Returns: None """ #这里是创建线程开启一个boofuzz的可视化web端口 if self.web_port isnotNone: self.server_init()
try: self._start_target(self.targets[0]) if self._reuse_target_connection: self.targets[0].open() self.num_cases_actually_fuzzed = 0 #记录fuzz开始时间 self.start_time = time.time() for mutation_context in fuzz_case_iterator: if self.total_mutant_index < self._index_start: continue # Check restart interval if ( self.num_cases_actually_fuzzed and self.restart_interval and self.num_cases_actually_fuzzed % self.restart_interval == 0 ): self._fuzz_data_logger.open_test_step("restart interval of %d reached" % self.restart_interval) self._restart_target(self.targets[0]) #这里开始fuzz这次 self._fuzz_current_case(mutation_context) #这里是记录实际进行fuzz的次数 self.num_cases_actually_fuzzed += 1 if self._index_end isnotNoneand self.total_mutant_index >= self._index_end: break if self._reuse_target_connection: self.targets[0].close() if self._keep_web_open and self.web_port isnotNone: self.end_time = time.time() print( "\nFuzzing session completed. Keeping webinterface up on localhost:{}".format(self.web_port), "\nPress ENTER to close webinterface", ) input()
def_start_target(self, target): started = False for monitor in target.monitors: if monitor.start_target(): started = True break if started: for monitor in target.monitors: monitor.post_start_target(target=target, fuzz_data_logger=self._fuzz_data_logger, session=self)
def_open_connection_keep_trying(self, target): """Open connection and if it fails, keep retrying. Args: target (Target): Target to open. """ #只有不开_reuse_target_connection的时候才会open ifnot self._reuse_target_connection: out_of_available_sockets_count = 0 unable_to_connect_count = 0 initial_time = time.time() whileTrue: try: #内部就是调用target的open函数,前面已经分析了 target.open() break# break if no exception except exception.BoofuzzTargetConnectionFailedError: if self.restart_threshold and unable_to_connect_count >= self.restart_threshold: self._fuzz_data_logger.log_info( "Unable to reconnect to target: Reached threshold of {0} retries. Ending fuzzing.".format( self.restart_threshold ) ) #自添加代码,实现网络状态的Monitor withopen(self.crash_filename + "_" + str(self.num_cases_actually_fuzzed),"wb") as fp: fp.write((self.current_test_case_name+"\n").encode()) fp.write(self.last_send) pass raise elif self.restart_timeout and time.time() >= initial_time + self.restart_timeout: self._fuzz_data_logger.log_info( "Unable to reconnect to target: Reached restart timeout of {0}s. Ending fuzzing.".format( self.restart_timeout ) ) raise else: self._fuzz_data_logger.log_info(constants.WARN_CONN_FAILED_TERMINAL) self._restart_target(target) unable_to_connect_count += 1 except exception.BoofuzzOutOfAvailableSockets: out_of_available_sockets_count += 1 if out_of_available_sockets_count == 50: raise exception.BoofuzzError("There are no available sockets. Ending fuzzing.") self._fuzz_data_logger.log_info("There are no available sockets. Waiting for another 5 seconds.") time.sleep(5)
_pre_send
依次调用 target 的 monitor 中的回调函数。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
def_pre_send(self, target): """ Execute custom methods to run prior to each fuzz request. The order of events is as follows:: pre_send() - req - callback ... req - callback - post_send() When fuzzing RPC for example, register this method to establish the RPC bind. Args: target (session.target): Target we are sending data to """ for monitor in target.monitors: try: self._fuzz_data_logger.open_test_step("Monitor {}.pre_send()".format(str(monitor))) monitor.pre_send(target=target, fuzz_data_logger=self._fuzz_data_logger, session=self) except Exception: self._fuzz_data_logger.log_error( constants.ERR_CALLBACK_FUNC.format(func_name="{}.pre_send()".format(str(monitor))) + traceback.format_exc() )
def_callback_current_node(self, node, edge, test_case_context): """Execute callback preceding current node. Args: test_case_context (ProtocolSession): Context for test case-scoped data. node (pgraph.node.node (Node), optional): Current Request/Node edge (pgraph.edge.edge (pgraph.edge), optional): Edge along the current fuzz path from "node" to next node. Returns: bytes: Data rendered by current node if any; otherwise None. """ data = None
# if the edge has a callback, process it. the callback has the option to render the node, modify it and return. #调用edge的callback函数,并返回callback数据 if edge.callback: self._fuzz_data_logger.open_test_step("Callback function '{0}'".format(edge.callback.__name__)) data = edge.callback( self.targets[0], self._fuzz_data_logger, session=self, node=node, edge=edge, test_case_context=test_case_context, )
def_check_for_passively_detected_failures(self, target, failure_already_detected=False): """Check for and log passively detected failures. Return True if any found. Args: target (Target): Target to be checked for failures. failure_already_detected (bool): If a failure was already detected. Returns: bool: True if failures were found. False otherwise. """ has_crashed = False iflen(target.monitors) > 0: self._fuzz_data_logger.open_test_step("Contact target monitors") # So, we need to run through the array two times. First, we check # if any of the monitors reported a failure and # if so, we need to # gather a crash synopsis from them. We don't know whether # a monitor can provide a crash synopsis, but in any case, we'll # check. In the second run, we try to get crash synopsis from the # monitors that did not detect a crash as supplemental information. finished_monitors = [] #依次调用monitor的post_send函数 for monitor in target.monitors: ifnot monitor.post_send(target=target, fuzz_data_logger=self._fuzz_data_logger, session=self): has_crashed = True self._fuzz_data_logger.log_fail( "{0} detected crash on test case #{1}: {2}".format( str(monitor), self.total_mutant_index, monitor.get_crash_synopsis() ) ) finished_monitors.append(monitor)
deftransmit_fuzz(self, sock, node, edge, callback_data, mutation_context): """Render and transmit a fuzzed node, process callbacks accordingly. Args: sock (Target, optional): Socket-like object on which to transmit node node (pgraph.node.node (Node), optional): Request/Node to transmit edge (pgraph.edge.edge (pgraph.edge), optional): Edge along the current fuzz path from "node" to next node. callback_data (bytes): Data from previous callback. mutation_context (MutationContext): Current mutation context. """ #这里就可以看到,边的callback是先于数据发送的,如果callback返回了自定义数据,那这里就会直接拿callback返回的数据发送 #如果callback返回空数据,这里就会正常调用变异的数据渲染,然后发送变异数据 if callback_data: data = callback_data else: data = self.fuzz_node.render(mutation_context)
try: # send #这里发送变异数据,同时将发送的数据保存在last_send里面 self.targets[0].send(data) self.last_send = data except exception.BoofuzzTargetConnectionReset: if self._ignore_connection_issues_when_sending_fuzz_data: self._fuzz_data_logger.log_info(constants.ERR_CONN_RESET) else: raise BoofuzzFailure(message=constants.ERR_CONN_RESET) except exception.BoofuzzTargetConnectionAborted as e: msg = constants.ERR_CONN_ABORTED.format(socket_errno=e.socket_errno, socket_errmsg=e.socket_errmsg) if self._ignore_connection_issues_when_sending_fuzz_data: self._fuzz_data_logger.log_info(msg) else: raise BoofuzzFailure(msg) except exception.BoofuzzSSLError as e: if self._ignore_connection_ssl_errors: self._fuzz_data_logger.log_info(str(e)) else: raise BoofuzzFailure(str(e))
received = b"" try: # recv if self._receive_data_after_fuzz: received = self.targets[0].recv() except exception.BoofuzzTargetConnectionReset: if self._check_data_received_each_request: raise BoofuzzFailure(message=constants.ERR_CONN_RESET) else: self._fuzz_data_logger.log_info(constants.ERR_CONN_RESET) except exception.BoofuzzTargetConnectionAborted as e: msg = constants.ERR_CONN_ABORTED.format(socket_errno=e.socket_errno, socket_errmsg=e.socket_errmsg) if self._check_data_received_each_request: raise BoofuzzFailure(msg) else: self._fuzz_data_logger.log_info(msg) pass except exception.BoofuzzSSLError as e: if self._ignore_connection_ssl_errors: self._fuzz_data_logger.log_info(str(e)) else: self._fuzz_data_logger.log_fail(str(e)) raise BoofuzzFailure(str(e)) #这里会将这次接受到的数据保存在last_recv里面 self.last_recv = received
defpost_send(self): """ This routine is called after the fuzzer transmits a test case and returns the status of the target. Returns: bool: True if the target is still active, False otherwise. """ if self.is_alive(): returnTrue else: withopen(self.process_monitor.crash_filename, "a") as rec_file: rec_file.write(self.process_monitor.last_synopsis)
if self.process_monitor.coredump_dir isnotNone: dest = os.path.join(self.process_monitor.coredump_dir, str(self.process_monitor.test_number)) src = _get_coredump_path()
def_generate_mutations_indefinitely(self, max_depth=None, path=None): """Yield MutationContext with n mutations per message over all messages, with n increasing indefinitely.""" # indefinitely 无限期的 depth = 1 while max_depth isNoneor depth <= max_depth: valid_case_found_at_this_depth = False for m in self._generate_n_mutations(depth=depth, path=path): valid_case_found_at_this_depth = True yield m ifnot valid_case_found_at_this_depth: break depth += 1
_generate_n_mutations
这里会先得到 path 再从 path 里得到要 fuzz 的 node。
1 2 3 4 5 6 7 8 9 10
def_generate_n_mutations(self, depth, path): """Yield MutationContext with n mutations per message over all messages.""" #调试此处的yield # for path in self._iterate_protocol_message_paths(path=path): # print(self._message_path_to_str(path)) #先得到要fuzz的path,再从中获取要fuzz的node for path in self._iterate_protocol_message_paths(path=path): #_generate_n_mutations_for_path这个函数会根据上面取得的path构建MutationContext for m in self._generate_n_mutations_for_path(path, depth=depth): yield m
def_iterate_protocol_message_paths(self, path=None): """ Iterates over protocol and yields a path (list of Connection) leading to a given message). #如果指定了path的集合,就返回这个指定的边的集合,否则就遍历整个协议中所有的边的可能性 Args: path (list of Connection): Provide a specific path to yield only that specific path. Yields: list of Connection: List of edges along the path to the current one being fuzzed. Raises: exception.SulleyRuntimeError: If no requests defined or no targets specified """ # we can't fuzz if we don't have at least one target and one request. ifnot self.targets: raise exception.SullyRuntimeError("No targets specified in session")
ifnot self.edges_from(self.root.id): raise exception.SullyRuntimeError("No requests specified in session")
if path isnotNone: yield path else: for x in self._iterate_protocol_message_paths_recursive(this_node=self.root, path=[]): yield x
def_iterate_protocol_message_paths_recursive(self, this_node, path): """Recursive helper for _iterate_protocol. #迭代的去取该协议中的msg的path #这里应该是会返回所有路径 Args: this_node (node.Node): Current node that is being fuzzed. path (list of Connection): List of edges along the path to the current one being fuzzed. Yields: list of Connection: List of edges along the path to the current one being fuzzed. """ # step through every edge from the current node. for edge in self.edges_from(this_node.id): # keep track of the path as we fuzz through it, don't count the root node. # we keep track of edges as opposed to nodes because if there is more then one path through a set of # given nodes we don't want any ambiguity. path.append(edge)
# recursively fuzz the remainder of the nodes in the session graph. for x in self._iterate_protocol_message_paths_recursive(self.fuzz_node, path): yield x
# finished with the last node on the path, pop it off the path stack. if path: path.pop()
def_generate_n_mutations_for_path(self, path, depth): """Yield MutationContext with n mutations for a specific message. Args: path (list of Connection): Nodes (Requests) along the path to the current one being fuzzed. depth (int): Yield sets of depth mutations. Yields: MutationContext: A MutationContext containing one mutation. """ for mutations in self._generate_n_mutations_for_path_recursive(path, depth=depth): ifnot self._mutations_contain_duplicate(mutations): self.total_mutant_index += 1 yield MutationContext(message_path=path, mutations={n.qualified_name: n for n in mutations})
def_generate_n_mutations_for_path_recursive(self, path, depth, skip_elements=None): if skip_elements isNone: skip_elements = set()
if depth == 0: yield [] return new_skip = skip_elements.copy() # 调试yield # for mutations in self._generate_mutations_for_request(path=path, skip_elements=skip_elements): # print(mutations) for mutations in self._generate_mutations_for_request(path=path, skip_elements=skip_elements): new_skip.update(m.qualified_name for m in mutations) for ms in self._generate_n_mutations_for_path_recursive(path, depth=depth - 1, skip_elements=new_skip): yield mutations + ms
def_generate_mutations_for_request(self, path, skip_elements=None): """Yield each mutation for a specific message (the last message in path). Args: path (list of Connection): Nodes (Requests) along the path to the current one being fuzzed. path (iter of str): Qualified names of elements to skip while fuzzing. Yields: Mutation: Mutation object describing a single mutation. """ if skip_elements isNone: skip_elements = [] #这里设置fuzz_node为当前fuzz路径的dst self.fuzz_node = self.nodes[path[-1].dst] self.mutant_index = 0 #调试yield #value_list = list(self.fuzz_node.get_mutations(skip_elements=skip_elements)) #这里会对node里的item枚举产生mutation for mutations in self.fuzz_node.get_mutations(skip_elements=skip_elements): #记录整体的已经发生的变异次数 self.mutant_index += 1 yield mutations
defmutations(self, default_value, skip_elements=None): if skip_elements isNone: skip_elements = [] #遍历stack中的item for item in self.stack: if item.qualified_name in skip_elements: continue self.request.mutant = item for mutation in item.get_mutations(): yield mutation
fuzzable.get_mutations
这个函数就是对当前 item 进行变异,并将变异的值传到生成的 Mutation 里面。
Mutation 的构造这里就能看到,是由一个值 value,一个所属 item的qualified_name,以及变异计数 index 组成的。
defget_mutations(self): """Iterate mutations. Used by boofuzz framework. Yields: list of Mutation: Mutations """ try: ifnot self.fuzzable: return index = 0 for value in itertools.chain(self.mutations(self.original_value()), self._fuzz_values): if self._halt_mutations: self._halt_mutations = False return ifisinstance(value, list): yield value elifisinstance(value, Mutation): yield [value] else: yield [Mutation(value=value, qualified_name=self.qualified_name, index=index)] index += 1 finally: self._halt_mutations = False# in case stop_mutations is called when mutations were exhausted anyway
stop_mutations
1 2 3 4 5 6 7 8 9
defstop_mutations(self): """Stop yielding mutations on the currently running :py:meth:`mutations` call. Used by boofuzz to stop fuzzing an element when it's already caused several failures. Returns: NoneType: None """ self._halt_mutations = True
defbinary_string_to_int(binary): """ Convert a binary string to a decimal number. @type binary: str @param binary: Binary string @rtype: int @return: Converted bit string """
returnint(binary, 2)
defmutations(self, default_value): for val in self._iterate_fuzz_lib(): yield val def_iterate_fuzz_lib(self): if self.full_range: for i inrange(0, self.max_num): yield i else: # try only "smart" values. interesting_boundaries = [ 0, self.max_num // 2, self.max_num // 3, self.max_num // 4, self.max_num // 8, self.max_num // 16, self.max_num // 32, self.max_num, ] for boundary in interesting_boundaries: for v in self._yield_integer_boundaries(boundary): yield v # TODO Add a way to inject a list of fuzz values # elif isinstance(default_value, (list, tuple)): # for val in iter(default_value): # yield val
# TODO: Add injectable arbitrary bit fields def_yield_integer_boundaries(self, integer): """ Add the supplied integer and border cases to the integer fuzz heuristics library. @type integer: int @param integer: int to append to fuzz heuristics """ for i inrange(-10, 10): case = integer + i if0 <= case < self.max_num: # some day: if case not in self._user_provided_values yield case
#session._main_fuzz_loop() for mutation_context in fuzz_case_iterator: if self.total_mutant_index < self._index_start: continue
#session._generate_n_mutations_for_path() self.total_mutant_index += 1 yield MutationContext(message_path=path, mutations={n.qualified_name: n for n in mutations})
#Fuzzable._init() if self._name isNone: Fuzzable.name_counter += 1 self._name = "{0}{1}".format(type(self).__name__, Fuzzable.name_counter) #Request.push() item.context_path = self._generate_context_path(self.block_stack) def_generate_context_path(self, block_stack): context_path = ".".join(x.name for x in block_stack) # TODO put in method context_path = ".".join(filter(None, (self.name, context_path))) return context_path #Fuzzable.qualified_name @property defqualified_name(self): """Dot-delimited name that describes the request name and the path to the element within the request. Example: "request1.block1.block2.node1" """ return".".join(s for s in (self._context_path, self.name) if s != "")
deftransmit_normal(self, sock, node, edge, callback_data, mutation_context): """Render and transmit a non-fuzzed node, process callbacks accordingly. Args: sock (Target, optional): Socket-like object on which to transmit node node (pgraph.node.node (Node), optional): Request/Node to transmit edge (pgraph.edge.edge (pgraph.edge), optional): Edge along the current fuzz path from "node" to next node. callback_data (bytes): Data from previous callback. mutation_context (MutationContext): active mutation context """ if callback_data: data = callback_data else: data = node.render(mutation_context=mutation_context)
try: # send self.targets[0].send(data) self.last_send = data if self._receive_data_after_each_request: self.last_recv = self.targets[0].recv()
request.render
这个函数流程前面也没分析。
1 2 3 4 5
defrender(self, mutation_context=None): if self.block_stack: raise exception.SullyRuntimeError("UNCLOSED BLOCK: %s" % self.block_stack[-1].qualified_name)
defget_child_data(self, mutation_context): """Get child or referenced data for this node. For blocks that reference other data from the message structure (e.g. size, checksum, blocks). See FuzzableBlock for an example. Args: mutation_context (MutationContext): Mutation context. Returns: bytes: Child data. """ rendered = b"" for item in self.stack: rendered += item.render(mutation_context=mutation_context) return rendered
Fuzzable.render
调用 get_value 获取值。
1 2 3 4 5 6 7 8
defrender(self, mutation_context=None): """Render after applying mutation, if applicable. :type mutation_context: MutationContext """ return self.encode(value=self.get_value(mutation_context=mutation_context),mutation_context=mutation_context)
defencode(self, value, mutation_context): return value
defget_value(self, mutation_context=None): """Helper method to get the currently applicable value. This is either the default value, or the active mutation value as dictated by mutation_context. Args: mutation_context (MutationContext): Returns: """ if mutation_context isNone: mutation_context = MutationContext() if self.qualified_name in mutation_context.mutations: mutation = mutation_context.mutations[self.qualified_name] ifcallable(mutation.value): value = mutation.value(self.original_value(test_case_context=mutation_context.protocol_session)) else: value = mutation.value else: value = self.original_value(test_case_context=mutation_context.protocol_session)
defadd_target(self, target): """ Add a target to the session. Multiple targets can be added for parallel fuzzing. Args: target (Target): Target to add to session """
# pass specified target parameters to the PED-RPC server. target.monitors_alive() target.set_fuzz_data_logger(fuzz_data_logger=self._fuzz_data_logger)
if self._callback_monitor notin target.monitors: target.monitors.append(self._callback_monitor)
# add target to internal list. self.targets.append(target)
defpre_send(self, target=None, fuzz_data_logger=None, session=None): """This method iterates over all supplied pre send callbacks and executes them. Their return values are discarded, exceptions are catched and logged, but otherwise discarded. """ try: for f in self.on_pre_send: fuzz_data_logger.open_test_step('Pre_Send callback: "{0}"'.format(f.__name__)) f(target=target, fuzz_data_logger=fuzz_data_logger, session=session, sock=target) except Exception: fuzz_data_logger.log_error( constants.ERR_CALLBACK_FUNC.format(func_name="pre_send") + traceback.format_exc() )