Tensorflow2.0实现Resnet

Posted by Fangjuntao on 2020-03-29

参考https://blog.csdn.net/abc13526222160/article/details/90057121

  1. resnet的基本模块:
    enter description here

下图展示了两种形态的残差模块,左图是常规残差模块,有两个3×3卷积核卷积核组成,但是随着网络进一步加深,这种残差结构在实践中并不是十分有效。针对这问题,右图的“瓶颈残差模块”(bottleneck residual block)可以有更好的效果,它依次由1×1、3×3、1×1这三个卷积层堆积而成,这里的1×1的卷积能够起降维或升维的作用,从而令3×3的卷积可以在相对较低维度的输入上进行,以达到提高计算效率的目的。
enter description here

  1. 如何实现基本的BasicBlock?
    这里我们实现的是上述左图:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48

class BasicBlock(layers.Layer):
#初始化函数
#filter_num:理解为卷积核通道的数目,也就是channel的通道数
#stride = 1意味着对图片不进行采样
def __init__(self,filter_num,strides=1):
#调用母类的初始化方法
super(BasicBlock,self).__init__()
#filter_num:卷积核通道的数目.(3,3):卷积核的size
#padding='same'如果stride等于1,那么输出等于输入。
#如果stride大于等于2的话,padding=same,会自动补全,
# 如果等于2的话,输入是32x32,可能输出是14x14,那么如果padding=same
#会padding输入的大小,使得输出是16x16


self.conv1=layers.Conv2D(filter_num,(3,3),strides=strides,padding='same')
self.bn1=layers.BatchNormalization()
#非线性激活函数
self.relu=layers.Activation('relu')

#那么这里设置stride=1,就始终保持一样
self.conv2=layers.Conv2D(filter_num,(3,3),strides=1,padding='same')
self.bn2=layers.BatchNormalization()

if strides != 1:
#下采样
self.downsample=Sequential()
self.downsample.add(layers.Conv2D(filter_num,(1,1),strides=strides))
else:
self.downsample=lambda x:x



def call(self,inputs,training=None):
#[b,h,w,c]
out=self.conv1(inputs)
out=self.bn1(out)
out=self.relu(out)

out=self.conv2(out)
out=self.bn2(out)

identify=self.downsample(inputs)
output=layers.add([out,identify])
#使用tf的函数功能
output=tf.nn.relu(output)

return output
  1. 如何实现基本的ResBlock
  • 上面只是介绍了一个Basic Block,在ResNet里面,基本的单元并不是一个Basic Block。它是由多个Basic Block堆叠而成,堆叠成一整块叫做Res Block。
  • 创建Res Block
    多个Basic Block堆叠一起组成
1
2
3
4
5
6
7
8
9
10
11
def build_resblock(self,filter_num,blocks,strides=1):
res_blocks=Sequential()
#添加第一层basicblock
#可能有下采样的功能的
res_blocks.add(BasicBlock(filter_num,strides))
#但是对于后面的basicblock不让有下采样功能
#从1开始,一直到blocks个
for _ in range(1,blocks):
#这样只会在第一个下采样,后面的不在下采样,保持shape不变
res_blocks.add(BasicBlock(filter_num,strides=1))
return res_blocks
  1. Resnet的通用实现:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
class ResNet(keras.Model):
def __init__(self,layer_dims,num_classes=10):
#layer_dims:resnet18里面有[2,2,2,2],也就是四个resblock
#这里指定了一共有多少个resblock层,每个层有多少个basicblock
#后面在设置blocks的数量的时候,就是用的这里的层的个数
#一个resblock里面包含了两层basicblock
#num_classes = 10:就是我们设置的输出的类的个数
super(ResNet, self).__init__()

#实现预处理层
self.stem=Sequential([layers.Conv2D(64,(3,3),strides=(1,1)),
layers.BatchNormalization(),
layers.Activation('relu'),
layers.MaxPool2D(pool_size=(2,2),strides=(1,1),padding='same')
])
#创建4个res_block
#这里blocks的数量是layer_dims[0]
#这里创建的四个res_block与前面的layer_dims:[2,2,2,2]对应
#将stride设置为2是为了让feature_size越来越小
self.layer1=self.build_resblock(64,layer_dims[0])
self.layer2=self.build_resblock(128,layer_dims[1],strides=2)
self.layer3=self.build_resblock(256,layer_dims[2],strides=2)
self.layer4=self.build_resblock(512,layer_dims[3],strides=2)


