SOL4Py Sample: TorchMNISTAutoEncoder

SOL4Py Samples



#******************************************************************************
#
#  Copyright (c) 2018-2019 Antillia.com TOSHIYUKI ARAI. ALL RIGHTS RESERVED.
#
#    This program is free software: you can redistribute it and/or modify
#    it under the terms of the GNU General Public License as published by
#    the Free Software Foundation, either version 3 of the License, or
#    (at your option) any later version.
#
#    This program is distributed in the hope that it will be useful,
#    but WITHOUT ANY WARRANTY; without even the implied warranty of
#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#    GNU General Public License for more details.
#
#    You should have received a copy of the GNU General Public License
#    along with this program.  If not, see <http://www.gnu.org/licenses/>.
#
#******************************************************************************

 
# 2019/07/20

#  TorchMNISTAutoEncoder.py

# This is based on the following sample program.
# https://pytorch.org/tutorials/beginner/blitz/cifar10_tutorial.html
#  https://www.kaggle.com/tinydman/fashion-mnist-with-pytorch

# encodig: utf-8

import sys
import os
import time
import traceback

import matplotlib.pyplot as plt
import numpy as np
import torch
import torchvision
from torch.autograd import Variable

import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

import torchvision.transforms as transforms
from torch.utils.data import DataLoader
from torchvision.datasets import MNIST, FashionMNIST

sys.path.append('../../')


from SOL4Py.ZMLModel import *
from SOL4Py.ZMain    import *

from SOL4Py.torch.ZTorchSimpleAutoEncoderModel import ZTorchSimpleAutoEncoderModel
from SOL4Py.torch.ZTorchEpochChangeNotifier import ZTorchEpochChangeNotifier
from SOL4Py.torch.ZTorchModelCheckPoint import ZTorchModelCheckPoint

MNIST         = 0
FASHION_MNIST = 1


############################################################
# Classifier Model clas

class TorchMNISTAutoEncoderModel(ZTorchSimpleAutoEncoderModel):
  ##
  # Constructor
  def __init__(self, image_size, n_classes, model_filename):
    super(TorchMNISTAutoEncoderModel, self).__init__(image_size, n_classes, model_filename)
    pass

  def create_encoder(self):    
    self.encoder = nn.Sequential(
      nn.Linear(self.size * self.size, 128),
      nn.ReLU(True),
      nn.Linear(128, 64),
      nn.ReLU(True),
      nn.Linear(64, 12),
      nn.ReLU(True),
      nn.Linear(12, 2))


  def create_decoder(self):
    self.decoder = nn.Sequential(
        nn.Linear(2, 12),
        nn.ReLU(True),
        nn.Linear(12, 64),
        nn.ReLU(True),
        nn.Linear(64, 128),
        nn.ReLU(True),
        nn.Linear(128, self.size * self.size),
        nn.Tanh()
    )


       
