前言

在UNSW-NB15数据集上进行基准测试实验,看下效果如何,以便后续进行调整

测试内容(未使用Gsmote平衡训练集)

  1. 对预处理后的数据集使用CNN进行二分类
  2. 对预处理后的数据集使用DNN进行二分类
  3. 对比二者的分类效果

数据预处理

预处理内容

  • protoservicestate三个全是字符型值的列进行one-hot编码
  • 将训练集和测试集在同一标准下进行归一化
  • 删除attack_cat

预处理代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
def load_data():
# Default values.
train_set = r"Training and Testing Sets/UNSW_NB15_training-set.csv"
test_set = r"Training and Testing Sets/UNSW_NB15_testing-set.csv"
train = pd.read_csv(train_set, index_col='id') # 指定“id”这一列数据作为行索引
test = pd.read_csv(test_set, index_col='id') # 指定“id”这一列数据作为行索引

# 二分类数据
training_label = train['label'].values # 将train的“label”这一列的值单独取出来
testing_label = test['label'].values # 将test的“label”这一列的值单独取出来
temp_train = training_label
temp_test = testing_label

# Creates new dummy columns from each unique string in a particular feature 创建新的虚拟列
unsw = pd.concat([train, test]) # 将train和test拼接在一起
unsw = pd.get_dummies(data=unsw, columns=['proto', 'service', 'state'])
# 将'proto', 'service', 'state'这三列使用one - hot - encoder转变
# Normalising all numerical features:
unsw.drop(['label', 'attack_cat'], axis=1,
inplace=True) # 删除'label', 'attack_cat'这两列,其中(inplace=True)是直接对原dataFrame进行操作

unsw_value = unsw.values

scaler = MinMaxScaler(feature_range=(0, 1)) # 初始化MinMaxScaler
unsw_value = scaler.fit_transform(unsw_value) # 将待处理数据矩阵进行归一化
train_set = unsw_value[:len(train), :] # 分离出train集
test_set = unsw_value[len(train):, :] # 分离出test集

return train_set, temp_train, test_set, temp_test

实验结果

CNN二分类

阈值为0.5时的实验结果如下

阈值为0.5,learning_rate=0.001,epochs=50,batch_size=128

效果不好,有很多正常样本误报为攻击样本,有11763个正常样本被判断为了攻击样本,假阳性很高;

CNN二分类(一)

第二次测试效果比第一次好很多,很奇怪

CNN二分类(二)

第三次又恢复原样了,炼丹太玄学了

CNN二分类(三)

第四次,结果又变成和第一次一样烂了

CNN二分类(四)

阈值为0.9时的实验结果如下

可以看到各项评估指标都好了一点,但是很明显假阴性的样本多了不少,阈值为0.5时假阴性在300~1200范围内浮动,但是阈值设为0.9时一下子变成5700了,虽然假阳性样本从10000多变成了1200多(因为概率低于0.9的都算正常样本,可见有很多正常样本的最终预测概率在0.5~0.9之间),因为假阳性样本下降的数量大于假阴性样本上升的数量,所以各项指标都变好了,但实际并不代表模型效果变好了

CNN二分类 阈值0.9(一)

阈值为0.9时第二次测试

CNN二分类 阈值0.9(二)

阈值为0.1时的实验结果如下

当阈值为0.1时,会导致很多正常样本被划分为攻击样本,即假阳性很高(15620个,比之前阈值为0.5的11000多了四千个),因为概率只要大于0.1就当作攻击样本,但是还有21380个正常样本实际预测概率实际上是小于0.1的,从这个角度看预测准度还是挺不错的

CNN二分类 阈值0.1(一)

DNN二分类

阈值为0.5时的实验结果如下

在UNSW-NB15使用CNN的效果好像比CNN要好一点

DNN二分类(一)

DNN二分类(二)

DNN二分类 (三)

第四次 算是比较好的结果了,偶尔能出一次这种结果,玄学

DNN二分类 (四)

阈值为0.9时实验结果如下

这个倒是和CNN差别不大,误差也挺小,但是阈值0.5差距蛮大

DNN二分类 阈值0.9 (一)

和第上一次结果差不多

DNN二分类 阈值0.9 (二)

第三次 和上两次大差不差

DNN二分类 阈值0.9 (三)

阈值选择问题

正常二分类都使用0.5作为阈值,但是可以根据需要自定义调整阈值,阈值不同最后的结果也不一样

在二分类问题中,预测模型输出的值通常是一个介于0和1之间的概率,表示样本属于正类的信心程度。阈值决定了将这个概率转换为具体类别的界限。如果概率大于0.5,预测为正类(1,即攻击样本);否则预测为负类(0,即正常样本)。这是一种常见的默认选择,适用于类别分布相对均衡的情况。

阈值等于0.5

在这个设置下,模型对正类和负类的识别相对平衡,但存在一定的假阳性(FAR较高),表明一些正常流量被误判为攻击

阈值大于0.5

在这个设置下,模型的精确率大幅提高,意味着大部分预测为攻击的样本确实是攻击。这降低了假阳性率(FAR显著降低),但假阴性(即真实攻击被误判为正常)增加

高阈值的优缺点

  • 高阈值的优点:提高了精确率,减少了假阳性(FAR显著降低),适合对误报敏感的场景。

  • 高阈值的缺点:可能导致更高的假阴性数量,错过一些实际的攻击。因为概率为0.9时才当作是攻击样本,概率低于0.9都当作正常样本,所以导致会错过比较多的实际攻击,假阴性更高了

模型代码

