Compare commits

..

1 Commits

Author SHA1 Message Date
Lenoctambule
e6b508f739 feat: sampling layer w/ forward method + abstract autoencoder 2026-03-31 19:10:06 +02:00
15 changed files with 397 additions and 802 deletions

3
.gitignore vendored
View File

@@ -3,6 +3,3 @@ __pycache__
*.npz *.npz
*.npy *.npy
.venv .venv
dist
*.egg-info
.env

View File

@@ -1,53 +1,25 @@
# Python AutoEncoder from scratch using Numpy # Python AutoEncoder from scratch using Numpy
<figure>
<p align="center">
<img
src="https://raw.githubusercontent.com/lenoctambule/autoencoder/refs/heads/main/media/latent-space.png"
alt="Latent-space of the MNIST dataset"
width=70%>
<figcaption>
<p align="center">
Latent-space representation of the MNIST dataset using Variational Autoencoder
</p>
</figcaption>
</p>
</figure>
## Usage ## Usage
1. To install from source : 1. Install requirements :
```sh ```sh
$ git clone git@github.com:lenoctambule/autoencoder.git $ pip install -r requirements.txt
$ pip install -e autoencoder/
``` ```
Or install from PyPI : 2. Optionally run mnist_test.py.
```sh ```sh
$ pip install easyvae
```
2. Optionally, run mnist_test.py to see it in action on the MNIST dataset.
```sh
$ cd examples
$ py mnist_test.py $ py mnist_test.py
``` ```
## Training ## Training
Instatiate an `ClassicalAutoencoder` or `VariationalAutoencoder` object : Instatiate an `Autoencoder` object :
```py ```py
from easyvae.autoencoder import ClassicalAutoencoder, VariationalAutoencoder from autoencoder import Autoencoder
from easyvae.activations import LeakyReLU from activations import LeakyReLU
autoencoder = ClassicalAutoencoder( autoencoder = Autoencoder(
[768, 64, 16],
[16, 64, 768],
0.01,
LeakyReLU()
)
# or
autoencoder = VariationalAutoencoder(
[768, 64, 16], [768, 64, 16],
[16, 64, 768], [16, 64, 768],
0.01, 0.01,
@@ -58,17 +30,11 @@ And then via the `train_dataset` method to train over a dataset :
```py ```py
autoencoder.train_dataset(data) autoencoder.train_dataset(data)
``` ```
Or via the `train` method to input each data points iteratively : Or via the `train` to input each data points iteratively :
```py ```py
autoencoder.train(v) autoencoder.train(v)
``` ```
After training, you can save your model via the `save` method and load that model using `load` method :
```
autoencoder.save("mymodel.npy")
autoencoder.load("mymodel.npy")
```
## Inference ## Inference
Use your `Autoencoder` object with the `encode`, `decode`, `forward` methods like so : Use your `Autoencoder` object with the `encode`, `decode`, `forward` methods like so :

View File

@@ -4,7 +4,7 @@ from abc import ABC, abstractmethod
class ActivationFunc(ABC): class ActivationFunc(ABC):
@abstractmethod @abstractmethod
def d(v: np.ndarray) -> np.ndarray: def derivative(v: np.ndarray) -> np.ndarray:
pass pass
@@ -12,7 +12,7 @@ class ReLU(ActivationFunc):
def __call__(self, x): def __call__(self, x):
return x * (x > 0) return x * (x > 0)
def d(self, x): def derivative(self, x):
return x > 0 return x > 0
@@ -23,7 +23,7 @@ class LeakyReLU(ActivationFunc):
def __call__(self, x): def __call__(self, x):
return x * (x > 0) + self.k * x * (x <= 0) return x * (x > 0) + self.k * x * (x <= 0)
def d(self, x): def derivative(self, x):
return (x > 0) + self.k * (x <= 0) return (x > 0) + self.k * (x <= 0)
@@ -31,5 +31,5 @@ class Identity(ActivationFunc):
def __call__(self, x): def __call__(self, x):
return x return x
def d(self, x): def derivative(x):
return 1 return 1

137
autoencoder.py Normal file
View File

@@ -0,0 +1,137 @@
import numpy as np
from utils import (dynamic_loss_plot_init,
dynamic_loss_plot_update,
dynamic_loss_plot_finish)
from tqdm import tqdm
from layers import DeepNNLayer, SamplingLayer
from activations import ActivationFunc
from abc import ABC, abstractmethod
LOADER = ['', '', '', '', '', '', '', '']
class AAutoencoder(ABC):
def train_dataset(self,
data_set: list[np.ndarray],
max_epoch: int,
patience: int,
display_loss: bool = False) -> list[float]:
losses = [self.loss(data_set)]
if display_loss is True:
ax, line = dynamic_loss_plot_init(losses)
epoch = 0
no_improv = 0
prev_error = losses[0]
with tqdm(bar_format="{desc} {elapsed} {rate_fmt}") as lbar:
while True:
lbar.set_description(
f"{LOADER[epoch % len(LOADER)]} Training ({epoch=} error={float(prev_error):.6f})", # noqa
)
lbar.update()
error = 0
for x in tqdm(data_set, leave=False):
error += self.train(x)
error /= len(data_set)
derror = prev_error - error
if derror <= 0 or abs(derror) < 1e-4:
no_improv += 1
else:
no_improv = 0
prev_error = float(error)
losses.append(error)
if display_loss is True:
dynamic_loss_plot_update(ax, line, losses)
if no_improv > patience:
break
if epoch > max_epoch:
break
epoch += 1
print("Training complete !")
if display_loss is True:
dynamic_loss_plot_finish(ax, line)
return losses
def loss(self, data_set: list[np.ndarray]) -> float:
loss = 0
for x in data_set:
loss += np.sum(np.abs(x - self.forward(x)[0])) / len(x)
return loss / len(data_set)
def save(self, path: str):
path = path.removesuffix('.npy')
np.save(path, self)
def load(path: str) -> 'Autoencoder':
path = path.removesuffix('.npy') + '.npy'
data = np.load(path, allow_pickle=True)
return data.item()
@abstractmethod
def train(self, v: np.ndarray) -> float:
pass
@abstractmethod
def encode(self, v: np.ndarray) -> np.ndarray:
return self.encoder.forward(v)
@abstractmethod
def decode(self, v: np.ndarray) -> np.ndarray:
return self.decoder.forward(v)
@abstractmethod
def forward(self, v: np.ndarray) -> tuple[np.ndarray, np.ndarray]:
code = self.encode(v)
out = self.decode(code)
return out, code
class Autoencoder(AAutoencoder):
def __init__(self,
encoder_layers: list[int],
decoder_layers: list[int],
lr: float,
activation_func: ActivationFunc):
if encoder_layers[-1] != decoder_layers[0]:
raise Exception(
f"Encoder output and decoder input don't match {encoder_layers[-1]} != {encoder_layers[0]}" # noqa
)
self.encoder = DeepNNLayer(encoder_layers, lr, activation_func)
self.decoder = DeepNNLayer(decoder_layers, lr, activation_func)
def train(self, v: np.ndarray) -> float:
out = self.decoder.forward(
self.encoder.forward(v)
)
self.encoder.backprop(
self.decoder.backprop(out - v)
)
return np.sum(np.abs(out - v)) / len(v)
def __str__(self):
return f'Encoder:\n{self.encoder}\n\nDecoder:\n{self.decoder}'
def encode(self, v: np.ndarray) -> np.ndarray:
return self.encoder.forward(v)
def decode(self, v: np.ndarray) -> np.ndarray:
return self.decoder.forward(v)
def forward(self, v: np.ndarray) -> tuple[np.ndarray, np.ndarray]:
code = self.encode(v)
out = self.decode(code)
return out, code
class VariationalAutoencoder(AAutoencoder):
def __init__(self,
encoder_layers: list[int],
decoder_layers: list[int],
lr: float,
activation_func: ActivationFunc):
if encoder_layers[-1] != decoder_layers[0]:
raise Exception(
f"Encoder output and decoder input don't match {encoder_layers[-1]} != {encoder_layers[0]}" # noqa
)
self.encoder = DeepNNLayer(encoder_layers, lr, activation_func)
self.decoder = DeepNNLayer(decoder_layers, lr, activation_func)
self.sampler = SamplingLayer(decoder_layers[0], lr, activation_func)

View File

@@ -1,178 +0,0 @@
import matplotlib.pyplot as plt
import numpy as np
import os
from easyvae.autoencoder import ( # noqa
VariationalAutoencoder,
ClassicalAutoencoder,
LabelingVAE,
AAutoencoder
)
from easyvae.activations import LeakyReLU
from tqdm import tqdm
def load_mnist() -> list[np.ndarray]:
import requests
mnist_path = "./mnist.npz"
mnist_url = "https://storage.googleapis.com/tensorflow/tf-keras-datasets/mnist.npz" # noqa
if not os.path.exists(mnist_path):
with open(mnist_path, "w+b") as f:
f.write(requests.get(mnist_url, stream=True).content)
res = np.load(mnist_path)
return res["x_train"], res["y_train"], res["x_test"], res["y_test"]
def mnist_train(
filename: str,
max_epoch: int,
patience: int,
cls: type[AAutoencoder],) -> AAutoencoder:
x_train, _, _, _ = load_mnist()
in_len = x_train[0].shape[0] * x_train[0].shape[0]
x_train.resize(x_train.shape[0], in_len)
x_train = x_train / 255
if os.path.exists(filename):
autoencoder = cls.load(filename)
else:
autoencoder = cls(
[in_len, 256, 2],
[2, 256, in_len],
0.001,
LeakyReLU()
)
print("CTRL+C to interrupt training.")
autoencoder.train_dataset(
x_train,
max_epoch,
patience,
display_loss=True)
autoencoder.save(filename)
print("Training complete !")
return autoencoder
def plot_mnist_latent_space(autoencoder: AAutoencoder, x: np.ndarray, y,):
codes = []
for x in x:
_, c = autoencoder.forward(x.flatten())
codes.append(c)
codes = np.array(codes)
if codes.shape[1] == 2:
plt.figure(figsize=(6, 6))
scatter = plt.scatter(
codes[:, 0],
codes[:, 1],
c=y,
cmap='tab10',
s=5,
alpha=0.7
)
plt.colorbar(scatter)
plt.grid(True)
def plot_random_reconstruction(
autoencoder: AAutoencoder,
example: np.ndarray,
img_shape,
y):
output, code = autoencoder.forward(example.flatten())
plt.subplot(1, 2, 1)
plt.matshow(
example.reshape(img_shape),
fignum=False)
plt.title(f"Input ({y})")
plt.subplot(1, 2, 2)
plt.matshow(
output.reshape(img_shape),
fignum=False)
plt.title(f"Output ({y})")
print(f'{code.tolist()}')
def labeling_accuracy(autoencoder: LabelingVAE, x_test, y_test):
accuracy = 0
for x, y in tqdm(
zip(x_test, y_test),
desc="Testing labeling",
total=len(x_test)
):
res = autoencoder.label(x)
res = list(res.items())[0][0]
if res == str(int(y)):
accuracy += 1
accuracy /= len(y_test)
print(f"Accuracy : {accuracy * 100:.2f}%")
def mnist_test(model: str | AAutoencoder | LabelingVAE):
x_train, y_train, x_test, y_test = load_mnist()
in_len = x_train[0].shape[0] * x_train[0].shape[0]
img_shape = x_train[0].shape
x_train.resize(x_train.shape[0], in_len)
x_test.resize(x_test.shape[0], in_len)
x_train = x_train / 255
x_test = x_test / 255
if isinstance(model, str):
autoencoder: AAutoencoder = AAutoencoder.load(model)
else:
autoencoder = model
print("Testing model ...\n")
print(autoencoder)
idx = np.random.randint(0, len(x_test))
example: np.ndarray = x_test[idx]
labels_train = [str(int(i)) for i in y_train]
if isinstance(autoencoder, LabelingVAE):
autoencoder.learn_labels(x_train, labels_train)
labeling_accuracy(autoencoder, x_test, y_test)
res = autoencoder.label(example)
for k, v in res.items():
print(f"{k} => {v}")
plot_random_reconstruction(autoencoder, example, img_shape, y_test[idx])
if autoencoder.space_dim == 2:
plot_mnist_latent_space(autoencoder, x_test, y_test)
plt.show()
if __name__ == "__main__":
import argparse
import sys
parser = argparse.ArgumentParser()
parser.add_argument(
'-e',
type=int,
nargs='?',
default=30,
help='Max epochs'
)
parser.add_argument(
'-p',
type=int,
nargs='?',
default=30,
help='Patience'
)
parser.add_argument(
'-m',
type=str, nargs='?',
default='autoencoder_mnist.npy',
help='Model filename to save in run mode or load in training mode'
)
parser.add_argument(
'-r',
action='store_true',
help='Run the model'
)
args = parser.parse_args(sys.argv[1:])
if args.r:
mnist_test(args.m)
else:
autoencoder = mnist_train(
args.m,
args.e,
args.p,
LabelingVAE
)
mnist_test(autoencoder)

84
layers.py Normal file
View File

@@ -0,0 +1,84 @@
import numpy as np
from utils import normalize
from activations import ActivationFunc
class NNLayer:
def __init__(self,
in_size: int,
out_size: int,
lr: float,
activation_func: ActivationFunc):
self.W = np.random.uniform(-1, 1, (in_size, out_size))
self.B = np.zeros((out_size))
self.lr = lr
self.input = None
self.output = None
self.output_linear = None
self.activation_func = activation_func
def __str__(self):
return f'[ {self.W.shape[0]} => {self.W.shape[1]}\tlr:{self.lr}\tactivation:{self.activation_func.__class__.__name__} ]' # noqa
def forward(self, V: np.ndarray) -> np.ndarray:
self.input = normalize(V)
self.output_linear = self.input @ self.W + self.B
self.output = self.activation_func(
self.output_linear
)
return self.output
def backprop(self, error: np.ndarray) -> np.ndarray:
error *= self.activation_func.derivative(self.output_linear)
ret = self.W @ error
dW = np.outer(self.input, error) * self.lr
dB = error * self.lr
self.W -= dW
self.B -= dB
return ret
class SamplingLayer:
def __init__(self,
in_size: int,
lr: float,
activation_func: ActivationFunc):
self.W_mean = np.random.uniform(-0.1, 0.1, (in_size, in_size))
self.W_variance = np.random.uniform(-0.1, 0.1, (in_size, in_size))
def forward(self, v) -> np.ndarray:
mean = self.W_mean @ v
variance = self.W_variance @ v
return np.random.normal(mean, variance)
def backprop(self, error: np.ndarray) -> np.ndarray:
pass
class DeepNNLayer:
def __init__(self,
layers: list[int],
lr: float,
activation_func: ActivationFunc):
self.layers: list[NNLayer] = []
for i in range(len(layers) - 1):
self.layers.append(
NNLayer(
layers[i],
layers[i+1],
lr,
activation_func)
)
def __str__(self):
return '\n'.join([str(layer) for layer in self.layers])
def forward(self, v: np.ndarray) -> np.ndarray:
for layer in self.layers:
v = layer.forward(v)
return v
def backprop(self, error: np.ndarray) -> np.ndarray:
for layer in self.layers[::-1]:
error = layer.backprop(error)
return error

Binary file not shown.

Before

Width:  |  Height:  |  Size: 181 KiB

118
mnist_test.py Normal file
View File

@@ -0,0 +1,118 @@
import matplotlib.pyplot as plt
import numpy as np
from autoencoder import Autoencoder
from activations import LeakyReLU
import os
def load_mnist() -> list[np.ndarray]:
import requests
mnist_path = "./mnist.npz"
mnist_url = "https://storage.googleapis.com/tensorflow/tf-keras-datasets/mnist.npz" # noqa
if not os.path.exists(mnist_path):
with open(mnist_path, "w+b") as f:
f.write(requests.get(mnist_url, stream=True).content)
res = np.load(mnist_path)
return res["x_train"], res["y_train"], res["x_test"], res["y_test"]
def mnist_train(
filename: str,
max_epoch: int,
patience: int,
) -> Autoencoder:
x_train, _, x_test, _ = load_mnist()
in_len = x_train[0].shape[0] * x_train[0].shape[0]
x_train.resize(x_train.shape[0], in_len)
x_test.resize(x_test.shape[0], in_len)
x_train = x_train / 255
x_test = x_test / 255
if os.path.exists(filename):
autoencoder = Autoencoder.load(filename)
else:
autoencoder = Autoencoder(
[in_len, 64, 16],
[16, 64, in_len],
0.01,
LeakyReLU()
)
autoencoder.train_dataset(
x_train,
max_epoch,
patience,
display_loss=True)
autoencoder.save(filename)
return autoencoder
def mnist_test(model: str | Autoencoder):
x_train, _, x_test, y_test = load_mnist()
in_len = x_train[0].shape[0] * x_train[0].shape[0]
img_shape = x_train[0].shape
x_train.resize(x_train.shape[0], in_len)
x_test.resize(x_test.shape[0], in_len)
x_train = x_train / 255
x_test = x_test / 255
if isinstance(model, str):
autoencoder: Autoencoder = Autoencoder.load(model)
else:
autoencoder = model
print(autoencoder)
idx = np.random.randint(0, len(x_test))
example: np.ndarray = x_test[idx]
output, code = autoencoder.forward(example.flatten())
plt.subplot(1, 3, 1)
plt.matshow(
example.reshape(img_shape),
fignum=False)
plt.title(f"Input ({y_test[idx]})")
plt.subplot(1, 3, 2)
plt.matshow(
output.reshape(img_shape),
fignum=False)
plt.title(f"Output ({y_test[idx]})")
plt.subplot(1, 3, 3)
s = int(np.ceil(np.sqrt(code.shape[0])))
code.resize((s, s), refcheck=False)
plt.matshow(code, fignum=False)
plt.title(f"Code ({y_test[idx]})")
plt.show()
if __name__ == "__main__":
import argparse
import sys
parser = argparse.ArgumentParser()
parser.add_argument(
'-e',
type=int,
nargs='?',
default=1000,
help='Max epochs'
)
parser.add_argument(
'-p',
type=int,
nargs='?',
default=5,
help='Patience'
)
parser.add_argument(
'-m',
type=str, nargs='?',
default='autoencoder_mnist.npy',
help='Model filename to save in run mode or load in training mode'
)
parser.add_argument(
'-r',
action='store_true',
help='Run mode'
)
args = parser.parse_args(sys.argv[1:])
if args.r:
mnist_test(args.m)
else:
autoencoder = mnist_train(args.m, args.e, args.p)
mnist_test(autoencoder)

View File

@@ -1,28 +0,0 @@
[project]
name = "easyvae"
version = "1.3.3"
authors = [
{ name="Ravaka RALAMBOARIVONY", email="ravaka.rlb.pro@gmail.com" },
]
description = "Python implementation of a Classical and Variational Autoencoders using NumPy"
readme = "README.md"
requires-python = ">=3.10"
classifiers = [
"Programming Language :: Python :: 3",
"Operating System :: OS Independent",
]
license = "MIT"
license-files = ["LICEN[CS]E*"]
dependencies = [
"numpy",
"matplotlib",
"tqdm",
]
[project.urls]
Homepage = "https://github.com/lenoctambule/autoencoder"
Issues = "https://github.com/lenoctambule/autoencoder/issues"
[build-system]
requires = ["setuptools"]
build-backend = "setuptools.build_meta"

View File

@@ -1,299 +0,0 @@
import numpy as np
from tqdm import tqdm
from .layers import DeepNNLayer, SampleLayer, NoiseLayer
from .activations import ActivationFunc, Identity
from .plotters import Plotter, CAPlotter, VAEPlotter
from .utils import interruptable
from abc import ABC, abstractmethod
LOADER = ['', '', '', '', '', '', '', '']
SQRT_2PI = np.sqrt(2 * np.pi)
class AAutoencoder(ABC):
plotter_cls = Plotter
@abstractmethod
def __init__(self,
encoder_layers: list[int],
decoder_layers: list[int],
lr: float,
activation_func: ActivationFunc,
noise=0):
if encoder_layers[-1] != decoder_layers[0]:
raise Exception(
f"Encoder output and decoder input don't match {encoder_layers[-1]} != {encoder_layers[0]}" # noqa
)
self.encoder = DeepNNLayer(encoder_layers, lr, activation_func)
self.decoder = DeepNNLayer(decoder_layers, lr, activation_func)
self.noise = NoiseLayer(noise)
self.space_dim = decoder_layers[0]
self.lr = lr
self.losses = [0]
def save(self, path: str):
path = path.removesuffix('.npy')
np.save(path, self)
def load(path: str) -> 'AAutoencoder':
path = path.removesuffix('.npy') + '.npy'
data = np.load(path, allow_pickle=True)
return data.item()
@abstractmethod
def loss(self, data_set: list[np.ndarray]) -> float:
pass
@abstractmethod
def train(self, v: np.ndarray) -> float:
pass
@abstractmethod
def forward(self, v: np.ndarray) -> np.ndarray:
pass
@abstractmethod
def train_dataset(self, *args, **kwargs) -> list[float]:
pass
def __str__(self):
return "\n".join((
f"Type: {self.__class__.__name__}",
"Encoder:",
f"{self.encoder}",
"Decoder:",
f"{self.decoder}"
)
)
class ClassicalAutoencoder(AAutoencoder):
plotter_cls = CAPlotter
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.losses = []
def loss(self, data_set: list[np.ndarray]) -> float:
loss = 0
for x in data_set:
loss += np.sum(np.abs(x - self.forward(x)[0])) / len(x)
return loss / len(data_set)
def train(self, v: np.ndarray):
out, _ = self.forward(
self.noise.forward(v)
)
error = out - v
self.encoder.back(
self.decoder.back(error)
)
self.encoder.backprop()
self.decoder.backprop()
return np.sum(np.abs(error)) / len(v)
@interruptable
def train_dataset(self,
data_set: list[np.ndarray],
max_epoch: int,
patience: int,
display_loss: bool = False) -> list[float]:
plotter = self.plotter_cls(self) if display_loss else Plotter(self)
if len(self.losses) == 0:
self.losses = [self.loss(data_set)]
epoch = 0
no_improv = 0
prev_error = self.losses[-1]
with tqdm(bar_format="{desc} {elapsed} {rate_fmt}") as lbar:
while True:
lbar.set_description(
f"{LOADER[epoch % len(LOADER)]} Training ({epoch=} error={float(prev_error):.6f})", # noqa
)
lbar.update()
error = 0
for x in tqdm(data_set, leave=False):
error += self.train(x)
error /= len(data_set)
derror = prev_error - error
if abs(derror) < 1e-4:
no_improv += 1
else:
no_improv = 0
prev_error = float(error)
self.losses.append(error)
if no_improv > patience:
break
if epoch > max_epoch:
break
plotter.update()
epoch += 1
def encode(self, v: np.ndarray) -> np.ndarray:
return self.encoder.forward(v)
def decode(self, v: np.ndarray) -> np.ndarray:
return self.decoder.forward(v)
def forward(self, v: np.ndarray) -> tuple[np.ndarray, np.ndarray]:
code = self.encode(v)
out = self.decode(code)
return out, code
class VariationalAutoencoder(AAutoencoder):
plotter_cls = VAEPlotter
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.sampler = SampleLayer(self.encoder.out_size, self.lr, Identity())
self.KL_losses = []
self.recon_losses = []
def loss(self, data_set: list[np.ndarray]) -> float:
kl_loss = 0
recon_loss = 0
for x in data_set:
out = self.forward(x)[0]
kl = self.sampler.DKL()
recon_loss += np.mean((out - x) ** 2)
kl_loss += kl
kl_loss /= len(data_set)
recon_loss /= len(data_set)
return recon_loss, kl_loss
def train(self, v: np.ndarray) -> tuple[float, float]:
out, _ = self.forward(
self.noise.forward(v)
)
error = out - v
self.encoder.back(
self.sampler.back(
self.decoder.back(error)
)
)
self.encoder.backprop()
self.sampler.backprop()
self.decoder.backprop()
return np.mean(error ** 2), self.sampler.DKL()
@interruptable
def train_dataset(self,
data_set: list[np.ndarray],
max_epoch: int,
patience: int,
display_loss: bool = False) -> list[float]:
plotter = self.plotter_cls(self) if display_loss else Plotter(self)
if len(self.recon_losses) == 0:
recon_0, kl_0 = self.loss(data_set)
self.recon_losses = [recon_0]
self.KL_losses = [kl_0]
epoch = 0
no_improv = 0
prev_loss = self.recon_losses[-1] + self.KL_losses[-1]
with tqdm(bar_format="{desc} {elapsed} {rate_fmt}") as lbar:
while True:
lbar.set_description(
f"{LOADER[epoch % len(LOADER)]} Training ({epoch=} loss={float(prev_loss):.6f})", # noqa
)
lbar.update()
dkl = 0
recon = 0
for x in tqdm(data_set, leave=False):
recon_i, dkl_i = self.train(x)
dkl += dkl_i
recon += recon_i
recon /= len(data_set)
dkl /= len(data_set)
loss = recon + dkl
dloss = prev_loss - loss
if dloss <= 0 or abs(dloss) < 1e-4:
no_improv += 1
else:
no_improv = 0
prev_loss = float(loss)
self.recon_losses.append(recon)
self.KL_losses.append(dkl)
if no_improv > patience:
break
if epoch > max_epoch:
break
plotter.update()
epoch += 1
def forward(self, v: np.ndarray) -> tuple[np.ndarray, np.ndarray]:
code = self.encoder.forward(v)
sample = self.sampler.forward(code)
out = self.decoder.forward(sample)
return out, sample
def encode(self, v: np.ndarray) -> np.ndarray:
return self.sampler.forward(
self.encoder.forward(v)
)
def decode(self, v: np.ndarray) -> np.ndarray:
return self.decoder.forward(v)
class Label:
def __init__(self,
name: str,
embedding_size: int,
N=100):
self.name = name
self.embedding_size = embedding_size
self.N = N
self.idx = 0
self.history = np.zeros((self.N, embedding_size))
def observe(self, code: np.ndarray):
if self.idx < self.N:
self.history[self.idx] = code
self.idx += 1
else:
diffs = np.linalg.norm(self.history - code, axis=1)
idx = np.argmin(diffs)
self.history[idx] = (self.history[idx] + code) / 2
def p(self, x: np.ndarray):
return 1 / (1e-4 + np.mean(np.abs(self.history - x)))
class LabelingVAE(VariationalAutoencoder):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.labels: list[Label] = []
self.labels_idxs: dict[str, int] = {}
def learn_labels(self, data: np.ndarray, labels: list[list[str]]):
self.labels.clear()
self.labels_idxs.clear()
for x_i, labels_i in zip(data, labels):
y_i = self.encode(x_i)
for c in labels_i:
idx = self.labels_idxs.get(c, None)
if idx is None:
label = Label(c, self.encoder.out_size)
self.labels.append(label)
self.labels_idxs[c] = len(self.labels) - 1
else:
label = self.labels[idx]
label.observe(y_i)
def label(self, x: np.ndarray):
probs = {}
total = 0
code = self.encode(x)
for label in self.labels:
p = label.p(code)
probs[label.name] = p
total += p
for k in probs:
probs[k] = float(probs[k] / total)
return dict(
sorted(
probs.items(),
key=lambda item: item[1],
reverse=True
)
)

View File

@@ -1,126 +0,0 @@
import numpy as np
from .activations import ActivationFunc, Identity
class NNLayer:
def __init__(self,
in_size: int,
out_size: int,
lr: float,
activation_func: ActivationFunc):
limit = np.sqrt(6 / (in_size + out_size))
self.W = np.random.uniform(-limit, limit, (in_size, out_size))
self.B = np.zeros((out_size))
self.lr = lr
self.input = None
self.output = None
self.output_linear = None
self.error = None
self.activation_func = activation_func
def __str__(self):
return f'[ {self.W.shape[0]} => {self.W.shape[1]}\tlr:{self.lr}\tactivation:{self.activation_func.__class__.__name__} ]' # noqa
def forward(self, v: np.ndarray) -> np.ndarray:
self.input = v
self.output_linear = self.input @ self.W + self.B
self.output = self.activation_func(
self.output_linear
)
return self.output
def back(self, error: np.ndarray) -> np.ndarray:
self.error = error * self.activation_func.d(self.output_linear)
return self.W @ self.error
def backprop(self) -> np.ndarray:
dW = np.outer(self.input, self.error) * self.lr
dB = self.error * self.lr
self.W -= dW
self.B -= dB
class SampleLayer:
def __init__(self,
in_size: int,
lr: float,
activation_func: ActivationFunc):
self.input = None
self.mean_nn = NNLayer(
in_size,
in_size,
lr,
activation_func)
self.std_nn = NNLayer(
in_size,
in_size,
lr,
activation_func)
def DKL(self):
return -0.5 * np.mean(1 + self.logvar - self.mean ** 2 - np.exp(self.logvar)) # noqa
def forward(self, v: np.ndarray) -> np.ndarray:
self.input = v
self.mean = self.mean_nn.forward(v)
self.logvar = np.clip(self.std_nn.forward(v), -10, 10)
self.std = np.exp(0.5 * self.logvar)
self.eps = np.random.normal(0, 1, self.mean.shape)
return 0.5 * self.eps * self.std + self.mean
def back(self, error: np.ndarray) -> np.ndarray:
dmean = error + self.mean
dstd = error * self.eps + 0.5 * (np.exp(self.logvar) - 1)
mean_error = self.mean_nn.back(dmean)
logvar_error = self.std_nn.back(dstd * self.std)
return mean_error + logvar_error
def backprop(self):
self.mean_nn.backprop()
self.std_nn.backprop()
class DeepNNLayer:
def __init__(self,
layers: list[int],
lr: float,
activation_func: ActivationFunc):
self.layers: list[NNLayer] = []
for i in range(len(layers) - 1):
self.layers.append(
NNLayer(
layers[i],
layers[i+1],
lr,
activation_func if i != len(layers) - 2 else Identity()
)
)
self.in_size = layers[0]
self.out_size = layers[-1]
def __str__(self):
return '\n'.join([str(layer) for layer in self.layers])
def forward(self, v: np.ndarray) -> np.ndarray:
for layer in self.layers:
v = layer.forward(v)
return v
def back(self, error: np.ndarray):
for layer in self.layers[::-1]:
error = layer.back(error)
return error
def backprop(self) -> np.ndarray:
for layer in self.layers:
layer.backprop()
class NoiseLayer:
def __init__(self, amount=0.1):
self.amount = amount
def forward(self, v: np.ndarray):
if self.amount == 0:
return v
return v + np.random.normal(0, self.amount, v.shape)

View File

@@ -1,93 +0,0 @@
import matplotlib.pyplot as plt
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from .autoencoder import AAutoencoder, VariationalAutoencoder
class Plotter:
def __init__(self, autoencoder: 'AAutoencoder'):
pass
def update(self):
pass
def close(self):
pass
def __del__(self):
self.close()
class CAPlotter(Plotter):
def __init__(self, autoencoder: 'AAutoencoder'):
self.autoencoder = autoencoder
plt.ion()
self.fig, self.ax = plt.subplots()
self.line, = self.ax.plot(
list(range(len(autoencoder.losses))),
autoencoder.losses,
label="Loss"
)
self.ax.set_xlabel("Epoch")
self.ax.set_ylabel("Loss")
self.ax.set_title("Training MSE Loss")
self.ax.legend()
self.update()
def update(self):
self.line.set_xdata(range(len(self.autoencoder.losses)))
self.line.set_ydata(self.autoencoder.losses)
self.ax.relim()
self.ax.autoscale_view()
plt.draw()
plt.pause(0.1)
def close(self):
plt.ioff()
plt.close(self.fig)
class VAEPlotter(Plotter):
def __init__(self, autoencoder: 'VariationalAutoencoder'):
self.autoencoder = autoencoder
plt.ion()
self.fig, (self.ax_recon, self.ax_dkl) = plt.subplots(1, 2)
self.line, = self.ax_recon.plot(
list(range(len(self.autoencoder.recon_losses))),
self.autoencoder.recon_losses,
label="Loss"
)
self.ax_recon.set_xlabel("Epoch")
self.ax_recon.set_ylabel("Loss")
self.ax_recon.set_title("Reconstruction MSE Loss")
self.ax_recon.legend()
self.dkl_line, = self.ax_dkl.plot(
list(range(len(self.autoencoder.KL_losses))),
self.autoencoder.KL_losses,
label="DKL Loss",
)
self.ax_dkl.set_xlabel("Epoch")
self.ax_dkl.set_ylabel("Loss")
self.ax_dkl.set_title("DKL Loss")
self.ax_dkl.legend()
self.update()
def update(self):
self.line.set_xdata(range(len(self.autoencoder.recon_losses)))
self.line.set_ydata(self.autoencoder.recon_losses)
self.ax_recon.relim()
self.ax_recon.autoscale_view()
self.dkl_line.set_xdata(range(len(self.autoencoder.KL_losses)))
self.dkl_line.set_ydata(self.autoencoder.KL_losses)
self.ax_dkl.relim()
self.ax_dkl.autoscale_view()
plt.draw()
plt.pause(0.1)
def close(self):
plt.ioff()
plt.close(self.fig)

View File

@@ -1,29 +0,0 @@
import numpy as np
def softmax(v: np.ndarray) -> np.ndarray:
v = v - np.max(v)
exp_v = np.exp(v)
return exp_v / np.sum(exp_v)
def normalize(v: np.ndarray) -> np.ndarray:
return v / (np.linalg.norm(v) + 1e-8)
def regularize(v: np.ndarray) -> np.ndarray:
v_min = v.min(axis=0)
v_max = v.max(axis=0)
if v_min - v_max == 0:
return v
return (v - v_min) / (v_max - v_min)
def interruptable(func):
def inner(*args, **kwargs):
try:
return func(*args, **kwargs)
except KeyboardInterrupt:
pass
return inner

46
utils.py Normal file
View File

@@ -0,0 +1,46 @@
import numpy as np
import matplotlib.pyplot as plt
def softmax(v: np.ndarray) -> np.ndarray:
v = v - np.max(v)
exp_v = np.exp(v)
return exp_v / np.sum(exp_v)
def normalize(v: np.ndarray) -> np.ndarray:
return v / (np.linalg.norm(v) + 1e-8)
def regularize(v: np.ndarray) -> np.ndarray:
v_min = v.min(axis=0)
v_max = v.max(axis=0)
if v_min - v_max == 0:
return v
return (v - v_min) / (v_max - v_min)
def dynamic_loss_plot_init(losses: list):
plt.ion()
fig, ax = plt.subplots()
line, = ax.plot([0], losses, label="Loss")
ax.set_xlabel("Epoch")
ax.set_ylabel("Loss")
ax.set_title("Training Loss")
ax.legend()
return ax, line
def dynamic_loss_plot_update(ax, line, loss):
line.set_xdata(range(len(loss)))
line.set_ydata(loss)
ax.relim()
ax.autoscale_view()
plt.draw()
plt.pause(0.1)
def dynamic_loss_plot_finish(ax, line):
plt.ioff()
plt.show()