#out:[b,512,h,w]
#经过运算之后不能得到h和w的值,
#使用自适应的方法得到h,w
#GlobalAveragePooling2D:就是不管你的长和宽是多少
#会在某个channel上面的长和宽加起来,取均值
self.avgpool=layers.GlobalAveragePooling2D()
#创建全连接层
#这里的Dense是用来分类的,这里输出是之前输出的类别,num_classes
self.fc=layers.Dense(num_classes)



def call(self,inputs,training=None):
#完成前向运算过程
x = self.stem(inputs)
x = self.layer1(x)
x = self.layer2(x)
x = self.layer3(x)
x = self.layer4(x)
#这里已经变成[b,c]的shape,不需要reshape了
x=self.avgpool(x)
#这里输出是[b,100]
x=self.fc(x)

return x



def build_resblock(self,filter_num,blocks,strides=1):
res_blocks=Sequential()
#添加第一层basicblock
#可能有下采样的功能的
res_blocks.add(BasicBlock(filter_num,strides))
#但是对于后面的basicblock不让有下采样功能
#从1开始,一直到blocks个
for _ in range(1,blocks):
#这样只会在第一个下采样,后面的不在下采样,保持shape不变
res_blocks.add(BasicBlock(filter_num,strides=1))
return res_blocks

5.resnet参数函数:

1
2
def resnet18():
return ResNet([2,2,2,2])
  1. 工程利用ResNet18来进行minist数据集的识别:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125


import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers,Sequential

class BasicBlock(layers.Layer):
#初始化函数
#filter_num:理解为卷积核通道的数目,也就是channel的通道数
#stride = 1意味着对图片不进行采样
def __init__(self,filter_num,strides=1):
#调用母类的初始化方法
super(BasicBlock,self).__init__()
#filter_num:卷积核通道的数目.(3,3):卷积核的size
#padding='same'如果stride等于1,那么输出等于输入。
#如果stride大于等于2的话,padding=same,会自动补全,
# 如果等于2的话,输入是32x32,可能输出是14x14,那么如果padding=same
#会padding输入的大小,使得输出是16x16


self.conv1=layers.Conv2D(filter_num,(3,3),strides=strides,padding='same')
self.bn1=layers.BatchNormalization()
#非线性激活函数
self.relu=layers.Activation('relu')

#那么这里设置stride=1,就始终保持一样
self.conv2=layers.Conv2D(filter_num,(3,3),strides=1,padding='same')
self.bn2=layers.BatchNormalization()

if strides != 1:
#下采样
self.downsample=Sequential()
self.downsample.add(layers.Conv2D(filter_num,(1,1),strides=strides))
else:
self.downsample=lambda x:x



def call(self,inputs,training=None):#前向传播
#[b,h,w,c]
out=self.conv1(inputs)
out=self.bn1(out)
out=self.relu(out)

out=self.conv2(out)
out=self.bn2(out)

identify=self.downsample(inputs)
output=layers.add([out,identify])
#使用tf的函数功能
output=tf.nn.relu(output)

return output


class ResNet(keras.Model):
def __init__(self,layer_dims,num_classes=10):
#layer_dims:resnet18里面有[2,2,2,2],也就是四个resblock
#这里指定了一共有多少个resblock层,每个层有多少个basicblock
#后面在设置blocks的数量的时候,就是用的这里的层的个数
#一个resblock里面包含了两层basicblock
#num_classes = 10:就是我们设置的输出的类的个数
super(ResNet, self).__init__()

#实现预处理层
self.stem=Sequential([layers.Conv2D(64,(3,3),strides=(1,1)),
layers.BatchNormalization(),
layers.Activation('relu'),
layers.MaxPool2D(pool_size=(2,2),strides=(1,1),padding='same')
])
#创建4个res_block
#这里blocks的数量是layer_dims[0]
#这里创建的四个res_block与前面的layer_dims:[2,2,2,2]对应
#将stride设置为2是为了让feature_size越来越小
self.layer1=self.build_resblock(64,layer_dims[0])
self.layer2=self.build_resblock(128,layer_dims[1],strides=2)
self.layer3=self.build_resblock(256,layer_dims[2],strides=2)
self.layer4=self.build_resblock(512,layer_dims[3],strides=2)


