项目地址

nfstream是一个根据数据包聚合流的项目。流的聚合通过一组共享特征(流密钥,例如,源IP地址、目的IP地址、传输协议、源端口、目的端口、VLAN标识符)将数据包聚合为流组成。流缓存维护每个流条目直到其终止(例如活动超时、非活动超时)。当条目出现在流缓存中时,基本计数器和几个指标将被更新。如果两个对在两个方向上生成流,则流缓存使用双向流定义,为两个方向添加计数器和度量。

1.总体框架:

  • NFStreamer: 驱动进程,它负责设置整个工作流,主要是并行计量进程的编排。
  • Meters: 是NFStream框架的核心部分。原始数据包被处理(例如,时间戳、解码、截断)并按特征分发。每个特征能够将包信息聚合到流中并计算所需的特性,直到触发流过期(主动超时、非主动超时)。

框架图

2.源码结构:

工作流程

  • context_cc.c 文件为胶水层,将ndpi的api进行封装

  • context.py 文件调用了.c文件

将数据包聚合到流,并提取特征,然后将数据包和流销毁。

3.提取数据包和流属性

3.1 总字典数据结构

序号 字典字段 类型 解释
1 flow_id int 流的数量标识
2 packet_num int 流中包含的数据包个数
3 all_data list 包含数据包、流,独自封装成字典,最后一个字典为流

3.2 数据包结构及特征

序号 数据包字段 类型 解释
1 type str 数据包及id
2 src_ip str 源IP地址
3 src_port int 传输层源端口
4 dst_ip str 目的IP地址
5 dst_port int 传输层目的端口
6 protocol int 协议标识字段(见下文)
7 ip_packet str 转换为十六进制的数据包内容
8 ip_packet_binary bytes IP头开始的原始内容
9 syn bool TCP SYN标志
10 cwr bool TCP CWR标志
11 ece bool TCP ECE标志
12 urg bool TCP URG标志
13 ack bool TCP ACK标志
14 psh bool TCP PSH标志
15 rst bool TCP RST标志
16 fin bool TCP FIN标志
17 ip_version int IP版本
18 vlan_id int 虚拟局域网标识符
19 time int 数据包时间戳(毫秒)
20 delta_time int 与上一个数据包的增量时间(毫秒)
21 direction int 数据包方向:src_to_dst为0,dst_to_src为1
22 raw_size int 链路层数据包大小
23 ip_size int IP包大小
24 transport_size int 传输层包大小
25 payload_size int 数据包有效负载大小

3.3 流结构及特征

