[Standalone DL] 13 Lab - # 21 CNN with Pytorch
import torch
import torchvision
import torchvision.transforms as transforms
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import argparse
import numpy as np
import time
from copy import deepcopy # Add Deepcopy for args
import seaborn as sns
import matplotlib.pyplot as plt
transform = transforms.Compose(
[transforms.ToTensor(),
transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))])
trainset = torchvision.datasets.CIFAR10(root='./data', train=True,
download=True, transform=transform)
trainset, valset = torch.utils.data.random_split(trainset, [40000, 10000])
testset = torchvision.datasets.CIFAR10(root='./data', train=False,
download=True, transform=transform)
partition = {'train': trainset, 'val':valset, 'test':testset}
class MLP(nn.Module):
def __init__(self, in_dim, out_dim, hid_dim, n_layer, act, dropout, use_bn, use_xavier):
super(MLP, self).__init__()
self.in_dim = in_dim
self.out_dim = out_dim
self.hid_dim = hid_dim
self.n_layer = n_layer
self.act = act
self.dropout = dropout
self.use_bn = use_bn
self.use_xavier = use_xavier
# ====== Create Linear Layers ====== #
self.fc1 = nn.Linear(self.in_dim, self.hid_dim)
self.linears = nn.ModuleList()
self.bns = nn.ModuleList()
for i in range(self.n_layer-1):
self.linears.append(nn.Linear(self.hid_dim, self.hid_dim))
if self.use_bn:
self.bns.append(nn.BatchNorm1d(self.hid_dim))
self.fc2 = nn.Linear(self.hid_dim, self.out_dim)
# ====== Create Activation Function ====== #
if self.act == 'relu':
self.act = nn.ReLU()
elif self.act == 'tanh':
self.act == nn.Tanh()
elif self.act == 'sigmoid':
self.act = nn.Sigmoid()
else:
raise ValueError('no valid activation function selected!')
# ====== Create Regularization Layer ======= #
self.dropout = nn.Dropout(self.dropout)
if self.use_xavier:
self.xavier_init()
def forward(self, x):
x = self.act(self.fc1(x))
for i in range(len(self.linears)):
x = self.act(self.linears[i](x))
if self.use_bn:
x = self.bns[i](x)
x = self.dropout(x)
x = self.fc2(x)
return x
def xavier_init(self):
for linear in self.linears:
nn.init.xavier_normal_(linear.weight)
linear.bias.data.fill_(0.01)
cfg = {
'VGG11': [64, 'M', 128, 'M', 256, 256, 'M', 512, 512, 'M', 512, 512, 'M'],
'VGG13': [64, 64, 'M', 128, 128, 'M', 256, 256, 'M', 512, 512, 'M', 512, 512, 'M'],
'VGG16': [64, 64, 'M', 128, 128, 'M', 256, 256, 256, 'M', 512, 512, 512, 'M', 512, 512, 512, 'M'],
'VGG19': [64, 64, 'M', 128, 128, 'M', 256, 256, 256, 256, 'M', 512, 512, 512, 512, 'M', 512, 512, 512, 512, 'M'],
}
CNN 구조를 한 번 만들어 보자.
class CNN1(nn.Module): # 1번 CNN
def __init__(self):
super(CNN1, self).__init__()
# 한 개의 convolution layer 생성
self.conv1 = nn.Conv2d(in_channels = 3,
out_channels = 64, # 보통 2의 제곱수로 함
kernel_size = 3, # 3x3 filter을 사용하자
stride = 1, # default 값이 1임
padding = 1) # zero-padding
self.conv2 = nn.Conv2d(in_channels = 64,
out_channels = 256,
kernel_size = 5,
stride = 1,
padding = 2)
self.act = nn.ReLU()
# activation function은 여러개 할 필요 없음 그냥 한 번 만들어두고 계속 갖다가 쓰면 됨
self.maxpool1 = nn.MaxPool2d(kernel_size = 2,
stride = 2)
# pooling 수가 많아질수록 파라미터 개수가 줄어듦 (작아지니까))
self.fc = nn.Linear(65536, 10)
def forward(self, x):
x = self.conv1(x)
x = self.act(x)
x = self.conv2(x)
x = self.act(x)
x = self.maxpool1(x)
x = x.view(x.size(0), -1)
# 배치사이즈는 유지되지만 일렬로 쭉 펴서 보여주게 될 것 - torch.Size([2, 65536])
x = self.fc(x) # linear # torch.Size([2, 10])
return x
이렇게 해두면 학습이 매우 느리다
- 이유 1 : gpu에 올리지 않았기 때문
- 이유 2 : pooling 수가 너무 적어서 파라미터가 너무 많이 생성되었기 때문
즉, 레이어를 더 깊게 쌓고 pooling도 늘리면 훨씬 더 효율적으로 학습하게 될 것이다.
그래서 새로운 CNN을 만든다.
class CNN(nn.Module):
def __init__(self, model_code, in_channels, out_dim, act, use_bn):
super(CNN, self).__init__()
if act == 'relu':
self.act = nn.ReLU()
elif act == 'sigmoid':
self.act = nn.Sigmoid()
elif act == 'tanh':
self.act = nn.TanH()
else:
raise ValueError("Not a valid activation function code")
self.layers = self._make_layers(model_code, in_channels, use_bn)
self.classifer = nn.Sequential(nn.Linear(512, 256),
self.act,
nn.Linear(256, out_dim))
def forward(self, x):
x = self.layers(x)
x = x.view(x.size(0), -1)
x = self.classifer(x)
return x
def _make_layers(self, model_code, in_channels, use_bn):
layers = []
for x in cfg[model_code]:
if x == 'M':
layers += [nn.MaxPool2d(kernel_size=2, stride=2)]
else:
layers += [nn.Conv2d(in_channels=in_channels,
out_channels=x,
kernel_size=3,
stride=1,
padding=1)]
if use_bn:
layers += [nn.BatchNorm2d(x)]
layers += [self.act]
in_channels = x
# max pooling 때는 그대로 가면 되지만 conv를 지날 때는 out_channel을 다음 레이어의 in_channel로 넣어주어야 하기 때문
return nn.Sequential(*layers)
def train(net, partition, optimizer, criterion, args):
trainloader = torch.utils.data.DataLoader(partition['train'],
batch_size=args.train_batch_size,
shuffle=True, num_workers=2)
net.train()
correct = 0
total = 0
train_loss = 0.0
for i, data in enumerate(trainloader, 0):
optimizer.zero_grad()
# get the inputs
inputs, labels = data # input size : torch.Size([256, 3, 32, 32])
inputs = inputs.cuda()
labels = labels.cuda()
outputs = net(inputs)
loss = criterion(outputs, labels)
loss.backward()
optimizer.step()
train_loss += loss.item()
_, predicted = torch.max(outputs.data, 1)
total += labels.size(0)
correct += (predicted == labels).sum().item()
train_loss = train_loss / len(trainloader)
train_acc = 100 * correct / total
return net, train_loss, train_acc
def validate(net, partition, criterion, args):
valloader = torch.utils.data.DataLoader(partition['val'],
batch_size=args.test_batch_size,
shuffle=False, num_workers=2)
net.eval()
correct = 0
total = 0
val_loss = 0
with torch.no_grad():
for data in valloader:
images, labels = data
images = images.cuda()
labels = labels.cuda()
outputs = net(images)
loss = criterion(outputs, labels)
val_loss += loss.item()
_, predicted = torch.max(outputs.data, 1)
total += labels.size(0)
correct += (predicted == labels).sum().item()
val_loss = val_loss / len(valloader)
val_acc = 100 * correct / total
return val_loss, val_acc
def test(net, partition, args):
testloader = torch.utils.data.DataLoader(partition['test'],
batch_size=args.test_batch_size,
shuffle=False, num_workers=2)
net.eval()
correct = 0
total = 0
with torch.no_grad():
for data in testloader:
images, labels = data
images = images.cuda()
labels = labels.cuda()
outputs = net(images)
_, predicted = torch.max(outputs.data, 1)
total += labels.size(0)
correct += (predicted == labels).sum().item()
test_acc = 100 * correct / total
return test_acc
def experiment(partition, args):
net = CNN(model_code = args.model_code,
in_channels = args.in_channels,
out_dim = args.out_dim,
act = args.act,
use_bn = args.use_bn)
net.cuda()
criterion = nn.CrossEntropyLoss()
if args.optim == 'SGD':
optimizer = optim.SGD(net.parameters(), lr=args.lr, weight_decay=args.l2)
elif args.optim == 'RMSprop':
optimizer = optim.RMSprop(net.parameters(), lr=args.lr, weight_decay=args.l2)
elif args.optim == 'Adam':
optimizer = optim.Adam(net.parameters(), lr=args.lr, weight_decay=args.l2)
else:
raise ValueError('In-valid optimizer choice')
train_losses = []
val_losses = []
train_accs = []
val_accs = []
for epoch in range(args.epoch): # loop over the dataset multiple times
ts = time.time()
net, train_loss, train_acc = train(net, partition, optimizer, criterion, args)
val_loss, val_acc = validate(net, partition, criterion, args)
te = time.time()
train_losses.append(train_loss)
val_losses.append(val_loss)
train_accs.append(train_acc)
val_accs.append(val_acc)
print('Epoch {}, Acc(train/val): {:2.2f}/{:2.2f}, Loss(train/val) {:2.2f}/{:2.2f}. Took {:2.2f} sec'.format(epoch, train_acc, val_acc, train_loss, val_loss, te-ts))
test_acc = test(net, partition, args)
result = {}
result['train_losses'] = train_losses
result['val_losses'] = val_losses
result['train_accs'] = train_accs
result['val_accs'] = val_accs
result['train_acc'] = train_acc
result['val_acc'] = val_acc
result['test_acc'] = test_acc
return vars(args), result
import hashlib
import json
from os import listdir
from os.path import isfile, join
import pandas as pd
def save_exp_result(setting, result):
exp_name = setting['exp_name']
del setting['epoch']
del setting['test_batch_size']
hash_key = hashlib.sha1(str(setting).encode()).hexdigest()[:6]
filename = './results/{}-{}.json'.format(exp_name, hash_key)
result.update(setting)
with open(filename, 'w') as f:
json.dump(result, f)
def load_exp_result(exp_name):
dir_path = './results'
filenames = [f for f in listdir(dir_path) if isfile(join(dir_path, f)) if '.json' in f]
list_result = []
for filename in filenames:
if exp_name in filename:
with open(join(dir_path, filename), 'r') as infile:
results = json.load(infile)
list_result.append(results)
df = pd.DataFrame(list_result) # .drop(columns=[])
return df
def plot_acc(var1, var2, df):
fig, ax = plt.subplots(1, 3)
fig.set_size_inches(15, 6)
sns.set_style("darkgrid", {"axes.facecolor": ".9"})
sns.barplot(x=var1, y='train_acc', hue=var2, data=df, ax=ax[0])
sns.barplot(x=var1, y='val_acc', hue=var2, data=df, ax=ax[1])
sns.barplot(x=var1, y='test_acc', hue=var2, data=df, ax=ax[2])
ax[0].set_title('Train Accuracy')
ax[1].set_title('Validation Accuracy')
ax[2].set_title('Test Accuracy')
def plot_loss_variation(var1, var2, df, **kwargs):
list_v1 = df[var1].unique()
list_v2 = df[var2].unique()
list_data = []
for value1 in list_v1:
for value2 in list_v2:
row = df.loc[df[var1]==value1]
row = row.loc[df[var2]==value2]
train_losses = list(row.train_losses)[0]
val_losses = list(row.val_losses)[0]
for epoch, train_loss in enumerate(train_losses):
list_data.append({'type':'train', 'loss':train_loss, 'epoch':epoch, var1:value1, var2:value2})
for epoch, val_loss in enumerate(val_losses):
list_data.append({'type':'val', 'loss':val_loss, 'epoch':epoch, var1:value1, var2:value2})
df = pd.DataFrame(list_data)
g = sns.FacetGrid(df, row=var2, col=var1, hue='type', **kwargs)
g = g.map(plt.plot, 'epoch', 'loss', marker='.')
g.add_legend()
g.fig.suptitle('Train loss vs Val loss')
plt.subplots_adjust(top=0.89) # 만약 Title이 그래프랑 겹친다면 top 값을 조정해주면 됩니다! 함수 인자로 받으면 그래프마다 조절할 수 있겠죠?
def plot_acc_variation(var1, var2, df, **kwargs):
list_v1 = df[var1].unique()
list_v2 = df[var2].unique()
list_data = []
for value1 in list_v1:
for value2 in list_v2:
row = df.loc[df[var1]==value1]
row = row.loc[df[var2]==value2]
train_accs = list(row.train_accs)[0]
val_accs = list(row.val_accs)[0]
test_acc = list(row.test_acc)[0]
for epoch, train_acc in enumerate(train_accs):
list_data.append({'type':'train', 'Acc':train_acc, 'test_acc':test_acc, 'epoch':epoch, var1:value1, var2:value2})
for epoch, val_acc in enumerate(val_accs):
list_data.append({'type':'val', 'Acc':val_acc, 'test_acc':test_acc, 'epoch':epoch, var1:value1, var2:value2})
df = pd.DataFrame(list_data)
g = sns.FacetGrid(df, row=var2, col=var1, hue='type', **kwargs)
g = g.map(plt.plot, 'epoch', 'Acc', marker='.')
def show_acc(x, y, metric, **kwargs):
plt.scatter(x, y, alpha=0.3, s=1)
metric = "Test Acc: {:1.3f}".format(list(metric.values)[0])
plt.text(0.05, 0.95, metric, horizontalalignment='left', verticalalignment='center', transform=plt.gca().transAxes, bbox=dict(facecolor='yellow', alpha=0.5, boxstyle="round,pad=0.1"))
g = g.map(show_acc, 'epoch', 'Acc', 'test_acc')
g.add_legend()
g.fig.suptitle('Train Accuracy vs Val Accuracy')
plt.subplots_adjust(top=0.89)
실제로 실험을 돌려보자 !
# ====== Random Seed Initialization ====== #
seed = 123
np.random.seed(seed)
torch.manual_seed(seed)
parser = argparse.ArgumentParser()
args = parser.parse_args("")
args.exp_name = "exp1_lr_model_code"
# ====== Model ====== #
args.model_code = 'VGG11'
args.in_channels = 3
args.out_dim = 10
args.act = 'relu'
# ====== Regularization ======= #
args.l2 = 0.00001
args.use_bn = True
# ====== Optimizer & Training ====== #
args.optim = 'RMSprop' #'RMSprop' #SGD, RMSprop, ADAM...
args.lr = 0.0015
args.epoch = 10
args.train_batch_size = 256
args.test_batch_size = 1024
# ====== Experiment Variable ====== #
name_var1 = 'lr'
name_var2 = 'model_code'
list_var1 = [0.0001, 0.00001]
list_var2 = ['VGG11', 'VGG13']
for var1 in list_var1:
for var2 in list_var2:
setattr(args, name_var1, var1)
setattr(args, name_var2, var2)
print(args)
setting, result = experiment(partition, deepcopy(args))
save_exp_result(setting, result)