#out:[b,512,h,w]
#经过运算之后不能得到h和w的值,
#使用自适应的方法得到h,w
#GlobalAveragePooling2D:就是不管你的长和宽是多少
#会在某个channel上面的长和宽加起来,取均值
self.avgpool=layers.GlobalAveragePooling2D()
#创建全连接层
#这里的Dense是用来分类的,这里输出是之前输出的类别,num_classes
self.fc=layers.Dense(num_classes)



def call(self,inputs,training=None):
#完成前向运算过程
x = self.stem(inputs)
x = self.layer1(x)
x = self.layer2(x)
x = self.layer3(x)
x = self.layer4(x)
#这里已经变成[b,c]的shape,不需要reshape了
x=self.avgpool(x)
#这里输出是[b,100]
x=self.fc(x)

return x



def build_resblock(self,filter_num,blocks,strides=1):
res_blocks=Sequential()
#添加第一层basicblock
#可能有下采样的功能的
res_blocks.add(BasicBlock(filter_num,strides))
#但是对于后面的basicblock不让有下采样功能
#从1开始,一直到blocks个
for _ in range(1,blocks):
#这样只会在第一个下采样,后面的不在下采样,保持shape不变
res_blocks.add(BasicBlock(filter_num,strides=1))
return res_blocks

def resnet18():
return ResNet([2,2,2,2])

def resnet34():
return ResNet([3,4,6,3])
  • resnet18_train2.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77

import tensorflow as tf
from tensorflow.keras import layers,optimizers,datasets,Sequential
import os
from resnet2 import resnet18
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'
tf.random.set_seed(2345)



#预处理函数
def preprocess(x,y):
#【-1,1】
x=2 * tf.cast(x,dtype=tf.float32)/255.-1
y=tf.cast(y,dtype=tf.int32)
return x,y


(x,y), (x_test, y_test) = datasets.mnist.load_data()
print(x.shape, y.shape, x_test.shape, y_test.shape)

x, x_test = x.reshape(60000, 28, 28, 1) , x_test.reshape(10000, 28, 28, 1)
print(x.shape, y.shape, x_test.shape, y_test.shape)

train_db = tf.data.Dataset.from_tensor_slices((x,y))
train_db = train_db.shuffle(1000).map(preprocess).batch(128)

test_db = tf.data.Dataset.from_tensor_slices((x_test,y_test))
test_db = test_db.map(preprocess).batch(64)

sample = next(iter(train_db))
print('sample:', sample[0].shape, sample[1].shape,
tf.reduce_min(sample[0]), tf.reduce_max(sample[0]))



def main():

model=resnet18()
model.build(input_shape=(None,28,28,1))
optimizer=optimizers.Adam(lr=1e-3)
for epoch in range(50):
for step,(x,y) in enumerate(train_db):
#这里做一个前向循环,将需要求解梯度放进来
with tf.GradientTape() as tape:
#[b,28,28,1] => [b,10]
logits=model(x)
#[b] => [b,10]
y_onehot=tf.one_hot(y,depth=10)
#compute loss
loss=tf.losses.categorical_crossentropy(y_onehot,logits,from_logits=True)
#计算均值,对每个batch的均值进行计算
loss=tf.reduce_mean(loss)
#计算gradient
grads=tape.gradient(loss,model.trainable_variables)
#传给优化器两个参数:grads和variable,完成梯度更新
optimizer.apply_gradients(zip(grads,model.trainable_variables))

if step % 100 == 0:
print(epoch,step,'losses:',float(loss))
total_num=0
total_correct=0
for x,y in test_db:
logits=model(x)
prob=tf.nn.softmax(logits,axis=1)
pred=tf.argmax(prob,axis=1)
pred=tf.cast(pred,dtype=tf.int32)
correct=tf.cast(tf.equal(pred,y),dtype=tf.int32)
correct=tf.reduce_sum(correct)

total_num += x.shape[0]
total_correct += int(correct)
acc=total_correct / total_num
print(epoch,'acc:',acc)

if __name__ == '__main__':
main()