序号 流字段 类型 解释
NFLOW核心功能
1 type str 流及id
2 id int 流标识符
3 expiration_id int 流量到期触发器的标识符。对于idle_timeout可以为0,对于active_timeout可以为1,对于自定义到期可以为-1。
4 src_ip str 源IP地址
5 src_ip_is_private bool 源IP地址类型(如果为私有,则为1,否则为0)
6 src_port int 传输层源端口
7 dst_ip str 目的IP地址
8 dst_ip_is_private bool 目的IP地址类型(如果为私有,则为1,否则为0)
9 dst_port int 传输层目的端口
10 protocol int 协议标识字段(见下文)
11 ip_version int IP版本
12 vlan_id int 虚拟局域网标识符
13 bidirectional_first_seen_ms int 第一个流双向数据包上的时间戳(毫秒)
14 bidirectional_last_seen_ms int 最后一个流双向数据包的时间戳(毫秒)
15 bidirectional_duration_ms int 流双向持续时间(毫秒)
16 bidirectional_packets int 流双向数据包累加器
17 bidirectional_bytes int 流双向字节累加器(取决于accounting_mode)
18 src2dst_first_seen_ms int 第一个流src2dst数据包上的时间戳(毫秒)
19 src2dst_last_seen_ms int 最后一个流src2dst数据包的时间戳(毫秒)
20 src2dst_duration_ms int 流src2dst持续时间(毫秒)
21 src2dst_packets int 流src2dst数据包累加器
22 src2dst_bytes int 流src2dst字节累加器
23 dst2src_first_seen_ms int 第一个流dst2src数据包上的时间戳(毫秒)
24 dst2src_last_seen_ms int 最后一个流dst2src数据包的时间戳(毫秒)
25 dst2src_duration_ms int 流dst2src的持续时间(毫秒)
26 dst2src_packets int 流dst2src数据包累加器
27 dst2src_bytes int 流dst2src字节累加器(取决于accounting_mode)
事后统计特征 (STATISTICAL_ANALYSIS = TRUE)
28 bidirectional_min_ps int 流双向最小数据包大小(取决于accounting_mode)
29 bidirectional_mean_ps float 流双向平均数据包大小(取决于accounting_mode)
30 bidirectional_stddev_ps float 流双向数据包大小样本标准差(取决于accounting_mode)
31 bidirectional_max_ps int 流双向最大包大小(取决于accounting_mode)
32 src2dst_min_ps int 流src2dst的最小数据包大小(取决于accounting_mode)
33 src2dst_mean_ps float 流src2dst的平均数据包大小(取决于accounting_mode)
34 src2dst_stddev_ps float 流src2dst数据包大小样本标准差(取决于accounting_mode)
35 src2dst_max_ps int 流src2dst的最大数据包大小(取决于accounting_mode)
36 dst2src_min_ps int 流dst2src的最小数据包大小(取决于accounting_mode)
37 dst2src_mean_ps float 流dst2src的平均数据包大小(取决于accounting_mode)
38 dst2src_stddev_ps float 流dst2src数据包大小样本标准差(取决于accounting_mode)
39 dst2src_max_ps int 流dst2src的最大数据包大小(取决于accounting_mode)
40 bidirectional_min_piat_ms int 流双向最小数据包到达时间(毫秒)
41 bidirectional_mean_piat_ms float 流双向平均数据包到达时间(毫秒)
42 bidirectional_stddev_piat_ms float 流双向报文到达时间采样标准偏差(毫秒)
43 bidirectional_max_piat_ms int 流双向最大数据包到达时间(毫秒)
44 src2dst_min_piat_ms int 流src2dst最小数据包到达时间
45 src2dst_mean_piat_ms float 流src2dst平均数据包到达时间
46 src2dst_stddev_piat_ms float 流src2dst数据包到达时间间隔采样标准偏差
47 src2dst_max_piat_ms int 流src2dst最大数据包到达时间
48 dst2src_min_piat_ms int 流dst2src最小数据包的到达时间
49 dst2src_mean_piat_ms float 流dst2src平均数据包的到达时间
50 dst2src_stddev_piat_ms float 流dst2src数据包到达时间间隔采样标准偏差
51 dst2src_max_piat_ms int 流dst2src最大数据包到达时间
52 bidirectional_syn_packets int 流双向SYN数据包累加器
53 bidirectional_cwr_packets int 流双向CWR数据包累加器
54 bidirectional_ece_packets int 流双向ECE数据包累加器
55 bidirectional_urg_packets int 流双向URG数据包累加器
56 bidirectional_ack_packets int 流双向ACK数据包累加器
57 bidirectional_psh_packets int 流双向PSH数据包累加器
58 bidirectional_rst_packets int 流双向RST数据包累加器
59 bidirectional_fin_packets int 流双向FIN数据包累加器
60 src2dst_syn_packets int 流src2dst SYN数据包累加器
61 src2dst_cwr_packets int 流src2dst CWR数据包累加器
62 src2dst_ece_packets int 流src2dst ECE数据包累加器
63 src2dst_urg_packets int 流src2dst URG数据包累加器
64 src2dst_ack_packets int 流src2dst ACK数据包累加器
65 src2dst_psh_packets int 流src2dst PSH数据包累加器
66 src2dst_rst_packets int 流src2dst RST数据包累加器
67 src2dst_fin_packets int 流src2dst FIN数据包累加器
68 dst2src_syn_packets int 流dst2src SYN数据包累加器
69 dst2src_cwr_packets int 流dst2src CWR数据包累加器
70 dst2src_ece_packets int 流dst2src ECE数据包累加器
71 dst2src_urg_packets int 流dst2src URG数据包累加器
72 dst2src_ack_packets int 流dst2src ACK数据包累加器
73 dst2src_psh_packets int 流dst2src PSH数据包累加器
74 dst2src_rst_packets int 流dst2src RST数据包累加器
75 dst2src_fin_packets int 流dst2src FIN数据包累加器
早期统计特征 (SPLT_ANALYSIS> 0)
76 splt_direction list N(splt_analysis = N)个第一数据包方向的列表(0:src2dst, 1:dst2src, -1:无数据包)
77 splt_ps list N(splt_analysis = N)个第一数据包大小的列表(取决于accounting_mode, 当没有包时为-1)
78 splt_piat_ms list N(splt_analysis = N)个第一数据包到达时间的列表(对于第一个数据包始终为0, 在没有数据包时始终为-1)
NFLOW第7层可见性功能 (N_DISSECTIONS> 0)
79 application_name str nDPI检测到应用程序名称
80 application_category_name str nDPI检测到应用程序类别名称
81 application_is_guessed int 指示检测结果是基于纯解剖还是基于端口的猜测
82 requested_server_name str 请求的服务器名称(SSL/TLS, DNS, HTTP)
83 client_fingerprint str 客户指纹(DHCP为DHCP指纹, JA3为SSL/TLS, HASSH为SSH)
84 server_fingerprint str 服务器指纹(JA3为SSL/TLS, HASSH为SSH)
85 user_agent str 提取用于HTTP的用户代理或用于QUIC的用户代理标识符
86 content_type str 提取的HTTP内容类型
87 _C class <class ‘_cffi_backend._CDataBase’> <cdata ‘nf_flow_t *’ 0x2956d20>唯一标识流的对象

