Compare commits

...

12 Commits

Author SHA1 Message Date
Lenoctambule
e6b508f739 feat: sampling layer w/ forward method + abstract autoencoder 2026-03-31 19:10:06 +02:00
Lenoctambule
cc74b62afd feat: identity activation func 2026-03-30 05:14:02 +02:00
Lenoctambule
a93bb0a692 feat: error handling and re-train in mnist_test 2026-03-29 21:12:19 +02:00
Lenoctambule
8a3d408b7a docs(README.md): update Autoencoder class usage 2026-03-29 19:25:42 +02:00
Lenoctambule
53c7f73055 feat: ActivationFunc classes ReLU and LeakyReLU 2026-03-29 19:20:05 +02:00
Lenoctambule
44bf4c0286 feat: str methods for Autoencoder 2026-03-29 09:41:33 +02:00
Lenoctambule
7aabc5db48 feat: leaky relu func 2026-03-29 09:19:15 +02:00
Lenoctambule
09835e9afa fix: missing activation func derivative + send error before update 2026-03-29 08:23:15 +02:00
Lenoctambule
efd328e530 feat: save and load methods for Autoencoder 2026-03-28 17:50:27 +01:00
Lenoctambule
6155649655 feat: loss method + mv data reshaping out of Autoencoder class 2026-03-28 02:40:10 +01:00
Lenoctambule
e5520bf050 feat: use DeepNNLayer in Autoencoder 2026-03-28 02:09:47 +01:00
Lenoctambule
a50a09b337 feat: DeepNNLayer class 2026-03-28 02:09:47 +01:00
7 changed files with 302 additions and 76 deletions

3
.gitignore vendored
View File

@@ -1,4 +1,5 @@
__pycache__ __pycache__
*.pyc *.pyc
*.npz
*.npy
.venv .venv
mnist.npz

View File

@@ -17,8 +17,14 @@ $ py mnist_test.py
Instatiate an `Autoencoder` object : Instatiate an `Autoencoder` object :
```py ```py
from autoencoder import Autoencoder from autoencoder import Autoencoder
from activations import LeakyReLU
autoencoder = Autoencoder(in_len=300, bottleneck=50, 0.001, relu) autoencoder = Autoencoder(
[768, 64, 16],
[16, 64, 768],
0.01,
LeakyReLU()
)
``` ```
And then via the `train_dataset` method to train over a dataset : And then via the `train_dataset` method to train over a dataset :
```py ```py
@@ -31,9 +37,10 @@ autoencoder.train(v)
## Inference ## Inference
Use your `Autoencoder` object with the `encode` and `decode` methods like so : Use your `Autoencoder` object with the `encode`, `decode`, `forward` methods like so :
```py ```py
example = ... example = ...
code = autoencoder.encode(example) code = autoencoder.encode(example)
output = autoencoder.decode(code) output = autoencoder.decode(code)
output, code = autoencoder.forward(example)
``` ```

35
activations.py Normal file
View File

@@ -0,0 +1,35 @@
import numpy as np
from abc import ABC, abstractmethod
class ActivationFunc(ABC):
@abstractmethod
def derivative(v: np.ndarray) -> np.ndarray:
pass
class ReLU(ActivationFunc):
def __call__(self, x):
return x * (x > 0)
def derivative(self, x):
return x > 0
class LeakyReLU(ActivationFunc):
def __init__(self, k=0.01):
self.k = k
def __call__(self, x):
return x * (x > 0) + self.k * x * (x <= 0)
def derivative(self, x):
return (x > 0) + self.k * (x <= 0)
class Identity(ActivationFunc):
def __call__(self, x):
return x
def derivative(x):
return 1

View File

