2022脑机接口算法应战赛:脑纹辨认比赛基线
敞开生长之旅!这是我参加「日新方案 2 月更文应战」的第 2 天,点击查看活动详情
1、布景介绍
在当时的数字信息社会中,个人身份验证技能是许多个人和企业安全体系中必不行少的东西。生物特征辨认是一种个人身份验证技能,运用生物丈量,包括物理、生理或行为特征。有别于传统的指纹,声纹,脸部辨认等身份辨认办法,脑纹辨认技能在不行盗取、不行假造、不易受损、有必要活体检测等方面具有共同的优势,能为身份辨认提供更安全的生物辨认办法,被称为最安全的下一代暗码。
然而,目前脑纹辨认技能的发展仍处于探索阶段,存在数据样本量小、测试时段单一、记录范式单一等一系列局限性。一起,脑纹辨认体系的准确性、稳定性和通用性受到应战。一起,没有揭露的基于脑电图的生物特征竞赛。缺乏统一的测试基准和平台阻碍了这一领域的发展。为此,咱们收集了M3CV(一个用于研究EEG共性和个性问题的多被试、多时间段和多任务的数据库)数据集,以发动基于脑电的脑纹辨认竞赛。
PS:以上布景介绍转载自2022脑机接口算法应战赛:脑纹辨认
2、数据可视化
!cd data/data151025/ && unzip -q Calibration.zip -d val
# !cd data/data151025/ && unzip -q Enrollment.zip -d train
# !cd data/data151025/ && unzip -q Testing.zip -d test
!pip install PyWavelets
人脑发生的特定脑电波波形,被称为“脑纹”,不同个别在观看特定图片时,大脑会发生有针对性的脑电波反应,这种反应是绝无仅有的,每个人都不尽相同。记录这些个人特有的脑电波信号,能够构建“脑纹”比对数据库。当需要进行身份认证时,只要再次浏览特定图片,就能够经过收集的“脑纹”信息与数据库进行比对,快速得出待辨认个别的身份信息,准确率高达100%。
3、Baseline建立
在Baseline中,提出了一种双分支网络,首要將脑纹信号进行离散小波改换,将脑纹信号拆解为两个函数,分别入对应的网络通路,网络整体设计思想参阅了双线性卷积神经网络的结构。关于脑纹辨认问题采取了分而治之的思想。网络整体结构如上图所示。
3.1 离散小波改换
如上图所示,脑纹信号经过离散小波改换之后能够拆分为两组不同的函数。
小波分化的意义就在于能够在不同尺度上对信号进行分化,并且对不同尺度的挑选能够根据不同的方针来确认。关于许多信号,低频成分适当重要,它常常蕴含着信号的特征,而高频成分则给出信号的细节或不同。人的话音假如去掉高频成分,听起来与曾经或许不同,但仍能知道所说的内容;假如去掉足够的低频成分,则听到的是一些没有意义的声音。在小波分析中经常用到近似与细节。近似表示信号的高尺度,即低频信息;细节表示信号的高尺度,即高频信息。因而,原始信号经过两个彼此滤波器发生两个信号。
参阅资料:离散小波改换(DWT)
3.2 数据读取
import scipy.io as scio
from sklearn.utils import shuffle
import os
import pandas as pd
import numpy as np
from PIL import Image
import paddle
import paddle.nn as nn
from paddle.io import Dataset
import paddle.vision.transforms as T
import paddle.nn.functional as F
from paddle.metric import Accuracy
from sklearn.preprocessing import LabelEncoder
from paddle.optimizer.lr import LinearWarmup
from paddle.optimizer.lr import CosineAnnealingDecay
import warnings
warnings.filterwarnings("ignore")
# 读取数据
train_images = pd.read_csv('data/data151025/Enrollment_Info.csv')
val_images = pd.read_csv('data/data151025/Calibration_Info.csv')
train_images = shuffle(train_images, random_state=0)
val_images = shuffle(val_images)
# 划分练习集和校验集
train_image_list = train_images
val_image_list = val_images
df = train_image_list
train_image_path_list = train_image_list['EpochID'].values
train_label_list = train_image_list['SubjectID'].values
val_image_path_list = val_image_list['EpochID'].values
val_label_list = val_image_list['SubjectID'].values
# 界说数据读取类
class MyDataset(paddle.io.Dataset):
"""
过程一:承继paddle.io.Dataset类
"""
def __init__(self, train_img_list, val_img_list,train_label_list,val_label_list, mode='train'):
"""
过程二:完成结构函数,界说数据读取办法,划分练习和测试数据集
"""
super(MyDataset, self).__init__()
self.img = []
self.label = []
# 借助pandas读csv的库
self.train_images = train_img_list
self.test_images = val_img_list
self.train_label = train_label_list
self.test_label = val_label_list
if mode == 'train':
# 读train_images的数据
for img,la in zip(self.train_images, self.train_label):
self.img.append('data/data151025/train/'+img+'.mat')
self.label.append(paddle.to_tensor(int(la[4:]) - 1, dtype='int64'))
else:
# 读test_images的数据
for img,la in zip(self.test_images, self.test_label):
self.img.append('data/data151025/val/'+img+'.mat')
self.label.append(paddle.to_tensor(int(la[4:]) - 1, dtype='int64'))
def load_eeg(self, eeg_path):
data = scio.loadmat(eeg_path)
return data['epoch_data']
def __getitem__(self, index):
"""
过程三:完成__getitem__办法,界说指定index时如何获取数据,并回来单条数据(练习数据,对应的标签)
"""
eeg_data = self.load_eeg(self.img[index])
eeg_label = self.label[index]
# label = paddle.to_tensor(label)
return eeg_data,eeg_label
def __len__(self):
"""
过程四:完成__len__办法,回来数据集总数目
"""
return len(self.img)
#train_loader
train_dataset = MyDataset(train_img_list=train_image_path_list, val_img_list=val_image_path_list, train_label_list=train_label_list, val_label_list=val_label_list, mode='train')
train_loader = paddle.io.DataLoader(train_dataset, places=paddle.CPUPlace(), batch_size=256, shuffle=True, num_workers=0)
#val_loader
val_dataset = MyDataset(train_img_list=train_image_path_list, val_img_list=val_image_path_list, train_label_list=train_label_list, val_label_list=val_label_list, mode='test')
val_loader = paddle.io.DataLoader(val_dataset, places=paddle.CPUPlace(), batch_size=256, shuffle=True, num_workers=0)
3.3 网络建立与可视化
import math
import paddle
import paddle.nn as nn
import paddle.nn.functional as F
import pywt
from paddle.nn import Linear, Dropout, ReLU
from paddle.nn import Conv1D, MaxPool1D
from paddle.nn.initializer import Uniform
from paddle.fluid.param_attr import ParamAttr
from paddle.utils.download import get_weights_path_from_url
class MyNet_dwt(nn.Layer):
def __init__(self, num_classes=1000):
super(MyNet_dwt, self).__init__()
self.num_classes = num_classes
self._conv1 = Conv1D(
65,
128,
3,
stride=2,
padding=1,
)
self.bn1 = nn.BatchNorm1D(128)
self._conv2_1 = Conv1D(
128,
256,
3,
stride=2,
padding=1,
)
self.bn2_1 = nn.BatchNorm1D(256)
self._conv3_1 = Conv1D(
256,
512,
3,
stride=2,
padding=1,
)
self.bn3_1 = nn.BatchNorm1D(512)
self._conv4_1 = Conv1D(
512,
256,
3,
stride=2,
padding=1,
)
self.bn4_1 = nn.BatchNorm1D(256)
self._conv2_2 = Conv1D(
128,
256,
3,
stride=2,
padding=1,
)
self.bn2_2 = nn.BatchNorm1D(256)
self._conv3_2 = Conv1D(
256,
512,
3,
stride=2,
padding=1,
)
self.bn3_2 = nn.BatchNorm1D(512)
self._conv4_2 = Conv1D(
512,
256,
3,
stride=2,
padding=1,
)
self.bn4_2 = nn.BatchNorm1D(256)
self._fc8 = Linear(
in_features=16384,
out_features=num_classes,
)
def forward(self, inputs):
x = self._conv1(inputs)
x = paddle.to_tensor(pywt.dwt(x.numpy(), 'haar'), dtype='float32')
x1,x2 = x.split(2)
x1 = x1.squeeze(axis=0)
x2 = x2.squeeze(axis=0)
x1 = self._conv2_1(x1)
x1 = self._conv3_1(x1)
x1 = F.relu(x1)
x1 = self._conv4_1(x1)
x1 = F.relu(x1)
x2 = self._conv2_2(x2)
x2 = self._conv3_2(x2)
x2 = F.relu(x2)
x2 = self._conv4_2(x2)
x2 = F.relu(x2)
x = paddle.concat(x = [x1,x2], axis=2)
x = paddle.flatten(x, start_axis=1, stop_axis=-1)
x = self._fc8(x)
return x
model_res = MyNet_dwt(num_classes=95)
paddle.summary(model_res,(512,65,1000))
W0707 12:40:23.169551 756 gpu_context.cc:278] Please NOTE: device: 0, GPU Compute Capability: 7.0, Driver API Version: 11.2, Runtime API Version: 10.1
W0707 12:40:23.173269 756 gpu_context.cc:306] device: 0, cuDNN Version: 7.6.
---------------------------------------------------------------------------
Layer (type) Input Shape Output Shape Param #
===========================================================================
Conv1D-1 [[512, 65, 1000]] [512, 128, 500] 25,088
Conv1D-2 [[512, 128, 250]] [512, 256, 125] 98,560
Conv1D-3 [[512, 256, 125]] [512, 512, 63] 393,728
Conv1D-4 [[512, 512, 63]] [512, 256, 32] 393,472
Conv1D-5 [[512, 128, 250]] [512, 256, 125] 98,560
Conv1D-6 [[512, 256, 125]] [512, 512, 63] 393,728
Conv1D-7 [[512, 512, 63]] [512, 256, 32] 393,472
Linear-1 [[512, 16384]] [512, 95] 1,556,575
===========================================================================
Total params: 3,353,183
Trainable params: 3,353,183
Non-trainable params: 0
---------------------------------------------------------------------------
Input size (MB): 126.95
Forward/backward pass size (MB): 816.37
Params size (MB): 12.79
Estimated Total Size (MB): 956.12
---------------------------------------------------------------------------
{'total_params': 3353183, 'trainable_params': 3353183}
3.4 模型装备与练习
# 模型封装
model = paddle.Model(model_res)
# 界说优化器
class Cosine(CosineAnnealingDecay):
"""
Cosine learning rate decay
lr = 0.05 * (math.cos(epoch * (math.pi / epochs)) + 1)
Args:
lr(float): initial learning rate
step_each_epoch(int): steps each epoch
epochs(int): total training epochs
"""
def __init__(self, lr, step_each_epoch, epochs, **kwargs):
super(Cosine, self).__init__(
learning_rate=lr,
T_max=step_each_epoch * epochs, )
self.update_specified = False
class CosineWarmup(LinearWarmup):
"""
Cosine learning rate decay with warmup
[0, warmup_epoch): linear warmup
[warmup_epoch, epochs): cosine decay
Args:
lr(float): initial learning rate
step_each_epoch(int): steps each epoch
epochs(int): total training epochs
warmup_epoch(int): epoch num of warmup
"""
def __init__(self, lr, step_each_epoch, epochs, warmup_epoch=5, **kwargs):
assert epochs > warmup_epoch, "total epoch({}) should be larger than warmup_epoch({}) in CosineWarmup.".format(
epochs, warmup_epoch)
warmup_step = warmup_epoch * step_each_epoch
start_lr = 0.0
end_lr = lr
lr_sch = Cosine(lr, step_each_epoch, epochs - warmup_epoch)
super(CosineWarmup, self).__init__(
learning_rate=lr_sch,
warmup_steps=warmup_step,
start_lr=start_lr,
end_lr=end_lr)
self.update_specified = False
scheduler = CosineWarmup(
lr=0.00125, step_each_epoch=226, epochs=24, warmup_steps=20, start_lr=0, end_lr=0.00125, verbose=True)
optim = paddle.optimizer.Adam(learning_rate=scheduler, parameters=model.parameters())
# 装备模型
model.prepare(
optim,
paddle.nn.CrossEntropyLoss(),
Accuracy()
)
callback = paddle.callbacks.VisualDL(log_dir='visualdl_log_dir_alexdwt_bn')
# 模型练习与评估
model.fit(train_loader,
val_loader,
log_freq=1,
epochs=24,
callbacks=callback,
verbose=1,
)
The loss value printed in the log is the current step, and the metric is the average value of previous steps.
Epoch 1/24
step 226/226 [==============================] - loss: 6.9227 - acc: 0.0568 - 2s/step
Eval begin...
step 24/24 [==============================] - loss: 8.7896 - acc: 0.0152 - 992ms/step
Eval samples: 6070
model.save('Hapi_MyCNN_dwt_bn', True) # save
3.5 模型预测与成果提交
# 模型预测并生成提交文件
import os, time
import matplotlib.pyplot as plt
import paddle
from PIL import Image
import numpy as np
import pandas as pd
import scipy.io as scio
use_gpu = True
paddle.set_device('gpu:0') if use_gpu else paddle.set_device('cpu')
param_state_dict = paddle.load( "Hapi_MyCNN_dwt_bn.pdparams")
model_res.set_dict(param_state_dict)
model_res.eval() #练习形式
test_image = pd.read_csv('data/data151025/Testing_Info.csv')
test_image_path_list = test_image['EpochID'].values
eeg_list = list()
labeled_img_list = []
for img in test_image_path_list:
eeg_list.append('data/data151025/test/'+img+'.mat')
labeled_img_list.append(img)
def load_eeg(eeg_path):
# 读取数据
data = scio.loadmat(eeg_path)
return data['epoch_data']
pre_list = []
for i in range(len(eeg_list)):
data = load_eeg(eeg_path=eeg_list[i])
dy_x_data = np.array(data).astype('float32')
dy_x_data = dy_x_data[np.newaxis,:, :]
eeg = paddle.to_tensor(dy_x_data)
out = model_res(eeg)
res = paddle.nn.functional.softmax(out)[0] # 若模型中现已包含softmax则不必此行代码。
lab = np.argmax(out.numpy()) #argmax():回来最大数的索引
pre_list.append(int(lab)+1)
img_test = pd.DataFrame(labeled_img_list)
img_pre = pd.DataFrame(labeled_img_list)
img_test = img_test.rename(columns = {0:"EpochID"})
img_pre['SubjectID'] = pre_list
pre_info = img_pre['SubjectID'].values
test_info = test_image['SubjectID'].values
result_cnn = list()
for i,j in zip(test_info, pre_info):
if i == 'None':
result_cnn.append(j)
elif int(i[4:])==j :
print(i[4:])
result_cnn.append(int(1))
else:
result_cnn.append(int(0))
img_test['Prediction'] = result_cnn
img_test.to_csv('result_dwt_bn.csv', index=False)
总结
在本项目中经过提出的网络结构,完成了模型练习、成果提交全流程,包括数据预备、模型练习及保存和预测过程,并完成了预测成果提交。关于模型的调优,大家能够从以下几个方面考虑:
1、对数据集进行预处理,运用Label Smooth对标签进行处理
2、模型的挑选,增加网络深度,添加残差结构等
3、模型的超参数,如学习率、batch_size等参数
4、模型练习办法,包括优化器的挑选、WarmUp办法挑选
假如项目中有任何问题,欢迎在评论区留言沟通,共同进步!