3.4 协议标识符字段

协议名 协议标识号
ICMP 1
IP 4
TCP 6
IGRP 9
UDP 17
IPV6 41
GREE 47
EIGRP 88
OSPF 89
PIM 103
VRRP 112
L2TP 115
ISIS 124

4.参数说明及插件扩展

4.1 自定义扩展样例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
from nfstream import NFStreamer, NFPlugin
import binascii

'''
扩展nfstream框架,只需要继承NFPlugin
其提供了on_init(初始化流),on_update(更新流),on_expire(销毁流)的三大方法
本程序将所有流和流包含的数据包全部继承于字典中,方便下一步实验
我们使用flow._C作为流唯一标识
结构:im_every_flow={_C:id, _C:id}
im_flow_data={1:{包,包,...,流}, 2:{包,包,...,流},}
返回 {'flow_id': 1, 'packet_num': 2, 'all_data':[{包}, {包}, {流}] }

'''

im_every_flow = {} # 存储 _C:id 对应关系
im_flow_id = -1 # 数量统计
im_flow_data = {} # 存储所有包和流


class FlowSlicer(NFPlugin): # 扩展插件继承NFPlugin

def decodeLoad(self, data):
"""
二进制转换成字符串(pip install adafruit-circuitpython-binascii)
# b'E\x00\x004\xd5\x17@\x004\x06\xe38\x170\xc9\x08\xac\x13\x02(\x00P\x
# 45 00 00 34 d5 17 40 00 34 06 e3 38 17 30 c9 08 ac 13 02 28 00 50
:param data:二进制data
:return: 字符串data
"""
str = binascii.b2a_hex(data).decode()
if str == '00':
return None
newLoad = ''
i = 0
for j in range(0, len(str), 2):
newLoad += str[j:j + 2] + ' '
newLoad = newLoad[:-1]
newLoad += '\n'
return newLoad

def on_init(self, packet, flow):
'''
提取初始数据包的数据
:param packet: 数据包
:param flow: 流
'''
global im_flow_id
global im_flow_data
global im_every_flow

im_flow_id += 1
im_every_flow.setdefault(str(flow._C), im_flow_id)
tmp_ip_packet = self.decodeLoad(packet.ip_packet)
# 提取对应字段
tmp_dict = {
'type': 'packet1',
'src_ip': packet.src_ip,
'src_port': packet.src_port,
'dst_ip': packet.dst_ip,
'dst_port': packet.dst_port,
'protocol': packet.protocol,
'ip_packet': tmp_ip_packet,
'ip_packet_binary': packet.ip_packet,
'syn': packet.syn,
'cwr': packet.cwr,
'ece': packet.ece,
'urg': packet.urg,
'ack': packet.ack,
'psh': packet.psh,
'rst': packet.rst,
'fin': packet.fin,
'ip_version': packet.ip_version,
'vlan_id': packet.vlan_id,
'time': packet.time,
'delta_time': packet.delta_time,
'direction': packet.direction,
'raw_size': packet.raw_size,
'ip_size': packet.ip_size,
'transport_size': packet.transport_size,
'payload_size': packet.payload_size
}
tmp_list = []
tmp_list.append(tmp_dict)
im_flow_data.setdefault(im_flow_id, tmp_list)
# print(im_flow_data)


def on_update(self, packet, flow):
'''
将数据包的数据追加到流
:param packet: 数据包
:param flow: 流
'''
global im_flow_id
global im_flow_data
global im_every_flow

try:
tmp_flow_C = str(flow._C)
tmp_list_id = int(im_every_flow.get(tmp_flow_C))
tmp_ip_packet = self.decodeLoad(packet.ip_packet)
# 提取对应字段
tmp_dict = {
'type': 'packet' + str(len(im_flow_data[tmp_list_id]) + 1),
'src_ip': packet.src_ip,
'src_port': packet.src_port,
'dst_ip': packet.dst_ip,
'dst_port': packet.dst_port,
'protocol': packet.protocol,
'ip_packet': tmp_ip_packet,
'ip_packet_binary': packet.ip_packet,
'syn': packet.syn,
'cwr': packet.cwr,
'ece': packet.ece,
'urg': packet.urg,
'ack': packet.ack,
'psh': packet.psh,
'rst': packet.rst,
'fin': packet.fin,
'ip_version': packet.ip_version,
'vlan_id': packet.vlan_id,
'time': packet.time,
'delta_time': packet.delta_time,
'direction': packet.direction,
'raw_size': packet.raw_size,
'ip_size': packet.ip_size,
'transport_size': packet.transport_size,
'payload_size': packet.payload_size
}
im_flow_data[tmp_list_id].append(tmp_dict)
except Exception as e:
print("error:", e)

