Compare commits
12 Commits
6bd230c070
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
e6b508f739 | ||
|
|
cc74b62afd | ||
|
|
a93bb0a692 | ||
|
|
8a3d408b7a | ||
|
|
53c7f73055 | ||
|
|
44bf4c0286 | ||
|
|
7aabc5db48 | ||
|
|
09835e9afa | ||
|
|
efd328e530 | ||
|
|
6155649655 | ||
|
|
e5520bf050 | ||
|
|
a50a09b337 |
3
.gitignore
vendored
3
.gitignore
vendored
@@ -1,4 +1,5 @@
|
||||
__pycache__
|
||||
*.pyc
|
||||
*.npz
|
||||
*.npy
|
||||
.venv
|
||||
mnist.npz
|
||||
11
README.md
11
README.md
@@ -17,8 +17,14 @@ $ py mnist_test.py
|
||||
Instatiate an `Autoencoder` object :
|
||||
```py
|
||||
from autoencoder import Autoencoder
|
||||
from activations import LeakyReLU
|
||||
|
||||
autoencoder = Autoencoder(in_len=300, bottleneck=50, 0.001, relu)
|
||||
autoencoder = Autoencoder(
|
||||
[768, 64, 16],
|
||||
[16, 64, 768],
|
||||
0.01,
|
||||
LeakyReLU()
|
||||
)
|
||||
```
|
||||
And then via the `train_dataset` method to train over a dataset :
|
||||
```py
|
||||
@@ -31,9 +37,10 @@ autoencoder.train(v)
|
||||
|
||||
## Inference
|
||||
|
||||
Use your `Autoencoder` object with the `encode` and `decode` methods like so :
|
||||
Use your `Autoencoder` object with the `encode`, `decode`, `forward` methods like so :
|
||||
```py
|
||||
example = ...
|
||||
code = autoencoder.encode(example)
|
||||
output = autoencoder.decode(code)
|
||||
output, code = autoencoder.forward(example)
|
||||
```
|
||||
|
||||
35
activations.py
Normal file
35
activations.py
Normal file
@@ -0,0 +1,35 @@
|
||||
import numpy as np
|
||||
from abc import ABC, abstractmethod
|
||||
|
||||
|
||||
class ActivationFunc(ABC):
|
||||
@abstractmethod
|
||||
def derivative(v: np.ndarray) -> np.ndarray:
|
||||
pass
|
||||
|
||||
|
||||
class ReLU(ActivationFunc):
|
||||
def __call__(self, x):
|
||||
return x * (x > 0)
|
||||
|
||||
def derivative(self, x):
|
||||
return x > 0
|
||||
|
||||
|
||||
class LeakyReLU(ActivationFunc):
|
||||
def __init__(self, k=0.01):
|
||||
self.k = k
|
||||
|
||||
def __call__(self, x):
|
||||
return x * (x > 0) + self.k * x * (x <= 0)
|
||||
|
||||
def derivative(self, x):
|
||||
return (x > 0) + self.k * (x <= 0)
|
||||
|
||||
|
||||
class Identity(ActivationFunc):
|
||||
def __call__(self, x):
|
||||
return x
|
||||
|
||||
def derivative(x):
|
||||
return 1
|
||||
125
autoencoder.py
125
autoencoder.py
@@ -3,51 +3,37 @@ from utils import (dynamic_loss_plot_init,
|
||||
dynamic_loss_plot_update,
|
||||
dynamic_loss_plot_finish)
|
||||
from tqdm import tqdm
|
||||
from layers import NNLayer
|
||||
from layers import DeepNNLayer, SamplingLayer
|
||||
from activations import ActivationFunc
|
||||
from abc import ABC, abstractmethod
|
||||
|
||||
LOADER = ['⡿', '⣟', '⣯', '⣷', '⣾', '⣽', '⣻', '⢿']
|
||||
|
||||
|
||||
class Autoencoder:
|
||||
def __init__(self,
|
||||
in_len: int,
|
||||
bottleneck: int,
|
||||
lr: float,
|
||||
activation_func):
|
||||
self.encoder = NNLayer(in_len, bottleneck, lr, activation_func)
|
||||
self.decoder = NNLayer(bottleneck, in_len, lr, activation_func)
|
||||
|
||||
def train(self, v: np.ndarray) -> float:
|
||||
encoded = self.encoder.forward(v)
|
||||
reconstructed = self.decoder.forward(encoded)
|
||||
error = self.decoder.backprop(reconstructed - v)
|
||||
self.encoder.backprop(error)
|
||||
error = v - reconstructed
|
||||
return np.sum(np.abs(error))
|
||||
|
||||
class AAutoencoder(ABC):
|
||||
def train_dataset(self,
|
||||
data_set: list[np.ndarray],
|
||||
max_epoch: int,
|
||||
patience: int,
|
||||
display_loss: bool = False) -> list[float]:
|
||||
losses = [self.loss(data_set)]
|
||||
if display_loss is True:
|
||||
ax, line = dynamic_loss_plot_init()
|
||||
losses = []
|
||||
ax, line = dynamic_loss_plot_init(losses)
|
||||
epoch = 0
|
||||
no_improv = 0
|
||||
prev_error = float('inf')
|
||||
prev_error = losses[0]
|
||||
with tqdm(bar_format="{desc} {elapsed} {rate_fmt}") as lbar:
|
||||
while True:
|
||||
lbar.set_description(
|
||||
f"{LOADER[epoch % len(LOADER)]} Training ({epoch=} error={prev_error:.2f})", # noqa
|
||||
f"{LOADER[epoch % len(LOADER)]} Training ({epoch=} error={float(prev_error):.6f})", # noqa
|
||||
)
|
||||
lbar.update()
|
||||
error = 0
|
||||
for x in data_set:
|
||||
input = x.flatten()
|
||||
error += self.train(input)
|
||||
for x in tqdm(data_set, leave=False):
|
||||
error += self.train(x)
|
||||
error /= len(data_set)
|
||||
if prev_error - error <= 1e-8:
|
||||
derror = prev_error - error
|
||||
if derror <= 0 or abs(derror) < 1e-4:
|
||||
no_improv += 1
|
||||
else:
|
||||
no_improv = 0
|
||||
@@ -60,13 +46,92 @@ class Autoencoder:
|
||||
if epoch > max_epoch:
|
||||
break
|
||||
epoch += 1
|
||||
if display_loss is True:
|
||||
dynamic_loss_plot_finish(ax, line)
|
||||
print("#Training complete !")
|
||||
return losses
|
||||
print("Training complete !")
|
||||
if display_loss is True:
|
||||
dynamic_loss_plot_finish(ax, line)
|
||||
return losses
|
||||
|
||||
def loss(self, data_set: list[np.ndarray]) -> float:
|
||||
loss = 0
|
||||
for x in data_set:
|
||||
loss += np.sum(np.abs(x - self.forward(x)[0])) / len(x)
|
||||
return loss / len(data_set)
|
||||
|
||||
def save(self, path: str):
|
||||
path = path.removesuffix('.npy')
|
||||
np.save(path, self)
|
||||
|
||||
def load(path: str) -> 'Autoencoder':
|
||||
path = path.removesuffix('.npy') + '.npy'
|
||||
data = np.load(path, allow_pickle=True)
|
||||
return data.item()
|
||||
|
||||
@abstractmethod
|
||||
def train(self, v: np.ndarray) -> float:
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def encode(self, v: np.ndarray) -> np.ndarray:
|
||||
return self.encoder.forward(v)
|
||||
|
||||
@abstractmethod
|
||||
def decode(self, v: np.ndarray) -> np.ndarray:
|
||||
return self.decoder.forward(v)
|
||||
|
||||
@abstractmethod
|
||||
def forward(self, v: np.ndarray) -> tuple[np.ndarray, np.ndarray]:
|
||||
code = self.encode(v)
|
||||
out = self.decode(code)
|
||||
return out, code
|
||||
|
||||
|
||||
class Autoencoder(AAutoencoder):
|
||||
def __init__(self,
|
||||
encoder_layers: list[int],
|
||||
decoder_layers: list[int],
|
||||
lr: float,
|
||||
activation_func: ActivationFunc):
|
||||
if encoder_layers[-1] != decoder_layers[0]:
|
||||
raise Exception(
|
||||
f"Encoder output and decoder input don't match {encoder_layers[-1]} != {encoder_layers[0]}" # noqa
|
||||
)
|
||||
self.encoder = DeepNNLayer(encoder_layers, lr, activation_func)
|
||||
self.decoder = DeepNNLayer(decoder_layers, lr, activation_func)
|
||||
|
||||
def train(self, v: np.ndarray) -> float:
|
||||
out = self.decoder.forward(
|
||||
self.encoder.forward(v)
|
||||
)
|
||||
self.encoder.backprop(
|
||||
self.decoder.backprop(out - v)
|
||||
)
|
||||
return np.sum(np.abs(out - v)) / len(v)
|
||||
|
||||
def __str__(self):
|
||||
return f'Encoder:\n{self.encoder}\n\nDecoder:\n{self.decoder}'
|
||||
|
||||
def encode(self, v: np.ndarray) -> np.ndarray:
|
||||
return self.encoder.forward(v)
|
||||
|
||||
def decode(self, v: np.ndarray) -> np.ndarray:
|
||||
return self.decoder.forward(v)
|
||||
|
||||
def forward(self, v: np.ndarray) -> tuple[np.ndarray, np.ndarray]:
|
||||
code = self.encode(v)
|
||||
out = self.decode(code)
|
||||
return out, code
|
||||
|
||||
|
||||
class VariationalAutoencoder(AAutoencoder):
|
||||
def __init__(self,
|
||||
encoder_layers: list[int],
|
||||
decoder_layers: list[int],
|
||||
lr: float,
|
||||
activation_func: ActivationFunc):
|
||||
if encoder_layers[-1] != decoder_layers[0]:
|
||||
raise Exception(
|
||||
f"Encoder output and decoder input don't match {encoder_layers[-1]} != {encoder_layers[0]}" # noqa
|
||||
)
|
||||
self.encoder = DeepNNLayer(encoder_layers, lr, activation_func)
|
||||
self.decoder = DeepNNLayer(decoder_layers, lr, activation_func)
|
||||
self.sampler = SamplingLayer(decoder_layers[0], lr, activation_func)
|
||||
|
||||
85
layers.py
85
layers.py
@@ -1,6 +1,6 @@
|
||||
import numpy as np
|
||||
import types
|
||||
from utils import regularize
|
||||
from utils import normalize
|
||||
from activations import ActivationFunc
|
||||
|
||||
|
||||
class NNLayer:
|
||||
@@ -8,22 +8,77 @@ class NNLayer:
|
||||
in_size: int,
|
||||
out_size: int,
|
||||
lr: float,
|
||||
activation_func: types.FunctionType):
|
||||
activation_func: ActivationFunc):
|
||||
self.W = np.random.uniform(-1, 1, (in_size, out_size))
|
||||
self.B = np.zeros((out_size))
|
||||
self.lr = lr
|
||||
self.last_input = None
|
||||
self.last_output = None
|
||||
self.input = None
|
||||
self.output = None
|
||||
self.output_linear = None
|
||||
self.activation_func = activation_func
|
||||
|
||||
def forward(self, V: np.ndarray) -> np.ndarray:
|
||||
self.last_input = V
|
||||
res = V @ self.W + self.B
|
||||
self.last_output = regularize(self.activation_func(res))
|
||||
return self.last_output
|
||||
def __str__(self):
|
||||
return f'[ {self.W.shape[0]} => {self.W.shape[1]}\tlr:{self.lr}\tactivation:{self.activation_func.__class__.__name__} ]' # noqa
|
||||
|
||||
def backprop(self, error: np.ndarray):
|
||||
dW = np.outer(self.last_input, error)
|
||||
self.W -= self.lr * dW
|
||||
self.B -= self.lr * error
|
||||
return error @ self.W.T
|
||||
def forward(self, V: np.ndarray) -> np.ndarray:
|
||||
self.input = normalize(V)
|
||||
self.output_linear = self.input @ self.W + self.B
|
||||
self.output = self.activation_func(
|
||||
self.output_linear
|
||||
)
|
||||
return self.output
|
||||
|
||||
def backprop(self, error: np.ndarray) -> np.ndarray:
|
||||
error *= self.activation_func.derivative(self.output_linear)
|
||||
ret = self.W @ error
|
||||
dW = np.outer(self.input, error) * self.lr
|
||||
dB = error * self.lr
|
||||
self.W -= dW
|
||||
self.B -= dB
|
||||
return ret
|
||||
|
||||
|
||||
class SamplingLayer:
|
||||
def __init__(self,
|
||||
in_size: int,
|
||||
lr: float,
|
||||
activation_func: ActivationFunc):
|
||||
self.W_mean = np.random.uniform(-0.1, 0.1, (in_size, in_size))
|
||||
self.W_variance = np.random.uniform(-0.1, 0.1, (in_size, in_size))
|
||||
|
||||
def forward(self, v) -> np.ndarray:
|
||||
mean = self.W_mean @ v
|
||||
variance = self.W_variance @ v
|
||||
return np.random.normal(mean, variance)
|
||||
|
||||
def backprop(self, error: np.ndarray) -> np.ndarray:
|
||||
pass
|
||||
|
||||
|
||||
class DeepNNLayer:
|
||||
def __init__(self,
|
||||
layers: list[int],
|
||||
lr: float,
|
||||
activation_func: ActivationFunc):
|
||||
self.layers: list[NNLayer] = []
|
||||
for i in range(len(layers) - 1):
|
||||
self.layers.append(
|
||||
NNLayer(
|
||||
layers[i],
|
||||
layers[i+1],
|
||||
lr,
|
||||
activation_func)
|
||||
)
|
||||
|
||||
def __str__(self):
|
||||
return '\n'.join([str(layer) for layer in self.layers])
|
||||
|
||||
def forward(self, v: np.ndarray) -> np.ndarray:
|
||||
for layer in self.layers:
|
||||
v = layer.forward(v)
|
||||
return v
|
||||
|
||||
def backprop(self, error: np.ndarray) -> np.ndarray:
|
||||
for layer in self.layers[::-1]:
|
||||
error = layer.backprop(error)
|
||||
return error
|
||||
|
||||
111
mnist_test.py
111
mnist_test.py
@@ -1,11 +1,11 @@
|
||||
import matplotlib.pyplot as plt
|
||||
import numpy as np
|
||||
from autoencoder import Autoencoder
|
||||
from utils import relu
|
||||
from activations import LeakyReLU
|
||||
import os
|
||||
|
||||
|
||||
def load_mnist():
|
||||
import os
|
||||
def load_mnist() -> list[np.ndarray]:
|
||||
import requests
|
||||
|
||||
mnist_path = "./mnist.npz"
|
||||
@@ -17,24 +17,66 @@ def load_mnist():
|
||||
return res["x_train"], res["y_train"], res["x_test"], res["y_test"]
|
||||
|
||||
|
||||
def mnist_test(
|
||||
bottleneck: int,
|
||||
def mnist_train(
|
||||
filename: str,
|
||||
max_epoch: int,
|
||||
patience: int,
|
||||
):
|
||||
) -> Autoencoder:
|
||||
x_train, _, x_test, _ = load_mnist()
|
||||
x_train = np.divide(x_train, 255)
|
||||
x_test = np.divide(x_train, 255)
|
||||
in_len = x_train[0].shape[0] * x_train[0].shape[0]
|
||||
autoencoder = Autoencoder(in_len, bottleneck, 0.001, relu)
|
||||
autoencoder.train_dataset(x_train, max_epoch, patience, display_loss=True)
|
||||
example: np.ndarray = x_test[np.random.randint(0, len(x_test))]
|
||||
code = autoencoder.encode(example.flatten())
|
||||
output = autoencoder.decode(code)
|
||||
plt.subplot(1, 2, 1)
|
||||
plt.matshow(example, fignum=False)
|
||||
plt.subplot(1, 2, 2)
|
||||
plt.matshow(output.reshape(example.shape), fignum=False)
|
||||
x_train.resize(x_train.shape[0], in_len)
|
||||
x_test.resize(x_test.shape[0], in_len)
|
||||
x_train = x_train / 255
|
||||
x_test = x_test / 255
|
||||
if os.path.exists(filename):
|
||||
autoencoder = Autoencoder.load(filename)
|
||||
else:
|
||||
autoencoder = Autoencoder(
|
||||
[in_len, 64, 16],
|
||||
[16, 64, in_len],
|
||||
0.01,
|
||||
LeakyReLU()
|
||||
)
|
||||
autoencoder.train_dataset(
|
||||
x_train,
|
||||
max_epoch,
|
||||
patience,
|
||||
display_loss=True)
|
||||
autoencoder.save(filename)
|
||||
return autoencoder
|
||||
|
||||
|
||||
def mnist_test(model: str | Autoencoder):
|
||||
x_train, _, x_test, y_test = load_mnist()
|
||||
in_len = x_train[0].shape[0] * x_train[0].shape[0]
|
||||
img_shape = x_train[0].shape
|
||||
x_train.resize(x_train.shape[0], in_len)
|
||||
x_test.resize(x_test.shape[0], in_len)
|
||||
x_train = x_train / 255
|
||||
x_test = x_test / 255
|
||||
if isinstance(model, str):
|
||||
autoencoder: Autoencoder = Autoencoder.load(model)
|
||||
else:
|
||||
autoencoder = model
|
||||
print(autoencoder)
|
||||
idx = np.random.randint(0, len(x_test))
|
||||
example: np.ndarray = x_test[idx]
|
||||
output, code = autoencoder.forward(example.flatten())
|
||||
plt.subplot(1, 3, 1)
|
||||
plt.matshow(
|
||||
example.reshape(img_shape),
|
||||
fignum=False)
|
||||
plt.title(f"Input ({y_test[idx]})")
|
||||
plt.subplot(1, 3, 2)
|
||||
plt.matshow(
|
||||
output.reshape(img_shape),
|
||||
fignum=False)
|
||||
plt.title(f"Output ({y_test[idx]})")
|
||||
plt.subplot(1, 3, 3)
|
||||
s = int(np.ceil(np.sqrt(code.shape[0])))
|
||||
code.resize((s, s), refcheck=False)
|
||||
plt.matshow(code, fignum=False)
|
||||
plt.title(f"Code ({y_test[idx]})")
|
||||
plt.show()
|
||||
|
||||
|
||||
@@ -42,10 +84,35 @@ if __name__ == "__main__":
|
||||
import argparse
|
||||
import sys
|
||||
|
||||
options = "b:e:p:"
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument('-b', type=int, nargs='?', default=50)
|
||||
parser.add_argument('-e', type=int, nargs='?', default=1000)
|
||||
parser.add_argument('-p', type=int, nargs='?', default=5)
|
||||
parser.add_argument(
|
||||
'-e',
|
||||
type=int,
|
||||
nargs='?',
|
||||
default=1000,
|
||||
help='Max epochs'
|
||||
)
|
||||
parser.add_argument(
|
||||
'-p',
|
||||
type=int,
|
||||
nargs='?',
|
||||
default=5,
|
||||
help='Patience'
|
||||
)
|
||||
parser.add_argument(
|
||||
'-m',
|
||||
type=str, nargs='?',
|
||||
default='autoencoder_mnist.npy',
|
||||
help='Model filename to save in run mode or load in training mode'
|
||||
)
|
||||
parser.add_argument(
|
||||
'-r',
|
||||
action='store_true',
|
||||
help='Run mode'
|
||||
)
|
||||
args = parser.parse_args(sys.argv[1:])
|
||||
mnist_test(args.b, args.e, args.p)
|
||||
if args.r:
|
||||
mnist_test(args.m)
|
||||
else:
|
||||
autoencoder = mnist_train(args.m, args.e, args.p)
|
||||
mnist_test(autoencoder)
|
||||
|
||||
8
utils.py
8
utils.py
@@ -9,10 +9,6 @@ def softmax(v: np.ndarray) -> np.ndarray:
|
||||
return exp_v / np.sum(exp_v)
|
||||
|
||||
|
||||
def relu(x: np.ndarray) -> np.ndarray:
|
||||
return x * (x > 0)
|
||||
|
||||
|
||||
def normalize(v: np.ndarray) -> np.ndarray:
|
||||
return v / (np.linalg.norm(v) + 1e-8)
|
||||
|
||||
@@ -25,10 +21,10 @@ def regularize(v: np.ndarray) -> np.ndarray:
|
||||
return (v - v_min) / (v_max - v_min)
|
||||
|
||||
|
||||
def dynamic_loss_plot_init():
|
||||
def dynamic_loss_plot_init(losses: list):
|
||||
plt.ion()
|
||||
fig, ax = plt.subplots()
|
||||
line, = ax.plot([], [], label="Loss")
|
||||
line, = ax.plot([0], losses, label="Loss")
|
||||
ax.set_xlabel("Epoch")
|
||||
ax.set_ylabel("Loss")
|
||||
ax.set_title("Training Loss")
|
||||
|
||||
Reference in New Issue
Block a user