@@ -3,51 +3,37 @@ from utils import (dynamic_loss_plot_init,
dynamic_loss_plot_update, dynamic_loss_plot_update,
dynamic_loss_plot_finish) dynamic_loss_plot_finish)
from tqdm import tqdm from tqdm import tqdm
from layers import NNLayer from layers import DeepNNLayer, SamplingLayer
from activations import ActivationFunc
from abc import ABC, abstractmethod
LOADER = ['', '', '', '', '', '', '', ''] LOADER = ['', '', '', '', '', '', '', '']
class Autoencoder: class AAutoencoder(ABC):
def __init__(self,
in_len: int,
bottleneck: int,
lr: float,
activation_func):
self.encoder = NNLayer(in_len, bottleneck, lr, activation_func)
self.decoder = NNLayer(bottleneck, in_len, lr, activation_func)
def train(self, v: np.ndarray) -> float:
encoded = self.encoder.forward(v)
reconstructed = self.decoder.forward(encoded)
error = self.decoder.backprop(reconstructed - v)
self.encoder.backprop(error)
error = v - reconstructed
return np.sum(np.abs(error))
def train_dataset(self, def train_dataset(self,
data_set: list[np.ndarray], data_set: list[np.ndarray],
max_epoch: int, max_epoch: int,
patience: int, patience: int,
display_loss: bool = False) -> list[float]: display_loss: bool = False) -> list[float]:
losses = [self.loss(data_set)]
if display_loss is True: if display_loss is True:
ax, line = dynamic_loss_plot_init() ax, line = dynamic_loss_plot_init(losses)
losses = []
epoch = 0 epoch = 0
no_improv = 0 no_improv = 0
prev_error = float('inf') prev_error = losses[0]
with tqdm(bar_format="{desc} {elapsed} {rate_fmt}") as lbar: with tqdm(bar_format="{desc} {elapsed} {rate_fmt}") as lbar:
while True: while True:
lbar.set_description( lbar.set_description(
f"{LOADER[epoch % len(LOADER)]} Training ({epoch=} error={prev_error:.2f})", # noqa f"{LOADER[epoch % len(LOADER)]} Training ({epoch=} error={float(prev_error):.6f})", # noqa
) )
lbar.update() lbar.update()
error = 0 error = 0
for x in data_set: for x in tqdm(data_set, leave=False):
input = x.flatten() error += self.train(x)
error += self.train(input)
error /= len(data_set) error /= len(data_set)
if prev_error - error <= 1e-8: derror = prev_error - error
if derror <= 0 or abs(derror) < 1e-4:
no_improv += 1 no_improv += 1
else: else:
no_improv = 0 no_improv = 0
@@ -60,13 +46,92 @@ class Autoencoder:
if epoch > max_epoch: if epoch > max_epoch:
break break
epoch += 1 epoch += 1
print("Training complete !")
if display_loss is True: if display_loss is True:
dynamic_loss_plot_finish(ax, line) dynamic_loss_plot_finish(ax, line)
print("#Training complete !")
return losses return losses
def loss(self, data_set: list[np.ndarray]) -> float:
loss = 0
for x in data_set:
loss += np.sum(np.abs(x - self.forward(x)[0])) / len(x)
return loss / len(data_set)
def save(self, path: str):
path = path.removesuffix('.npy')
np.save(path, self)
def load(path: str) -> 'Autoencoder':
path = path.removesuffix('.npy') + '.npy'
data = np.load(path, allow_pickle=True)
return data.item()
@abstractmethod
def train(self, v: np.ndarray) -> float:
pass
@abstractmethod
def encode(self, v: np.ndarray) -> np.ndarray:
return self.encoder.forward(v)
@abstractmethod
def decode(self, v: np.ndarray) -> np.ndarray:
return self.decoder.forward(v)
@abstractmethod
def forward(self, v: np.ndarray) -> tuple[np.ndarray, np.ndarray]:
code = self.encode(v)
out = self.decode(code)
return out, code
class Autoencoder(AAutoencoder):
def __init__(self,
encoder_layers: list[int],
decoder_layers: list[int],
lr: float,
activation_func: ActivationFunc):
if encoder_layers[-1] != decoder_layers[0]:
raise Exception(
f"Encoder output and decoder input don't match {encoder_layers[-1]} != {encoder_layers[0]}" # noqa
)
self.encoder = DeepNNLayer(encoder_layers, lr, activation_func)
self.decoder = DeepNNLayer(decoder_layers, lr, activation_func)
def train(self, v: np.ndarray) -> float:
out = self.decoder.forward(
self.encoder.forward(v)
)
self.encoder.backprop(
self.decoder.backprop(out - v)
)
return np.sum(np.abs(out - v)) / len(v)
def __str__(self):
return f'Encoder:\n{self.encoder}\n\nDecoder:\n{self.decoder}'
def encode(self, v: np.ndarray) -> np.ndarray: def encode(self, v: np.ndarray) -> np.ndarray:
return self.encoder.forward(v) return self.encoder.forward(v)
def decode(self, v: np.ndarray) -> np.ndarray: def decode(self, v: np.ndarray) -> np.ndarray:
return self.decoder.forward(v) return self.decoder.forward(v)
def forward(self, v: np.ndarray) -> tuple[np.ndarray, np.ndarray]:
code = self.encode(v)
out = self.decode(code)
return out, code
class VariationalAutoencoder(AAutoencoder):
def __init__(self,
encoder_layers: list[int],
decoder_layers: list[int],
lr: float,
activation_func: ActivationFunc):
if encoder_layers[-1] != decoder_layers[0]:
raise Exception(
f"Encoder output and decoder input don't match {encoder_layers[-1]} != {encoder_layers[0]}" # noqa
)
self.encoder = DeepNNLayer(encoder_layers, lr, activation_func)
self.decoder = DeepNNLayer(decoder_layers, lr, activation_func)
self.sampler = SamplingLayer(decoder_layers[0], lr, activation_func)

