SOL4Py Sample: VegeFruitsAutoEncoder
|
#******************************************************************************
#
# Copyright (c) 2018-2019 Antillia.com TOSHIYUKI ARAI. ALL RIGHTS RESERVED.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
#******************************************************************************
# 2019/05/13
# encodig: utf-8
import sys
import os
import time
import traceback
import socket
import matplotlib.pyplot as plt
import numpy as np
import keras
#from keras.preprocessing.image import ImageDataGenerator
from keras import backend as K
from keras.utils import np_utils
sys.path.append('../../')
from SOL4Py.ZMain import *
from SOL4Py.ZMLModel import *
from SOL4Py.keras.ZDataSetAugmentor import *
from SOL4Py.keras.ZDataSetLoader import *
from SOL4Py.keras.ZEpochChangeNotifier import *
from SOL4Py.keras.ZSimpleAutoEncoderModel import *
VegeFruits_10 = 0
############################################################
# Classifier Model class
class VegeFruitsAutoEncoder(ZMLModel):
IMAGE_SIZE = 128
CHANNELS = 3
################
#Inner class to define VegeFruitsAutoEncoderMode which inherits ZSimpleAutoEncoderModel
class VegeFruitsAutoEncoderModel(ZSimpleAutoEncoderModel):
# Construcotr
def __init__(self, input_shape):
ZSimpleAutoEncoderModel.__init__(self, input_shape)
def encode(self, input_image):
x = Conv2D(64, (3, 3), activation='relu', padding='same')(input_image)
x = MaxPooling2D((2, 2), padding='same')(x)
x = Conv2D(32, (3, 3), activation='relu', padding='same')(x)
x = MaxPooling2D((2, 2), padding='same')(x)
x = Conv2D(16, (3, 3), activation='relu', padding='same')(x)
encoded = MaxPooling2D((2, 2), padding='same')(x)
return encoded
def decode(self, encoded):
x = Conv2D(16, (3, 3), activation='relu', padding='same')(encoded)
x = UpSampling2D((2, 2))(x)
x = Conv2D(32, (3, 3), activation='relu', padding='same')(x)
x = UpSampling2D((2, 2))(x)
x = Conv2D(64, (3, 3), activation='relu', padding='same')(x)
x = UpSampling2D((2, 2))(x)
decoded = Conv2D(3, (3, 3), activation='sigmoid', padding='same')(x)
return decoded
##
# Constructor
def __init__(self, dataset_id, epochs=50, mainv=None, ipaddress="127.0.0.1", port=7777, use_checkpoint_cb=False):
super(VegeFruitsAutoEncoder, self).__init__(dataset_id, mainv)
self._start(self.__init__.__name__)
self.write("dataset_id:{}, ephochs:{}, mainv:{}".format(dataset_id, epochs, mainv) )
self.model = None # Keras model
self.dataset_id = dataset_id
self.dataset = None
self.epochs = epochs
self.dataset_id = dataset_id
self.mini_dataset = ("./mini_dataset", "jpg")
self.augmented_dataset = ("./augmented_dataset", "png")
self.image_size = self.IMAGE_SIZE
self.use_checkpoint_cb = use_checkpoint_cb
self.callbacks = [ZEpochChangeNotifier(ipaddress, port, self.__class__.__name__, self.epochs+10)]
self.set_weight_filepath()
self._end(self.__init__.__name__)
def set_weight_filepath(self):
self._start(self.set_weight_filepath.__name__)
weight_file = self.__class__.__name__ + "_" + str(self.dataset_id) + ".h5"
current_dir = os.path.dirname(os.path.abspath(__file__))
self.weight_filepath = os.path.join(current_dir, weight_file)
self.write("WeightFilePath " + self.weight_filepath)
self._end(self.set_weight_filepath.__name__)
def build(self):
self.write("====================================")
self._start(self.build.__name__)
if self.is_trained() != True:
try:
self.generate_dataset()
self.load_dataset()
self.create()
self.compile()
self.train()
self.save()
except:
traceback.print_exc()
else:
# If our cifar model trained, i.e, if weight_filepath.h5 exists
self.load_dataset()
self.create()
self.load()
self.compile()
self._end(self.build.__name__)
def generate_dataset(self):
self.augmentor = ZDataSetAugmentor()
self.n_augmentation = 100
self.augmentor.generate(self.mini_dataset, self.augmented_dataset,
image_size = self.image_size,
n_augmentation = self.n_augmentation)
def load_dataset(self):
self._start(self.load_dataset.__name__)
self.loader = ZDataSetLoader()
self.loader.load_dataset(self.augmented_dataset, image_size=self.image_size)
self.x_train = self.loader.x_train
self.y_train = self.loader.y_train
self.x_test = self.loader.x_test
self.y_test = self.loader.y_test
self.n_classes = self.loader.n_classes
self.classes = self.loader.classes
#self.loader.show_summary(show_images=False)
self._end(self.load_dataset.__name__)
# Create a sequential model
def create(self):
self._start(self.create.__name__)
input_shape = self.x_train.shape[1:]
print(input_shape)
self.model = self.VegeFruitsAutoEncoderModel(input_shape)
self._end(self.create.__name__)
def compile(self):
self._start(self.compile.__name__)
#self.model.compile(optimizer='adam',
# loss='categorical_crossentropy', metrics = ['accuracy'])
self.model.compile(optimizer='adadelta', loss='binary_crossentropy')
self._end(self.compile.__name__)
def train(self):
self._start(self.train.__name__)
start = time.time()
print("Epochs " + str(self.epochs))
print(self.x_train.shape)
print(self.y_train.shape)
if (self.use_checkpoint_cb == True) :
check_point_cb = ModelCheckpoint(self.weight_filepath,
monitor="acc", verbose=1,
save_best_only=True, save_weights_only=True)
self.callbacks.append(check_point_cb)
print(self.callbacks)
self.model.fit(self.x_train, self.x_train,
epochs= self.epochs,
batch_size=128,
shuffle=True,
verbose=True,
validation_data=(self.x_test, self.x_test),
callbacks = self.callbacks
)
elapsed_time = time.time() - start
elapsed = str("Train elapsed_time:{0}".format(elapsed_time) + "[sec]")
self.write(elapsed)
self.model.summary()
self._end(self.train.__name__)
def predict(self):
self._start(self.predict.__name__)
# Call self.model.predict method to get decoded_images from x_test image
self.decoded_images = self.model.predict(self.x_test)
self._end(self.predict.__name__)
def show_images(self, n=10):
fig = plt.figure() #figsize=(20, 4))
for i in range(1, n+1):
# Display original x_test images
ax = plt.subplot(2, n, i)
plt.imshow(self.x_test[i].reshape(self.IMAGE_SIZE, self.IMAGE_SIZE, self.CHANNELS))
ax.get_xaxis().set_visible(False)
ax.get_yaxis().set_visible(False)
# Display decoded images predicted from original x_test images.
ax = plt.subplot(2, n, i + n)
plt.imshow(self.decoded_images[i].reshape(self.IMAGE_SIZE, self.IMAGE_SIZE, self.CHANNELS))
ax.get_xaxis().set_visible(False)
ax.get_yaxis().set_visible(False)
fig.tight_layout()
plt.show()
def save(self):
self._start(self.save.__name__)
try:
self.model.save_weights(self.weight_filepath)
self.write("Saved weight file {}".format(self.weight_filepath))
except:
print(formatted_traceback())
self._end(self.save.__name__)
def load(self):
self._start(self.load.__name__)
try:
self.model.load_weights(self.weight_filepath)
self.write("Loaded a weight file:{}".format(self.weight_filepath))
except:
self.write( formatted_traceback() )
self._end(self.load.__name__)
def get_model(self):
return self.model
def is_trained(self):
rc = False
if os.path.isfile(self.weight_filepath) == True:
self.write("Found weight_filepath:'{}'".format(self.weight_filepath))
rc = True
return rc
def plot(self, filename=None):
from keras.utils import plot_model
if filename == None:
filename = self.__class__.__name__ + "_model.png"
plot_model(self.model, to_file=filename,show_shapes=True)
############################################################
#
if main(__name__):
try:
app_name = os.path.basename(sys.argv[0])
dataset_id = VegeFruits_10
epochs = 20
if len(sys.argv) ==2:
epochs = int(sys.argv[1])
print("dataset_id:{} epochs:{}".format(dataset_id, epochs))
model = VegeFruitsAutoEncoder(dataset_id, epochs, None,
ipaddress="127.0.0.1", port=7777, use_checkpoint_cb=True)
model.build()
model.predict()
model.show_images()
except:
traceback.print_exc()
Last modified:20 Sep. 2019