1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201
| from nfstream import NFStreamer, NFPlugin import binascii
''' 扩展nfstream框架,只需要继承NFPlugin 其提供了on_init(初始化流),on_update(更新流),on_expire(销毁流)的三大方法 本程序将所有流和流包含的数据包全部继承于字典中,方便下一步实验 我们使用flow._C作为流唯一标识 结构:im_every_flow={_C:id, _C:id} im_flow_data={1:{包,包,...,流}, 2:{包,包,...,流},} 返回 {'flow_id': 1, 'packet_num': 2, 'all_data':[{包}, {包}, {流}] }
'''
im_every_flow = {} im_flow_id = -1 im_flow_data = {}
class FlowSlicer(NFPlugin):
def decodeLoad(self, data): """ 二进制转换成字符串(pip install adafruit-circuitpython-binascii) # b'E\x00\x004\xd5\x17@\x004\x06\xe38\x170\xc9\x08\xac\x13\x02(\x00P\x # 45 00 00 34 d5 17 40 00 34 06 e3 38 17 30 c9 08 ac 13 02 28 00 50 :param data:二进制data :return: 字符串data """ str = binascii.b2a_hex(data).decode() if str == '00': return None newLoad = '' i = 0 for j in range(0, len(str), 2): newLoad += str[j:j + 2] + ' ' newLoad = newLoad[:-1] newLoad += '\n' return newLoad
def on_init(self, packet, flow): ''' 提取初始数据包的数据 :param packet: 数据包 :param flow: 流 ''' global im_flow_id global im_flow_data global im_every_flow
im_flow_id += 1 im_every_flow.setdefault(str(flow._C), im_flow_id) tmp_ip_packet = self.decodeLoad(packet.ip_packet) tmp_dict = { 'type': 'packet1', 'src_ip': packet.src_ip, 'src_port': packet.src_port, 'dst_ip': packet.dst_ip, 'dst_port': packet.dst_port, 'protocol': packet.protocol, 'ip_packet': tmp_ip_packet, 'ip_packet_binary': packet.ip_packet, 'syn': packet.syn, 'cwr': packet.cwr, 'ece': packet.ece, 'urg': packet.urg, 'ack': packet.ack, 'psh': packet.psh, 'rst': packet.rst, 'fin': packet.fin, 'ip_version': packet.ip_version, 'vlan_id': packet.vlan_id, 'time': packet.time, 'delta_time': packet.delta_time, 'direction': packet.direction, 'raw_size': packet.raw_size, 'ip_size': packet.ip_size, 'transport_size': packet.transport_size, 'payload_size': packet.payload_size } tmp_list = [] tmp_list.append(tmp_dict) im_flow_data.setdefault(im_flow_id, tmp_list)
def on_update(self, packet, flow): ''' 将数据包的数据追加到流 :param packet: 数据包 :param flow: 流 ''' global im_flow_id global im_flow_data global im_every_flow try: tmp_flow_C = str(flow._C) tmp_list_id = int(im_every_flow.get(tmp_flow_C)) tmp_ip_packet = self.decodeLoad(packet.ip_packet) tmp_dict = { 'type': 'packet' + str(len(im_flow_data[tmp_list_id]) + 1), 'src_ip': packet.src_ip, 'src_port': packet.src_port, 'dst_ip': packet.dst_ip, 'dst_port': packet.dst_port, 'protocol': packet.protocol, 'ip_packet': tmp_ip_packet, 'ip_packet_binary': packet.ip_packet, 'syn': packet.syn, 'cwr': packet.cwr, 'ece': packet.ece, 'urg': packet.urg, 'ack': packet.ack, 'psh': packet.psh, 'rst': packet.rst, 'fin': packet.fin, 'ip_version': packet.ip_version, 'vlan_id': packet.vlan_id, 'time': packet.time, 'delta_time': packet.delta_time, 'direction': packet.direction, 'raw_size': packet.raw_size, 'ip_size': packet.ip_size, 'transport_size': packet.transport_size, 'payload_size': packet.payload_size } im_flow_data[tmp_list_id].append(tmp_dict) except Exception as e: print("error:", e)
def on_expire(self, flow): ''' 将合成的流追加到字典中,形成最终数据 :param flow: 流 ''' global im_flow_id global im_flow_data global im_every_flow
try: tmp_flow_C = str(flow._C) tmp_list_id = int(im_every_flow.get(str(tmp_flow_C))) tmp_flow_dict = { 'type': 'flow'+str(tmp_list_id) } tmp_flow_dict.update(dict(zip(flow.keys(), flow.values())))
im_flow_data[tmp_list_id].append(tmp_flow_dict) tmp_all_data = im_flow_data[tmp_list_id] tmp_flow_all = { 'flow_id': tmp_list_id, 'packet_num': len(tmp_all_data)-1, 'all_data': tmp_all_data } print(tmp_flow_all) im_every_flow.pop(tmp_flow_C) im_flow_data.pop(tmp_list_id) print("fin_rest_num:", len(im_every_flow), len(im_flow_data)) with open('data.txt', 'a+') as f: f.write(str(tmp_flow_all)) f.write('\n')
except Exception as e: print("error:", e)
my_streamer = NFStreamer(source="ens33", decode_tunnels=True, bpf_filter=None, promiscuous_mode=True, snapshot_length=1536, idle_timeout=15, active_timeout=1800, accounting_mode=0, udps=FlowSlicer(), n_dissections=20, statistical_analysis=True, splt_analysis=28, n_meters=0, performance_report=0)
for flow in my_streamer: print("ok")
|