View File

@@ -1,6 +1,6 @@
import numpy as np import numpy as np
import types from utils import normalize
from utils import regularize from activations import ActivationFunc
class NNLayer: class NNLayer:
@@ -8,22 +8,77 @@ class NNLayer:
in_size: int, in_size: int,
out_size: int, out_size: int,
lr: float, lr: float,
activation_func: types.FunctionType): activation_func: ActivationFunc):
self.W = np.random.uniform(-1, 1, (in_size, out_size)) self.W = np.random.uniform(-1, 1, (in_size, out_size))
self.B = np.zeros((out_size)) self.B = np.zeros((out_size))
self.lr = lr self.lr = lr
self.last_input = None self.input = None
self.last_output = None self.output = None
self.output_linear = None
self.activation_func = activation_func self.activation_func = activation_func
def forward(self, V: np.ndarray) -> np.ndarray: def __str__(self):
self.last_input = V return f'[ {self.W.shape[0]} => {self.W.shape[1]}\tlr:{self.lr}\tactivation:{self.activation_func.__class__.__name__} ]' # noqa
res = V @ self.W + self.B
self.last_output = regularize(self.activation_func(res))
return self.last_output
def backprop(self, error: np.ndarray): def forward(self, V: np.ndarray) -> np.ndarray:
dW = np.outer(self.last_input, error) self.input = normalize(V)
self.W -= self.lr * dW self.output_linear = self.input @ self.W + self.B
self.B -= self.lr * error self.output = self.activation_func(
return error @ self.W.T self.output_linear
)
return self.output
def backprop(self, error: np.ndarray) -> np.ndarray:
error *= self.activation_func.derivative(self.output_linear)
ret = self.W @ error
dW = np.outer(self.input, error) * self.lr
dB = error * self.lr
self.W -= dW
self.B -= dB
return ret
class SamplingLayer:
def __init__(self,
in_size: int,
lr: float,
activation_func: ActivationFunc):
self.W_mean = np.random.uniform(-0.1, 0.1, (in_size, in_size))
self.W_variance = np.random.uniform(-0.1, 0.1, (in_size, in_size))
def forward(self, v) -> np.ndarray:
mean = self.W_mean @ v
variance = self.W_variance @ v
return np.random.normal(mean, variance)
def backprop(self, error: np.ndarray) -> np.ndarray:
pass
class DeepNNLayer:
def __init__(self,
layers: list[int],
lr: float,
activation_func: ActivationFunc):
self.layers: list[NNLayer] = []
for i in range(len(layers) - 1):
self.layers.append(
NNLayer(
layers[i],
layers[i+1],
lr,
activation_func)
)
def __str__(self):
return '\n'.join([str(layer) for layer in self.layers])
def forward(self, v: np.ndarray) -> np.ndarray:
for layer in self.layers:
v = layer.forward(v)
return v
def backprop(self, error: np.ndarray) -> np.ndarray:
for layer in self.layers[::-1]:
error = layer.backprop(error)
return error

View File

