Compare commits

..

36 Commits

Author SHA1 Message Date
Lenoctambule
fca796173f fix: missing dependencies in pyproject.toml 2026-06-09 15:45:44 +02:00
Lenoctambule
583fc796f6 refactor: code de-dup __str__ method 2026-04-17 19:53:58 +02:00
Lenoctambule
6eaaa43285 fix: use inverse func in p method in Label class 2026-04-17 04:52:23 +02:00
Lenoctambule
65c6d3bbee fix: wrong axis typo in Label's observe method 2026-04-17 02:31:33 +02:00
Lenoctambule
251d66a625 feat: test label accuracy in mnist example 2026-04-15 18:13:24 +02:00
Lenoctambule
0f1c9f920b build: update package version 1.2 => 1.3 2026-04-14 21:17:31 +02:00
Lenoctambule
bf55c8eb7a Merge pull request #3 from lenoctambule/dev
Add simple post-training labeling + Noise layer
2026-04-14 21:11:11 +02:00
Lenoctambule
32a4a39ab9 fix: add history mem cap and mid point pruning 2026-04-14 21:03:04 +02:00
Lenoctambule
4cc349c22c feat: simple distances instead of std+mean for labeling 2026-04-14 20:53:13 +02:00
Lenoctambule
b635bf0467 feat: add monte-carlo method and MSE to labeling method 2026-04-14 19:24:20 +02:00
Lenoctambule
b1dc34e699 feat: post training and online labeling VAE class 2026-04-14 14:12:22 +02:00
Lenoctambule
e6c1229a7e fix: return encoding after sample layer 2026-04-13 23:07:11 +02:00
Lenoctambule
82acc6b4f3 feat: NoiseLayer class + keep loss across trainings 2026-04-12 20:31:31 +02:00
Lenoctambule
a4334568ec refactor: separate gradient back and weight updates 2026-04-12 19:40:04 +02:00
Lenoctambule
d23f3a903a build: update package version 1.1 => 1.2 2026-04-10 22:26:42 +02:00
Lenoctambule
475ca0cf6f Merge pull request #2 from lenoctambule/dev
Refactor of plotting and kb interrupt logic
2026-04-10 22:24:10 +02:00
Lenoctambule
7a822782a5 refactor: move kb interrupt handling to autoencoder classes 2026-04-10 22:20:35 +02:00
Lenoctambule
5ff6cfe55e feat(plotters.py): add VAEPlotter class + seperate training logic 2026-04-10 20:37:03 +02:00
Lenoctambule
849d988de5 feat(autoencoder.py): __str__ method for VariationalAutoencoder class 2026-04-10 15:00:04 +02:00
Lenoctambule
d4679712a6 docs: fix typo in README.md 2026-04-10 14:33:25 +02:00
Lenoctambule
ea8a4079ac refactor: move plot logic to plotters.py 2026-04-09 22:47:22 +02:00
Lenoctambule
9d718a6bc8 docs: readme full url to media 2026-04-09 19:55:50 +02:00
Lenoctambule
5e83983d96 Merge pull request #1 from lenoctambule/dev
Bug fixes and README improvements
2026-04-09 19:25:21 +02:00
Lenoctambule
9b42e9db85 build: update package version 1.0 => 1.1 2026-04-09 19:23:25 +02:00
Lenoctambule
0b46028702 docs: illustration of MNIST latent-space representation 2026-04-09 19:21:13 +02:00
Lenoctambule
b4963513cc fix: unneeded func params 2026-04-09 19:19:59 +02:00
Lenoctambule
075382cfb0 fix(mnist_test.py): subplot invalid ncols 2026-04-09 18:43:27 +02:00
Lenoctambule
15865812d8 refactor(autoencoder.py): __init__ code de-dup 2026-04-09 18:31:32 +02:00
Lenoctambule
058b7a0f2a docs: update readme.md w/ remote install 2026-04-08 19:08:16 +02:00
Lenoctambule
d048ddc6db feat: packaging, project structure + updated README.md 2026-04-08 18:58:32 +02:00
Lenoctambule
e9c79f463f fix: missing args in np.clip 2026-04-08 14:49:56 +02:00
Lenoctambule
510ad8720c feat: plot 2d latent space + signal handling + fix SGD in Sampler 2026-04-07 22:25:39 +02:00
Lenoctambule
3440de851a fix: dup load method 2026-04-05 08:41:17 +02:00
Lenoctambule
82d61dd10f fix: bad type hint and typo for forward method return value 2026-04-05 01:44:10 +02:00
Lenoctambule
5a8fb2c48b feat: working implementation of VAE 2026-04-05 01:17:51 +02:00
Lenoctambule
577e679425 feat: VariationalAutoencoder class + sampling nn layer 2026-04-01 22:32:35 +02:00
15 changed files with 802 additions and 397 deletions

3
.gitignore vendored
View File

@@ -3,3 +3,6 @@ __pycache__
*.npz *.npz
*.npy *.npy
.venv .venv
dist
*.egg-info
.env

View File