def on_expire(self, flow):
'''
将合成的流追加到字典中,形成最终数据
:param flow: 流
'''
global im_flow_id
global im_flow_data
global im_every_flow

try:
# print(im_every_flow)
tmp_flow_C = str(flow._C)
tmp_list_id = int(im_every_flow.get(str(tmp_flow_C)))
# 提取流中字段
tmp_flow_dict = {
'type': 'flow'+str(tmp_list_id)
}
tmp_flow_dict.update(dict(zip(flow.keys(), flow.values())))

im_flow_data[tmp_list_id].append(tmp_flow_dict)
tmp_all_data = im_flow_data[tmp_list_id]
# 合成新的字典,存入.txt文件的一行
tmp_flow_all = {
'flow_id': tmp_list_id,
'packet_num': len(tmp_all_data)-1,
'all_data': tmp_all_data
}
print(tmp_flow_all)
im_every_flow.pop(tmp_flow_C)
im_flow_data.pop(tmp_list_id)
print("fin_rest_num:", len(im_every_flow), len(im_flow_data))
# print("expire:", flow)
# 存入文件
with open('data.txt', 'a+') as f:
f.write(str(tmp_flow_all))
f.write('\n')

except Exception as e:
print("error:", e)


# 调用框架,添加对应参数
my_streamer = NFStreamer(source="ens33",
decode_tunnels=True,
bpf_filter=None,
promiscuous_mode=True,
snapshot_length=1536,
idle_timeout=15,
active_timeout=1800,
accounting_mode=0,
udps=FlowSlicer(),
n_dissections=20,
statistical_analysis=True,
splt_analysis=28,
n_meters=0,
performance_report=0)

for flow in my_streamer:
# print(flow)
print("ok")

# 从'data.txt'文件中,一行一行读取数据,使用eval()转换成字典,使用数据
# with open('data.txt', 'r+') as f:
# for each_line in f:
# change_dict = eval(each_line)
# print(type(change_dict))
# print(change_dict['all_data'][-1])

4.2 NFStreamer()方法参数

属性 默认值 解释
source [default=None] 数据包捕获源:pcap文件路径或网络接口名称。
decode_tunnels [default=True] 启用/禁用 GTP/TZSP 隧道解码。
bpf_filter [default=None] 指定一个BPF筛选器以筛选定的流量。
promiscuous_mode [default=True] 启用/禁用混杂捕获模式。
snapshot_length [default=1536] 控制数据包切片大小(截断),以字节为单位。
idle_timeout [default=15] 闲置(未接收到数据包)的流超过此值(以秒为单位)的流将过期。
active_timeout [default=1800] 活动时间超过此值(以秒为单位)的流将过期。
accounting_mode [default=0] 指定将用于报告字节相关功能的计费模式(0:链路层, 1:IP层, 2:传输层, 3:有效负载)。
udps [default=None] 指定用于扩展NFStreamer的用户定义的NFPlugins。
n_dissections [default=20] L7可见性功能要分析的每个流数据包的数量。设置为0时,将禁用L7可见性功能。
statistical_analysis [default=False] 启用/禁用事后流量统计分析。
splt_analysis [default=0] 指定第一个数据包长度的顺序,以便进行早期统计分析。设置为0时,禁用splt_analysis。
n_meters [default=0] 指定并行计量过程的数量。设置为0时,NFStreamer将根据正在运行的主机上的可用物理内核自动缩放计数。
performance_report [default=0] 效果报告间隔(以秒为单位)。设置为0时禁用。忽略脱机捕获。

原文链接

4.3 机器学习模型:培训和部署

1
2
3
4
5
6
7
8
from nfstream import NFPlugin, NFStreamer
from sklearn.ensemble import RandomForestClassifier

df = NFStreamer(source="training_traffic.pcap").to_pandas()
X = df[["bidirectional_packets", "bidirectional_bytes"]]
y = df["application_category_name"].apply(lambda x: 1 if 'SocialNetwork' in x else 0)
model = RandomForestClassifier()
model.fit(X, y)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
import numpy

class ModelPrediction(NFPlugin):
def on_init(self, packet, flow):
flow.udps.model_prediction = 0
def on_expire(self, flow):
# 您可以在on_update入口点中执行相同的操作,并使用自定义id强制过期。
to_predict = numpy.array([flow.bidirectional_packets,
flow.bidirectional_bytes]).reshape((1,-1))
flow.udps.model_prediction = self.my_model.predict(to_predict)

ml_streamer = NFStreamer(source="eth0", udps=ModelPrediction(my_model=model))
for flow in ml_streamer:
print(flow.udps.model_prediction)

感谢观看,如有不足或错误,恳请大佬指正 (^_^)#