CNN代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
import pandas as pd
from sklearn.preprocessing import MinMaxScaler
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv2D, MaxPooling2D, Flatten, Dense
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MinMaxScaler
from keras.models import Sequential
from keras.layers import Conv1D, MaxPooling1D, Flatten, Dense, Dropout
from sklearn.metrics import confusion_matrix, accuracy_score, precision_score, f1_score


def load_data():
# Default values.
train_set = r"Training and Testing Sets/UNSW_NB15_training-set.csv"
test_set = r"Training and Testing Sets/UNSW_NB15_testing-set.csv"
train = pd.read_csv(train_set, index_col='id') # 指定“id”这一列数据作为行索引
test = pd.read_csv(test_set, index_col='id') # 指定“id”这一列数据作为行索引

# 二分类数据
training_label = train['label'].values # 将train的“label”这一列的值单独取出来
testing_label = test['label'].values # 将test的“label”这一列的值单独取出来
temp_train = training_label
temp_test = testing_label

# Creates new dummy columns from each unique string in a particular feature 创建新的虚拟列
unsw = pd.concat([train, test]) # 将train和test拼接在一起
unsw = pd.get_dummies(data=unsw, columns=['proto', 'service', 'state'])
# 将'proto', 'service', 'state'这三列使用one - hot - encoder转变
# Normalising all numerical features:
unsw.drop(['label', 'attack_cat'], axis=1,
inplace=True) # 删除'label', 'attack_cat'这两列,其中(inplace=True)是直接对原dataFrame进行操作

unsw_value = unsw.values

scaler = MinMaxScaler(feature_range=(0, 1)) # 初始化MinMaxScaler
unsw_value = scaler.fit_transform(unsw_value) # 将待处理数据矩阵进行归一化
train_set = unsw_value[:len(train), :] # 分离出train集
test_set = unsw_value[len(train):, :] # 分离出test集

return train_set, temp_train, test_set, temp_test


if __name__ == '__main__':
train_set, temp_train, test_set, temp_test = load_data()
train_set = np.expand_dims(train_set, axis=2)
test_set = np.expand_dims(test_set, axis=2)

model = Sequential()
model.add(Conv1D(filters=32, kernel_size=3, activation='relu', input_shape=(train_set.shape[1], 1)))
model.add(MaxPooling1D(pool_size=2))
model.add(Flatten())
model.add(Dense(64, activation='relu'))
model.add(Dense(1, activation='sigmoid'))

model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=0.001), loss='binary_crossentropy', metrics=['accuracy'])
model.fit(train_set, temp_train, epochs=50, batch_size=128, validation_split=0.2)

# 评估模型
loss, accuracy = model.evaluate(test_set, temp_test)

# 预测
predictions = model.predict(test_set)
predictions = (predictions > 0.5).astype(int) # 使用0.5作为阈值

# 计算评估指标
cm = confusion_matrix(temp_test, predictions)
accuracy = accuracy_score(temp_test, predictions)
precision = precision_score(temp_test, predictions)
f1 = f1_score(temp_test, predictions)
far = cm[0, 1] / (cm[0, 0] + cm[0, 1])

# 输出结果
print("Confusion Matrix:")
print(cm)
print(f"Accuracy: {accuracy:.4f}")
print(f"Precision: {precision:.4f}")
print(f"F1 Score: {f1:.4f}")
print(f"False Acceptance Rate (FAR): {far:.4f}")

DNN代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
import pandas as pd
from sklearn.preprocessing import MinMaxScaler
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix, accuracy_score, precision_score, f1_score

def load_data():
train_set = r"Training and Testing Sets/UNSW_NB15_training-set.csv"
test_set = r"Training and Testing Sets/UNSW_NB15_testing-set.csv"
train = pd.read_csv(train_set, index_col='id')
test = pd.read_csv(test_set, index_col='id')

training_label = train['label'].values
testing_label = test['label'].values
temp_train = training_label
temp_test = testing_label

unsw = pd.concat([train, test])
unsw = pd.get_dummies(data=unsw, columns=['proto', 'service', 'state'])
unsw.drop(['label', 'attack_cat'], axis=1, inplace=True)

unsw_value = unsw.values

scaler = MinMaxScaler(feature_range=(0, 1))
unsw_value = scaler.fit_transform(unsw_value)
train_set = unsw_value[:len(train), :]
test_set = unsw_value[len(train):, :]

return train_set, temp_train, test_set, temp_test

if __name__ == '__main__':
train_set, temp_train, test_set, temp_test = load_data()

model = Sequential()
model.add(Dense(64, activation='relu', input_shape=(train_set.shape[1],)))
model.add(Dense(32, activation='relu'))
model.add(Dense(1, activation='sigmoid'))

model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=0.001), loss='binary_crossentropy', metrics=['accuracy'])
model.fit(train_set, temp_train, epochs=50, batch_size=128, validation_split=0.2)

loss, accuracy = model.evaluate(test_set, temp_test)

predictions = model.predict(test_set)
predictions = (predictions > 0.5).astype(int)

cm = confusion_matrix(temp_test, predictions)
accuracy = accuracy_score(temp_test, predictions)
precision = precision_score(temp_test, predictions)
f1 = f1_score(temp_test, predictions)
far = cm[0, 1] / (cm[0, 0] + cm[0, 1])

print("Confusion Matrix:")
print(cm)
print(f"Accuracy: {accuracy:.4f}")
print(f"Precision: {precision:.4f}")
print(f"F1 Score: {f1:.4f}")
print(f"False Acceptance Rate (FAR): {far:.4f}")