Compare commits
37 Commits
e6b508f739
...
9fbdb759d7
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
9fbdb759d7 | ||
|
|
24f1a8c9de | ||
|
|
583fc796f6 | ||
|
|
6eaaa43285 | ||
|
|
65c6d3bbee | ||
|
|
251d66a625 | ||
|
|
0f1c9f920b | ||
|
|
bf55c8eb7a | ||
|
|
32a4a39ab9 | ||
|
|
4cc349c22c | ||
|
|
b635bf0467 | ||
|
|
b1dc34e699 | ||
|
|
e6c1229a7e | ||
|
|
82acc6b4f3 | ||
|
|
a4334568ec | ||
|
|
d23f3a903a | ||
|
|
475ca0cf6f | ||
|
|
7a822782a5 | ||
|
|
5ff6cfe55e | ||
|
|
849d988de5 | ||
|
|
d4679712a6 | ||
|
|
ea8a4079ac | ||
|
|
9d718a6bc8 | ||
|
|
5e83983d96 | ||
|
|
9b42e9db85 | ||
|
|
0b46028702 | ||
|
|
b4963513cc | ||
|
|
075382cfb0 | ||
|
|
15865812d8 | ||
|
|
058b7a0f2a | ||
|
|
d048ddc6db | ||
|
|
e9c79f463f | ||
|
|
510ad8720c | ||
|
|
3440de851a | ||
|
|
82d61dd10f | ||
|
|
5a8fb2c48b | ||
|
|
577e679425 |
3
.gitignore
vendored
3
.gitignore
vendored
@@ -3,3 +3,6 @@ __pycache__
|
|||||||
*.npz
|
*.npz
|
||||||
*.npy
|
*.npy
|
||||||
.venv
|
.venv
|
||||||
|
dist
|
||||||
|
*.egg-info
|
||||||
|
.env
|
||||||
50
README.md
50
README.md
@@ -1,25 +1,53 @@
|
|||||||
# Python AutoEncoder from scratch using Numpy
|
# Python AutoEncoder from scratch using Numpy
|
||||||
|
|
||||||
|
<figure>
|
||||||
|
<p align="center">
|
||||||
|
<img
|
||||||
|
src="https://raw.githubusercontent.com/lenoctambule/autoencoder/refs/heads/main/media/latent-space.png"
|
||||||
|
alt="Latent-space of the MNIST dataset"
|
||||||
|
width=70%>
|
||||||
|
<figcaption>
|
||||||
|
<p align="center">
|
||||||
|
Latent-space representation of the MNIST dataset using Variational Autoencoder
|
||||||
|
</p>
|
||||||
|
</figcaption>
|
||||||
|
</p>
|
||||||
|
</figure>
|
||||||
|
|
||||||
## Usage
|
## Usage
|
||||||
|
|
||||||
1. Install requirements :
|
1. To install from source :
|
||||||
```sh
|
```sh
|
||||||
$ pip install -r requirements.txt
|
$ git clone git@github.com:lenoctambule/autoencoder.git
|
||||||
|
$ pip install -e autoencoder/
|
||||||
```
|
```
|
||||||
|
|
||||||
2. Optionally run mnist_test.py.
|
Or install from PyPI :
|
||||||
```sh
|
```sh
|
||||||
|
$ pip install easyvae
|
||||||
|
```
|
||||||
|
|
||||||
|
2. Optionally, run mnist_test.py to see it in action on the MNIST dataset.
|
||||||
|
```sh
|
||||||
|
$ cd examples
|
||||||
$ py mnist_test.py
|
$ py mnist_test.py
|
||||||
```
|
```
|
||||||
|
|
||||||
## Training
|
## Training
|
||||||
|
|
||||||
Instatiate an `Autoencoder` object :
|
Instatiate an `ClassicalAutoencoder` or `VariationalAutoencoder` object :
|
||||||
```py
|
```py
|
||||||
from autoencoder import Autoencoder
|
from easyvae.autoencoder import ClassicalAutoencoder, VariationalAutoencoder
|
||||||
from activations import LeakyReLU
|
from easyvae.activations import LeakyReLU
|
||||||
|
|
||||||
autoencoder = Autoencoder(
|
autoencoder = ClassicalAutoencoder(
|
||||||
|
[768, 64, 16],
|
||||||
|
[16, 64, 768],
|
||||||
|
0.01,
|
||||||
|
LeakyReLU()
|
||||||
|
)
|
||||||
|
# or
|
||||||
|
autoencoder = VariationalAutoencoder(
|
||||||
[768, 64, 16],
|
[768, 64, 16],
|
||||||
[16, 64, 768],
|
[16, 64, 768],
|
||||||
0.01,
|
0.01,
|
||||||
@@ -30,11 +58,17 @@ And then via the `train_dataset` method to train over a dataset :
|
|||||||
```py
|
```py
|
||||||
autoencoder.train_dataset(data)
|
autoencoder.train_dataset(data)
|
||||||
```
|
```
|
||||||
Or via the `train` to input each data points iteratively :
|
Or via the `train` method to input each data points iteratively :
|
||||||
```py
|
```py
|
||||||
autoencoder.train(v)
|
autoencoder.train(v)
|
||||||
```
|
```
|
||||||
|
|
||||||
|
After training, you can save your model via the `save` method and load that model using `load` method :
|
||||||
|
```
|
||||||
|
autoencoder.save("mymodel.npy")
|
||||||
|
autoencoder.load("mymodel.npy")
|
||||||
|
```
|
||||||
|
|
||||||
## Inference
|
## Inference
|
||||||
|
|
||||||
Use your `Autoencoder` object with the `encode`, `decode`, `forward` methods like so :
|
Use your `Autoencoder` object with the `encode`, `decode`, `forward` methods like so :
|
||||||
|
|||||||
101
autoencoder.py
101
autoencoder.py
@@ -1,101 +0,0 @@
|
|||||||
import numpy as np
|
|
||||||
from utils import (dynamic_loss_plot_init,
|
|
||||||
dynamic_loss_plot_update,
|
|
||||||
dynamic_loss_plot_finish)
|
|
||||||
from tqdm import tqdm
|
|
||||||
from layers import DeepNNLayer
|
|
||||||
from activations import ActivationFunc
|
|
||||||
|
|
||||||
LOADER = ['⡿', '⣟', '⣯', '⣷', '⣾', '⣽', '⣻', '⢿']
|
|
||||||
|
|
||||||
|
|
||||||
class Autoencoder:
|
|
||||||
def __init__(self,
|
|
||||||
encoder_layers: list[int],
|
|
||||||
decoder_layers: list[int],
|
|
||||||
lr: float,
|
|
||||||
activation_func: ActivationFunc):
|
|
||||||
if encoder_layers[-1] != decoder_layers[0]:
|
|
||||||
raise Exception(
|
|
||||||
f"Encoder output and decoder input don't match {encoder_layers[-1]} != {encoder_layers[0]}" # noqa
|
|
||||||
)
|
|
||||||
self.encoder = DeepNNLayer(encoder_layers, lr, activation_func)
|
|
||||||
self.decoder = DeepNNLayer(decoder_layers, lr, activation_func)
|
|
||||||
|
|
||||||
def __str__(self):
|
|
||||||
return f'Encoder:\n{self.encoder}\n\nDecoder:\n{self.decoder}'
|
|
||||||
|
|
||||||
def loss(self, data_set: list[np.ndarray]) -> float:
|
|
||||||
loss = 0
|
|
||||||
for x in data_set:
|
|
||||||
loss += np.sum(np.abs(x - self.forward(x)[0])) / len(x)
|
|
||||||
return loss / len(data_set)
|
|
||||||
|
|
||||||
def train(self, v: np.ndarray):
|
|
||||||
out = self.decoder.forward(
|
|
||||||
self.encoder.forward(v)
|
|
||||||
)
|
|
||||||
self.encoder.backprop(
|
|
||||||
self.decoder.backprop(out - v)
|
|
||||||
)
|
|
||||||
return np.sum(np.abs(out - v)) / len(v)
|
|
||||||
|
|
||||||
def train_dataset(self,
|
|
||||||
data_set: list[np.ndarray],
|
|
||||||
max_epoch: int,
|
|
||||||
patience: int,
|
|
||||||
display_loss: bool = False) -> list[float]:
|
|
||||||
losses = [self.loss(data_set)]
|
|
||||||
if display_loss is True:
|
|
||||||
ax, line = dynamic_loss_plot_init(losses)
|
|
||||||
epoch = 0
|
|
||||||
no_improv = 0
|
|
||||||
prev_error = losses[0]
|
|
||||||
with tqdm(bar_format="{desc} {elapsed} {rate_fmt}") as lbar:
|
|
||||||
while True:
|
|
||||||
lbar.set_description(
|
|
||||||
f"{LOADER[epoch % len(LOADER)]} Training ({epoch=} error={float(prev_error):.6f})", # noqa
|
|
||||||
)
|
|
||||||
lbar.update()
|
|
||||||
error = 0
|
|
||||||
for x in tqdm(data_set, leave=False):
|
|
||||||
error += self.train(x)
|
|
||||||
error /= len(data_set)
|
|
||||||
derror = prev_error - error
|
|
||||||
if derror <= 0 or abs(derror) < 1e-4:
|
|
||||||
no_improv += 1
|
|
||||||
else:
|
|
||||||
no_improv = 0
|
|
||||||
prev_error = float(error)
|
|
||||||
losses.append(error)
|
|
||||||
if display_loss is True:
|
|
||||||
dynamic_loss_plot_update(ax, line, losses)
|
|
||||||
if no_improv > patience:
|
|
||||||
break
|
|
||||||
if epoch > max_epoch:
|
|
||||||
break
|
|
||||||
epoch += 1
|
|
||||||
print("Training complete !")
|
|
||||||
if display_loss is True:
|
|
||||||
dynamic_loss_plot_finish(ax, line)
|
|
||||||
return losses
|
|
||||||
|
|
||||||
def encode(self, v: np.ndarray) -> np.ndarray:
|
|
||||||
return self.encoder.forward(v)
|
|
||||||
|
|
||||||
def decode(self, v: np.ndarray) -> np.ndarray:
|
|
||||||
return self.decoder.forward(v)
|
|
||||||
|
|
||||||
def forward(self, v: np.ndarray) -> tuple[np.ndarray, np.ndarray]:
|
|
||||||
code = self.encode(v)
|
|
||||||
out = self.decode(code)
|
|
||||||
return out, code
|
|
||||||
|
|
||||||
def save(self, path: str):
|
|
||||||
path = path.removesuffix('.npy')
|
|
||||||
np.save(path, self)
|
|
||||||
|
|
||||||
def load(path: str) -> 'Autoencoder':
|
|
||||||
path = path.removesuffix('.npy') + '.npy'
|
|
||||||
data = np.load(path, allow_pickle=True)
|
|
||||||
return data.item()
|
|
||||||
178
examples/mnist_test.py
Normal file
178
examples/mnist_test.py
Normal file
@@ -0,0 +1,178 @@
|
|||||||
|
import matplotlib.pyplot as plt
|
||||||
|
import numpy as np
|
||||||
|
import os
|
||||||
|
from easyvae.autoencoder import ( # noqa
|
||||||
|
VariationalAutoencoder,
|
||||||
|
ClassicalAutoencoder,
|
||||||
|
LabelingVAE,
|
||||||
|
AAutoencoder
|
||||||
|
)
|
||||||
|
from easyvae.activations import LeakyReLU
|
||||||
|
from tqdm import tqdm
|
||||||
|
|
||||||
|
|
||||||
|
def load_mnist() -> list[np.ndarray]:
|
||||||
|
import requests
|
||||||
|
|
||||||
|
mnist_path = "./mnist.npz"
|
||||||
|
mnist_url = "https://storage.googleapis.com/tensorflow/tf-keras-datasets/mnist.npz" # noqa
|
||||||
|
if not os.path.exists(mnist_path):
|
||||||
|
with open(mnist_path, "w+b") as f:
|
||||||
|
f.write(requests.get(mnist_url, stream=True).content)
|
||||||
|
res = np.load(mnist_path)
|
||||||
|
return res["x_train"], res["y_train"], res["x_test"], res["y_test"]
|
||||||
|
|
||||||
|
|
||||||
|
def mnist_train(
|
||||||
|
filename: str,
|
||||||
|
max_epoch: int,
|
||||||
|
patience: int,
|
||||||
|
cls: type[AAutoencoder],) -> AAutoencoder:
|
||||||
|
x_train, _, _, _ = load_mnist()
|
||||||
|
in_len = x_train[0].shape[0] * x_train[0].shape[0]
|
||||||
|
x_train.resize(x_train.shape[0], in_len)
|
||||||
|
x_train = x_train / 255
|
||||||
|
if os.path.exists(filename):
|
||||||
|
autoencoder = cls.load(filename)
|
||||||
|
else:
|
||||||
|
autoencoder = cls(
|
||||||
|
[in_len, 256, 2],
|
||||||
|
[2, 256, in_len],
|
||||||
|
0.001,
|
||||||
|
LeakyReLU()
|
||||||
|
)
|
||||||
|
print("CTRL+C to interrupt training.")
|
||||||
|
autoencoder.train_dataset(
|
||||||
|
x_train,
|
||||||
|
max_epoch,
|
||||||
|
patience,
|
||||||
|
display_loss=True)
|
||||||
|
autoencoder.save(filename)
|
||||||
|
print("Training complete !")
|
||||||
|
return autoencoder
|
||||||
|
|
||||||
|
|
||||||
|
def plot_mnist_latent_space(autoencoder: AAutoencoder, x: np.ndarray, y,):
|
||||||
|
codes = []
|
||||||
|
for x in x:
|
||||||
|
_, c = autoencoder.forward(x.flatten())
|
||||||
|
codes.append(c)
|
||||||
|
codes = np.array(codes)
|
||||||
|
if codes.shape[1] == 2:
|
||||||
|
plt.figure(figsize=(6, 6))
|
||||||
|
scatter = plt.scatter(
|
||||||
|
codes[:, 0],
|
||||||
|
codes[:, 1],
|
||||||
|
c=y,
|
||||||
|
cmap='tab10',
|
||||||
|
s=5,
|
||||||
|
alpha=0.7
|
||||||
|
)
|
||||||
|
plt.colorbar(scatter)
|
||||||
|
plt.grid(True)
|
||||||
|
|
||||||
|
|
||||||
|
def plot_random_reconstruction(
|
||||||
|
autoencoder: AAutoencoder,
|
||||||
|
example: np.ndarray,
|
||||||
|
img_shape,
|
||||||
|
y):
|
||||||
|
output, code = autoencoder.forward(example.flatten())
|
||||||
|
plt.subplot(1, 2, 1)
|
||||||
|
plt.matshow(
|
||||||
|
example.reshape(img_shape),
|
||||||
|
fignum=False)
|
||||||
|
plt.title(f"Input ({y})")
|
||||||
|
plt.subplot(1, 2, 2)
|
||||||
|
plt.matshow(
|
||||||
|
output.reshape(img_shape),
|
||||||
|
fignum=False)
|
||||||
|
plt.title(f"Output ({y})")
|
||||||
|
print(f'{code.tolist()}')
|
||||||
|
|
||||||
|
|
||||||
|
def labeling_accuracy(autoencoder: LabelingVAE, x_test, y_test):
|
||||||
|
accuracy = 0
|
||||||
|
for x, y in tqdm(
|
||||||
|
zip(x_test, y_test),
|
||||||
|
desc="Testing labeling",
|
||||||
|
total=len(x_test)
|
||||||
|
):
|
||||||
|
res = autoencoder.label(x)
|
||||||
|
res = list(res.items())[0][0]
|
||||||
|
if res == str(int(y)):
|
||||||
|
accuracy += 1
|
||||||
|
accuracy /= len(y_test)
|
||||||
|
print(f"Accuracy : {accuracy * 100:.2f}%")
|
||||||
|
|
||||||
|
|
||||||
|
def mnist_test(model: str | AAutoencoder | LabelingVAE):
|
||||||
|
x_train, y_train, x_test, y_test = load_mnist()
|
||||||
|
in_len = x_train[0].shape[0] * x_train[0].shape[0]
|
||||||
|
img_shape = x_train[0].shape
|
||||||
|
x_train.resize(x_train.shape[0], in_len)
|
||||||
|
x_test.resize(x_test.shape[0], in_len)
|
||||||
|
x_train = x_train / 255
|
||||||
|
x_test = x_test / 255
|
||||||
|
if isinstance(model, str):
|
||||||
|
autoencoder: AAutoencoder = AAutoencoder.load(model)
|
||||||
|
else:
|
||||||
|
autoencoder = model
|
||||||
|
print("Testing model ...\n")
|
||||||
|
print(autoencoder)
|
||||||
|
idx = np.random.randint(0, len(x_test))
|
||||||
|
example: np.ndarray = x_test[idx]
|
||||||
|
labels_train = [str(int(i)) for i in y_train]
|
||||||
|
if isinstance(autoencoder, LabelingVAE):
|
||||||
|
autoencoder.learn_labels(x_train, labels_train)
|
||||||
|
labeling_accuracy(autoencoder, x_test, y_test)
|
||||||
|
res = autoencoder.label(example)
|
||||||
|
for k, v in res.items():
|
||||||
|
print(f"{k} => {v}")
|
||||||
|
plot_random_reconstruction(autoencoder, example, img_shape, y_test[idx])
|
||||||
|
if autoencoder.space_dim == 2:
|
||||||
|
plot_mnist_latent_space(autoencoder, x_test, y_test)
|
||||||
|
plt.show()
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
import argparse
|
||||||
|
import sys
|
||||||
|
|
||||||
|
parser = argparse.ArgumentParser()
|
||||||
|
parser.add_argument(
|
||||||
|
'-e',
|
||||||
|
type=int,
|
||||||
|
nargs='?',
|
||||||
|
default=30,
|
||||||
|
help='Max epochs'
|
||||||
|
)
|
||||||
|
parser.add_argument(
|
||||||
|
'-p',
|
||||||
|
type=int,
|
||||||
|
nargs='?',
|
||||||
|
default=30,
|
||||||
|
help='Patience'
|
||||||
|
)
|
||||||
|
parser.add_argument(
|
||||||
|
'-m',
|
||||||
|
type=str, nargs='?',
|
||||||
|
default='autoencoder_mnist.npy',
|
||||||
|
help='Model filename to save in run mode or load in training mode'
|
||||||
|
)
|
||||||
|
parser.add_argument(
|
||||||
|
'-r',
|
||||||
|
action='store_true',
|
||||||
|
help='Run the model'
|
||||||
|
)
|
||||||
|
args = parser.parse_args(sys.argv[1:])
|
||||||
|
if args.r:
|
||||||
|
mnist_test(args.m)
|
||||||
|
else:
|
||||||
|
autoencoder = mnist_train(
|
||||||
|
args.m,
|
||||||
|
args.e,
|
||||||
|
args.p,
|
||||||
|
LabelingVAE
|
||||||
|
)
|
||||||
|
mnist_test(autoencoder)
|
||||||
67
layers.py
67
layers.py
@@ -1,67 +0,0 @@
|
|||||||
import numpy as np
|
|
||||||
from utils import normalize
|
|
||||||
from activations import ActivationFunc
|
|
||||||
|
|
||||||
|
|
||||||
class NNLayer:
|
|
||||||
def __init__(self,
|
|
||||||
in_size: int,
|
|
||||||
out_size: int,
|
|
||||||
lr: float,
|
|
||||||
activation_func: ActivationFunc):
|
|
||||||
self.W = np.random.uniform(-1, 1, (in_size, out_size))
|
|
||||||
self.B = np.zeros((out_size))
|
|
||||||
self.lr = lr
|
|
||||||
self.input = None
|
|
||||||
self.output = None
|
|
||||||
self.output_linear = None
|
|
||||||
self.activation_func = activation_func
|
|
||||||
|
|
||||||
def __str__(self):
|
|
||||||
return f'[ {self.W.shape[0]} => {self.W.shape[1]}\tlr:{self.lr}\tactivation:{self.activation_func.__class__.__name__} ]' # noqa
|
|
||||||
|
|
||||||
def forward(self, V: np.ndarray) -> np.ndarray:
|
|
||||||
self.input = normalize(V)
|
|
||||||
self.output_linear = self.input @ self.W + self.B
|
|
||||||
self.output = self.activation_func(
|
|
||||||
self.output_linear
|
|
||||||
)
|
|
||||||
return self.output
|
|
||||||
|
|
||||||
def backprop(self, error: np.ndarray) -> np.ndarray:
|
|
||||||
error *= self.activation_func.derivative(self.output_linear)
|
|
||||||
ret = self.W @ error
|
|
||||||
dW = np.outer(self.input, error) * self.lr
|
|
||||||
dB = error * self.lr
|
|
||||||
self.W -= dW
|
|
||||||
self.B -= dB
|
|
||||||
return ret
|
|
||||||
|
|
||||||
|
|
||||||
class DeepNNLayer:
|
|
||||||
def __init__(self,
|
|
||||||
layers: list[int],
|
|
||||||
lr: float,
|
|
||||||
activation_func: ActivationFunc):
|
|
||||||
self.layers: list[NNLayer] = []
|
|
||||||
for i in range(len(layers) - 1):
|
|
||||||
self.layers.append(
|
|
||||||
NNLayer(
|
|
||||||
layers[i],
|
|
||||||
layers[i+1],
|
|
||||||
lr,
|
|
||||||
activation_func)
|
|
||||||
)
|
|
||||||
|
|
||||||
def __str__(self):
|
|
||||||
return '\n'.join([str(layer) for layer in self.layers])
|
|
||||||
|
|
||||||
def forward(self, v: np.ndarray) -> np.ndarray:
|
|
||||||
for layer in self.layers:
|
|
||||||
v = layer.forward(v)
|
|
||||||
return v
|
|
||||||
|
|
||||||
def backprop(self, error: np.ndarray) -> np.ndarray:
|
|
||||||
for layer in self.layers[::-1]:
|
|
||||||
error = layer.backprop(error)
|
|
||||||
return error
|
|
||||||
BIN
media/latent-space.png
Normal file
BIN
media/latent-space.png
Normal file
Binary file not shown.
|
After Width: | Height: | Size: 181 KiB |
118
mnist_test.py
118
mnist_test.py
@@ -1,118 +0,0 @@
|
|||||||
import matplotlib.pyplot as plt
|
|
||||||
import numpy as np
|
|
||||||
from autoencoder import Autoencoder
|
|
||||||
from activations import LeakyReLU
|
|
||||||
import os
|
|
||||||
|
|
||||||
|
|
||||||
def load_mnist() -> list[np.ndarray]:
|
|
||||||
import requests
|
|
||||||
|
|
||||||
mnist_path = "./mnist.npz"
|
|
||||||
mnist_url = "https://storage.googleapis.com/tensorflow/tf-keras-datasets/mnist.npz" # noqa
|
|
||||||
if not os.path.exists(mnist_path):
|
|
||||||
with open(mnist_path, "w+b") as f:
|
|
||||||
f.write(requests.get(mnist_url, stream=True).content)
|
|
||||||
res = np.load(mnist_path)
|
|
||||||
return res["x_train"], res["y_train"], res["x_test"], res["y_test"]
|
|
||||||
|
|
||||||
|
|
||||||
def mnist_train(
|
|
||||||
filename: str,
|
|
||||||
max_epoch: int,
|
|
||||||
patience: int,
|
|
||||||
) -> Autoencoder:
|
|
||||||
x_train, _, x_test, _ = load_mnist()
|
|
||||||
in_len = x_train[0].shape[0] * x_train[0].shape[0]
|
|
||||||
x_train.resize(x_train.shape[0], in_len)
|
|
||||||
x_test.resize(x_test.shape[0], in_len)
|
|
||||||
x_train = x_train / 255
|
|
||||||
x_test = x_test / 255
|
|
||||||
if os.path.exists(filename):
|
|
||||||
autoencoder = Autoencoder.load(filename)
|
|
||||||
else:
|
|
||||||
autoencoder = Autoencoder(
|
|
||||||
[in_len, 64, 16],
|
|
||||||
[16, 64, in_len],
|
|
||||||
0.01,
|
|
||||||
LeakyReLU()
|
|
||||||
)
|
|
||||||
autoencoder.train_dataset(
|
|
||||||
x_train,
|
|
||||||
max_epoch,
|
|
||||||
patience,
|
|
||||||
display_loss=True)
|
|
||||||
autoencoder.save(filename)
|
|
||||||
return autoencoder
|
|
||||||
|
|
||||||
|
|
||||||
def mnist_test(model: str | Autoencoder):
|
|
||||||
x_train, _, x_test, y_test = load_mnist()
|
|
||||||
in_len = x_train[0].shape[0] * x_train[0].shape[0]
|
|
||||||
img_shape = x_train[0].shape
|
|
||||||
x_train.resize(x_train.shape[0], in_len)
|
|
||||||
x_test.resize(x_test.shape[0], in_len)
|
|
||||||
x_train = x_train / 255
|
|
||||||
x_test = x_test / 255
|
|
||||||
if isinstance(model, str):
|
|
||||||
autoencoder: Autoencoder = Autoencoder.load(model)
|
|
||||||
else:
|
|
||||||
autoencoder = model
|
|
||||||
print(autoencoder)
|
|
||||||
idx = np.random.randint(0, len(x_test))
|
|
||||||
example: np.ndarray = x_test[idx]
|
|
||||||
output, code = autoencoder.forward(example.flatten())
|
|
||||||
plt.subplot(1, 3, 1)
|
|
||||||
plt.matshow(
|
|
||||||
example.reshape(img_shape),
|
|
||||||
fignum=False)
|
|
||||||
plt.title(f"Input ({y_test[idx]})")
|
|
||||||
plt.subplot(1, 3, 2)
|
|
||||||
plt.matshow(
|
|
||||||
output.reshape(img_shape),
|
|
||||||
fignum=False)
|
|
||||||
plt.title(f"Output ({y_test[idx]})")
|
|
||||||
plt.subplot(1, 3, 3)
|
|
||||||
s = int(np.ceil(np.sqrt(code.shape[0])))
|
|
||||||
code.resize((s, s), refcheck=False)
|
|
||||||
plt.matshow(code, fignum=False)
|
|
||||||
plt.title(f"Code ({y_test[idx]})")
|
|
||||||
plt.show()
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
import argparse
|
|
||||||
import sys
|
|
||||||
|
|
||||||
parser = argparse.ArgumentParser()
|
|
||||||
parser.add_argument(
|
|
||||||
'-e',
|
|
||||||
type=int,
|
|
||||||
nargs='?',
|
|
||||||
default=1000,
|
|
||||||
help='Max epochs'
|
|
||||||
)
|
|
||||||
parser.add_argument(
|
|
||||||
'-p',
|
|
||||||
type=int,
|
|
||||||
nargs='?',
|
|
||||||
default=5,
|
|
||||||
help='Patience'
|
|
||||||
)
|
|
||||||
parser.add_argument(
|
|
||||||
'-m',
|
|
||||||
type=str, nargs='?',
|
|
||||||
default='autoencoder_mnist.npy',
|
|
||||||
help='Model filename to save in run mode or load in training mode'
|
|
||||||
)
|
|
||||||
parser.add_argument(
|
|
||||||
'-r',
|
|
||||||
action='store_true',
|
|
||||||
help='Run mode'
|
|
||||||
)
|
|
||||||
args = parser.parse_args(sys.argv[1:])
|
|
||||||
if args.r:
|
|
||||||
mnist_test(args.m)
|
|
||||||
else:
|
|
||||||
autoencoder = mnist_train(args.m, args.e, args.p)
|
|
||||||
mnist_test(autoencoder)
|
|
||||||
23
pyproject.toml
Normal file
23
pyproject.toml
Normal file
@@ -0,0 +1,23 @@
|
|||||||
|
[project]
|
||||||
|
name = "easyvae"
|
||||||
|
version = "1.3.2"
|
||||||
|
authors = [
|
||||||
|
{ name="Ravaka RALAMBOARIVONY", email="ravaka.rlb.pro@gmail.com" },
|
||||||
|
]
|
||||||
|
description = "Python implementation of a Classical and Variational Autoencoders using NumPy"
|
||||||
|
readme = "README.md"
|
||||||
|
requires-python = ">=3.10"
|
||||||
|
classifiers = [
|
||||||
|
"Programming Language :: Python :: 3",
|
||||||
|
"Operating System :: OS Independent",
|
||||||
|
]
|
||||||
|
license = "MIT"
|
||||||
|
license-files = ["LICEN[CS]E*"]
|
||||||
|
|
||||||
|
[project.urls]
|
||||||
|
Homepage = "https://github.com/lenoctambule/autoencoder"
|
||||||
|
Issues = "https://github.com/lenoctambule/autoencoder/issues"
|
||||||
|
|
||||||
|
[build-system]
|
||||||
|
requires = ["setuptools"]
|
||||||
|
build-backend = "setuptools.build_meta"
|
||||||
0
src/easyvae/__init__.py
Normal file
0
src/easyvae/__init__.py
Normal file
@@ -4,7 +4,7 @@ from abc import ABC, abstractmethod
|
|||||||
|
|
||||||
class ActivationFunc(ABC):
|
class ActivationFunc(ABC):
|
||||||
@abstractmethod
|
@abstractmethod
|
||||||
def derivative(v: np.ndarray) -> np.ndarray:
|
def d(v: np.ndarray) -> np.ndarray:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
@@ -12,7 +12,7 @@ class ReLU(ActivationFunc):
|
|||||||
def __call__(self, x):
|
def __call__(self, x):
|
||||||
return x * (x > 0)
|
return x * (x > 0)
|
||||||
|
|
||||||
def derivative(self, x):
|
def d(self, x):
|
||||||
return x > 0
|
return x > 0
|
||||||
|
|
||||||
|
|
||||||
@@ -23,7 +23,7 @@ class LeakyReLU(ActivationFunc):
|
|||||||
def __call__(self, x):
|
def __call__(self, x):
|
||||||
return x * (x > 0) + self.k * x * (x <= 0)
|
return x * (x > 0) + self.k * x * (x <= 0)
|
||||||
|
|
||||||
def derivative(self, x):
|
def d(self, x):
|
||||||
return (x > 0) + self.k * (x <= 0)
|
return (x > 0) + self.k * (x <= 0)
|
||||||
|
|
||||||
|
|
||||||
@@ -31,5 +31,5 @@ class Identity(ActivationFunc):
|
|||||||
def __call__(self, x):
|
def __call__(self, x):
|
||||||
return x
|
return x
|
||||||
|
|
||||||
def derivative(x):
|
def d(self, x):
|
||||||
return 1
|
return 1
|
||||||
299
src/easyvae/autoencoder.py
Normal file
299
src/easyvae/autoencoder.py
Normal file
@@ -0,0 +1,299 @@
|
|||||||
|
import numpy as np
|
||||||
|
from tqdm import tqdm
|
||||||
|
from .layers import DeepNNLayer, SampleLayer, NoiseLayer
|
||||||
|
from .activations import ActivationFunc, Identity
|
||||||
|
from .plotters import Plotter, CAPlotter, VAEPlotter
|
||||||
|
from .utils import interruptable
|
||||||
|
from abc import ABC, abstractmethod
|
||||||
|
|
||||||
|
LOADER = ['⡿', '⣟', '⣯', '⣷', '⣾', '⣽', '⣻', '⢿']
|
||||||
|
SQRT_2PI = np.sqrt(2 * np.pi)
|
||||||
|
|
||||||
|
|
||||||
|
class AAutoencoder(ABC):
|
||||||
|
plotter_cls = Plotter
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
|
def __init__(self,
|
||||||
|
encoder_layers: list[int],
|
||||||
|
decoder_layers: list[int],
|
||||||
|
lr: float,
|
||||||
|
activation_func: ActivationFunc,
|
||||||
|
noise=0):
|
||||||
|
if encoder_layers[-1] != decoder_layers[0]:
|
||||||
|
raise Exception(
|
||||||
|
f"Encoder output and decoder input don't match {encoder_layers[-1]} != {encoder_layers[0]}" # noqa
|
||||||
|
)
|
||||||
|
self.encoder = DeepNNLayer(encoder_layers, lr, activation_func)
|
||||||
|
self.decoder = DeepNNLayer(decoder_layers, lr, activation_func)
|
||||||
|
self.noise = NoiseLayer(noise)
|
||||||
|
self.space_dim = decoder_layers[0]
|
||||||
|
self.lr = lr
|
||||||
|
self.losses = [0]
|
||||||
|
|
||||||
|
def save(self, path: str):
|
||||||
|
path = path.removesuffix('.npy')
|
||||||
|
np.save(path, self)
|
||||||
|
|
||||||
|
def load(path: str) -> 'AAutoencoder':
|
||||||
|
path = path.removesuffix('.npy') + '.npy'
|
||||||
|
data = np.load(path, allow_pickle=True)
|
||||||
|
return data.item()
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
|
def loss(self, data_set: list[np.ndarray]) -> float:
|
||||||
|
pass
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
|
def train(self, v: np.ndarray) -> float:
|
||||||
|
pass
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
|
def forward(self, v: np.ndarray) -> np.ndarray:
|
||||||
|
pass
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
|
def train_dataset(self, *args, **kwargs) -> list[float]:
|
||||||
|
pass
|
||||||
|
|
||||||
|
def __str__(self):
|
||||||
|
return "\n".join((
|
||||||
|
f"Type: {self.__class__.__name__}",
|
||||||
|
"Encoder:",
|
||||||
|
f"{self.encoder}",
|
||||||
|
"Decoder:",
|
||||||
|
f"{self.decoder}"
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class ClassicalAutoencoder(AAutoencoder):
|
||||||
|
plotter_cls = CAPlotter
|
||||||
|
|
||||||
|
def __init__(self, *args, **kwargs):
|
||||||
|
super().__init__(*args, **kwargs)
|
||||||
|
self.losses = []
|
||||||
|
|
||||||
|
def loss(self, data_set: list[np.ndarray]) -> float:
|
||||||
|
loss = 0
|
||||||
|
for x in data_set:
|
||||||
|
loss += np.sum(np.abs(x - self.forward(x)[0])) / len(x)
|
||||||
|
return loss / len(data_set)
|
||||||
|
|
||||||
|
def train(self, v: np.ndarray):
|
||||||
|
out, _ = self.forward(
|
||||||
|
self.noise.forward(v)
|
||||||
|
)
|
||||||
|
error = out - v
|
||||||
|
self.encoder.back(
|
||||||
|
self.decoder.back(error)
|
||||||
|
)
|
||||||
|
self.encoder.backprop()
|
||||||
|
self.decoder.backprop()
|
||||||
|
return np.sum(np.abs(error)) / len(v)
|
||||||
|
|
||||||
|
@interruptable
|
||||||
|
def train_dataset(self,
|
||||||
|
data_set: list[np.ndarray],
|
||||||
|
max_epoch: int,
|
||||||
|
patience: int,
|
||||||
|
display_loss: bool = False) -> list[float]:
|
||||||
|
plotter = self.plotter_cls(self) if display_loss else Plotter(self)
|
||||||
|
if len(self.losses) == 0:
|
||||||
|
self.losses = [self.loss(data_set)]
|
||||||
|
epoch = 0
|
||||||
|
no_improv = 0
|
||||||
|
prev_error = self.losses[-1]
|
||||||
|
with tqdm(bar_format="{desc} {elapsed} {rate_fmt}") as lbar:
|
||||||
|
while True:
|
||||||
|
lbar.set_description(
|
||||||
|
f"{LOADER[epoch % len(LOADER)]} Training ({epoch=} error={float(prev_error):.6f})", # noqa
|
||||||
|
)
|
||||||
|
lbar.update()
|
||||||
|
error = 0
|
||||||
|
for x in tqdm(data_set, leave=False):
|
||||||
|
error += self.train(x)
|
||||||
|
error /= len(data_set)
|
||||||
|
derror = prev_error - error
|
||||||
|
if abs(derror) < 1e-4:
|
||||||
|
no_improv += 1
|
||||||
|
else:
|
||||||
|
no_improv = 0
|
||||||
|
prev_error = float(error)
|
||||||
|
self.losses.append(error)
|
||||||
|
if no_improv > patience:
|
||||||
|
break
|
||||||
|
if epoch > max_epoch:
|
||||||
|
break
|
||||||
|
plotter.update()
|
||||||
|
epoch += 1
|
||||||
|
|
||||||
|
def encode(self, v: np.ndarray) -> np.ndarray:
|
||||||
|
return self.encoder.forward(v)
|
||||||
|
|
||||||
|
def decode(self, v: np.ndarray) -> np.ndarray:
|
||||||
|
return self.decoder.forward(v)
|
||||||
|
|
||||||
|
def forward(self, v: np.ndarray) -> tuple[np.ndarray, np.ndarray]:
|
||||||
|
code = self.encode(v)
|
||||||
|
out = self.decode(code)
|
||||||
|
return out, code
|
||||||
|
|
||||||
|
|
||||||
|
class VariationalAutoencoder(AAutoencoder):
|
||||||
|
plotter_cls = VAEPlotter
|
||||||
|
|
||||||
|
def __init__(self, *args, **kwargs):
|
||||||
|
super().__init__(*args, **kwargs)
|
||||||
|
self.sampler = SampleLayer(self.encoder.out_size, self.lr, Identity())
|
||||||
|
self.KL_losses = []
|
||||||
|
self.recon_losses = []
|
||||||
|
|
||||||
|
def loss(self, data_set: list[np.ndarray]) -> float:
|
||||||
|
kl_loss = 0
|
||||||
|
recon_loss = 0
|
||||||
|
for x in data_set:
|
||||||
|
out = self.forward(x)[0]
|
||||||
|
kl = self.sampler.DKL()
|
||||||
|
recon_loss += np.mean((out - x) ** 2)
|
||||||
|
kl_loss += kl
|
||||||
|
kl_loss /= len(data_set)
|
||||||
|
recon_loss /= len(data_set)
|
||||||
|
return recon_loss, kl_loss
|
||||||
|
|
||||||
|
def train(self, v: np.ndarray) -> tuple[float, float]:
|
||||||
|
out, _ = self.forward(
|
||||||
|
self.noise.forward(v)
|
||||||
|
)
|
||||||
|
error = out - v
|
||||||
|
self.encoder.back(
|
||||||
|
self.sampler.back(
|
||||||
|
self.decoder.back(error)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
self.encoder.backprop()
|
||||||
|
self.sampler.backprop()
|
||||||
|
self.decoder.backprop()
|
||||||
|
return np.mean(error ** 2), self.sampler.DKL()
|
||||||
|
|
||||||
|
@interruptable
|
||||||
|
def train_dataset(self,
|
||||||
|
data_set: list[np.ndarray],
|
||||||
|
max_epoch: int,
|
||||||
|
patience: int,
|
||||||
|
display_loss: bool = False) -> list[float]:
|
||||||
|
plotter = self.plotter_cls(self) if display_loss else Plotter(self)
|
||||||
|
if len(self.recon_losses) == 0:
|
||||||
|
recon_0, kl_0 = self.loss(data_set)
|
||||||
|
self.recon_losses = [recon_0]
|
||||||
|
self.KL_losses = [kl_0]
|
||||||
|
epoch = 0
|
||||||
|
no_improv = 0
|
||||||
|
prev_loss = self.recon_losses[-1] + self.KL_losses[-1]
|
||||||
|
with tqdm(bar_format="{desc} {elapsed} {rate_fmt}") as lbar:
|
||||||
|
while True:
|
||||||
|
lbar.set_description(
|
||||||
|
f"{LOADER[epoch % len(LOADER)]} Training ({epoch=} loss={float(prev_loss):.6f})", # noqa
|
||||||
|
)
|
||||||
|
lbar.update()
|
||||||
|
dkl = 0
|
||||||
|
recon = 0
|
||||||
|
for x in tqdm(data_set, leave=False):
|
||||||
|
recon_i, dkl_i = self.train(x)
|
||||||
|
dkl += dkl_i
|
||||||
|
recon += recon_i
|
||||||
|
recon /= len(data_set)
|
||||||
|
dkl /= len(data_set)
|
||||||
|
loss = recon + dkl
|
||||||
|
dloss = prev_loss - loss
|
||||||
|
if dloss <= 0 or abs(dloss) < 1e-4:
|
||||||
|
no_improv += 1
|
||||||
|
else:
|
||||||
|
no_improv = 0
|
||||||
|
prev_loss = float(loss)
|
||||||
|
self.recon_losses.append(recon)
|
||||||
|
self.KL_losses.append(dkl)
|
||||||
|
if no_improv > patience:
|
||||||
|
break
|
||||||
|
if epoch > max_epoch:
|
||||||
|
break
|
||||||
|
plotter.update()
|
||||||
|
epoch += 1
|
||||||
|
|
||||||
|
def forward(self, v: np.ndarray) -> tuple[np.ndarray, np.ndarray]:
|
||||||
|
code = self.encoder.forward(v)
|
||||||
|
sample = self.sampler.forward(code)
|
||||||
|
out = self.decoder.forward(sample)
|
||||||
|
return out, sample
|
||||||
|
|
||||||
|
def encode(self, v: np.ndarray) -> np.ndarray:
|
||||||
|
return self.sampler.forward(
|
||||||
|
self.encoder.forward(v)
|
||||||
|
)
|
||||||
|
|
||||||
|
def decode(self, v: np.ndarray) -> np.ndarray:
|
||||||
|
return self.decoder.forward(v)
|
||||||
|
|
||||||
|
|
||||||
|
class Label:
|
||||||
|
def __init__(self,
|
||||||
|
name: str,
|
||||||
|
embedding_size: int,
|
||||||
|
N=100):
|
||||||
|
self.name = name
|
||||||
|
self.embedding_size = embedding_size
|
||||||
|
self.N = N
|
||||||
|
self.idx = 0
|
||||||
|
self.history = np.zeros((self.N, embedding_size))
|
||||||
|
|
||||||
|
def observe(self, code: np.ndarray):
|
||||||
|
if self.idx < self.N:
|
||||||
|
self.history[self.idx] = code
|
||||||
|
self.idx += 1
|
||||||
|
else:
|
||||||
|
diffs = np.linalg.norm(self.history - code, axis=1)
|
||||||
|
idx = np.argmin(diffs)
|
||||||
|
self.history[idx] = (self.history[idx] + code) / 2
|
||||||
|
|
||||||
|
def p(self, x: np.ndarray):
|
||||||
|
return 1 / (1e-4 + np.mean(np.abs(self.history - x)))
|
||||||
|
|
||||||
|
|
||||||
|
class LabelingVAE(VariationalAutoencoder):
|
||||||
|
def __init__(self, *args, **kwargs):
|
||||||
|
super().__init__(*args, **kwargs)
|
||||||
|
self.labels: list[Label] = []
|
||||||
|
self.labels_idxs: dict[str, int] = {}
|
||||||
|
|
||||||
|
def learn_labels(self, data: np.ndarray, labels: list[list[str]]):
|
||||||
|
self.labels.clear()
|
||||||
|
self.labels_idxs.clear()
|
||||||
|
for x_i, labels_i in zip(data, labels):
|
||||||
|
y_i = self.encode(x_i)
|
||||||
|
for c in labels_i:
|
||||||
|
idx = self.labels_idxs.get(c, None)
|
||||||
|
if idx is None:
|
||||||
|
label = Label(c, self.encoder.out_size)
|
||||||
|
self.labels.append(label)
|
||||||
|
self.labels_idxs[c] = len(self.labels) - 1
|
||||||
|
else:
|
||||||
|
label = self.labels[idx]
|
||||||
|
label.observe(y_i)
|
||||||
|
|
||||||
|
def label(self, x: np.ndarray):
|
||||||
|
probs = {}
|
||||||
|
total = 0
|
||||||
|
code = self.encode(x)
|
||||||
|
for label in self.labels:
|
||||||
|
p = label.p(code)
|
||||||
|
probs[label.name] = p
|
||||||
|
total += p
|
||||||
|
for k in probs:
|
||||||
|
probs[k] = float(probs[k] / total)
|
||||||
|
return dict(
|
||||||
|
sorted(
|
||||||
|
probs.items(),
|
||||||
|
key=lambda item: item[1],
|
||||||
|
reverse=True
|
||||||
|
)
|
||||||
|
)
|
||||||
126
src/easyvae/layers.py
Normal file
126
src/easyvae/layers.py
Normal file
@@ -0,0 +1,126 @@
|
|||||||
|
import numpy as np
|
||||||
|
from .activations import ActivationFunc, Identity
|
||||||
|
|
||||||
|
|
||||||
|
class NNLayer:
|
||||||
|
def __init__(self,
|
||||||
|
in_size: int,
|
||||||
|
out_size: int,
|
||||||
|
lr: float,
|
||||||
|
activation_func: ActivationFunc):
|
||||||
|
limit = np.sqrt(6 / (in_size + out_size))
|
||||||
|
self.W = np.random.uniform(-limit, limit, (in_size, out_size))
|
||||||
|
self.B = np.zeros((out_size))
|
||||||
|
self.lr = lr
|
||||||
|
self.input = None
|
||||||
|
self.output = None
|
||||||
|
self.output_linear = None
|
||||||
|
self.error = None
|
||||||
|
self.activation_func = activation_func
|
||||||
|
|
||||||
|
def __str__(self):
|
||||||
|
return f'[ {self.W.shape[0]} => {self.W.shape[1]}\tlr:{self.lr}\tactivation:{self.activation_func.__class__.__name__} ]' # noqa
|
||||||
|
|
||||||
|
def forward(self, v: np.ndarray) -> np.ndarray:
|
||||||
|
self.input = v
|
||||||
|
self.output_linear = self.input @ self.W + self.B
|
||||||
|
self.output = self.activation_func(
|
||||||
|
self.output_linear
|
||||||
|
)
|
||||||
|
return self.output
|
||||||
|
|
||||||
|
def back(self, error: np.ndarray) -> np.ndarray:
|
||||||
|
self.error = error * self.activation_func.d(self.output_linear)
|
||||||
|
return self.W @ self.error
|
||||||
|
|
||||||
|
def backprop(self) -> np.ndarray:
|
||||||
|
dW = np.outer(self.input, self.error) * self.lr
|
||||||
|
dB = self.error * self.lr
|
||||||
|
self.W -= dW
|
||||||
|
self.B -= dB
|
||||||
|
|
||||||
|
|
||||||
|
class SampleLayer:
|
||||||
|
def __init__(self,
|
||||||
|
in_size: int,
|
||||||
|
lr: float,
|
||||||
|
activation_func: ActivationFunc):
|
||||||
|
self.input = None
|
||||||
|
self.mean_nn = NNLayer(
|
||||||
|
in_size,
|
||||||
|
in_size,
|
||||||
|
lr,
|
||||||
|
activation_func)
|
||||||
|
self.std_nn = NNLayer(
|
||||||
|
in_size,
|
||||||
|
in_size,
|
||||||
|
lr,
|
||||||
|
activation_func)
|
||||||
|
|
||||||
|
def DKL(self):
|
||||||
|
return -0.5 * np.mean(1 + self.logvar - self.mean ** 2 - np.exp(self.logvar)) # noqa
|
||||||
|
|
||||||
|
def forward(self, v: np.ndarray) -> np.ndarray:
|
||||||
|
self.input = v
|
||||||
|
self.mean = self.mean_nn.forward(v)
|
||||||
|
self.logvar = np.clip(self.std_nn.forward(v), -10, 10)
|
||||||
|
self.std = np.exp(0.5 * self.logvar)
|
||||||
|
self.eps = np.random.normal(0, 1, self.mean.shape)
|
||||||
|
return 0.5 * self.eps * self.std + self.mean
|
||||||
|
|
||||||
|
def back(self, error: np.ndarray) -> np.ndarray:
|
||||||
|
dmean = error + self.mean
|
||||||
|
dstd = error * self.eps + 0.5 * (np.exp(self.logvar) - 1)
|
||||||
|
mean_error = self.mean_nn.back(dmean)
|
||||||
|
logvar_error = self.std_nn.back(dstd * self.std)
|
||||||
|
return mean_error + logvar_error
|
||||||
|
|
||||||
|
def backprop(self):
|
||||||
|
self.mean_nn.backprop()
|
||||||
|
self.std_nn.backprop()
|
||||||
|
|
||||||
|
|
||||||
|
class DeepNNLayer:
|
||||||
|
def __init__(self,
|
||||||
|
layers: list[int],
|
||||||
|
lr: float,
|
||||||
|
activation_func: ActivationFunc):
|
||||||
|
self.layers: list[NNLayer] = []
|
||||||
|
for i in range(len(layers) - 1):
|
||||||
|
self.layers.append(
|
||||||
|
NNLayer(
|
||||||
|
layers[i],
|
||||||
|
layers[i+1],
|
||||||
|
lr,
|
||||||
|
activation_func if i != len(layers) - 2 else Identity()
|
||||||
|
)
|
||||||
|
)
|
||||||
|
self.in_size = layers[0]
|
||||||
|
self.out_size = layers[-1]
|
||||||
|
|
||||||
|
def __str__(self):
|
||||||
|
return '\n'.join([str(layer) for layer in self.layers])
|
||||||
|
|
||||||
|
def forward(self, v: np.ndarray) -> np.ndarray:
|
||||||
|
for layer in self.layers:
|
||||||
|
v = layer.forward(v)
|
||||||
|
return v
|
||||||
|
|
||||||
|
def back(self, error: np.ndarray):
|
||||||
|
for layer in self.layers[::-1]:
|
||||||
|
error = layer.back(error)
|
||||||
|
return error
|
||||||
|
|
||||||
|
def backprop(self) -> np.ndarray:
|
||||||
|
for layer in self.layers:
|
||||||
|
layer.backprop()
|
||||||
|
|
||||||
|
|
||||||
|
class NoiseLayer:
|
||||||
|
def __init__(self, amount=0.1):
|
||||||
|
self.amount = amount
|
||||||
|
|
||||||
|
def forward(self, v: np.ndarray):
|
||||||
|
if self.amount == 0:
|
||||||
|
return v
|
||||||
|
return v + np.random.normal(0, self.amount, v.shape)
|
||||||
93
src/easyvae/plotters.py
Normal file
93
src/easyvae/plotters.py
Normal file
@@ -0,0 +1,93 @@
|
|||||||
|
import matplotlib.pyplot as plt
|
||||||
|
from typing import TYPE_CHECKING
|
||||||
|
|
||||||
|
if TYPE_CHECKING:
|
||||||
|
from .autoencoder import AAutoencoder, VariationalAutoencoder
|
||||||
|
|
||||||
|
|
||||||
|
class Plotter:
|
||||||
|
def __init__(self, autoencoder: 'AAutoencoder'):
|
||||||
|
pass
|
||||||
|
|
||||||
|
def update(self):
|
||||||
|
pass
|
||||||
|
|
||||||
|
def close(self):
|
||||||
|
pass
|
||||||
|
|
||||||
|
def __del__(self):
|
||||||
|
self.close()
|
||||||
|
|
||||||
|
|
||||||
|
class CAPlotter(Plotter):
|
||||||
|
def __init__(self, autoencoder: 'AAutoencoder'):
|
||||||
|
self.autoencoder = autoencoder
|
||||||
|
plt.ion()
|
||||||
|
self.fig, self.ax = plt.subplots()
|
||||||
|
self.line, = self.ax.plot(
|
||||||
|
list(range(len(autoencoder.losses))),
|
||||||
|
autoencoder.losses,
|
||||||
|
label="Loss"
|
||||||
|
)
|
||||||
|
self.ax.set_xlabel("Epoch")
|
||||||
|
self.ax.set_ylabel("Loss")
|
||||||
|
self.ax.set_title("Training MSE Loss")
|
||||||
|
self.ax.legend()
|
||||||
|
self.update()
|
||||||
|
|
||||||
|
def update(self):
|
||||||
|
self.line.set_xdata(range(len(self.autoencoder.losses)))
|
||||||
|
self.line.set_ydata(self.autoencoder.losses)
|
||||||
|
self.ax.relim()
|
||||||
|
self.ax.autoscale_view()
|
||||||
|
plt.draw()
|
||||||
|
plt.pause(0.1)
|
||||||
|
|
||||||
|
def close(self):
|
||||||
|
plt.ioff()
|
||||||
|
plt.close(self.fig)
|
||||||
|
|
||||||
|
|
||||||
|
class VAEPlotter(Plotter):
|
||||||
|
def __init__(self, autoencoder: 'VariationalAutoencoder'):
|
||||||
|
self.autoencoder = autoencoder
|
||||||
|
plt.ion()
|
||||||
|
self.fig, (self.ax_recon, self.ax_dkl) = plt.subplots(1, 2)
|
||||||
|
self.line, = self.ax_recon.plot(
|
||||||
|
list(range(len(self.autoencoder.recon_losses))),
|
||||||
|
self.autoencoder.recon_losses,
|
||||||
|
label="Loss"
|
||||||
|
)
|
||||||
|
self.ax_recon.set_xlabel("Epoch")
|
||||||
|
self.ax_recon.set_ylabel("Loss")
|
||||||
|
self.ax_recon.set_title("Reconstruction MSE Loss")
|
||||||
|
self.ax_recon.legend()
|
||||||
|
|
||||||
|
self.dkl_line, = self.ax_dkl.plot(
|
||||||
|
list(range(len(self.autoencoder.KL_losses))),
|
||||||
|
self.autoencoder.KL_losses,
|
||||||
|
label="DKL Loss",
|
||||||
|
)
|
||||||
|
self.ax_dkl.set_xlabel("Epoch")
|
||||||
|
self.ax_dkl.set_ylabel("Loss")
|
||||||
|
self.ax_dkl.set_title("DKL Loss")
|
||||||
|
self.ax_dkl.legend()
|
||||||
|
self.update()
|
||||||
|
|
||||||
|
def update(self):
|
||||||
|
self.line.set_xdata(range(len(self.autoencoder.recon_losses)))
|
||||||
|
self.line.set_ydata(self.autoencoder.recon_losses)
|
||||||
|
self.ax_recon.relim()
|
||||||
|
self.ax_recon.autoscale_view()
|
||||||
|
|
||||||
|
self.dkl_line.set_xdata(range(len(self.autoencoder.KL_losses)))
|
||||||
|
self.dkl_line.set_ydata(self.autoencoder.KL_losses)
|
||||||
|
self.ax_dkl.relim()
|
||||||
|
self.ax_dkl.autoscale_view()
|
||||||
|
|
||||||
|
plt.draw()
|
||||||
|
plt.pause(0.1)
|
||||||
|
|
||||||
|
def close(self):
|
||||||
|
plt.ioff()
|
||||||
|
plt.close(self.fig)
|
||||||
29
src/easyvae/utils.py
Normal file
29
src/easyvae/utils.py
Normal file
@@ -0,0 +1,29 @@
|
|||||||
|
|
||||||
|
import numpy as np
|
||||||
|
|
||||||
|
|
||||||
|
def softmax(v: np.ndarray) -> np.ndarray:
|
||||||
|
v = v - np.max(v)
|
||||||
|
exp_v = np.exp(v)
|
||||||
|
return exp_v / np.sum(exp_v)
|
||||||
|
|
||||||
|
|
||||||
|
def normalize(v: np.ndarray) -> np.ndarray:
|
||||||
|
return v / (np.linalg.norm(v) + 1e-8)
|
||||||
|
|
||||||
|
|
||||||
|
def regularize(v: np.ndarray) -> np.ndarray:
|
||||||
|
v_min = v.min(axis=0)
|
||||||
|
v_max = v.max(axis=0)
|
||||||
|
if v_min - v_max == 0:
|
||||||
|
return v
|
||||||
|
return (v - v_min) / (v_max - v_min)
|
||||||
|
|
||||||
|
|
||||||
|
def interruptable(func):
|
||||||
|
def inner(*args, **kwargs):
|
||||||
|
try:
|
||||||
|
return func(*args, **kwargs)
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
pass
|
||||||
|
return inner
|
||||||
46
utils.py
46
utils.py
@@ -1,46 +0,0 @@
|
|||||||
|
|
||||||
import numpy as np
|
|
||||||
import matplotlib.pyplot as plt
|
|
||||||
|
|
||||||
|
|
||||||
def softmax(v: np.ndarray) -> np.ndarray:
|
|
||||||
v = v - np.max(v)
|
|
||||||
exp_v = np.exp(v)
|
|
||||||
return exp_v / np.sum(exp_v)
|
|
||||||
|
|
||||||
|
|
||||||
def normalize(v: np.ndarray) -> np.ndarray:
|
|
||||||
return v / (np.linalg.norm(v) + 1e-8)
|
|
||||||
|
|
||||||
|
|
||||||
def regularize(v: np.ndarray) -> np.ndarray:
|
|
||||||
v_min = v.min(axis=0)
|
|
||||||
v_max = v.max(axis=0)
|
|
||||||
if v_min - v_max == 0:
|
|
||||||
return v
|
|
||||||
return (v - v_min) / (v_max - v_min)
|
|
||||||
|
|
||||||
|
|
||||||
def dynamic_loss_plot_init(losses: list):
|
|
||||||
plt.ion()
|
|
||||||
fig, ax = plt.subplots()
|
|
||||||
line, = ax.plot([0], losses, label="Loss")
|
|
||||||
ax.set_xlabel("Epoch")
|
|
||||||
ax.set_ylabel("Loss")
|
|
||||||
ax.set_title("Training Loss")
|
|
||||||
ax.legend()
|
|
||||||
return ax, line
|
|
||||||
|
|
||||||
|
|
||||||
def dynamic_loss_plot_update(ax, line, loss):
|
|
||||||
line.set_xdata(range(len(loss)))
|
|
||||||
line.set_ydata(loss)
|
|
||||||
ax.relim()
|
|
||||||
ax.autoscale_view()
|
|
||||||
plt.draw()
|
|
||||||
plt.pause(0.1)
|
|
||||||
|
|
||||||
|
|
||||||
def dynamic_loss_plot_finish(ax, line):
|
|
||||||
plt.ioff()
|
|
||||||
plt.show()
|
|
||||||
Reference in New Issue
Block a user