feat: VariationalAutoencoder class + sampling nn layer
This commit is contained in:
@@ -4,7 +4,7 @@ from abc import ABC, abstractmethod
|
|||||||
|
|
||||||
class ActivationFunc(ABC):
|
class ActivationFunc(ABC):
|
||||||
@abstractmethod
|
@abstractmethod
|
||||||
def derivative(v: np.ndarray) -> np.ndarray:
|
def d(v: np.ndarray) -> np.ndarray:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
@@ -12,7 +12,7 @@ class ReLU(ActivationFunc):
|
|||||||
def __call__(self, x):
|
def __call__(self, x):
|
||||||
return x * (x > 0)
|
return x * (x > 0)
|
||||||
|
|
||||||
def derivative(self, x):
|
def d(self, x):
|
||||||
return x > 0
|
return x > 0
|
||||||
|
|
||||||
|
|
||||||
@@ -23,7 +23,7 @@ class LeakyReLU(ActivationFunc):
|
|||||||
def __call__(self, x):
|
def __call__(self, x):
|
||||||
return x * (x > 0) + self.k * x * (x <= 0)
|
return x * (x > 0) + self.k * x * (x <= 0)
|
||||||
|
|
||||||
def derivative(self, x):
|
def d(self, x):
|
||||||
return (x > 0) + self.k * (x <= 0)
|
return (x > 0) + self.k * (x <= 0)
|
||||||
|
|
||||||
|
|
||||||
@@ -31,5 +31,5 @@ class Identity(ActivationFunc):
|
|||||||
def __call__(self, x):
|
def __call__(self, x):
|
||||||
return x
|
return x
|
||||||
|
|
||||||
def derivative(x):
|
def d(x):
|
||||||
return 1
|
return 1
|
||||||
|
|||||||
125
autoencoder.py
125
autoencoder.py
@@ -3,43 +3,14 @@ from utils import (dynamic_loss_plot_init,
|
|||||||
dynamic_loss_plot_update,
|
dynamic_loss_plot_update,
|
||||||
dynamic_loss_plot_finish)
|
dynamic_loss_plot_finish)
|
||||||
from tqdm import tqdm
|
from tqdm import tqdm
|
||||||
from layers import DeepNNLayer
|
from layers import DeepNNLayer, SampleLayer
|
||||||
from activations import ActivationFunc
|
from activations import ActivationFunc
|
||||||
|
from abc import ABC, abstractmethod
|
||||||
|
|
||||||
LOADER = ['⡿', '⣟', '⣯', '⣷', '⣾', '⣽', '⣻', '⢿']
|
LOADER = ['⡿', '⣟', '⣯', '⣷', '⣾', '⣽', '⣻', '⢿']
|
||||||
|
|
||||||
|
|
||||||
class Autoencoder:
|
class AAutoencoder(ABC):
|
||||||
def __init__(self,
|
|
||||||
encoder_layers: list[int],
|
|
||||||
decoder_layers: list[int],
|
|
||||||
lr: float,
|
|
||||||
activation_func: ActivationFunc):
|
|
||||||
if encoder_layers[-1] != decoder_layers[0]:
|
|
||||||
raise Exception(
|
|
||||||
f"Encoder output and decoder input don't match {encoder_layers[-1]} != {encoder_layers[0]}" # noqa
|
|
||||||
)
|
|
||||||
self.encoder = DeepNNLayer(encoder_layers, lr, activation_func)
|
|
||||||
self.decoder = DeepNNLayer(decoder_layers, lr, activation_func)
|
|
||||||
|
|
||||||
def __str__(self):
|
|
||||||
return f'Encoder:\n{self.encoder}\n\nDecoder:\n{self.decoder}'
|
|
||||||
|
|
||||||
def loss(self, data_set: list[np.ndarray]) -> float:
|
|
||||||
loss = 0
|
|
||||||
for x in data_set:
|
|
||||||
loss += np.sum(np.abs(x - self.forward(x)[0])) / len(x)
|
|
||||||
return loss / len(data_set)
|
|
||||||
|
|
||||||
def train(self, v: np.ndarray):
|
|
||||||
out = self.decoder.forward(
|
|
||||||
self.encoder.forward(v)
|
|
||||||
)
|
|
||||||
self.encoder.backprop(
|
|
||||||
self.decoder.backprop(out - v)
|
|
||||||
)
|
|
||||||
return np.sum(np.abs(out - v)) / len(v)
|
|
||||||
|
|
||||||
def train_dataset(self,
|
def train_dataset(self,
|
||||||
data_set: list[np.ndarray],
|
data_set: list[np.ndarray],
|
||||||
max_epoch: int,
|
max_epoch: int,
|
||||||
@@ -80,6 +51,60 @@ class Autoencoder:
|
|||||||
dynamic_loss_plot_finish(ax, line)
|
dynamic_loss_plot_finish(ax, line)
|
||||||
return losses
|
return losses
|
||||||
|
|
||||||
|
def save(self, path: str):
|
||||||
|
path = path.removesuffix('.npy')
|
||||||
|
np.save(path, self)
|
||||||
|
|
||||||
|
def load(path: str) -> 'ClassicalAutoencoder':
|
||||||
|
path = path.removesuffix('.npy') + '.npy'
|
||||||
|
data = np.load(path, allow_pickle=True)
|
||||||
|
return data.item()
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
|
def loss(self, data_set: list[np.ndarray]) -> float:
|
||||||
|
pass
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
|
def train(self, v: np.ndarray) -> float:
|
||||||
|
pass
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
|
def forward(self, v: np.ndarray) -> np.ndarray:
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class ClassicalAutoencoder(AAutoencoder):
|
||||||
|
def __init__(self,
|
||||||
|
encoder_layers: list[int],
|
||||||
|
decoder_layers: list[int],
|
||||||
|
lr: float,
|
||||||
|
activation_func: ActivationFunc):
|
||||||
|
if encoder_layers[-1] != decoder_layers[0]:
|
||||||
|
raise Exception(
|
||||||
|
f"Encoder output and decoder input don't match {encoder_layers[-1]} != {encoder_layers[0]}" # noqa
|
||||||
|
)
|
||||||
|
self.encoder = DeepNNLayer(encoder_layers, lr, activation_func)
|
||||||
|
self.decoder = DeepNNLayer(decoder_layers, lr, activation_func)
|
||||||
|
|
||||||
|
def __str__(self):
|
||||||
|
return f'Encoder:\n{self.encoder}\n\nDecoder:\n{self.decoder}'
|
||||||
|
|
||||||
|
def loss(self, data_set: list[np.ndarray]) -> float:
|
||||||
|
loss = 0
|
||||||
|
for x in data_set:
|
||||||
|
loss += np.sum(np.abs(x - self.forward(x)[0])) / len(x)
|
||||||
|
return loss / len(data_set)
|
||||||
|
|
||||||
|
def train(self, v: np.ndarray):
|
||||||
|
out = self.decoder.forward(
|
||||||
|
self.encoder.forward(v)
|
||||||
|
)
|
||||||
|
error = out - v
|
||||||
|
self.encoder.backprop(
|
||||||
|
self.decoder.backprop(error)
|
||||||
|
)
|
||||||
|
return np.sum(np.abs(error)) / len(v)
|
||||||
|
|
||||||
def encode(self, v: np.ndarray) -> np.ndarray:
|
def encode(self, v: np.ndarray) -> np.ndarray:
|
||||||
return self.encoder.forward(v)
|
return self.encoder.forward(v)
|
||||||
|
|
||||||
@@ -91,11 +116,39 @@ class Autoencoder:
|
|||||||
out = self.decode(code)
|
out = self.decode(code)
|
||||||
return out, code
|
return out, code
|
||||||
|
|
||||||
def save(self, path: str):
|
|
||||||
path = path.removesuffix('.npy')
|
|
||||||
np.save(path, self)
|
|
||||||
|
|
||||||
def load(path: str) -> 'Autoencoder':
|
class VariationalAutoencoder(AAutoencoder):
|
||||||
|
def __init__(self,
|
||||||
|
encoder_layers: list[int],
|
||||||
|
decoder_layers: list[int],
|
||||||
|
sampling_size: int,
|
||||||
|
lr: float,
|
||||||
|
activation_func: ActivationFunc):
|
||||||
|
if encoder_layers[-1] != decoder_layers[0]:
|
||||||
|
raise Exception(
|
||||||
|
f"Encoder output and decoder input don't match {encoder_layers[-1]} != {encoder_layers[0]}" # noqa
|
||||||
|
)
|
||||||
|
self.encoder = DeepNNLayer(encoder_layers, lr, activation_func)
|
||||||
|
self.decoder = DeepNNLayer(decoder_layers, lr, activation_func)
|
||||||
|
self.sampler = SampleLayer(self.encoder.out_size, lr, activation_func)
|
||||||
|
self.sampling_size = sampling_size
|
||||||
|
|
||||||
|
def load(path: str) -> 'ClassicalAutoencoder':
|
||||||
path = path.removesuffix('.npy') + '.npy'
|
path = path.removesuffix('.npy') + '.npy'
|
||||||
data = np.load(path, allow_pickle=True)
|
data = np.load(path, allow_pickle=True)
|
||||||
return data.item()
|
return data.item()
|
||||||
|
|
||||||
|
def train(self, v: np.ndarray) -> float:
|
||||||
|
out_enc = self.encoder.forward(v)
|
||||||
|
in_samples = np.zeros(
|
||||||
|
(self.sampling_size, self.encoder.out_size)
|
||||||
|
)
|
||||||
|
out_samples = np.zeros(
|
||||||
|
(self.sampling_size, self.decoder.out_size)
|
||||||
|
)
|
||||||
|
for i in range(self.sampling_size):
|
||||||
|
in_samples[i] = self.sampler.forward(out_enc)
|
||||||
|
out_samples[i] = self.decoder.forward(in_samples[i])
|
||||||
|
|
||||||
|
def forward(self, v: np.ndarray) -> np.ndarray:
|
||||||
|
pass
|
||||||
|
|||||||
33
layers.py
33
layers.py
@@ -20,8 +20,8 @@ class NNLayer:
|
|||||||
def __str__(self):
|
def __str__(self):
|
||||||
return f'[ {self.W.shape[0]} => {self.W.shape[1]}\tlr:{self.lr}\tactivation:{self.activation_func.__class__.__name__} ]' # noqa
|
return f'[ {self.W.shape[0]} => {self.W.shape[1]}\tlr:{self.lr}\tactivation:{self.activation_func.__class__.__name__} ]' # noqa
|
||||||
|
|
||||||
def forward(self, V: np.ndarray) -> np.ndarray:
|
def forward(self, v: np.ndarray) -> np.ndarray:
|
||||||
self.input = normalize(V)
|
self.input = normalize(v)
|
||||||
self.output_linear = self.input @ self.W + self.B
|
self.output_linear = self.input @ self.W + self.B
|
||||||
self.output = self.activation_func(
|
self.output = self.activation_func(
|
||||||
self.output_linear
|
self.output_linear
|
||||||
@@ -38,6 +38,33 @@ class NNLayer:
|
|||||||
return ret
|
return ret
|
||||||
|
|
||||||
|
|
||||||
|
class SampleLayer:
|
||||||
|
def __init__(self,
|
||||||
|
in_size: int,
|
||||||
|
lr: float,
|
||||||
|
activation_func: ActivationFunc):
|
||||||
|
self.input = None
|
||||||
|
self.mean_nn = NNLayer(
|
||||||
|
in_size,
|
||||||
|
in_size,
|
||||||
|
lr,
|
||||||
|
activation_func)
|
||||||
|
self.std_nn = NNLayer(
|
||||||
|
in_size,
|
||||||
|
in_size,
|
||||||
|
lr,
|
||||||
|
activation_func)
|
||||||
|
|
||||||
|
def forward(self, v: np.ndarray) -> np.ndarray:
|
||||||
|
self.input = v
|
||||||
|
mean = self.mean_nn.forward(v)
|
||||||
|
std = self.std_nn.forward(v)
|
||||||
|
return np.random.normal(mean, std, 1)
|
||||||
|
|
||||||
|
def backprop(self, errors: np.ndarray) -> np.ndarray:
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
class DeepNNLayer:
|
class DeepNNLayer:
|
||||||
def __init__(self,
|
def __init__(self,
|
||||||
layers: list[int],
|
layers: list[int],
|
||||||
@@ -52,6 +79,8 @@ class DeepNNLayer:
|
|||||||
lr,
|
lr,
|
||||||
activation_func)
|
activation_func)
|
||||||
)
|
)
|
||||||
|
self.in_size = layers[0]
|
||||||
|
self.out_size = layers[-1]
|
||||||
|
|
||||||
def __str__(self):
|
def __str__(self):
|
||||||
return '\n'.join([str(layer) for layer in self.layers])
|
return '\n'.join([str(layer) for layer in self.layers])
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
import matplotlib.pyplot as plt
|
import matplotlib.pyplot as plt
|
||||||
import numpy as np
|
import numpy as np
|
||||||
from autoencoder import Autoencoder
|
from autoencoder import ClassicalAutoencoder
|
||||||
from activations import LeakyReLU
|
from activations import LeakyReLU
|
||||||
import os
|
import os
|
||||||
|
|
||||||
@@ -21,7 +21,7 @@ def mnist_train(
|
|||||||
filename: str,
|
filename: str,
|
||||||
max_epoch: int,
|
max_epoch: int,
|
||||||
patience: int,
|
patience: int,
|
||||||
) -> Autoencoder:
|
) -> ClassicalAutoencoder:
|
||||||
x_train, _, x_test, _ = load_mnist()
|
x_train, _, x_test, _ = load_mnist()
|
||||||
in_len = x_train[0].shape[0] * x_train[0].shape[0]
|
in_len = x_train[0].shape[0] * x_train[0].shape[0]
|
||||||
x_train.resize(x_train.shape[0], in_len)
|
x_train.resize(x_train.shape[0], in_len)
|
||||||
@@ -29,9 +29,9 @@ def mnist_train(
|
|||||||
x_train = x_train / 255
|
x_train = x_train / 255
|
||||||
x_test = x_test / 255
|
x_test = x_test / 255
|
||||||
if os.path.exists(filename):
|
if os.path.exists(filename):
|
||||||
autoencoder = Autoencoder.load(filename)
|
autoencoder = ClassicalAutoencoder.load(filename)
|
||||||
else:
|
else:
|
||||||
autoencoder = Autoencoder(
|
autoencoder = ClassicalAutoencoder(
|
||||||
[in_len, 64, 16],
|
[in_len, 64, 16],
|
||||||
[16, 64, in_len],
|
[16, 64, in_len],
|
||||||
0.01,
|
0.01,
|
||||||
@@ -46,7 +46,7 @@ def mnist_train(
|
|||||||
return autoencoder
|
return autoencoder
|
||||||
|
|
||||||
|
|
||||||
def mnist_test(model: str | Autoencoder):
|
def mnist_test(model: str | ClassicalAutoencoder):
|
||||||
x_train, _, x_test, y_test = load_mnist()
|
x_train, _, x_test, y_test = load_mnist()
|
||||||
in_len = x_train[0].shape[0] * x_train[0].shape[0]
|
in_len = x_train[0].shape[0] * x_train[0].shape[0]
|
||||||
img_shape = x_train[0].shape
|
img_shape = x_train[0].shape
|
||||||
@@ -55,7 +55,7 @@ def mnist_test(model: str | Autoencoder):
|
|||||||
x_train = x_train / 255
|
x_train = x_train / 255
|
||||||
x_test = x_test / 255
|
x_test = x_test / 255
|
||||||
if isinstance(model, str):
|
if isinstance(model, str):
|
||||||
autoencoder: Autoencoder = Autoencoder.load(model)
|
autoencoder: ClassicalAutoencoder = ClassicalAutoencoder.load(model)
|
||||||
else:
|
else:
|
||||||
autoencoder = model
|
autoencoder = model
|
||||||
print(autoencoder)
|
print(autoencoder)
|
||||||
|
|||||||
Reference in New Issue
Block a user