class TorchMNISTAutoEncoder(ZMLModel):
  IMAGE_SIZE = 28
  CHANNELS   = 1    #1: Gray scale
   
  ##
  # Constructor
  def __init__(self, dataset_id, epochs=0, mainv=None, ipaddress="127.0.0.1", port= 8888):
    super(TorchMNISTAutoEncoder, self).__init__(dataset_id, mainv)
    self.write("====================================")
    self._start(self.__init__.__name__)

    self.model_filepath   = None

    self.set_input_shape()
    
    self.nclasses = 10

    self.epochs   = epochs
    self.batch_size = 128
    notifier = self.__class__.__name__+str("-") + str(self.dataset_id)
    print("epochs {}".format(self.epochs))
    
    self.callbacks = [ZTorchEpochChangeNotifier(ipaddress, port, notifier, int(self.epochs)+10),
                      ZTorchModelCheckPoint(dataset_id=dataset_id)]

    self.model_filename  = self.__class__.__name__ + "_" + str(self.dataset_id) + ".pt"
    self.write(self.model_filename)
    
    # Create training and validation transformer.
    self.create_image_transformer()
        
    self._end(self.__init__.__name__)
 

  # Define your own create_image_transformer method in a subclass derived from this class if required.
  def create_image_transformer(self):
    self.train_transformer = transforms.Compose([transforms.ToTensor()])
    self.valid_transformer = transforms.Compose([transforms.ToTensor()])


  def build(self):
    self._start(self.build.__name__)
    try:
      self.load_dataset()
      self.create()
              
      if self.is_trained() ==False:
        self.train()
        self.save()
      else:
        self.load()
 
    except:
      traceback.print_exc()

    self._end(self.build.__name__)


  def set_input_shape(self):
    self.input_shape = (self.IMAGE_SIZE, self.IMAGE_SIZE, 1)


  def set_dataset_id(self, dataset_id):
    self.dataset_id = dataset_id
    self.model_filename  = self.__class__.__name__ + "_" + str(self.dataset_id) + ".pt"
    #self.model = None


  def load_dataset(self):
    self._start(self.load_dataset.__name__)
    
    if self.dataset_id == MNIST:
      self.train_data = torchvision.datasets.MNIST("./data", train=True, download=True, transform=self.train_transformer)
      self.train_loader = torch.utils.data.DataLoader(self.train_data,
                         batch_size=self.batch_size,
                         shuffle=True)
                         
      self.test_data   = torchvision.datasets.MNIST("./data", train=False, download=True, transform=self.valid_transformer)
      self.valid_loader = torch.utils.data.DataLoader(self.test_data,
                         batch_size=self.batch_size,
                         shuffle=False)      
    if self.dataset_id == FASHION_MNIST:
      self.train_data = torchvision.datasets.FashionMNIST("./data", train=True, download=True, transform=self.train_transformer)
      self.train_loader = torch.utils.data.DataLoader(self.train_data,
                         batch_size=self.batch_size,
                         shuffle=True)
                         
      self.test_data   = torchvision.datasets.FashionMNIST("./data", train=False, download=True, transform=self.valid_transformer)
      self.valid_loader = torch.utils.data.DataLoader(self.test_data,
                         batch_size=self.batch_size,
                         shuffle=False)      

    self._end(self.load_dataset.__name__)


  def create(self):
    self._start(self.create.__name__)
    self.image_size = (1, self.IMAGE_SIZE, self.IMAGE_SIZE)
    self.model_filename  = self.__class__.__name__ + "_" + str(self.dataset_id) + ".pt"

    self.model = TorchMNISTAutoEncoderModel(self.image_size,  self.nclasses, self.model_filename )
    
    self._end(self.create.__name__)


  def train(self):  
    self._start(self.train.__name__)
    start = time.time()
    criterion = nn.MSELoss()
    optimizer = optim.Adam(self.model.parameters(),lr=0.01, weight_decay=1e-5)
    
    self.model.fit(self.train_loader,
                   self.valid_loader,
                   self.callbacks,
                   self.epochs,
                   criterion, 
                   optimizer)

    elapsed_time = time.time() - start
    elapsed = str("Train elapsed_time:{0}".format(elapsed_time) + "[sec]")
    self.write(elapsed)
    self.model.summary()
    self._end(self.train.__name__)


  # Remove model file and weight file.
  def clear(self):
    if self.trained():
      os.remove(self.model_filename)
      self.model = None


  def is_trained(self):
    self._start(self.trained.__name__)
    rc = False
  
    if os.path.isfile(self.model_filename) == True:
      self.write("Found model file:'{}'".format(self.model_filename))
      rc = True
    self._end(self.trained.__name__)
    
    return rc


  def predict(self, dataset_loader, n_sampling=10):
    # Call self.model.predict method to get decoded_images from x_test image
    return self.model.predict(dataset_loader, n_sampling=n_sampling)


  def show_images(self, dataset_loader, decoded_images,  n_sampling=10):    
    it = iter(dataset_loader)
    n = n_sampling
    fig = plt.figure(figsize=(n * 2, 4))
    for i in range(1, n + 1):
        images, labels = it.next()

        # Display original x_test images
        ax = plt.subplot(2, n, i)
        npimage = images[0].numpy()
        ch, h, w = npimage.shape
        #print("src", npimage.shape)
        plt.imshow(npimage.reshape(h, w))

        plt.gray()
        ax.get_xaxis().set_visible(False)
        ax.get_yaxis().set_visible(False)

        # Display decoded images predicted from original images. 
        ax = plt.subplot(2, n, i + n)
        npimage = decoded_images[i-1].numpy()
        #print("dec", npimage.shape)
        
        plt.imshow(npimage.reshape(h, w))
        plt.gray()
        ax.get_xaxis().set_visible(False)
        ax.get_yaxis().set_visible(False)
    fig.tight_layout()
    plt.show()


  def save(self):
    self._start(self.save.__name__)
    self.model.save()
    self._end(self.save.__name__)


  def load(self):
    self._start(self.load.__name__)
    self.model.load_model()        
    self._end(self.load.__name__)



############################################################
#
#

if main(__name__):
  try:
    app_name  = os.path.basename(sys.argv[0])
    dataset_id = 0
    
    epochs     = 10
    if len(sys.argv) >= 2:
      dataset_id = int(sys.argv[1])

    if len(sys.argv) == 3:
      epochs = int(sys.argv[2])

    model = TorchMNISTAutoEncoder(dataset_id=dataset_id, epochs=epochs)
    model.build()

    sampling = 10
    decoded_images = model.predict(model.valid_loader,    n_sampling=sampling)
    
    model.show_images(model.valid_loader, decoded_images, n_sampling=sampling)
    
  except:
    traceback.print_exc()


Last modified:20 Sep. 2019