@@ -1,25 +1,53 @@
# Python AutoEncoder from scratch using Numpy # Python AutoEncoder from scratch using Numpy
<figure>
<p align="center">
<img
src="https://raw.githubusercontent.com/lenoctambule/autoencoder/refs/heads/main/media/latent-space.png"
alt="Latent-space of the MNIST dataset"
width=70%>
<figcaption>
<p align="center">
Latent-space representation of the MNIST dataset using Variational Autoencoder
</p>
</figcaption>
</p>
</figure>
## Usage ## Usage
1. Install requirements : 1. To install from source :
```sh ```sh
$ pip install -r requirements.txt $ git clone git@github.com:lenoctambule/autoencoder.git
$ pip install -e autoencoder/
``` ```
2. Optionally run mnist_test.py. Or install from PyPI :
```sh ```sh
$ pip install easyvae
```
2. Optionally, run mnist_test.py to see it in action on the MNIST dataset.
```sh
$ cd examples
$ py mnist_test.py $ py mnist_test.py
``` ```
## Training ## Training
Instatiate an `Autoencoder` object : Instatiate an `ClassicalAutoencoder` or `VariationalAutoencoder` object :
```py ```py
from autoencoder import Autoencoder from easyvae.autoencoder import ClassicalAutoencoder, VariationalAutoencoder
from activations import LeakyReLU from easyvae.activations import LeakyReLU
autoencoder = Autoencoder( autoencoder = ClassicalAutoencoder(
[768, 64, 16],
[16, 64, 768],
0.01,
LeakyReLU()
)
# or
autoencoder = VariationalAutoencoder(
[768, 64, 16], [768, 64, 16],
[16, 64, 768], [16, 64, 768],
0.01, 0.01,
@@ -30,11 +58,17 @@ And then via the `train_dataset` method to train over a dataset :
```py ```py
autoencoder.train_dataset(data) autoencoder.train_dataset(data)
``` ```
Or via the `train` to input each data points iteratively : Or via the `train` method to input each data points iteratively :
```py ```py
autoencoder.train(v) autoencoder.train(v)
``` ```
After training, you can save your model via the `save` method and load that model using `load` method :
```
autoencoder.save("mymodel.npy")
autoencoder.load("mymodel.npy")
```
## Inference ## Inference
Use your `Autoencoder` object with the `encode`, `decode`, `forward` methods like so : Use your `Autoencoder` object with the `encode`, `decode`, `forward` methods like so :

View File

@@ -1,137 +0,0 @@
import numpy as np
from utils import (dynamic_loss_plot_init,
dynamic_loss_plot_update,
dynamic_loss_plot_finish)
from tqdm import tqdm
from layers import DeepNNLayer, SamplingLayer
from activations import ActivationFunc
from abc import ABC, abstractmethod
LOADER = ['', '', '', '', '', '', '', '']
class AAutoencoder(ABC):
def train_dataset(self,
data_set: list[np.ndarray],
max_epoch: int,
patience: int,
display_loss: bool = False) -> list[float]:
losses = [self.loss(data_set)]
if display_loss is True:
ax, line = dynamic_loss_plot_init(losses)
epoch = 0
no_improv = 0
prev_error = losses[0]
with tqdm(bar_format="{desc} {elapsed} {rate_fmt}") as lbar:
while True:
lbar.set_description(
f"{LOADER[epoch % len(LOADER)]} Training ({epoch=} error={float(prev_error):.6f})", # noqa
)
lbar.update()
error = 0
for x in tqdm(data_set, leave=False):
error += self.train(x)
error /= len(data_set)
derror = prev_error - error
if derror <= 0 or abs(derror) < 1e-4:
no_improv += 1
else:
no_improv = 0
prev_error = float(error)
losses.append(error)
if display_loss is True:
dynamic_loss_plot_update(ax, line, losses)
if no_improv > patience:
break
if epoch > max_epoch:
break
epoch += 1
print("Training complete !")
if display_loss is True:
dynamic_loss_plot_finish(ax, line)
return losses
def loss(self, data_set: list[np.ndarray]) -> float:
loss = 0
for x in data_set:
loss += np.sum(np.abs(x - self.forward(x)[0])) / len(x)
return loss / len(data_set)
def save(self, path: str):
path = path.removesuffix('.npy')
np.save(path, self)
def load(path: str) -> 'Autoencoder':
path = path.removesuffix('.npy') + '.npy'
data = np.load(path, allow_pickle=True)
return data.item()
@abstractmethod
def train(self, v: np.ndarray) -> float:
pass
@abstractmethod
def encode(self, v: np.ndarray) -> np.ndarray:
return self.encoder.forward(v)
@abstractmethod
def decode(self, v: np.ndarray) -> np.ndarray:
return self.decoder.forward(v)
@abstractmethod
def forward(self, v: np.ndarray) -> tuple[np.ndarray, np.ndarray]:
code = self.encode(v)
out = self.decode(code)
return out, code
class Autoencoder(AAutoencoder):
def __init__(self,
encoder_layers: list[int],
decoder_layers: list[int],
lr: float,
activation_func: ActivationFunc):
if encoder_layers[-1] != decoder_layers[0]:
raise Exception(
f"Encoder output and decoder input don't match {encoder_layers[-1]} != {encoder_layers[0]}" # noqa
)
self.encoder = DeepNNLayer(encoder_layers, lr, activation_func)
self.decoder = DeepNNLayer(decoder_layers, lr, activation_func)
def train(self, v: np.ndarray) -> float:
out = self.decoder.forward(
self.encoder.forward(v)
)
self.encoder.backprop(
self.decoder.backprop(out - v)
)
return np.sum(np.abs(out - v)) / len(v)
def __str__(self):
return f'Encoder:\n{self.encoder}\n\nDecoder:\n{self.decoder}'
def encode(self, v: np.ndarray) -> np.ndarray:
return self.encoder.forward(v)
def decode(self, v: np.ndarray) -> np.ndarray:
return self.decoder.forward(v)
def forward(self, v: np.ndarray) -> tuple[np.ndarray, np.ndarray]:
code = self.encode(v)
out = self.decode(code)
return out, code
class VariationalAutoencoder(AAutoencoder):
def __init__(self,
encoder_layers: list[int],
decoder_layers: list[int],
lr: float,
activation_func: ActivationFunc):
if encoder_layers[-1] != decoder_layers[0]:
raise Exception(
f"Encoder output and decoder input don't match {encoder_layers[-1]} != {encoder_layers[0]}" # noqa
)
self.encoder = DeepNNLayer(encoder_layers, lr, activation_func)
self.decoder = DeepNNLayer(decoder_layers, lr, activation_func)
self.sampler = SamplingLayer(decoder_layers[0], lr, activation_func)

178
examples/mnist_test.py Normal file
View File

@@ -0,0 +1,178 @@
import matplotlib.pyplot as plt
import numpy as np
import os
from easyvae.autoencoder import ( # noqa
VariationalAutoencoder,
ClassicalAutoencoder,
LabelingVAE,
AAutoencoder
)
from easyvae.activations import LeakyReLU
from tqdm import tqdm
def load_mnist() -> list[np.ndarray]:
import requests
mnist_path = "./mnist.npz"
mnist_url = "https://storage.googleapis.com/tensorflow/tf-keras-datasets/mnist.npz" # noqa
if not os.path.exists(mnist_path):
with open(mnist_path, "w+b") as f:
f.write(requests.get(mnist_url, stream=True).content)
res = np.load(mnist_path)
return res["x_train"], res["y_train"], res["x_test"], res["y_test"]
def mnist_train(
filename: str,
max_epoch: int,
patience: int,
cls: type[AAutoencoder],) -> AAutoencoder:
x_train, _, _, _ = load_mnist()
in_len = x_train[0].shape[0] * x_train[0].shape[0]
x_train.resize(x_train.shape[0], in_len)
x_train = x_train / 255
if os.path.exists(filename):
autoencoder = cls.load(filename)
else:
autoencoder = cls(
[in_len, 256, 2],
[2, 256, in_len],
0.001,
LeakyReLU()
)
print("CTRL+C to interrupt training.")
autoencoder.train_dataset(
x_train,
max_epoch,
patience,
display_loss=True)
autoencoder.save(filename)
print("Training complete !")
return autoencoder
def plot_mnist_latent_space(autoencoder: AAutoencoder, x: np.ndarray, y,):
codes = []
for x in x:
_, c = autoencoder.forward(x.flatten())
codes.append(c)
codes = np.array(codes)
if codes.shape[1] == 2:
plt.figure(figsize=(6, 6))
scatter = plt.scatter(
codes[:, 0],
codes[:, 1],
c=y,
cmap='tab10',
s=5,
alpha=0.7
)
plt.colorbar(scatter)
plt.grid(True)
def plot_random_reconstruction(
autoencoder: AAutoencoder,
example: np.ndarray,
img_shape,
y):
output, code = autoencoder.forward(example.flatten())
plt.subplot(1, 2, 1)
plt.matshow(
example.reshape(img_shape),
fignum=False)
plt.title(f"Input ({y})")
plt.subplot(1, 2, 2)
plt.matshow(
output.reshape(img_shape),
fignum=False)
plt.title(f"Output ({y})")
print(f'{code.tolist()}')
def labeling_accuracy(autoencoder: LabelingVAE, x_test, y_test):
accuracy = 0
for x, y in tqdm(
zip(x_test, y_test),
desc="Testing labeling",
total=len(x_test)
):
res = autoencoder.label(x)
res = list(res.items())[0][0]
if res == str(int(y)):
accuracy += 1
accuracy /= len(y_test)
print(f"Accuracy : {accuracy * 100:.2f}%")
def mnist_test(model: str | AAutoencoder | LabelingVAE):
x_train, y_train, x_test, y_test = load_mnist()
in_len = x_train[0].shape[0] * x_train[0].shape[0]
img_shape = x_train[0].shape
x_train.resize(x_train.shape[0], in_len)
x_test.resize(x_test.shape[0], in_len)
x_train = x_train / 255
x_test = x_test / 255
if isinstance(model, str):
autoencoder: AAutoencoder = AAutoencoder.load(model)
else:
autoencoder = model
print("Testing model ...\n")
print(autoencoder)
idx = np.random.randint(0, len(x_test))
example: np.ndarray = x_test[idx]
labels_train = [str(int(i)) for i in y_train]
if isinstance(autoencoder, LabelingVAE):
autoencoder.learn_labels(x_train, labels_train)
labeling_accuracy(autoencoder, x_test, y_test)
res = autoencoder.label(example)
for k, v in res.items():
print(f"{k} => {v}")
plot_random_reconstruction(autoencoder, example, img_shape, y_test[idx])
if autoencoder.space_dim == 2:
plot_mnist_latent_space(autoencoder, x_test, y_test)
plt.show()
if __name__ == "__main__":
import argparse
import sys
parser = argparse.ArgumentParser()
parser.add_argument(
'-e',
type=int,
nargs='?',
default=30,
help='Max epochs'
)
parser.add_argument(
'-p',
type=int,
nargs='?',
default=30,
help='Patience'
)
parser.add_argument(
'-m',
type=str, nargs='?',
default='autoencoder_mnist.npy',
help='Model filename to save in run mode or load in training mode'
)
parser.add_argument(
'-r',
action='store_true',
help='Run the model'
)
args = parser.parse_args(sys.argv[1:])
if args.r:
mnist_test(args.m)
else:
autoencoder = mnist_train(
args.m,
args.e,
args.p,
LabelingVAE
)
mnist_test(autoencoder)

View File

@@ -1,84 +0,0 @@
import numpy as np
from utils import normalize
from activations import ActivationFunc
class NNLayer:
def __init__(self,
in_size: int,
out_size: int,
lr: float,
activation_func: ActivationFunc):
self.W = np.random.uniform(-1, 1, (in_size, out_size))
self.B = np.zeros((out_size))
self.lr = lr
self.input = None
self.output = None
self.output_linear = None
self.activation_func = activation_func
def __str__(self):
return f'[ {self.W.shape[0]} => {self.W.shape[1]}\tlr:{self.lr}\tactivation:{self.activation_func.__class__.__name__} ]' # noqa
def forward(self, V: np.ndarray) -> np.ndarray:
self.input = normalize(V)
self.output_linear = self.input @ self.W + self.B
self.output = self.activation_func(
self.output_linear
)
return self.output
def backprop(self, error: np.ndarray) -> np.ndarray:
error *= self.activation_func.derivative(self.output_linear)
ret = self.W @ error
dW = np.outer(self.input, error) * self.lr
dB = error * self.lr
self.W -= dW
self.B -= dB
return ret
class SamplingLayer:
def __init__(self,
in_size: int,
lr: float,
activation_func: ActivationFunc):
self.W_mean = np.random.uniform(-0.1, 0.1, (in_size, in_size))
self.W_variance = np.random.uniform(-0.1, 0.1, (in_size, in_size))
def forward(self, v) -> np.ndarray:
mean = self.W_mean @ v
variance = self.W_variance @ v
return np.random.normal(mean, variance)
def backprop(self, error: np.ndarray) -> np.ndarray:
pass
class DeepNNLayer:
def __init__(self,
layers: list[int],
lr: float,
activation_func: ActivationFunc):
self.layers: list[NNLayer] = []
for i in range(len(layers) - 1):
self.layers.append(
NNLayer(
layers[i],
layers[i+1],
lr,
activation_func)
)
def __str__(self):
return '\n'.join([str(layer) for layer in self.layers])
def forward(self, v: np.ndarray) -> np.ndarray:
for layer in self.layers:
v = layer.forward(v)
return v
def backprop(self, error: np.ndarray) -> np.ndarray:
for layer in self.layers[::-1]:
error = layer.backprop(error)
return error

BIN
media/latent-space.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 181 KiB

View File

@@ -1,118 +0,0 @@
import matplotlib.pyplot as plt
import numpy as np
from autoencoder import Autoencoder
from activations import LeakyReLU
import os
def load_mnist() -> list[np.ndarray]:
import requests
mnist_path = "./mnist.npz"
mnist_url = "https://storage.googleapis.com/tensorflow/tf-keras-datasets/mnist.npz" # noqa
if not os.path.exists(mnist_path):
with open(mnist_path, "w+b") as f:
f.write(requests.get(mnist_url, stream=True).content)
res = np.load(mnist_path)
return res["x_train"], res["y_train"], res["x_test"], res["y_test"]
def mnist_train(
filename: str,
max_epoch: int,
patience: int,
) -> Autoencoder:
x_train, _, x_test, _ = load_mnist()
in_len = x_train[0].shape[0] * x_train[0].shape[0]
x_train.resize(x_train.shape[0], in_len)
x_test.resize(x_test.shape[0], in_len)
x_train = x_train / 255
x_test = x_test / 255
if os.path.exists(filename):
autoencoder = Autoencoder.load(filename)
else:
autoencoder = Autoencoder(
[in_len, 64, 16],
[16, 64, in_len],
0.01,
LeakyReLU()
)
autoencoder.train_dataset(
x_train,
max_epoch,
patience,
display_loss=True)
autoencoder.save(filename)
return autoencoder
def mnist_test(model: str | Autoencoder):
x_train, _, x_test, y_test = load_mnist()
in_len = x_train[0].shape[0] * x_train[0].shape[0]
img_shape = x_train[0].shape
x_train.resize(x_train.shape[0], in_len)
x_test.resize(x_test.shape[0], in_len)
x_train = x_train / 255
x_test = x_test / 255
if isinstance(model, str):
autoencoder: Autoencoder = Autoencoder.load(model)
else:
autoencoder = model
print(autoencoder)
idx = np.random.randint(0, len(x_test))
example: np.ndarray = x_test[idx]
output, code = autoencoder.forward(example.flatten())
plt.subplot(1, 3, 1)
plt.matshow(
example.reshape(img_shape),
fignum=False)
plt.title(f"Input ({y_test[idx]})")
plt.subplot(1, 3, 2)
plt.matshow(
output.reshape(img_shape),
fignum=False)
plt.title(f"Output ({y_test[idx]})")
plt.subplot(1, 3, 3)
s = int(np.ceil(np.sqrt(code.shape[0])))
code.resize((s, s), refcheck=False)
plt.matshow(code, fignum=False)
plt.title(f"Code ({y_test[idx]})")
plt.show()
if __name__ == "__main__":
import argparse
import sys
parser = argparse.ArgumentParser()
parser.add_argument(
'-e',
type=int,
nargs='?',
default=1000,
help='Max epochs'
)
parser.add_argument(
'-p',
type=int,
nargs='?',
default=5,
help='Patience'
)
parser.add_argument(
'-m',
type=str, nargs='?',
default='autoencoder_mnist.npy',
help='Model filename to save in run mode or load in training mode'
)
parser.add_argument(
'-r',
action='store_true',
help='Run mode'
)
args = parser.parse_args(sys.argv[1:])
if args.r:
mnist_test(args.m)
else:
autoencoder = mnist_train(args.m, args.e, args.p)
mnist_test(autoencoder)

28
pyproject.toml Normal file
View File

@@ -0,0 +1,28 @@
[project]
name = "easyvae"
version = "1.3.3"
authors = [
{ name="Ravaka RALAMBOARIVONY", email="ravaka.rlb.pro@gmail.com" },
]
description = "Python implementation of a Classical and Variational Autoencoders using NumPy"
readme = "README.md"
requires-python = ">=3.10"
classifiers = [
"Programming Language :: Python :: 3",
"Operating System :: OS Independent",
]
license = "MIT"
license-files = ["LICEN[CS]E*"]
dependencies = [
"numpy",
"matplotlib",
"tqdm",
]
[project.urls]
Homepage = "https://github.com/lenoctambule/autoencoder"
Issues = "https://github.com/lenoctambule/autoencoder/issues"
[build-system]
requires = ["setuptools"]
build-backend = "setuptools.build_meta"

0
src/easyvae/__init__.py Normal file
View File

View File

@@ -4,7 +4,7 @@ from abc import ABC, abstractmethod
class ActivationFunc(ABC): class ActivationFunc(ABC):
@abstractmethod @abstractmethod
def derivative(v: np.ndarray) -> np.ndarray: def d(v: np.ndarray) -> np.ndarray:
pass pass
@@ -12,7 +12,7 @@ class ReLU(ActivationFunc):
def __call__(self, x): def __call__(self, x):
return x * (x > 0) return x * (x > 0)
def derivative(self, x): def d(self, x):
return x > 0 return x > 0
@@ -23,7 +23,7 @@ class LeakyReLU(ActivationFunc):
def __call__(self, x): def __call__(self, x):
return x * (x > 0) + self.k * x * (x <= 0) return x * (x > 0) + self.k * x * (x <= 0)
def derivative(self, x): def d(self, x):
return (x > 0) + self.k * (x <= 0) return (x > 0) + self.k * (x <= 0)
@@ -31,5 +31,5 @@ class Identity(ActivationFunc):
def __call__(self, x): def __call__(self, x):
return x return x
def derivative(x): def d(self, x):
return 1 return 1

299
src/easyvae/autoencoder.py Normal file
View File

@@ -0,0 +1,299 @@
import numpy as np
from tqdm import tqdm
from .layers import DeepNNLayer, SampleLayer, NoiseLayer
from .activations import ActivationFunc, Identity
from .plotters import Plotter, CAPlotter, VAEPlotter
from .utils import interruptable
from abc import ABC, abstractmethod
LOADER = ['', '', '', '', '', '', '', '']
SQRT_2PI = np.sqrt(2 * np.pi)
class AAutoencoder(ABC):
plotter_cls = Plotter
@abstractmethod
def __init__(self,
encoder_layers: list[int],
decoder_layers: list[int],
lr: float,
activation_func: ActivationFunc,
noise=0):
if encoder_layers[-1] != decoder_layers[0]:
raise Exception(
f"Encoder output and decoder input don't match {encoder_layers[-1]} != {encoder_layers[0]}" # noqa
)
self.encoder = DeepNNLayer(encoder_layers, lr, activation_func)
self.decoder = DeepNNLayer(decoder_layers, lr, activation_func)
self.noise = NoiseLayer(noise)
self.space_dim = decoder_layers[0]
self.lr = lr
self.losses = [0]
def save(self, path: str):
path = path.removesuffix('.npy')
np.save(path, self)
def load(path: str) -> 'AAutoencoder':
path = path.removesuffix('.npy') + '.npy'
data = np.load(path, allow_pickle=True)
return data.item()
@abstractmethod
def loss(self, data_set: list[np.ndarray]) -> float:
pass
@abstractmethod
def train(self, v: np.ndarray) -> float:
pass
@abstractmethod
def forward(self, v: np.ndarray) -> np.ndarray:
pass
@abstractmethod
def train_dataset(self, *args, **kwargs) -> list[float]:
pass
def __str__(self):
return "\n".join((
f"Type: {self.__class__.__name__}",
"Encoder:",
f"{self.encoder}",
"Decoder:",
f"{self.decoder}"
)
)
class ClassicalAutoencoder(AAutoencoder):
plotter_cls = CAPlotter
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.losses = []
def loss(self, data_set: list[np.ndarray]) -> float:
loss = 0
for x in data_set:
loss += np.sum(np.abs(x - self.forward(x)[0])) / len(x)
return loss / len(data_set)
def train(self, v: np.ndarray):
out, _ = self.forward(
self.noise.forward(v)
)
error = out - v
self.encoder.back(
self.decoder.back(error)
)
self.encoder.backprop()
self.decoder.backprop()
return np.sum(np.abs(error)) / len(v)
@interruptable
def train_dataset(self,
data_set: list[np.ndarray],
max_epoch: int,
patience: int,
display_loss: bool = False) -> list[float]:
plotter = self.plotter_cls(self) if display_loss else Plotter(self)
if len(self.losses) == 0:
self.losses = [self.loss(data_set)]
epoch = 0
no_improv = 0
prev_error = self.losses[-1]
with tqdm(bar_format="{desc} {elapsed} {rate_fmt}") as lbar:
while True:
lbar.set_description(
f"{LOADER[epoch % len(LOADER)]} Training ({epoch=} error={float(prev_error):.6f})", # noqa
)
lbar.update()
error = 0
for x in tqdm(data_set, leave=False):
error += self.train(x)
error /= len(data_set)
derror = prev_error - error
if abs(derror) < 1e-4:
no_improv += 1
else:
no_improv = 0
prev_error = float(error)
self.losses.append(error)
if no_improv > patience:
break
if epoch > max_epoch:
break
plotter.update()
epoch += 1
def encode(self, v: np.ndarray) -> np.ndarray:
return self.encoder.forward(v)
def decode(self, v: np.ndarray) -> np.ndarray:
return self.decoder.forward(v)
def forward(self, v: np.ndarray) -> tuple[np.ndarray, np.ndarray]:
code = self.encode(v)
out = self.decode(code)
return out, code
class VariationalAutoencoder(AAutoencoder):
plotter_cls = VAEPlotter
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.sampler = SampleLayer(self.encoder.out_size, self.lr, Identity())
self.KL_losses = []
self.recon_losses = []
def loss(self, data_set: list[np.ndarray]) -> float:
kl_loss = 0
recon_loss = 0
for x in data_set:
out = self.forward(x)[0]
kl = self.sampler.DKL()
recon_loss += np.mean((out - x) ** 2)
kl_loss += kl
kl_loss /= len(data_set)
recon_loss /= len(data_set)
return recon_loss, kl_loss
def train(self, v: np.ndarray) -> tuple[float, float]:
out, _ = self.forward(
self.noise.forward(v)
)
error = out - v
self.encoder.back(
self.sampler.back(
self.decoder.back(error)
)
)
self.encoder.backprop()
self.sampler.backprop()
self.decoder.backprop()
return np.mean(error ** 2), self.sampler.DKL()
@interruptable
def train_dataset(self,
data_set: list[np.ndarray],
max_epoch: int,
patience: int,
display_loss: bool = False) -> list[float]:
plotter = self.plotter_cls(self) if display_loss else Plotter(self)
if len(self.recon_losses) == 0:
recon_0, kl_0 = self.loss(data_set)
self.recon_losses = [recon_0]
self.KL_losses = [kl_0]
epoch = 0
no_improv = 0
prev_loss = self.recon_losses[-1] + self.KL_losses[-1]
with tqdm(bar_format="{desc} {elapsed} {rate_fmt}") as lbar:
while True:
lbar.set_description(
f"{LOADER[epoch % len(LOADER)]} Training ({epoch=} loss={float(prev_loss):.6f})", # noqa
)
lbar.update()
dkl = 0
recon = 0
for x in tqdm(data_set, leave=False):
recon_i, dkl_i = self.train(x)
dkl += dkl_i
recon += recon_i
recon /= len(data_set)
dkl /= len(data_set)
loss = recon + dkl
dloss = prev_loss - loss
if dloss <= 0 or abs(dloss) < 1e-4:
no_improv += 1
else:
no_improv = 0
prev_loss = float(loss)
self.recon_losses.append(recon)
self.KL_losses.append(dkl)
if no_improv > patience:
break
if epoch > max_epoch:
break
plotter.update()
epoch += 1
def forward(self, v: np.ndarray) -> tuple[np.ndarray, np.ndarray]:
code = self.encoder.forward(v)
sample = self.sampler.forward(code)
out = self.decoder.forward(sample)
return out, sample
def encode(self, v: np.ndarray) -> np.ndarray:
return self.sampler.forward(
self.encoder.forward(v)
)
def decode(self, v: np.ndarray) -> np.ndarray:
return self.decoder.forward(v)
class Label:
def __init__(self,
name: str,
embedding_size: int,
N=100):
self.name = name
self.embedding_size = embedding_size
self.N = N
self.idx = 0
self.history = np.zeros((self.N, embedding_size))
def observe(self, code: np.ndarray):
if self.idx < self.N:
self.history[self.idx] = code
self.idx += 1
else:
diffs = np.linalg.norm(self.history - code, axis=1)
idx = np.argmin(diffs)
self.history[idx] = (self.history[idx] + code) / 2
def p(self, x: np.ndarray):
return 1 / (1e-4 + np.mean(np.abs(self.history - x)))
class LabelingVAE(VariationalAutoencoder):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.labels: list[Label] = []
self.labels_idxs: dict[str, int] = {}
def learn_labels(self, data: np.ndarray, labels: list[list[str]]):
self.labels.clear()
self.labels_idxs.clear()
for x_i, labels_i in zip(data, labels):
y_i = self.encode(x_i)
for c in labels_i:
idx = self.labels_idxs.get(c, None)
if idx is None:
label = Label(c, self.encoder.out_size)
self.labels.append(label)
self.labels_idxs[c] = len(self.labels) - 1
else:
label = self.labels[idx]
label.observe(y_i)
def label(self, x: np.ndarray):
probs = {}
total = 0
code = self.encode(x)
for label in self.labels:
p = label.p(code)
probs[label.name] = p
total += p
for k in probs:
probs[k] = float(probs[k] / total)
return dict(
sorted(
probs.items(),
key=lambda item: item[1],
reverse=True
)
)

126
src/easyvae/layers.py Normal file
View File

@@ -0,0 +1,126 @@
import numpy as np
from .activations import ActivationFunc, Identity
class NNLayer:
def __init__(self,
in_size: int,
out_size: int,
lr: float,
activation_func: ActivationFunc):
limit = np.sqrt(6 / (in_size + out_size))
self.W = np.random.uniform(-limit, limit, (in_size, out_size))
self.B = np.zeros((out_size))
self.lr = lr
self.input = None
self.output = None
self.output_linear = None
self.error = None
self.activation_func = activation_func
def __str__(self):
return f'[ {self.W.shape[0]} => {self.W.shape[1]}\tlr:{self.lr}\tactivation:{self.activation_func.__class__.__name__} ]' # noqa
def forward(self, v: np.ndarray) -> np.ndarray:
self.input = v
self.output_linear = self.input @ self.W + self.B
self.output = self.activation_func(
self.output_linear
)
return self.output
def back(self, error: np.ndarray) -> np.ndarray:
self.error = error * self.activation_func.d(self.output_linear)
return self.W @ self.error
def backprop(self) -> np.ndarray:
dW = np.outer(self.input, self.error) * self.lr
dB = self.error * self.lr
self.W -= dW
self.B -= dB
class SampleLayer:
def __init__(self,
in_size: int,
lr: float,
activation_func: ActivationFunc):
self.input = None
self.mean_nn = NNLayer(
in_size,
in_size,
lr,
activation_func)
self.std_nn = NNLayer(
in_size,
in_size,
lr,
activation_func)
def DKL(self):
return -0.5 * np.mean(1 + self.logvar - self.mean ** 2 - np.exp(self.logvar)) # noqa
def forward(self, v: np.ndarray) -> np.ndarray:
self.input = v
self.mean = self.mean_nn.forward(v)
self.logvar = np.clip(self.std_nn.forward(v), -10, 10)
self.std = np.exp(0.5 * self.logvar)
self.eps = np.random.normal(0, 1, self.mean.shape)
return 0.5 * self.eps * self.std + self.mean
def back(self, error: np.ndarray) -> np.ndarray:
dmean = error + self.mean
dstd = error * self.eps + 0.5 * (np.exp(self.logvar) - 1)
mean_error = self.mean_nn.back(dmean)
logvar_error = self.std_nn.back(dstd * self.std)
return mean_error + logvar_error
def backprop(self):
self.mean_nn.backprop()
self.std_nn.backprop()
class DeepNNLayer:
def __init__(self,
layers: list[int],
lr: float,
activation_func: ActivationFunc):
self.layers: list[NNLayer] = []
for i in range(len(layers) - 1):
self.layers.append(
NNLayer(
layers[i],
layers[i+1],
lr,
activation_func if i != len(layers) - 2 else Identity()
)
)
self.in_size = layers[0]
self.out_size = layers[-1]
def __str__(self):
return '\n'.join([str(layer) for layer in self.layers])
def forward(self, v: np.ndarray) -> np.ndarray:
for layer in self.layers:
v = layer.forward(v)
return v
def back(self, error: np.ndarray):
for layer in self.layers[::-1]:
error = layer.back(error)
return error
def backprop(self) -> np.ndarray:
for layer in self.layers:
layer.backprop()
class NoiseLayer:
def __init__(self, amount=0.1):
self.amount = amount
def forward(self, v: np.ndarray):
if self.amount == 0:
return v
return v + np.random.normal(0, self.amount, v.shape)

93
src/easyvae/plotters.py Normal file
View File

@@ -0,0 +1,93 @@
import matplotlib.pyplot as plt
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from .autoencoder import AAutoencoder, VariationalAutoencoder
class Plotter:
def __init__(self, autoencoder: 'AAutoencoder'):
pass
def update(self):
pass
def close(self):
pass
def __del__(self):
self.close()
class CAPlotter(Plotter):
def __init__(self, autoencoder: 'AAutoencoder'):
self.autoencoder = autoencoder
plt.ion()
self.fig, self.ax = plt.subplots()
self.line, = self.ax.plot(
list(range(len(autoencoder.losses))),
autoencoder.losses,
label="Loss"
)
self.ax.set_xlabel("Epoch")
self.ax.set_ylabel("Loss")
self.ax.set_title("Training MSE Loss")
self.ax.legend()
self.update()
def update(self):
self.line.set_xdata(range(len(self.autoencoder.losses)))
self.line.set_ydata(self.autoencoder.losses)
self.ax.relim()
self.ax.autoscale_view()
plt.draw()
plt.pause(0.1)
def close(self):
plt.ioff()
plt.close(self.fig)
class VAEPlotter(Plotter):
def __init__(self, autoencoder: 'VariationalAutoencoder'):
self.autoencoder = autoencoder
plt.ion()
self.fig, (self.ax_recon, self.ax_dkl) = plt.subplots(1, 2)
self.line, = self.ax_recon.plot(
list(range(len(self.autoencoder.recon_losses))),
self.autoencoder.recon_losses,
label="Loss"
)
self.ax_recon.set_xlabel("Epoch")
self.ax_recon.set_ylabel("Loss")
self.ax_recon.set_title("Reconstruction MSE Loss")
self.ax_recon.legend()
self.dkl_line, = self.ax_dkl.plot(
list(range(len(self.autoencoder.KL_losses))),
self.autoencoder.KL_losses,
label="DKL Loss",
)
self.ax_dkl.set_xlabel("Epoch")
self.ax_dkl.set_ylabel("Loss")
self.ax_dkl.set_title("DKL Loss")
self.ax_dkl.legend()
self.update()
def update(self):
self.line.set_xdata(range(len(self.autoencoder.recon_losses)))
self.line.set_ydata(self.autoencoder.recon_losses)
self.ax_recon.relim()
self.ax_recon.autoscale_view()
self.dkl_line.set_xdata(range(len(self.autoencoder.KL_losses)))
self.dkl_line.set_ydata(self.autoencoder.KL_losses)
self.ax_dkl.relim()
self.ax_dkl.autoscale_view()
plt.draw()
plt.pause(0.1)
def close(self):
plt.ioff()
plt.close(self.fig)

29
src/easyvae/utils.py Normal file
View File

@@ -0,0 +1,29 @@
import numpy as np
def softmax(v: np.ndarray) -> np.ndarray:
v = v - np.max(v)
exp_v = np.exp(v)
return exp_v / np.sum(exp_v)
def normalize(v: np.ndarray) -> np.ndarray:
return v / (np.linalg.norm(v) + 1e-8)
def regularize(v: np.ndarray) -> np.ndarray:
v_min = v.min(axis=0)
v_max = v.max(axis=0)
if v_min - v_max == 0:
return v
return (v - v_min) / (v_max - v_min)
def interruptable(func):
def inner(*args, **kwargs):
try:
return func(*args, **kwargs)
except KeyboardInterrupt:
pass
return inner

View File

@@ -1,46 +0,0 @@
import numpy as np
import matplotlib.pyplot as plt
def softmax(v: np.ndarray) -> np.ndarray:
v = v - np.max(v)
exp_v = np.exp(v)
return exp_v / np.sum(exp_v)
def normalize(v: np.ndarray) -> np.ndarray:
return v / (np.linalg.norm(v) + 1e-8)
def regularize(v: np.ndarray) -> np.ndarray:
v_min = v.min(axis=0)
v_max = v.max(axis=0)
if v_min - v_max == 0:
return v
return (v - v_min) / (v_max - v_min)
def dynamic_loss_plot_init(losses: list):
plt.ion()
fig, ax = plt.subplots()
line, = ax.plot([0], losses, label="Loss")
ax.set_xlabel("Epoch")
ax.set_ylabel("Loss")
ax.set_title("Training Loss")
ax.legend()
return ax, line
def dynamic_loss_plot_update(ax, line, loss):
line.set_xdata(range(len(loss)))
line.set_ydata(loss)
ax.relim()
ax.autoscale_view()
plt.draw()
plt.pause(0.1)
def dynamic_loss_plot_finish(ax, line):
plt.ioff()
plt.show()