Compare commits
1 Commits
main
...
e6b508f739
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
e6b508f739 |
3
.gitignore
vendored
3
.gitignore
vendored
@@ -3,6 +3,3 @@ __pycache__
|
|||||||
*.npz
|
*.npz
|
||||||
*.npy
|
*.npy
|
||||||
.venv
|
.venv
|
||||||
dist
|
|
||||||
*.egg-info
|
|
||||||
.env
|
|
||||||
50
README.md
50
README.md
@@ -1,53 +1,25 @@
|
|||||||
# Python AutoEncoder from scratch using Numpy
|
# Python AutoEncoder from scratch using Numpy
|
||||||
|
|
||||||
<figure>
|
|
||||||
<p align="center">
|
|
||||||
<img
|
|
||||||
src="https://raw.githubusercontent.com/lenoctambule/autoencoder/refs/heads/main/media/latent-space.png"
|
|
||||||
alt="Latent-space of the MNIST dataset"
|
|
||||||
width=70%>
|
|
||||||
<figcaption>
|
|
||||||
<p align="center">
|
|
||||||
Latent-space representation of the MNIST dataset using Variational Autoencoder
|
|
||||||
</p>
|
|
||||||
</figcaption>
|
|
||||||
</p>
|
|
||||||
</figure>
|
|
||||||
|
|
||||||
## Usage
|
## Usage
|
||||||
|
|
||||||
1. To install from source :
|
1. Install requirements :
|
||||||
```sh
|
```sh
|
||||||
$ git clone git@github.com:lenoctambule/autoencoder.git
|
$ pip install -r requirements.txt
|
||||||
$ pip install -e autoencoder/
|
|
||||||
```
|
```
|
||||||
|
|
||||||
Or install from PyPI :
|
2. Optionally run mnist_test.py.
|
||||||
```sh
|
```sh
|
||||||
$ pip install easyvae
|
|
||||||
```
|
|
||||||
|
|
||||||
2. Optionally, run mnist_test.py to see it in action on the MNIST dataset.
|
|
||||||
```sh
|
|
||||||
$ cd examples
|
|
||||||
$ py mnist_test.py
|
$ py mnist_test.py
|
||||||
```
|
```
|
||||||
|
|
||||||
## Training
|
## Training
|
||||||
|
|
||||||
Instatiate an `ClassicalAutoencoder` or `VariationalAutoencoder` object :
|
Instatiate an `Autoencoder` object :
|
||||||
```py
|
```py
|
||||||
from easyvae.autoencoder import ClassicalAutoencoder, VariationalAutoencoder
|
from autoencoder import Autoencoder
|
||||||
from easyvae.activations import LeakyReLU
|
from activations import LeakyReLU
|
||||||
|
|
||||||
autoencoder = ClassicalAutoencoder(
|
autoencoder = Autoencoder(
|
||||||
[768, 64, 16],
|
|
||||||
[16, 64, 768],
|
|
||||||
0.01,
|
|
||||||
LeakyReLU()
|
|
||||||
)
|
|
||||||
# or
|
|
||||||
autoencoder = VariationalAutoencoder(
|
|
||||||
[768, 64, 16],
|
[768, 64, 16],
|
||||||
[16, 64, 768],
|
[16, 64, 768],
|
||||||
0.01,
|
0.01,
|
||||||
@@ -58,17 +30,11 @@ And then via the `train_dataset` method to train over a dataset :
|
|||||||
```py
|
```py
|
||||||
autoencoder.train_dataset(data)
|
autoencoder.train_dataset(data)
|
||||||
```
|
```
|
||||||
Or via the `train` method to input each data points iteratively :
|
Or via the `train` to input each data points iteratively :
|
||||||
```py
|
```py
|
||||||
autoencoder.train(v)
|
autoencoder.train(v)
|
||||||
```
|
```
|
||||||
|
|
||||||
After training, you can save your model via the `save` method and load that model using `load` method :
|
|
||||||
```
|
|
||||||
autoencoder.save("mymodel.npy")
|
|
||||||
autoencoder.load("mymodel.npy")
|
|
||||||
```
|
|
||||||
|
|
||||||
## Inference
|
## Inference
|
||||||
|
|
||||||
Use your `Autoencoder` object with the `encode`, `decode`, `forward` methods like so :
|
Use your `Autoencoder` object with the `encode`, `decode`, `forward` methods like so :
|
||||||
|
|||||||
@@ -4,7 +4,7 @@ from abc import ABC, abstractmethod
|
|||||||
|
|
||||||
class ActivationFunc(ABC):
|
class ActivationFunc(ABC):
|
||||||
@abstractmethod
|
@abstractmethod
|
||||||
def d(v: np.ndarray) -> np.ndarray:
|
def derivative(v: np.ndarray) -> np.ndarray:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
@@ -12,7 +12,7 @@ class ReLU(ActivationFunc):
|
|||||||
def __call__(self, x):
|
def __call__(self, x):
|
||||||
return x * (x > 0)
|
return x * (x > 0)
|
||||||
|
|
||||||
def d(self, x):
|
def derivative(self, x):
|
||||||
return x > 0
|
return x > 0
|
||||||
|
|
||||||
|
|
||||||
@@ -23,7 +23,7 @@ class LeakyReLU(ActivationFunc):
|
|||||||
def __call__(self, x):
|
def __call__(self, x):
|
||||||
return x * (x > 0) + self.k * x * (x <= 0)
|
return x * (x > 0) + self.k * x * (x <= 0)
|
||||||
|
|
||||||
def d(self, x):
|
def derivative(self, x):
|
||||||
return (x > 0) + self.k * (x <= 0)
|
return (x > 0) + self.k * (x <= 0)
|
||||||
|
|
||||||
|
|
||||||
@@ -31,5 +31,5 @@ class Identity(ActivationFunc):
|
|||||||
def __call__(self, x):
|
def __call__(self, x):
|
||||||
return x
|
return x
|
||||||
|
|
||||||
def d(self, x):
|
def derivative(x):
|
||||||
return 1
|
return 1
|
||||||
137
autoencoder.py
Normal file
137
autoencoder.py
Normal file
@@ -0,0 +1,137 @@
|
|||||||
|
import numpy as np
|
||||||
|
from utils import (dynamic_loss_plot_init,
|
||||||
|
dynamic_loss_plot_update,
|
||||||
|
dynamic_loss_plot_finish)
|
||||||
|
from tqdm import tqdm
|
||||||
|
from layers import DeepNNLayer, SamplingLayer
|
||||||
|
from activations import ActivationFunc
|
||||||
|
from abc import ABC, abstractmethod
|
||||||
|
|
||||||
|
LOADER = ['⡿', '⣟', '⣯', '⣷', '⣾', '⣽', '⣻', '⢿']
|
||||||
|
|
||||||
|
|
||||||
|
class AAutoencoder(ABC):
|
||||||
|
def train_dataset(self,
|
||||||
|
data_set: list[np.ndarray],
|
||||||
|
max_epoch: int,
|
||||||
|
patience: int,
|
||||||
|
display_loss: bool = False) -> list[float]:
|
||||||
|
losses = [self.loss(data_set)]
|
||||||
|
if display_loss is True:
|
||||||
|
ax, line = dynamic_loss_plot_init(losses)
|
||||||
|
epoch = 0
|
||||||
|
no_improv = 0
|
||||||
|
prev_error = losses[0]
|
||||||
|
with tqdm(bar_format="{desc} {elapsed} {rate_fmt}") as lbar:
|
||||||
|
while True:
|
||||||
|
lbar.set_description(
|
||||||
|
f"{LOADER[epoch % len(LOADER)]} Training ({epoch=} error={float(prev_error):.6f})", # noqa
|
||||||
|
)
|
||||||
|
lbar.update()
|
||||||
|
error = 0
|
||||||
|
for x in tqdm(data_set, leave=False):
|
||||||
|
error += self.train(x)
|
||||||
|
error /= len(data_set)
|
||||||
|
derror = prev_error - error
|
||||||
|
if derror <= 0 or abs(derror) < 1e-4:
|
||||||
|
no_improv += 1
|
||||||
|
else:
|
||||||
|
no_improv = 0
|
||||||
|
prev_error = float(error)
|
||||||
|
losses.append(error)
|
||||||
|
if display_loss is True:
|
||||||
|
dynamic_loss_plot_update(ax, line, losses)
|
||||||
|
if no_improv > patience:
|
||||||
|
break
|
||||||
|
if epoch > max_epoch:
|
||||||
|
break
|
||||||
|
epoch += 1
|
||||||
|
print("Training complete !")
|
||||||
|
if display_loss is True:
|
||||||
|
dynamic_loss_plot_finish(ax, line)
|
||||||
|
return losses
|
||||||
|
|
||||||
|
def loss(self, data_set: list[np.ndarray]) -> float:
|
||||||
|
loss = 0
|
||||||
|
for x in data_set:
|
||||||
|
loss += np.sum(np.abs(x - self.forward(x)[0])) / len(x)
|
||||||
|
return loss / len(data_set)
|
||||||
|
|
||||||
|
def save(self, path: str):
|
||||||
|
path = path.removesuffix('.npy')
|
||||||
|
np.save(path, self)
|
||||||
|
|
||||||
|
def load(path: str) -> 'Autoencoder':
|
||||||
|
path = path.removesuffix('.npy') + '.npy'
|
||||||
|
data = np.load(path, allow_pickle=True)
|
||||||
|
return data.item()
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
|
def train(self, v: np.ndarray) -> float:
|
||||||
|
pass
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
|
def encode(self, v: np.ndarray) -> np.ndarray:
|
||||||
|
return self.encoder.forward(v)
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
|
def decode(self, v: np.ndarray) -> np.ndarray:
|
||||||
|
return self.decoder.forward(v)
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
|
def forward(self, v: np.ndarray) -> tuple[np.ndarray, np.ndarray]:
|
||||||
|
code = self.encode(v)
|
||||||
|
out = self.decode(code)
|
||||||
|
return out, code
|
||||||
|
|
||||||
|
|
||||||
|
class Autoencoder(AAutoencoder):
|
||||||
|
def __init__(self,
|
||||||
|
encoder_layers: list[int],
|
||||||
|
decoder_layers: list[int],
|
||||||
|
lr: float,
|
||||||
|
activation_func: ActivationFunc):
|
||||||
|
if encoder_layers[-1] != decoder_layers[0]:
|
||||||
|
raise Exception(
|
||||||
|
f"Encoder output and decoder input don't match {encoder_layers[-1]} != {encoder_layers[0]}" # noqa
|
||||||
|
)
|
||||||
|
self.encoder = DeepNNLayer(encoder_layers, lr, activation_func)
|
||||||
|
self.decoder = DeepNNLayer(decoder_layers, lr, activation_func)
|
||||||
|
|
||||||
|
def train(self, v: np.ndarray) -> float:
|
||||||
|
out = self.decoder.forward(
|
||||||
|
self.encoder.forward(v)
|
||||||
|
)
|
||||||
|
self.encoder.backprop(
|
||||||
|
self.decoder.backprop(out - v)
|
||||||
|
)
|
||||||
|
return np.sum(np.abs(out - v)) / len(v)
|
||||||
|
|
||||||
|
def __str__(self):
|
||||||
|
return f'Encoder:\n{self.encoder}\n\nDecoder:\n{self.decoder}'
|
||||||
|
|
||||||
|
def encode(self, v: np.ndarray) -> np.ndarray:
|
||||||
|
return self.encoder.forward(v)
|
||||||
|
|
||||||
|
def decode(self, v: np.ndarray) -> np.ndarray:
|
||||||
|
return self.decoder.forward(v)
|
||||||
|
|
||||||
|
def forward(self, v: np.ndarray) -> tuple[np.ndarray, np.ndarray]:
|
||||||
|
code = self.encode(v)
|
||||||
|
out = self.decode(code)
|
||||||
|
return out, code
|
||||||
|
|
||||||
|
|
||||||
|
class VariationalAutoencoder(AAutoencoder):
|
||||||
|
def __init__(self,
|
||||||
|
encoder_layers: list[int],
|
||||||
|
decoder_layers: list[int],
|
||||||
|
lr: float,
|
||||||
|
activation_func: ActivationFunc):
|
||||||
|
if encoder_layers[-1] != decoder_layers[0]:
|
||||||
|
raise Exception(
|
||||||
|
f"Encoder output and decoder input don't match {encoder_layers[-1]} != {encoder_layers[0]}" # noqa
|
||||||
|
)
|
||||||
|
self.encoder = DeepNNLayer(encoder_layers, lr, activation_func)
|
||||||
|
self.decoder = DeepNNLayer(decoder_layers, lr, activation_func)
|
||||||
|
self.sampler = SamplingLayer(decoder_layers[0], lr, activation_func)
|
||||||
@@ -1,178 +0,0 @@
|
|||||||
import matplotlib.pyplot as plt
|
|
||||||
import numpy as np
|
|
||||||
import os
|
|
||||||
from easyvae.autoencoder import ( # noqa
|
|
||||||
VariationalAutoencoder,
|
|
||||||
ClassicalAutoencoder,
|
|
||||||
LabelingVAE,
|
|
||||||
AAutoencoder
|
|
||||||
)
|
|
||||||
from easyvae.activations import LeakyReLU
|
|
||||||
from tqdm import tqdm
|
|
||||||
|
|
||||||
|
|
||||||
def load_mnist() -> list[np.ndarray]:
|
|
||||||
import requests
|
|
||||||
|
|
||||||
mnist_path = "./mnist.npz"
|
|
||||||
mnist_url = "https://storage.googleapis.com/tensorflow/tf-keras-datasets/mnist.npz" # noqa
|
|
||||||
if not os.path.exists(mnist_path):
|
|
||||||
with open(mnist_path, "w+b") as f:
|
|
||||||
f.write(requests.get(mnist_url, stream=True).content)
|
|
||||||
res = np.load(mnist_path)
|
|
||||||
return res["x_train"], res["y_train"], res["x_test"], res["y_test"]
|
|
||||||
|
|
||||||
|
|
||||||
def mnist_train(
|
|
||||||
filename: str,
|
|
||||||
max_epoch: int,
|
|
||||||
patience: int,
|
|
||||||
cls: type[AAutoencoder],) -> AAutoencoder:
|
|
||||||
x_train, _, _, _ = load_mnist()
|
|
||||||
in_len = x_train[0].shape[0] * x_train[0].shape[0]
|
|
||||||
x_train.resize(x_train.shape[0], in_len)
|
|
||||||
x_train = x_train / 255
|
|
||||||
if os.path.exists(filename):
|
|
||||||
autoencoder = cls.load(filename)
|
|
||||||
else:
|
|
||||||
autoencoder = cls(
|
|
||||||
[in_len, 256, 2],
|
|
||||||
[2, 256, in_len],
|
|
||||||
0.001,
|
|
||||||
LeakyReLU()
|
|
||||||
)
|
|
||||||
print("CTRL+C to interrupt training.")
|
|
||||||
autoencoder.train_dataset(
|
|
||||||
x_train,
|
|
||||||
max_epoch,
|
|
||||||
patience,
|
|
||||||
display_loss=True)
|
|
||||||
autoencoder.save(filename)
|
|
||||||
print("Training complete !")
|
|
||||||
return autoencoder
|
|
||||||
|
|
||||||
|
|
||||||
def plot_mnist_latent_space(autoencoder: AAutoencoder, x: np.ndarray, y,):
|
|
||||||
codes = []
|
|
||||||
for x in x:
|
|
||||||
_, c = autoencoder.forward(x.flatten())
|
|
||||||
codes.append(c)
|
|
||||||
codes = np.array(codes)
|
|
||||||
if codes.shape[1] == 2:
|
|
||||||
plt.figure(figsize=(6, 6))
|
|
||||||
scatter = plt.scatter(
|
|
||||||
codes[:, 0],
|
|
||||||
codes[:, 1],
|
|
||||||
c=y,
|
|
||||||
cmap='tab10',
|
|
||||||
s=5,
|
|
||||||
alpha=0.7
|
|
||||||
)
|
|
||||||
plt.colorbar(scatter)
|
|
||||||
plt.grid(True)
|
|
||||||
|
|
||||||
|
|
||||||
def plot_random_reconstruction(
|
|
||||||
autoencoder: AAutoencoder,
|
|
||||||
example: np.ndarray,
|
|
||||||
img_shape,
|
|
||||||
y):
|
|
||||||
output, code = autoencoder.forward(example.flatten())
|
|
||||||
plt.subplot(1, 2, 1)
|
|
||||||
plt.matshow(
|
|
||||||
example.reshape(img_shape),
|
|
||||||
fignum=False)
|
|
||||||
plt.title(f"Input ({y})")
|
|
||||||
plt.subplot(1, 2, 2)
|
|
||||||
plt.matshow(
|
|
||||||
output.reshape(img_shape),
|
|
||||||
fignum=False)
|
|
||||||
plt.title(f"Output ({y})")
|
|
||||||
print(f'{code.tolist()}')
|
|
||||||
|
|
||||||
|
|
||||||
def labeling_accuracy(autoencoder: LabelingVAE, x_test, y_test):
|
|
||||||
accuracy = 0
|
|
||||||
for x, y in tqdm(
|
|
||||||
zip(x_test, y_test),
|
|
||||||
desc="Testing labeling",
|
|
||||||
total=len(x_test)
|
|
||||||
):
|
|
||||||
res = autoencoder.label(x)
|
|
||||||
res = list(res.items())[0][0]
|
|
||||||
if res == str(int(y)):
|
|
||||||
accuracy += 1
|
|
||||||
accuracy /= len(y_test)
|
|
||||||
print(f"Accuracy : {accuracy * 100:.2f}%")
|
|
||||||
|
|
||||||
|
|
||||||
def mnist_test(model: str | AAutoencoder | LabelingVAE):
|
|
||||||
x_train, y_train, x_test, y_test = load_mnist()
|
|
||||||
in_len = x_train[0].shape[0] * x_train[0].shape[0]
|
|
||||||
img_shape = x_train[0].shape
|
|
||||||
x_train.resize(x_train.shape[0], in_len)
|
|
||||||
x_test.resize(x_test.shape[0], in_len)
|
|
||||||
x_train = x_train / 255
|
|
||||||
x_test = x_test / 255
|
|
||||||
if isinstance(model, str):
|
|
||||||
autoencoder: AAutoencoder = AAutoencoder.load(model)
|
|
||||||
else:
|
|
||||||
autoencoder = model
|
|
||||||
print("Testing model ...\n")
|
|
||||||
print(autoencoder)
|
|
||||||
idx = np.random.randint(0, len(x_test))
|
|
||||||
example: np.ndarray = x_test[idx]
|
|
||||||
labels_train = [str(int(i)) for i in y_train]
|
|
||||||
if isinstance(autoencoder, LabelingVAE):
|
|
||||||
autoencoder.learn_labels(x_train, labels_train)
|
|
||||||
labeling_accuracy(autoencoder, x_test, y_test)
|
|
||||||
res = autoencoder.label(example)
|
|
||||||
for k, v in res.items():
|
|
||||||
print(f"{k} => {v}")
|
|
||||||
plot_random_reconstruction(autoencoder, example, img_shape, y_test[idx])
|
|
||||||
if autoencoder.space_dim == 2:
|
|
||||||
plot_mnist_latent_space(autoencoder, x_test, y_test)
|
|
||||||
plt.show()
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
import argparse
|
|
||||||
import sys
|
|
||||||
|
|
||||||
parser = argparse.ArgumentParser()
|
|
||||||
parser.add_argument(
|
|
||||||
'-e',
|
|
||||||
type=int,
|
|
||||||
nargs='?',
|
|
||||||
default=30,
|
|
||||||
help='Max epochs'
|
|
||||||
)
|
|
||||||
parser.add_argument(
|
|
||||||
'-p',
|
|
||||||
type=int,
|
|
||||||
nargs='?',
|
|
||||||
default=30,
|
|
||||||
help='Patience'
|
|
||||||
)
|
|
||||||
parser.add_argument(
|
|
||||||
'-m',
|
|
||||||
type=str, nargs='?',
|
|
||||||
default='autoencoder_mnist.npy',
|
|
||||||
help='Model filename to save in run mode or load in training mode'
|
|
||||||
)
|
|
||||||
parser.add_argument(
|
|
||||||
'-r',
|
|
||||||
action='store_true',
|
|
||||||
help='Run the model'
|
|
||||||
)
|
|
||||||
args = parser.parse_args(sys.argv[1:])
|
|
||||||
if args.r:
|
|
||||||
mnist_test(args.m)
|
|
||||||
else:
|
|
||||||
autoencoder = mnist_train(
|
|
||||||
args.m,
|
|
||||||
args.e,
|
|
||||||
args.p,
|
|
||||||
LabelingVAE
|
|
||||||
)
|
|
||||||
mnist_test(autoencoder)
|
|
||||||
84
layers.py
Normal file
84
layers.py
Normal file
@@ -0,0 +1,84 @@
|
|||||||
|
import numpy as np
|
||||||
|
from utils import normalize
|
||||||
|
from activations import ActivationFunc
|
||||||
|
|
||||||
|
|
||||||
|
class NNLayer:
|
||||||
|
def __init__(self,
|
||||||
|
in_size: int,
|
||||||
|
out_size: int,
|
||||||
|
lr: float,
|
||||||
|
activation_func: ActivationFunc):
|
||||||
|
self.W = np.random.uniform(-1, 1, (in_size, out_size))
|
||||||
|
self.B = np.zeros((out_size))
|
||||||
|
self.lr = lr
|
||||||
|
self.input = None
|
||||||
|
self.output = None
|
||||||
|
self.output_linear = None
|
||||||
|
self.activation_func = activation_func
|
||||||
|
|
||||||
|
def __str__(self):
|
||||||
|
return f'[ {self.W.shape[0]} => {self.W.shape[1]}\tlr:{self.lr}\tactivation:{self.activation_func.__class__.__name__} ]' # noqa
|
||||||
|
|
||||||
|
def forward(self, V: np.ndarray) -> np.ndarray:
|
||||||
|
self.input = normalize(V)
|
||||||
|
self.output_linear = self.input @ self.W + self.B
|
||||||
|
self.output = self.activation_func(
|
||||||
|
self.output_linear
|
||||||
|
)
|
||||||
|
return self.output
|
||||||
|
|
||||||
|
def backprop(self, error: np.ndarray) -> np.ndarray:
|
||||||
|
error *= self.activation_func.derivative(self.output_linear)
|
||||||
|
ret = self.W @ error
|
||||||
|
dW = np.outer(self.input, error) * self.lr
|
||||||
|
dB = error * self.lr
|
||||||
|
self.W -= dW
|
||||||
|
self.B -= dB
|
||||||
|
return ret
|
||||||
|
|
||||||
|
|
||||||
|
class SamplingLayer:
|
||||||
|
def __init__(self,
|
||||||
|
in_size: int,
|
||||||
|
lr: float,
|
||||||
|
activation_func: ActivationFunc):
|
||||||
|
self.W_mean = np.random.uniform(-0.1, 0.1, (in_size, in_size))
|
||||||
|
self.W_variance = np.random.uniform(-0.1, 0.1, (in_size, in_size))
|
||||||
|
|
||||||
|
def forward(self, v) -> np.ndarray:
|
||||||
|
mean = self.W_mean @ v
|
||||||
|
variance = self.W_variance @ v
|
||||||
|
return np.random.normal(mean, variance)
|
||||||
|
|
||||||
|
def backprop(self, error: np.ndarray) -> np.ndarray:
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class DeepNNLayer:
|
||||||
|
def __init__(self,
|
||||||
|
layers: list[int],
|
||||||
|
lr: float,
|
||||||
|
activation_func: ActivationFunc):
|
||||||
|
self.layers: list[NNLayer] = []
|
||||||
|
for i in range(len(layers) - 1):
|
||||||
|
self.layers.append(
|
||||||
|
NNLayer(
|
||||||
|
layers[i],
|
||||||
|
layers[i+1],
|
||||||
|
lr,
|
||||||
|
activation_func)
|
||||||
|
)
|
||||||
|
|
||||||
|
def __str__(self):
|
||||||
|
return '\n'.join([str(layer) for layer in self.layers])
|
||||||
|
|
||||||
|
def forward(self, v: np.ndarray) -> np.ndarray:
|
||||||
|
for layer in self.layers:
|
||||||
|
v = layer.forward(v)
|
||||||
|
return v
|
||||||
|
|
||||||
|
def backprop(self, error: np.ndarray) -> np.ndarray:
|
||||||
|
for layer in self.layers[::-1]:
|
||||||
|
error = layer.backprop(error)
|
||||||
|
return error
|
||||||
Binary file not shown.
|
Before Width: | Height: | Size: 181 KiB |
118
mnist_test.py
Normal file
118
mnist_test.py
Normal file
@@ -0,0 +1,118 @@
|
|||||||
|
import matplotlib.pyplot as plt
|
||||||
|
import numpy as np
|
||||||
|
from autoencoder import Autoencoder
|
||||||
|
from activations import LeakyReLU
|
||||||
|
import os
|
||||||
|
|
||||||
|
|
||||||
|
def load_mnist() -> list[np.ndarray]:
|
||||||
|
import requests
|
||||||
|
|
||||||
|
mnist_path = "./mnist.npz"
|
||||||
|
mnist_url = "https://storage.googleapis.com/tensorflow/tf-keras-datasets/mnist.npz" # noqa
|
||||||
|
if not os.path.exists(mnist_path):
|
||||||
|
with open(mnist_path, "w+b") as f:
|
||||||
|
f.write(requests.get(mnist_url, stream=True).content)
|
||||||
|
res = np.load(mnist_path)
|
||||||
|
return res["x_train"], res["y_train"], res["x_test"], res["y_test"]
|
||||||
|
|
||||||
|
|
||||||
|
def mnist_train(
|
||||||
|
filename: str,
|
||||||
|
max_epoch: int,
|
||||||
|
patience: int,
|
||||||
|
) -> Autoencoder:
|
||||||
|
x_train, _, x_test, _ = load_mnist()
|
||||||
|
in_len = x_train[0].shape[0] * x_train[0].shape[0]
|
||||||
|
x_train.resize(x_train.shape[0], in_len)
|
||||||
|
x_test.resize(x_test.shape[0], in_len)
|
||||||
|
x_train = x_train / 255
|
||||||
|
x_test = x_test / 255
|
||||||
|
if os.path.exists(filename):
|
||||||
|
autoencoder = Autoencoder.load(filename)
|
||||||
|
else:
|
||||||
|
autoencoder = Autoencoder(
|
||||||
|
[in_len, 64, 16],
|
||||||
|
[16, 64, in_len],
|
||||||
|
0.01,
|
||||||
|
LeakyReLU()
|
||||||
|
)
|
||||||
|
autoencoder.train_dataset(
|
||||||
|
x_train,
|
||||||
|
max_epoch,
|
||||||
|
patience,
|
||||||
|
display_loss=True)
|
||||||
|
autoencoder.save(filename)
|
||||||
|
return autoencoder
|
||||||
|
|
||||||
|
|
||||||
|
def mnist_test(model: str | Autoencoder):
|
||||||
|
x_train, _, x_test, y_test = load_mnist()
|
||||||
|
in_len = x_train[0].shape[0] * x_train[0].shape[0]
|
||||||
|
img_shape = x_train[0].shape
|
||||||
|
x_train.resize(x_train.shape[0], in_len)
|
||||||
|
x_test.resize(x_test.shape[0], in_len)
|
||||||
|
x_train = x_train / 255
|
||||||
|
x_test = x_test / 255
|
||||||
|
if isinstance(model, str):
|
||||||
|
autoencoder: Autoencoder = Autoencoder.load(model)
|
||||||
|
else:
|
||||||
|
autoencoder = model
|
||||||
|
print(autoencoder)
|
||||||
|
idx = np.random.randint(0, len(x_test))
|
||||||
|
example: np.ndarray = x_test[idx]
|
||||||
|
output, code = autoencoder.forward(example.flatten())
|
||||||
|
plt.subplot(1, 3, 1)
|
||||||
|
plt.matshow(
|
||||||
|
example.reshape(img_shape),
|
||||||
|
fignum=False)
|
||||||
|
plt.title(f"Input ({y_test[idx]})")
|
||||||
|
plt.subplot(1, 3, 2)
|
||||||
|
plt.matshow(
|
||||||
|
output.reshape(img_shape),
|
||||||
|
fignum=False)
|
||||||
|
plt.title(f"Output ({y_test[idx]})")
|
||||||
|
plt.subplot(1, 3, 3)
|
||||||
|
s = int(np.ceil(np.sqrt(code.shape[0])))
|
||||||
|
code.resize((s, s), refcheck=False)
|
||||||
|
plt.matshow(code, fignum=False)
|
||||||
|
plt.title(f"Code ({y_test[idx]})")
|
||||||
|
plt.show()
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
import argparse
|
||||||
|
import sys
|
||||||
|
|
||||||
|
parser = argparse.ArgumentParser()
|
||||||
|
parser.add_argument(
|
||||||
|
'-e',
|
||||||
|
type=int,
|
||||||
|
nargs='?',
|
||||||
|
default=1000,
|
||||||
|
help='Max epochs'
|
||||||
|
)
|
||||||
|
parser.add_argument(
|
||||||
|
'-p',
|
||||||
|
type=int,
|
||||||
|
nargs='?',
|
||||||
|
default=5,
|
||||||
|
help='Patience'
|
||||||
|
)
|
||||||
|
parser.add_argument(
|
||||||
|
'-m',
|
||||||
|
type=str, nargs='?',
|
||||||
|
default='autoencoder_mnist.npy',
|
||||||
|
help='Model filename to save in run mode or load in training mode'
|
||||||
|
)
|
||||||
|
parser.add_argument(
|
||||||
|
'-r',
|
||||||
|
action='store_true',
|
||||||
|
help='Run mode'
|
||||||
|
)
|
||||||
|
args = parser.parse_args(sys.argv[1:])
|
||||||
|
if args.r:
|
||||||
|
mnist_test(args.m)
|
||||||
|
else:
|
||||||
|
autoencoder = mnist_train(args.m, args.e, args.p)
|
||||||
|
mnist_test(autoencoder)
|
||||||
@@ -1,23 +0,0 @@
|
|||||||
[project]
|
|
||||||
name = "easyvae"
|
|
||||||
version = "1.3.2"
|
|
||||||
authors = [
|
|
||||||
{ name="Ravaka RALAMBOARIVONY", email="ravaka.rlb.pro@gmail.com" },
|
|
||||||
]
|
|
||||||
description = "Python implementation of a Classical and Variational Autoencoders using NumPy"
|
|
||||||
readme = "README.md"
|
|
||||||
requires-python = ">=3.10"
|
|
||||||
classifiers = [
|
|
||||||
"Programming Language :: Python :: 3",
|
|
||||||
"Operating System :: OS Independent",
|
|
||||||
]
|
|
||||||
license = "MIT"
|
|
||||||
license-files = ["LICEN[CS]E*"]
|
|
||||||
|
|
||||||
[project.urls]
|
|
||||||
Homepage = "https://github.com/lenoctambule/autoencoder"
|
|
||||||
Issues = "https://github.com/lenoctambule/autoencoder/issues"
|
|
||||||
|
|
||||||
[build-system]
|
|
||||||
requires = ["setuptools"]
|
|
||||||
build-backend = "setuptools.build_meta"
|
|
||||||
@@ -1,299 +0,0 @@
|
|||||||
import numpy as np
|
|
||||||
from tqdm import tqdm
|
|
||||||
from .layers import DeepNNLayer, SampleLayer, NoiseLayer
|
|
||||||
from .activations import ActivationFunc, Identity
|
|
||||||
from .plotters import Plotter, CAPlotter, VAEPlotter
|
|
||||||
from .utils import interruptable
|
|
||||||
from abc import ABC, abstractmethod
|
|
||||||
|
|
||||||
LOADER = ['⡿', '⣟', '⣯', '⣷', '⣾', '⣽', '⣻', '⢿']
|
|
||||||
SQRT_2PI = np.sqrt(2 * np.pi)
|
|
||||||
|
|
||||||
|
|
||||||
class AAutoencoder(ABC):
|
|
||||||
plotter_cls = Plotter
|
|
||||||
|
|
||||||
@abstractmethod
|
|
||||||
def __init__(self,
|
|
||||||
encoder_layers: list[int],
|
|
||||||
decoder_layers: list[int],
|
|
||||||
lr: float,
|
|
||||||
activation_func: ActivationFunc,
|
|
||||||
noise=0):
|
|
||||||
if encoder_layers[-1] != decoder_layers[0]:
|
|
||||||
raise Exception(
|
|
||||||
f"Encoder output and decoder input don't match {encoder_layers[-1]} != {encoder_layers[0]}" # noqa
|
|
||||||
)
|
|
||||||
self.encoder = DeepNNLayer(encoder_layers, lr, activation_func)
|
|
||||||
self.decoder = DeepNNLayer(decoder_layers, lr, activation_func)
|
|
||||||
self.noise = NoiseLayer(noise)
|
|
||||||
self.space_dim = decoder_layers[0]
|
|
||||||
self.lr = lr
|
|
||||||
self.losses = [0]
|
|
||||||
|
|
||||||
def save(self, path: str):
|
|
||||||
path = path.removesuffix('.npy')
|
|
||||||
np.save(path, self)
|
|
||||||
|
|
||||||
def load(path: str) -> 'AAutoencoder':
|
|
||||||
path = path.removesuffix('.npy') + '.npy'
|
|
||||||
data = np.load(path, allow_pickle=True)
|
|
||||||
return data.item()
|
|
||||||
|
|
||||||
@abstractmethod
|
|
||||||
def loss(self, data_set: list[np.ndarray]) -> float:
|
|
||||||
pass
|
|
||||||
|
|
||||||
@abstractmethod
|
|
||||||
def train(self, v: np.ndarray) -> float:
|
|
||||||
pass
|
|
||||||
|
|
||||||
@abstractmethod
|
|
||||||
def forward(self, v: np.ndarray) -> np.ndarray:
|
|
||||||
pass
|
|
||||||
|
|
||||||
@abstractmethod
|
|
||||||
def train_dataset(self, *args, **kwargs) -> list[float]:
|
|
||||||
pass
|
|
||||||
|
|
||||||
def __str__(self):
|
|
||||||
return "\n".join((
|
|
||||||
f"Type: {self.__class__.__name__}",
|
|
||||||
"Encoder:",
|
|
||||||
f"{self.encoder}",
|
|
||||||
"Decoder:",
|
|
||||||
f"{self.decoder}"
|
|
||||||
)
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
class ClassicalAutoencoder(AAutoencoder):
|
|
||||||
plotter_cls = CAPlotter
|
|
||||||
|
|
||||||
def __init__(self, *args, **kwargs):
|
|
||||||
super().__init__(*args, **kwargs)
|
|
||||||
self.losses = []
|
|
||||||
|
|
||||||
def loss(self, data_set: list[np.ndarray]) -> float:
|
|
||||||
loss = 0
|
|
||||||
for x in data_set:
|
|
||||||
loss += np.sum(np.abs(x - self.forward(x)[0])) / len(x)
|
|
||||||
return loss / len(data_set)
|
|
||||||
|
|
||||||
def train(self, v: np.ndarray):
|
|
||||||
out, _ = self.forward(
|
|
||||||
self.noise.forward(v)
|
|
||||||
)
|
|
||||||
error = out - v
|
|
||||||
self.encoder.back(
|
|
||||||
self.decoder.back(error)
|
|
||||||
)
|
|
||||||
self.encoder.backprop()
|
|
||||||
self.decoder.backprop()
|
|
||||||
return np.sum(np.abs(error)) / len(v)
|
|
||||||
|
|
||||||
@interruptable
|
|
||||||
def train_dataset(self,
|
|
||||||
data_set: list[np.ndarray],
|
|
||||||
max_epoch: int,
|
|
||||||
patience: int,
|
|
||||||
display_loss: bool = False) -> list[float]:
|
|
||||||
plotter = self.plotter_cls(self) if display_loss else Plotter(self)
|
|
||||||
if len(self.losses) == 0:
|
|
||||||
self.losses = [self.loss(data_set)]
|
|
||||||
epoch = 0
|
|
||||||
no_improv = 0
|
|
||||||
prev_error = self.losses[-1]
|
|
||||||
with tqdm(bar_format="{desc} {elapsed} {rate_fmt}") as lbar:
|
|
||||||
while True:
|
|
||||||
lbar.set_description(
|
|
||||||
f"{LOADER[epoch % len(LOADER)]} Training ({epoch=} error={float(prev_error):.6f})", # noqa
|
|
||||||
)
|
|
||||||
lbar.update()
|
|
||||||
error = 0
|
|
||||||
for x in tqdm(data_set, leave=False):
|
|
||||||
error += self.train(x)
|
|
||||||
error /= len(data_set)
|
|
||||||
derror = prev_error - error
|
|
||||||
if abs(derror) < 1e-4:
|
|
||||||
no_improv += 1
|
|
||||||
else:
|
|
||||||
no_improv = 0
|
|
||||||
prev_error = float(error)
|
|
||||||
self.losses.append(error)
|
|
||||||
if no_improv > patience:
|
|
||||||
break
|
|
||||||
if epoch > max_epoch:
|
|
||||||
break
|
|
||||||
plotter.update()
|
|
||||||
epoch += 1
|
|
||||||
|
|
||||||
def encode(self, v: np.ndarray) -> np.ndarray:
|
|
||||||
return self.encoder.forward(v)
|
|
||||||
|
|
||||||
def decode(self, v: np.ndarray) -> np.ndarray:
|
|
||||||
return self.decoder.forward(v)
|
|
||||||
|
|
||||||
def forward(self, v: np.ndarray) -> tuple[np.ndarray, np.ndarray]:
|
|
||||||
code = self.encode(v)
|
|
||||||
out = self.decode(code)
|
|
||||||
return out, code
|
|
||||||
|
|
||||||
|
|
||||||
class VariationalAutoencoder(AAutoencoder):
|
|
||||||
plotter_cls = VAEPlotter
|
|
||||||
|
|
||||||
def __init__(self, *args, **kwargs):
|
|
||||||
super().__init__(*args, **kwargs)
|
|
||||||
self.sampler = SampleLayer(self.encoder.out_size, self.lr, Identity())
|
|
||||||
self.KL_losses = []
|
|
||||||
self.recon_losses = []
|
|
||||||
|
|
||||||
def loss(self, data_set: list[np.ndarray]) -> float:
|
|
||||||
kl_loss = 0
|
|
||||||
recon_loss = 0
|
|
||||||
for x in data_set:
|
|
||||||
out = self.forward(x)[0]
|
|
||||||
kl = self.sampler.DKL()
|
|
||||||
recon_loss += np.mean((out - x) ** 2)
|
|
||||||
kl_loss += kl
|
|
||||||
kl_loss /= len(data_set)
|
|
||||||
recon_loss /= len(data_set)
|
|
||||||
return recon_loss, kl_loss
|
|
||||||
|
|
||||||
def train(self, v: np.ndarray) -> tuple[float, float]:
|
|
||||||
out, _ = self.forward(
|
|
||||||
self.noise.forward(v)
|
|
||||||
)
|
|
||||||
error = out - v
|
|
||||||
self.encoder.back(
|
|
||||||
self.sampler.back(
|
|
||||||
self.decoder.back(error)
|
|
||||||
)
|
|
||||||
)
|
|
||||||
self.encoder.backprop()
|
|
||||||
self.sampler.backprop()
|
|
||||||
self.decoder.backprop()
|
|
||||||
return np.mean(error ** 2), self.sampler.DKL()
|
|
||||||
|
|
||||||
@interruptable
|
|
||||||
def train_dataset(self,
|
|
||||||
data_set: list[np.ndarray],
|
|
||||||
max_epoch: int,
|
|
||||||
patience: int,
|
|
||||||
display_loss: bool = False) -> list[float]:
|
|
||||||
plotter = self.plotter_cls(self) if display_loss else Plotter(self)
|
|
||||||
if len(self.recon_losses) == 0:
|
|
||||||
recon_0, kl_0 = self.loss(data_set)
|
|
||||||
self.recon_losses = [recon_0]
|
|
||||||
self.KL_losses = [kl_0]
|
|
||||||
epoch = 0
|
|
||||||
no_improv = 0
|
|
||||||
prev_loss = self.recon_losses[-1] + self.KL_losses[-1]
|
|
||||||
with tqdm(bar_format="{desc} {elapsed} {rate_fmt}") as lbar:
|
|
||||||
while True:
|
|
||||||
lbar.set_description(
|
|
||||||
f"{LOADER[epoch % len(LOADER)]} Training ({epoch=} loss={float(prev_loss):.6f})", # noqa
|
|
||||||
)
|
|
||||||
lbar.update()
|
|
||||||
dkl = 0
|
|
||||||
recon = 0
|
|
||||||
for x in tqdm(data_set, leave=False):
|
|
||||||
recon_i, dkl_i = self.train(x)
|
|
||||||
dkl += dkl_i
|
|
||||||
recon += recon_i
|
|
||||||
recon /= len(data_set)
|
|
||||||
dkl /= len(data_set)
|
|
||||||
loss = recon + dkl
|
|
||||||
dloss = prev_loss - loss
|
|
||||||
if dloss <= 0 or abs(dloss) < 1e-4:
|
|
||||||
no_improv += 1
|
|
||||||
else:
|
|
||||||
no_improv = 0
|
|
||||||
prev_loss = float(loss)
|
|
||||||
self.recon_losses.append(recon)
|
|
||||||
self.KL_losses.append(dkl)
|
|
||||||
if no_improv > patience:
|
|
||||||
break
|
|
||||||
if epoch > max_epoch:
|
|
||||||
break
|
|
||||||
plotter.update()
|
|
||||||
epoch += 1
|
|
||||||
|
|
||||||
def forward(self, v: np.ndarray) -> tuple[np.ndarray, np.ndarray]:
|
|
||||||
code = self.encoder.forward(v)
|
|
||||||
sample = self.sampler.forward(code)
|
|
||||||
out = self.decoder.forward(sample)
|
|
||||||
return out, sample
|
|
||||||
|
|
||||||
def encode(self, v: np.ndarray) -> np.ndarray:
|
|
||||||
return self.sampler.forward(
|
|
||||||
self.encoder.forward(v)
|
|
||||||
)
|
|
||||||
|
|
||||||
def decode(self, v: np.ndarray) -> np.ndarray:
|
|
||||||
return self.decoder.forward(v)
|
|
||||||
|
|
||||||
|
|
||||||
class Label:
|
|
||||||
def __init__(self,
|
|
||||||
name: str,
|
|
||||||
embedding_size: int,
|
|
||||||
N=100):
|
|
||||||
self.name = name
|
|
||||||
self.embedding_size = embedding_size
|
|
||||||
self.N = N
|
|
||||||
self.idx = 0
|
|
||||||
self.history = np.zeros((self.N, embedding_size))
|
|
||||||
|
|
||||||
def observe(self, code: np.ndarray):
|
|
||||||
if self.idx < self.N:
|
|
||||||
self.history[self.idx] = code
|
|
||||||
self.idx += 1
|
|
||||||
else:
|
|
||||||
diffs = np.linalg.norm(self.history - code, axis=1)
|
|
||||||
idx = np.argmin(diffs)
|
|
||||||
self.history[idx] = (self.history[idx] + code) / 2
|
|
||||||
|
|
||||||
def p(self, x: np.ndarray):
|
|
||||||
return 1 / (1e-4 + np.mean(np.abs(self.history - x)))
|
|
||||||
|
|
||||||
|
|
||||||
class LabelingVAE(VariationalAutoencoder):
|
|
||||||
def __init__(self, *args, **kwargs):
|
|
||||||
super().__init__(*args, **kwargs)
|
|
||||||
self.labels: list[Label] = []
|
|
||||||
self.labels_idxs: dict[str, int] = {}
|
|
||||||
|
|
||||||
def learn_labels(self, data: np.ndarray, labels: list[list[str]]):
|
|
||||||
self.labels.clear()
|
|
||||||
self.labels_idxs.clear()
|
|
||||||
for x_i, labels_i in zip(data, labels):
|
|
||||||
y_i = self.encode(x_i)
|
|
||||||
for c in labels_i:
|
|
||||||
idx = self.labels_idxs.get(c, None)
|
|
||||||
if idx is None:
|
|
||||||
label = Label(c, self.encoder.out_size)
|
|
||||||
self.labels.append(label)
|
|
||||||
self.labels_idxs[c] = len(self.labels) - 1
|
|
||||||
else:
|
|
||||||
label = self.labels[idx]
|
|
||||||
label.observe(y_i)
|
|
||||||
|
|
||||||
def label(self, x: np.ndarray):
|
|
||||||
probs = {}
|
|
||||||
total = 0
|
|
||||||
code = self.encode(x)
|
|
||||||
for label in self.labels:
|
|
||||||
p = label.p(code)
|
|
||||||
probs[label.name] = p
|
|
||||||
total += p
|
|
||||||
for k in probs:
|
|
||||||
probs[k] = float(probs[k] / total)
|
|
||||||
return dict(
|
|
||||||
sorted(
|
|
||||||
probs.items(),
|
|
||||||
key=lambda item: item[1],
|
|
||||||
reverse=True
|
|
||||||
)
|
|
||||||
)
|
|
||||||
@@ -1,126 +0,0 @@
|
|||||||
import numpy as np
|
|
||||||
from .activations import ActivationFunc, Identity
|
|
||||||
|
|
||||||
|
|
||||||
class NNLayer:
|
|
||||||
def __init__(self,
|
|
||||||
in_size: int,
|
|
||||||
out_size: int,
|
|
||||||
lr: float,
|
|
||||||
activation_func: ActivationFunc):
|
|
||||||
limit = np.sqrt(6 / (in_size + out_size))
|
|
||||||
self.W = np.random.uniform(-limit, limit, (in_size, out_size))
|
|
||||||
self.B = np.zeros((out_size))
|
|
||||||
self.lr = lr
|
|
||||||
self.input = None
|
|
||||||
self.output = None
|
|
||||||
self.output_linear = None
|
|
||||||
self.error = None
|
|
||||||
self.activation_func = activation_func
|
|
||||||
|
|
||||||
def __str__(self):
|
|
||||||
return f'[ {self.W.shape[0]} => {self.W.shape[1]}\tlr:{self.lr}\tactivation:{self.activation_func.__class__.__name__} ]' # noqa
|
|
||||||
|
|
||||||
def forward(self, v: np.ndarray) -> np.ndarray:
|
|
||||||
self.input = v
|
|
||||||
self.output_linear = self.input @ self.W + self.B
|
|
||||||
self.output = self.activation_func(
|
|
||||||
self.output_linear
|
|
||||||
)
|
|
||||||
return self.output
|
|
||||||
|
|
||||||
def back(self, error: np.ndarray) -> np.ndarray:
|
|
||||||
self.error = error * self.activation_func.d(self.output_linear)
|
|
||||||
return self.W @ self.error
|
|
||||||
|
|
||||||
def backprop(self) -> np.ndarray:
|
|
||||||
dW = np.outer(self.input, self.error) * self.lr
|
|
||||||
dB = self.error * self.lr
|
|
||||||
self.W -= dW
|
|
||||||
self.B -= dB
|
|
||||||
|
|
||||||
|
|
||||||
class SampleLayer:
|
|
||||||
def __init__(self,
|
|
||||||
in_size: int,
|
|
||||||
lr: float,
|
|
||||||
activation_func: ActivationFunc):
|
|
||||||
self.input = None
|
|
||||||
self.mean_nn = NNLayer(
|
|
||||||
in_size,
|
|
||||||
in_size,
|
|
||||||
lr,
|
|
||||||
activation_func)
|
|
||||||
self.std_nn = NNLayer(
|
|
||||||
in_size,
|
|
||||||
in_size,
|
|
||||||
lr,
|
|
||||||
activation_func)
|
|
||||||
|
|
||||||
def DKL(self):
|
|
||||||
return -0.5 * np.mean(1 + self.logvar - self.mean ** 2 - np.exp(self.logvar)) # noqa
|
|
||||||
|
|
||||||
def forward(self, v: np.ndarray) -> np.ndarray:
|
|
||||||
self.input = v
|
|
||||||
self.mean = self.mean_nn.forward(v)
|
|
||||||
self.logvar = np.clip(self.std_nn.forward(v), -10, 10)
|
|
||||||
self.std = np.exp(0.5 * self.logvar)
|
|
||||||
self.eps = np.random.normal(0, 1, self.mean.shape)
|
|
||||||
return 0.5 * self.eps * self.std + self.mean
|
|
||||||
|
|
||||||
def back(self, error: np.ndarray) -> np.ndarray:
|
|
||||||
dmean = error + self.mean
|
|
||||||
dstd = error * self.eps + 0.5 * (np.exp(self.logvar) - 1)
|
|
||||||
mean_error = self.mean_nn.back(dmean)
|
|
||||||
logvar_error = self.std_nn.back(dstd * self.std)
|
|
||||||
return mean_error + logvar_error
|
|
||||||
|
|
||||||
def backprop(self):
|
|
||||||
self.mean_nn.backprop()
|
|
||||||
self.std_nn.backprop()
|
|
||||||
|
|
||||||
|
|
||||||
class DeepNNLayer:
|
|
||||||
def __init__(self,
|
|
||||||
layers: list[int],
|
|
||||||
lr: float,
|
|
||||||
activation_func: ActivationFunc):
|
|
||||||
self.layers: list[NNLayer] = []
|
|
||||||
for i in range(len(layers) - 1):
|
|
||||||
self.layers.append(
|
|
||||||
NNLayer(
|
|
||||||
layers[i],
|
|
||||||
layers[i+1],
|
|
||||||
lr,
|
|
||||||
activation_func if i != len(layers) - 2 else Identity()
|
|
||||||
)
|
|
||||||
)
|
|
||||||
self.in_size = layers[0]
|
|
||||||
self.out_size = layers[-1]
|
|
||||||
|
|
||||||
def __str__(self):
|
|
||||||
return '\n'.join([str(layer) for layer in self.layers])
|
|
||||||
|
|
||||||
def forward(self, v: np.ndarray) -> np.ndarray:
|
|
||||||
for layer in self.layers:
|
|
||||||
v = layer.forward(v)
|
|
||||||
return v
|
|
||||||
|
|
||||||
def back(self, error: np.ndarray):
|
|
||||||
for layer in self.layers[::-1]:
|
|
||||||
error = layer.back(error)
|
|
||||||
return error
|
|
||||||
|
|
||||||
def backprop(self) -> np.ndarray:
|
|
||||||
for layer in self.layers:
|
|
||||||
layer.backprop()
|
|
||||||
|
|
||||||
|
|
||||||
class NoiseLayer:
|
|
||||||
def __init__(self, amount=0.1):
|
|
||||||
self.amount = amount
|
|
||||||
|
|
||||||
def forward(self, v: np.ndarray):
|
|
||||||
if self.amount == 0:
|
|
||||||
return v
|
|
||||||
return v + np.random.normal(0, self.amount, v.shape)
|
|
||||||
@@ -1,93 +0,0 @@
|
|||||||
import matplotlib.pyplot as plt
|
|
||||||
from typing import TYPE_CHECKING
|
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
|
||||||
from .autoencoder import AAutoencoder, VariationalAutoencoder
|
|
||||||
|
|
||||||
|
|
||||||
class Plotter:
|
|
||||||
def __init__(self, autoencoder: 'AAutoencoder'):
|
|
||||||
pass
|
|
||||||
|
|
||||||
def update(self):
|
|
||||||
pass
|
|
||||||
|
|
||||||
def close(self):
|
|
||||||
pass
|
|
||||||
|
|
||||||
def __del__(self):
|
|
||||||
self.close()
|
|
||||||
|
|
||||||
|
|
||||||
class CAPlotter(Plotter):
|
|
||||||
def __init__(self, autoencoder: 'AAutoencoder'):
|
|
||||||
self.autoencoder = autoencoder
|
|
||||||
plt.ion()
|
|
||||||
self.fig, self.ax = plt.subplots()
|
|
||||||
self.line, = self.ax.plot(
|
|
||||||
list(range(len(autoencoder.losses))),
|
|
||||||
autoencoder.losses,
|
|
||||||
label="Loss"
|
|
||||||
)
|
|
||||||
self.ax.set_xlabel("Epoch")
|
|
||||||
self.ax.set_ylabel("Loss")
|
|
||||||
self.ax.set_title("Training MSE Loss")
|
|
||||||
self.ax.legend()
|
|
||||||
self.update()
|
|
||||||
|
|
||||||
def update(self):
|
|
||||||
self.line.set_xdata(range(len(self.autoencoder.losses)))
|
|
||||||
self.line.set_ydata(self.autoencoder.losses)
|
|
||||||
self.ax.relim()
|
|
||||||
self.ax.autoscale_view()
|
|
||||||
plt.draw()
|
|
||||||
plt.pause(0.1)
|
|
||||||
|
|
||||||
def close(self):
|
|
||||||
plt.ioff()
|
|
||||||
plt.close(self.fig)
|
|
||||||
|
|
||||||
|
|
||||||
class VAEPlotter(Plotter):
|
|
||||||
def __init__(self, autoencoder: 'VariationalAutoencoder'):
|
|
||||||
self.autoencoder = autoencoder
|
|
||||||
plt.ion()
|
|
||||||
self.fig, (self.ax_recon, self.ax_dkl) = plt.subplots(1, 2)
|
|
||||||
self.line, = self.ax_recon.plot(
|
|
||||||
list(range(len(self.autoencoder.recon_losses))),
|
|
||||||
self.autoencoder.recon_losses,
|
|
||||||
label="Loss"
|
|
||||||
)
|
|
||||||
self.ax_recon.set_xlabel("Epoch")
|
|
||||||
self.ax_recon.set_ylabel("Loss")
|
|
||||||
self.ax_recon.set_title("Reconstruction MSE Loss")
|
|
||||||
self.ax_recon.legend()
|
|
||||||
|
|
||||||
self.dkl_line, = self.ax_dkl.plot(
|
|
||||||
list(range(len(self.autoencoder.KL_losses))),
|
|
||||||
self.autoencoder.KL_losses,
|
|
||||||
label="DKL Loss",
|
|
||||||
)
|
|
||||||
self.ax_dkl.set_xlabel("Epoch")
|
|
||||||
self.ax_dkl.set_ylabel("Loss")
|
|
||||||
self.ax_dkl.set_title("DKL Loss")
|
|
||||||
self.ax_dkl.legend()
|
|
||||||
self.update()
|
|
||||||
|
|
||||||
def update(self):
|
|
||||||
self.line.set_xdata(range(len(self.autoencoder.recon_losses)))
|
|
||||||
self.line.set_ydata(self.autoencoder.recon_losses)
|
|
||||||
self.ax_recon.relim()
|
|
||||||
self.ax_recon.autoscale_view()
|
|
||||||
|
|
||||||
self.dkl_line.set_xdata(range(len(self.autoencoder.KL_losses)))
|
|
||||||
self.dkl_line.set_ydata(self.autoencoder.KL_losses)
|
|
||||||
self.ax_dkl.relim()
|
|
||||||
self.ax_dkl.autoscale_view()
|
|
||||||
|
|
||||||
plt.draw()
|
|
||||||
plt.pause(0.1)
|
|
||||||
|
|
||||||
def close(self):
|
|
||||||
plt.ioff()
|
|
||||||
plt.close(self.fig)
|
|
||||||
@@ -1,29 +0,0 @@
|
|||||||
|
|
||||||
import numpy as np
|
|
||||||
|
|
||||||
|
|
||||||
def softmax(v: np.ndarray) -> np.ndarray:
|
|
||||||
v = v - np.max(v)
|
|
||||||
exp_v = np.exp(v)
|
|
||||||
return exp_v / np.sum(exp_v)
|
|
||||||
|
|
||||||
|
|
||||||
def normalize(v: np.ndarray) -> np.ndarray:
|
|
||||||
return v / (np.linalg.norm(v) + 1e-8)
|
|
||||||
|
|
||||||
|
|
||||||
def regularize(v: np.ndarray) -> np.ndarray:
|
|
||||||
v_min = v.min(axis=0)
|
|
||||||
v_max = v.max(axis=0)
|
|
||||||
if v_min - v_max == 0:
|
|
||||||
return v
|
|
||||||
return (v - v_min) / (v_max - v_min)
|
|
||||||
|
|
||||||
|
|
||||||
def interruptable(func):
|
|
||||||
def inner(*args, **kwargs):
|
|
||||||
try:
|
|
||||||
return func(*args, **kwargs)
|
|
||||||
except KeyboardInterrupt:
|
|
||||||
pass
|
|
||||||
return inner
|
|
||||||
46
utils.py
Normal file
46
utils.py
Normal file
@@ -0,0 +1,46 @@
|
|||||||
|
|
||||||
|
import numpy as np
|
||||||
|
import matplotlib.pyplot as plt
|
||||||
|
|
||||||
|
|
||||||
|
def softmax(v: np.ndarray) -> np.ndarray:
|
||||||
|
v = v - np.max(v)
|
||||||
|
exp_v = np.exp(v)
|
||||||
|
return exp_v / np.sum(exp_v)
|
||||||
|
|
||||||
|
|
||||||
|
def normalize(v: np.ndarray) -> np.ndarray:
|
||||||
|
return v / (np.linalg.norm(v) + 1e-8)
|
||||||
|
|
||||||
|
|
||||||
|
def regularize(v: np.ndarray) -> np.ndarray:
|
||||||
|
v_min = v.min(axis=0)
|
||||||
|
v_max = v.max(axis=0)
|
||||||
|
if v_min - v_max == 0:
|
||||||
|
return v
|
||||||
|
return (v - v_min) / (v_max - v_min)
|
||||||
|
|
||||||
|
|
||||||
|
def dynamic_loss_plot_init(losses: list):
|
||||||
|
plt.ion()
|
||||||
|
fig, ax = plt.subplots()
|
||||||
|
line, = ax.plot([0], losses, label="Loss")
|
||||||
|
ax.set_xlabel("Epoch")
|
||||||
|
ax.set_ylabel("Loss")
|
||||||
|
ax.set_title("Training Loss")
|
||||||
|
ax.legend()
|
||||||
|
return ax, line
|
||||||
|
|
||||||
|
|
||||||
|
def dynamic_loss_plot_update(ax, line, loss):
|
||||||
|
line.set_xdata(range(len(loss)))
|
||||||
|
line.set_ydata(loss)
|
||||||
|
ax.relim()
|
||||||
|
ax.autoscale_view()
|
||||||
|
plt.draw()
|
||||||
|
plt.pause(0.1)
|
||||||
|
|
||||||
|
|
||||||
|
def dynamic_loss_plot_finish(ax, line):
|
||||||
|
plt.ioff()
|
||||||
|
plt.show()
|
||||||
Reference in New Issue
Block a user