深度残差收缩网络的完整PyTorch代码
1、基础理论
深度残差收缩网络是建立在三个部分的基础之上的,包括残差网络、注意力机制和软阈值化。
其功能特色包括:
1)由于软阈值化是信号降噪算法的常用步骤,所以深度残差收缩网络比较适合强噪、高冗余数据。同时,软阈值化的梯度要么为0,要么为1,这与ReLU激活函数是相似/一致的。
2)由于软阈值化的阈值是通过类似于SENet的注意力机制自适应地进行设置的,深度残差收缩网络能够根据每个样本的情况,为每个样本单独地设置阈值,因此适用于每个样本内噪声含量不同的情况。
3)当数据噪声很弱、没有噪声时,深度残差收缩网络可能也是适用的。其前提是阈值可以被训练成非常接近于0的值,从而软阈值化就相当于不存在了。
4)值得注意的是,软阈值函数的阈值不能太大,否则会导致所有的输出都是0。所以深度残差收缩网络的注意力模块是经过专门设计的,与一般的SENet是存在明显区别的。
该方法的文献来源:
M. Zhao, S. Zhong, X. Fu, B. Tang, M. Pecht, Deep residual shrinkage networks for fault diagnosis, IEEE Transactions on Industrial Informatics, vol. 16, no. 7, pp. 4681-4690, 2020. (https://ieeexplore.ieee.org/d... )
2、PyTorch代码
本文的PyTorch代码是在这份代码(https://github.com/weiaicunza... )的基础上修改得到的,所以要下载这份代码到本地。主要是修改了models/resnet.py(https://github.com/weiaicunza... )和utils.py(https://github.com/weiaicunza... )的代码。
另一方面,残差收缩网络的核心代码,则是来源于知乎上最前线创作的一篇文章《用于故障诊断的残差收缩网络》(https://zhuanlan.zhihu.com/p/... )。
具体地,将resnet.py文件的名称,改为了rsnet.py,意思是residual shrinkage network。修改后的rsnet.py代码如下:
import torch import torch.nn as nn class BasicBlock(nn.Module): expansion = 1 def __init__(self, in_channels, out_channels, stride=1): super().__init__() self.shrinkage = Shrinkage(out_channels, gap_size=(1, 1)) #residual function self.residual_function = nn.Sequential( nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=stride, padding=1, bias=False), nn.BatchNorm2d(out_channels), nn.ReLU(inplace=True), nn.Conv2d(out_channels, out_channels * BasicBlock.expansion, kernel_size=3, padding=1, bias=False), nn.BatchNorm2d(out_channels * BasicBlock.expansion), self.shrinkage ) #shortcut self.shortcut = nn.Sequential() #the shortcut output dimension is not the same with residual function #use 1*1 convolution to match the dimension if stride != 1 or in_channels != BasicBlock.expansion * out_channels: self.shortcut = nn.Sequential( nn.Conv2d(in_channels, out_channels * BasicBlock.expansion, kernel_size=1, stride=stride, bias=False), nn.BatchNorm2d(out_channels * BasicBlock.expansion) ) def forward(self, x): return nn.ReLU(inplace=True)(self.residual_function(x) + self.shortcut(x)) class Shrinkage(nn.Module): def __init__(self, channel, gap_size): super(Shrinkage, self).__init__() self.gap = nn.AdaptiveAvgPool2d(gap_size) self.fc = nn.Sequential( nn.Linear(channel, channel), nn.BatchNorm1d(channel), nn.ReLU(inplace=True), nn.Linear(channel, channel), nn.Sigmoid(), ) def forward(self, x): x_raw = x x = torch.abs(x) x_abs = x x = self.gap(x) x = torch.flatten(x, 1) # average = torch.mean(x, dim=1, keepdim=True) average = x x = self.fc(x) x = torch.mul(average, x) x = x.unsqueeze(2).unsqueeze(2) # soft thresholding sub = x_abs - x zeros = sub - sub n_sub = torch.max(sub, zeros) x = torch.mul(torch.sign(x_raw), n_sub) return x class RSNet(nn.Module): def __init__(self, block, num_block, num_classes=100): super().__init__() self.in_channels = 64 self.conv1 = nn.Sequential( nn.Conv2d(3, 64, kernel_size=3, padding=1, bias=False), nn.BatchNorm2d(64), nn.ReLU(inplace=True)) #we use a different inputsize than the original paper #so conv2_x's stride is 1 self.conv2_x = self._make_layer(block, 64, num_block[0], 1) self.conv3_x = self._make_layer(block, 128, num_block[1], 2) self.conv4_x = self._make_layer(block, 256, num_block[2], 2) self.conv5_x = self._make_layer(block, 512, num_block[3], 2) self.avg_pool = nn.AdaptiveAvgPool2d((1, 1)) self.fc = nn.Linear(512 * block.expansion, num_classes) def _make_layer(self, block, out_channels, num_blocks, stride): """make rsnet layers(by layer i didnt mean this 'layer' was the same as a neuron netowork layer, ex. conv layer), one layer may contain more than one residual shrinkage block Args: block: block type, basic block or bottle neck block out_channels: output depth channel number of this layer num_blocks: how many blocks per layer stride: the stride of the first block of this layer Return: return a rsnet layer """ # we have num_block blocks per layer, the first block # could be 1 or 2, other blocks would always be 1 strides = [stride] + [1] * (num_blocks - 1) layers = [] for stride in strides: layers.append(block(self.in_channels, out_channels, stride)) self.in_channels = out_channels * block.expansion return nn.Sequential(*layers) def forward(self, x): output = self.conv1(x) output = self.conv2_x(output) output = self.conv3_x(output) output = self.conv4_x(output) output = self.conv5_x(output) output = self.avg_pool(output) output = output.view(output.size(0), -1) output = self.fc(output) return output def rsnet18(): """ return a RSNet 18 object """ return RSNet(BasicBlock, [2, 2, 2, 2]) def rsnet34(): """ return a RSNet 34 object """ return RSNet(BasicBlock, [3, 4, 6, 3])
然后,将utils.py文件中的第62-64行:
elif args.net == 'resnet18': from models.resnet import resnet18 net = resnet18()
修改为:
elif args.net == 'rsnet18': from models.rsnet import rsnet18 net = rsnet18()
然后在运行窗口输入:
python train.py -net rsnet18 -gpu
就可以运行程序了。
3、其他代码
论文原作者在GitHub上提供了TFLearn和Keras代码,见链接:https://github.com/zhao62/Dee...
也有网友编写了TensorFlow 2.0的代码:
https://blog.csdn.net/qq_3675...
莉莉
0 条评论
莉莉
宣传栏
目录
1、基础理论
深度残差收缩网络是建立在三个部分的基础之上的,包括残差网络、注意力机制和软阈值化。
其功能特色包括:
1)由于软阈值化是信号降噪算法的常用步骤,所以深度残差收缩网络比较适合强噪、高冗余数据。同时,软阈值化的梯度要么为0,要么为1,这与ReLU激活函数是相似/一致的。
2)由于软阈值化的阈值是通过类似于SENet的注意力机制自适应地进行设置的,深度残差收缩网络能够根据每个样本的情况,为每个样本单独地设置阈值,因此适用于每个样本内噪声含量不同的情况。
3)当数据噪声很弱、没有噪声时,深度残差收缩网络可能也是适用的。其前提是阈值可以被训练成非常接近于0的值,从而软阈值化就相当于不存在了。
4)值得注意的是,软阈值函数的阈值不能太大,否则会导致所有的输出都是0。所以深度残差收缩网络的注意力模块是经过专门设计的,与一般的SENet是存在明显区别的。
该方法的文献来源:
M. Zhao, S. Zhong, X. Fu, B. Tang, M. Pecht, Deep residual shrinkage networks for fault diagnosis, IEEE Transactions on Industrial Informatics, vol. 16, no. 7, pp. 4681-4690, 2020. (https://ieeexplore.ieee.org/d... )
2、PyTorch代码
本文的PyTorch代码是在这份代码(https://github.com/weiaicunza... )的基础上修改得到的,所以要下载这份代码到本地。主要是修改了models/resnet.py(https://github.com/weiaicunza... )和utils.py(https://github.com/weiaicunza... )的代码。
另一方面,残差收缩网络的核心代码,则是来源于知乎上最前线创作的一篇文章《用于故障诊断的残差收缩网络》(https://zhuanlan.zhihu.com/p/... )。
具体地,将resnet.py文件的名称,改为了rsnet.py,意思是residual shrinkage network。修改后的rsnet.py代码如下:
import torch import torch.nn as nn class BasicBlock(nn.Module): expansion = 1 def __init__(self, in_channels, out_channels, stride=1): super().__init__() self.shrinkage = Shrinkage(out_channels, gap_size=(1, 1)) #residual function self.residual_function = nn.Sequential( nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=stride, padding=1, bias=False), nn.BatchNorm2d(out_channels), nn.ReLU(inplace=True), nn.Conv2d(out_channels, out_channels * BasicBlock.expansion, kernel_size=3, padding=1, bias=False), nn.BatchNorm2d(out_channels * BasicBlock.expansion), self.shrinkage ) #shortcut self.shortcut = nn.Sequential() #the shortcut output dimension is not the same with residual function #use 1*1 convolution to match the dimension if stride != 1 or in_channels != BasicBlock.expansion * out_channels: self.shortcut = nn.Sequential( nn.Conv2d(in_channels, out_channels * BasicBlock.expansion, kernel_size=1, stride=stride, bias=False), nn.BatchNorm2d(out_channels * BasicBlock.expansion) ) def forward(self, x): return nn.ReLU(inplace=True)(self.residual_function(x) + self.shortcut(x)) class Shrinkage(nn.Module): def __init__(self, channel, gap_size): super(Shrinkage, self).__init__() self.gap = nn.AdaptiveAvgPool2d(gap_size) self.fc = nn.Sequential( nn.Linear(channel, channel), nn.BatchNorm1d(channel), nn.ReLU(inplace=True), nn.Linear(channel, channel), nn.Sigmoid(), ) def forward(self, x): x_raw = x x = torch.abs(x) x_abs = x x = self.gap(x) x = torch.flatten(x, 1) # average = torch.mean(x, dim=1, keepdim=True) average = x x = self.fc(x) x = torch.mul(average, x) x = x.unsqueeze(2).unsqueeze(2) # soft thresholding sub = x_abs - x zeros = sub - sub n_sub = torch.max(sub, zeros) x = torch.mul(torch.sign(x_raw), n_sub) return x class RSNet(nn.Module): def __init__(self, block, num_block, num_classes=100): super().__init__() self.in_channels = 64 self.conv1 = nn.Sequential( nn.Conv2d(3, 64, kernel_size=3, padding=1, bias=False), nn.BatchNorm2d(64), nn.ReLU(inplace=True)) #we use a different inputsize than the original paper #so conv2_x's stride is 1 self.conv2_x = self._make_layer(block, 64, num_block[0], 1) self.conv3_x = self._make_layer(block, 128, num_block[1], 2) self.conv4_x = self._make_layer(block, 256, num_block[2], 2) self.conv5_x = self._make_layer(block, 512, num_block[3], 2) self.avg_pool = nn.AdaptiveAvgPool2d((1, 1)) self.fc = nn.Linear(512 * block.expansion, num_classes) def _make_layer(self, block, out_channels, num_blocks, stride): """make rsnet layers(by layer i didnt mean this 'layer' was the same as a neuron netowork layer, ex. conv layer), one layer may contain more than one residual shrinkage block Args: block: block type, basic block or bottle neck block out_channels: output depth channel number of this layer num_blocks: how many blocks per layer stride: the stride of the first block of this layer Return: return a rsnet layer """ # we have num_block blocks per layer, the first block # could be 1 or 2, other blocks would always be 1 strides = [stride] + [1] * (num_blocks - 1) layers = [] for stride in strides: layers.append(block(self.in_channels, out_channels, stride)) self.in_channels = out_channels * block.expansion return nn.Sequential(*layers) def forward(self, x): output = self.conv1(x) output = self.conv2_x(output) output = self.conv3_x(output) output = self.conv4_x(output) output = self.conv5_x(output) output = self.avg_pool(output) output = output.view(output.size(0), -1) output = self.fc(output) return output def rsnet18(): """ return a RSNet 18 object """ return RSNet(BasicBlock, [2, 2, 2, 2]) def rsnet34(): """ return a RSNet 34 object """ return RSNet(BasicBlock, [3, 4, 6, 3])
然后,将utils.py文件中的第62-64行:
elif args.net == 'resnet18': from models.resnet import resnet18 net = resnet18()
修改为:
elif args.net == 'rsnet18': from models.rsnet import rsnet18 net = rsnet18()
然后在运行窗口输入:
python train.py -net rsnet18 -gpu
就可以运行程序了。
3、其他代码
论文原作者在GitHub上提供了TFLearn和Keras代码,见链接:https://github.com/zhao62/Dee...
也有网友编写了TensorFlow 2.0的代码:
https://blog.csdn.net/qq_3675...