@@ -1,11 +1,11 @@
import matplotlib.pyplot as plt import matplotlib.pyplot as plt
import numpy as np import numpy as np
from autoencoder import Autoencoder from autoencoder import Autoencoder
from utils import relu from activations import LeakyReLU
def load_mnist():
import os import os
def load_mnist() -> list[np.ndarray]:
import requests import requests
mnist_path = "./mnist.npz" mnist_path = "./mnist.npz"
@@ -17,24 +17,66 @@ def load_mnist():
return res["x_train"], res["y_train"], res["x_test"], res["y_test"] return res["x_train"], res["y_train"], res["x_test"], res["y_test"]
def mnist_test( def mnist_train(
bottleneck: int, filename: str,
max_epoch: int, max_epoch: int,
patience: int, patience: int,
): ) -> Autoencoder:
x_train, _, x_test, _ = load_mnist() x_train, _, x_test, _ = load_mnist()
x_train = np.divide(x_train, 255)
x_test = np.divide(x_train, 255)
in_len = x_train[0].shape[0] * x_train[0].shape[0] in_len = x_train[0].shape[0] * x_train[0].shape[0]
autoencoder = Autoencoder(in_len, bottleneck, 0.001, relu) x_train.resize(x_train.shape[0], in_len)
autoencoder.train_dataset(x_train, max_epoch, patience, display_loss=True) x_test.resize(x_test.shape[0], in_len)
example: np.ndarray = x_test[np.random.randint(0, len(x_test))] x_train = x_train / 255
code = autoencoder.encode(example.flatten()) x_test = x_test / 255
output = autoencoder.decode(code) if os.path.exists(filename):
plt.subplot(1, 2, 1) autoencoder = Autoencoder.load(filename)
plt.matshow(example, fignum=False) else:
plt.subplot(1, 2, 2) autoencoder = Autoencoder(
plt.matshow(output.reshape(example.shape), fignum=False) [in_len, 64, 16],
[16, 64, in_len],
0.01,
LeakyReLU()
)
autoencoder.train_dataset(
x_train,
max_epoch,
patience,
display_loss=True)
autoencoder.save(filename)
return autoencoder
def mnist_test(model: str | Autoencoder):
x_train, _, x_test, y_test = load_mnist()
in_len = x_train[0].shape[0] * x_train[0].shape[0]
img_shape = x_train[0].shape
x_train.resize(x_train.shape[0], in_len)
x_test.resize(x_test.shape[0], in_len)
x_train = x_train / 255
x_test = x_test / 255
if isinstance(model, str):
autoencoder: Autoencoder = Autoencoder.load(model)
else:
autoencoder = model
print(autoencoder)
idx = np.random.randint(0, len(x_test))
example: np.ndarray = x_test[idx]
output, code = autoencoder.forward(example.flatten())
plt.subplot(1, 3, 1)
plt.matshow(
example.reshape(img_shape),
fignum=False)
plt.title(f"Input ({y_test[idx]})")
plt.subplot(1, 3, 2)
plt.matshow(
output.reshape(img_shape),
fignum=False)
plt.title(f"Output ({y_test[idx]})")
plt.subplot(1, 3, 3)
s = int(np.ceil(np.sqrt(code.shape[0])))
code.resize((s, s), refcheck=False)
plt.matshow(code, fignum=False)
plt.title(f"Code ({y_test[idx]})")
plt.show() plt.show()
@@ -42,10 +84,35 @@ if __name__ == "__main__":
import argparse import argparse
import sys import sys
options = "b:e:p:"
parser = argparse.ArgumentParser() parser = argparse.ArgumentParser()
parser.add_argument('-b', type=int, nargs='?', default=50) parser.add_argument(
parser.add_argument('-e', type=int, nargs='?', default=1000) '-e',
parser.add_argument('-p', type=int, nargs='?', default=5) type=int,
nargs='?',
default=1000,
help='Max epochs'
)
parser.add_argument(
'-p',
type=int,
nargs='?',
default=5,
help='Patience'
)
parser.add_argument(
'-m',
type=str, nargs='?',
default='autoencoder_mnist.npy',
help='Model filename to save in run mode or load in training mode'
)
parser.add_argument(
'-r',
action='store_true',
help='Run mode'
)
args = parser.parse_args(sys.argv[1:]) args = parser.parse_args(sys.argv[1:])
mnist_test(args.b, args.e, args.p) if args.r:
mnist_test(args.m)
else:
autoencoder = mnist_train(args.m, args.e, args.p)
mnist_test(autoencoder)

View File

@@ -9,10 +9,6 @@ def softmax(v: np.ndarray) -> np.ndarray:
return exp_v / np.sum(exp_v) return exp_v / np.sum(exp_v)
def relu(x: np.ndarray) -> np.ndarray:
return x * (x > 0)
def normalize(v: np.ndarray) -> np.ndarray: def normalize(v: np.ndarray) -> np.ndarray:
return v / (np.linalg.norm(v) + 1e-8) return v / (np.linalg.norm(v) + 1e-8)
@@ -25,10 +21,10 @@ def regularize(v: np.ndarray) -> np.ndarray:
return (v - v_min) / (v_max - v_min) return (v - v_min) / (v_max - v_min)
def dynamic_loss_plot_init(): def dynamic_loss_plot_init(losses: list):
plt.ion() plt.ion()
fig, ax = plt.subplots() fig, ax = plt.subplots()
line, = ax.plot([], [], label="Loss") line, = ax.plot([0], losses, label="Loss")
ax.set_xlabel("Epoch") ax.set_xlabel("Epoch")
ax.set_ylabel("Loss") ax.set_ylabel("Loss")
ax.set_title("Training Loss") ax.set_title("Training Loss")