本文共 5084 字,大约阅读时间需要 16 分钟。
"""
@author: Inki
@contact: inki.yinji@qq.com
@version: Created in 2020 0903, last modified in 2020 1221.
@note: Some common function, and all given vector data's type must be numpy.array.
"""
import time
import numpy as np
import sys
import scipy.io as scio
import torch
import torchvision.transforms as transforms
import torchvision
from torch import nn
from multiprocessing import cpu_count
def get_iter(tr, tr_lab, te, te_lab):
"""
Get iterator.
:param
tr:
The training set.
tr_lab:
The training set's label.
te:
The test set.
te_lab:
The test set's label.
"""
yield tr, tr_lab, te, te_lab
def is_print(para_str, para_is_print=True):
"""
Is print?
:param
para_str:
The print string.
para_is_print:
True print else not.
"""
if para_is_print:
print(para_str)
def load_file(para_path):
"""
Load file.
:param
para_file_name:
The path of the given file.
:return
The data.
"""
temp_type = para_path.split('.')[-1]
if temp_type == 'mat':
ret_data = scio.loadmat(para_path)
return ret_data['data']
else:
with open(para_path) as temp_fd:
ret_data = temp_fd.readlines()
return ret_data
def load_data_fashion_mnist(batch_size=10, root='D:/Data/Datasets/FashionMNIST', resize=None):
"""
Download the fashion mnist dataset and then load into memory.
"""
trans = []
if resize:
trans.append(transforms.Resize(size=resize))
trans.append(transforms.ToTensor())
transform = transforms.Compose(trans)
mnist_train = torchvision.datasets.FashionMNIST(root=root, train=True, download=True, transform=transform)
mnist_test = torchvision.datasets.FashionMNIST(root=root, train=False, download=True, transform=transform)
if sys.platform.startswith('win'):
num_workers = 0
else:
num_workers = cpu_count()
train_iter = torch.utils.data.DataLoader(mnist_train, batch_size=batch_size, shuffle=True, num_workers=num_workers)
test_iter = torch.utils.data.DataLoader(mnist_test, batch_size=batch_size, shuffle=False, num_workers=num_workers)
return train_iter, test_iter
def owa_weight(para_num, para_type='linear_decrease'):
"""
The ordered weighted averaging operators (OWA) can replace the maximum or minimum operators.
And the purpose of this function is to generate the owa weights.
And the more refer is:
R. R. Yager, J. Kacprzyk, The ordered weighted averaging operators: Theory and applications, Springer Science &
Business Media, 2012.
:param
para_num:
The length of weights list.
para_type:
'linear_decrease';
'inverse_additive',
and its default setting is 'linear_decrease'.
:return
The owa weights.
"""
if para_num == 1:
return np.array([1])
else:
if para_type == 'linear_decrease':
temp_num = 2 / para_num / (para_num + 1)
return np.array([(para_num - i) * temp_num for i in range(para_num)])
elif para_type == 'inverse_additive':
temp_num = np.sum([1 / i for i in range(1, para_num + 1)])
return np.array([1 / i / temp_num for i in range(1, para_num + 1)])
else:
return owa_weight(para_num)
def print_go_round(para_idx, para_str='Program processing'):
"""
Print the round.
:param
para_idx:
The current index.
para_str:
The print words.
"""
round_list = ["\\", "|", "/", "-"]
print('\r' + para_str + ': ' + round_list[para_idx % 4], end="")
def print_progress_bar(para_idx, para_len):
"""
Print the progress bar.
:param
para_idx:
The current index.
para_len:
The loop length.
"""
print('\r' + '▇' * int(para_idx // (para_len / 50)) + str(np.ceil((para_idx + 1) * 100 / para_len)) + '%', end='')
def train(net, tr_iter, te_iter, batch_size, optimizer,
loss=nn.CrossEntropyLoss(),
device=torch.device('cuda' if torch.cuda.is_available() else 'cpu'),
num_epochs=100):
"""
The train function.
"""
net = net.to(device)
temp_batch_count = 0
print("Training on", device)
for epoch in range(num_epochs):
temp_tr_loss_sum, temp_tr_acc_sum, temp_num, temp_start_time = 0., 0., 0, time.time()
for x, y in tr_iter:
x = x.to(device)
y = y.to(device)
temp_y_pred = net(x)
temp_loss = loss(temp_y_pred, y)
optimizer.zero_grad()
temp_loss.backward()
optimizer.step()
temp_tr_loss_sum += temp_loss.cpu().item()
temp_tr_acc_sum += (temp_y_pred.argmax(dim=1) == y).sum().cpu().item()
temp_num += y.shape[0]
temp_batch_count += 1
test_acc = evaluate_accuracy(te_iter, net)
print("Epoch %d, loss %.4f, training acc %.3f, test ass %.3f, time %.1f s" %
(epoch + 1, temp_tr_loss_sum / temp_batch_count, temp_tr_acc_sum / temp_num, test_acc,
time.time() - temp_start_time))
def evaluate_accuracy(data_iter, net, device=torch.device('cuda' if torch.cuda.is_available() else 'cpu')):
"""
The evaluate function, and the performance measure is accuracy.
"""
ret_acc, temp_num = 0., 0
with torch.no_grad():
for x, y in data_iter:
net.eval() # The evaluate mode, and the dropout is closed.
ret_acc += (net(x.to(device)).argmax(dim=1) == y.to(device)).float().sum().cpu().item()
net.train()
temp_num += y.shape[0]
return ret_acc / temp_num
class Count(dict):
"""
The count class with dict.
"""
def __missing__(self, __key):
return 0
class FlattenLayer(torch.nn.Module):
def __init__(self):
super(FlattenLayer, self).__init__()
def forward(self, x):
return x.view(x.shape[0], -1)
if __name__ == '__main__':
load_data_fashion_mnist()
转载地址:http://xbima.baihongyu.com/