feat: packaging, project structure + updated README.md
This commit is contained in:
0
src/easyvae/__init__.py
Normal file
0
src/easyvae/__init__.py
Normal file
35
src/easyvae/activations.py
Normal file
35
src/easyvae/activations.py
Normal file
@@ -0,0 +1,35 @@
|
||||
import numpy as np
|
||||
from abc import ABC, abstractmethod
|
||||
|
||||
|
||||
class ActivationFunc(ABC):
|
||||
@abstractmethod
|
||||
def d(v: np.ndarray) -> np.ndarray:
|
||||
pass
|
||||
|
||||
|
||||
class ReLU(ActivationFunc):
|
||||
def __call__(self, x):
|
||||
return x * (x > 0)
|
||||
|
||||
def d(self, x):
|
||||
return x > 0
|
||||
|
||||
|
||||
class LeakyReLU(ActivationFunc):
|
||||
def __init__(self, k=0.01):
|
||||
self.k = k
|
||||
|
||||
def __call__(self, x):
|
||||
return x * (x > 0) + self.k * x * (x <= 0)
|
||||
|
||||
def d(self, x):
|
||||
return (x > 0) + self.k * (x <= 0)
|
||||
|
||||
|
||||
class Identity(ActivationFunc):
|
||||
def __call__(self, x):
|
||||
return x
|
||||
|
||||
def d(self, x):
|
||||
return 1
|
||||
158
src/easyvae/autoencoder.py
Normal file
158
src/easyvae/autoencoder.py
Normal file
@@ -0,0 +1,158 @@
|
||||
import numpy as np
|
||||
from .utils import (
|
||||
dynamic_loss_plot_init,
|
||||
dynamic_loss_plot_update,
|
||||
dynamic_loss_plot_finish
|
||||
)
|
||||
from tqdm import tqdm
|
||||
from .layers import DeepNNLayer, SampleLayer
|
||||
from .activations import ActivationFunc, Identity
|
||||
from abc import ABC, abstractmethod
|
||||
|
||||
LOADER = ['⡿', '⣟', '⣯', '⣷', '⣾', '⣽', '⣻', '⢿']
|
||||
|
||||
|
||||
class AAutoencoder(ABC):
|
||||
def train_dataset(self,
|
||||
data_set: list[np.ndarray],
|
||||
max_epoch: int,
|
||||
patience: int,
|
||||
display_loss: bool = False) -> list[float]:
|
||||
losses = [self.loss(data_set)]
|
||||
if display_loss is True:
|
||||
ax, line = dynamic_loss_plot_init(losses)
|
||||
epoch = 0
|
||||
no_improv = 0
|
||||
prev_error = losses[0]
|
||||
with tqdm(bar_format="{desc} {elapsed} {rate_fmt}") as lbar:
|
||||
while True:
|
||||
lbar.set_description(
|
||||
f"{LOADER[epoch % len(LOADER)]} Training ({epoch=} error={float(prev_error):.6f})", # noqa
|
||||
)
|
||||
lbar.update()
|
||||
error = 0
|
||||
for x in tqdm(data_set, leave=False):
|
||||
error += self.train(x)
|
||||
error /= len(data_set)
|
||||
derror = prev_error - error
|
||||
if derror <= 0 or abs(derror) < 1e-4:
|
||||
no_improv += 1
|
||||
else:
|
||||
no_improv = 0
|
||||
prev_error = float(error)
|
||||
losses.append(error)
|
||||
if display_loss is True:
|
||||
dynamic_loss_plot_update(ax, line, losses)
|
||||
if no_improv > patience:
|
||||
break
|
||||
if epoch > max_epoch:
|
||||
break
|
||||
epoch += 1
|
||||
if display_loss is True:
|
||||
dynamic_loss_plot_finish(ax, line)
|
||||
return losses
|
||||
|
||||
def save(self, path: str):
|
||||
path = path.removesuffix('.npy')
|
||||
np.save(path, self)
|
||||
|
||||
def load(path: str) -> 'ClassicalAutoencoder':
|
||||
path = path.removesuffix('.npy') + '.npy'
|
||||
data = np.load(path, allow_pickle=True)
|
||||
return data.item()
|
||||
|
||||
@abstractmethod
|
||||
def loss(self, data_set: list[np.ndarray]) -> float:
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def train(self, v: np.ndarray) -> float:
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def forward(self, v: np.ndarray) -> np.ndarray:
|
||||
pass
|
||||
|
||||
|
||||
class ClassicalAutoencoder(AAutoencoder):
|
||||
def __init__(self,
|
||||
encoder_layers: list[int],
|
||||
decoder_layers: list[int],
|
||||
lr: float,
|
||||
activation_func: ActivationFunc):
|
||||
if encoder_layers[-1] != decoder_layers[0]:
|
||||
raise Exception(
|
||||
f"Encoder output and decoder input don't match {encoder_layers[-1]} != {encoder_layers[0]}" # noqa
|
||||
)
|
||||
self.encoder = DeepNNLayer(encoder_layers, lr, activation_func)
|
||||
self.decoder = DeepNNLayer(decoder_layers, lr, activation_func)
|
||||
|
||||
def __str__(self):
|
||||
return f'Encoder:\n{self.encoder}\n\nDecoder:\n{self.decoder}'
|
||||
|
||||
def loss(self, data_set: list[np.ndarray]) -> float:
|
||||
loss = 0
|
||||
for x in data_set:
|
||||
loss += np.sum(np.abs(x - self.forward(x)[0])) / len(x)
|
||||
return loss / len(data_set)
|
||||
|
||||
def train(self, v: np.ndarray):
|
||||
out = self.decoder.forward(
|
||||
self.encoder.forward(v)
|
||||
)
|
||||
error = out - v
|
||||
self.encoder.backprop(
|
||||
self.decoder.backprop(error)
|
||||
)
|
||||
return np.sum(np.abs(error)) / len(v)
|
||||
|
||||
def encode(self, v: np.ndarray) -> np.ndarray:
|
||||
return self.encoder.forward(v)
|
||||
|
||||
def decode(self, v: np.ndarray) -> np.ndarray:
|
||||
return self.decoder.forward(v)
|
||||
|
||||
def forward(self, v: np.ndarray) -> tuple[np.ndarray, np.ndarray]:
|
||||
code = self.encode(v)
|
||||
out = self.decode(code)
|
||||
return out, code
|
||||
|
||||
|
||||
class VariationalAutoencoder(AAutoencoder):
|
||||
def __init__(self,
|
||||
encoder_layers: list[int],
|
||||
decoder_layers: list[int],
|
||||
lr: float,
|
||||
activation_func: ActivationFunc):
|
||||
if encoder_layers[-1] != decoder_layers[0]:
|
||||
raise Exception(
|
||||
f"Encoder output and decoder input don't match {encoder_layers[-1]} != {encoder_layers[0]}" # noqa
|
||||
)
|
||||
self.encoder = DeepNNLayer(encoder_layers, lr, activation_func)
|
||||
self.decoder = DeepNNLayer(decoder_layers, lr, activation_func)
|
||||
self.sampler = SampleLayer(self.encoder.out_size, lr, Identity())
|
||||
|
||||
def loss(self, data_set: list[np.ndarray]) -> float:
|
||||
loss = 0
|
||||
for x in data_set:
|
||||
out = self.forward(x)[0]
|
||||
kl = self.sampler.DKL()
|
||||
loss += np.mean((out - x) ** 2)
|
||||
loss += kl
|
||||
return loss / len(data_set)
|
||||
|
||||
def train(self, v: np.ndarray) -> float:
|
||||
out, _ = self.forward(v)
|
||||
error = out - v
|
||||
self.encoder.backprop(
|
||||
self.sampler.backprop(
|
||||
self.decoder.backprop(error)
|
||||
)
|
||||
)
|
||||
return np.mean(error ** 2) + self.sampler.DKL()
|
||||
|
||||
def forward(self, v: np.ndarray) -> tuple[np.ndarray, np.ndarray]:
|
||||
code = self.encoder.forward(v)
|
||||
sample = self.sampler.forward(code)
|
||||
out = self.decoder.forward(sample)
|
||||
return out, code
|
||||
106
src/easyvae/layers.py
Normal file
106
src/easyvae/layers.py
Normal file
@@ -0,0 +1,106 @@
|
||||
import numpy as np
|
||||
from .activations import ActivationFunc, Identity
|
||||
|
||||
|
||||
class NNLayer:
|
||||
def __init__(self,
|
||||
in_size: int,
|
||||
out_size: int,
|
||||
lr: float,
|
||||
activation_func: ActivationFunc):
|
||||
limit = np.sqrt(6 / (in_size + out_size))
|
||||
self.W = np.random.uniform(-limit, limit, (in_size, out_size))
|
||||
self.B = np.zeros((out_size))
|
||||
self.lr = lr
|
||||
self.input = None
|
||||
self.output = None
|
||||
self.output_linear = None
|
||||
self.activation_func = activation_func
|
||||
|
||||
def __str__(self):
|
||||
return f'[ {self.W.shape[0]} => {self.W.shape[1]}\tlr:{self.lr}\tactivation:{self.activation_func.__class__.__name__} ]' # noqa
|
||||
|
||||
def forward(self, v: np.ndarray) -> np.ndarray:
|
||||
self.input = v
|
||||
self.output_linear = self.input @ self.W + self.B
|
||||
self.output = self.activation_func(
|
||||
self.output_linear
|
||||
)
|
||||
return self.output
|
||||
|
||||
def backprop(self, error: np.ndarray) -> np.ndarray:
|
||||
error *= self.activation_func.d(self.output_linear)
|
||||
ret = self.W @ error
|
||||
dW = np.outer(self.input, error) * self.lr
|
||||
dB = error * self.lr
|
||||
self.W -= dW
|
||||
self.B -= dB
|
||||
return ret
|
||||
|
||||
|
||||
class SampleLayer:
|
||||
def __init__(self,
|
||||
in_size: int,
|
||||
lr: float,
|
||||
activation_func: ActivationFunc):
|
||||
self.input = None
|
||||
self.mean_nn = NNLayer(
|
||||
in_size,
|
||||
in_size,
|
||||
lr,
|
||||
activation_func)
|
||||
self.std_nn = NNLayer(
|
||||
in_size,
|
||||
in_size,
|
||||
lr,
|
||||
activation_func)
|
||||
|
||||
def DKL(self):
|
||||
return -0.5 * np.mean(1 + self.logvar - self.mean ** 2 - np.exp(self.logvar)) # noqa
|
||||
|
||||
def forward(self, v: np.ndarray) -> np.ndarray:
|
||||
self.input = v
|
||||
self.mean = self.mean_nn.forward(v)
|
||||
self.logvar = np.clip(self.std_nn.forward(v), -10, 10)
|
||||
self.std = np.exp(0.5 * self.logvar)
|
||||
self.eps = np.random.normal(0, 1, self.mean.shape)
|
||||
return 0.5 * self.eps * self.std + self.mean
|
||||
|
||||
def backprop(self, error: np.ndarray) -> np.ndarray:
|
||||
dmean = error + self.mean
|
||||
dstd = error * self.eps + 0.5 * (np.exp(self.logvar) - 1)
|
||||
mean_error = self.mean_nn.backprop(dmean)
|
||||
logvar_error = self.std_nn.backprop(dstd * self.std)
|
||||
return mean_error + logvar_error
|
||||
|
||||
|
||||
class DeepNNLayer:
|
||||
def __init__(self,
|
||||
layers: list[int],
|
||||
lr: float,
|
||||
activation_func: ActivationFunc):
|
||||
self.layers: list[NNLayer] = []
|
||||
for i in range(len(layers) - 1):
|
||||
self.layers.append(
|
||||
NNLayer(
|
||||
layers[i],
|
||||
layers[i+1],
|
||||
lr,
|
||||
activation_func if i != len(layers) - 2 else Identity()
|
||||
)
|
||||
)
|
||||
self.in_size = layers[0]
|
||||
self.out_size = layers[-1]
|
||||
|
||||
def __str__(self):
|
||||
return '\n'.join([str(layer) for layer in self.layers])
|
||||
|
||||
def forward(self, v: np.ndarray) -> np.ndarray:
|
||||
for layer in self.layers:
|
||||
v = layer.forward(v)
|
||||
return v
|
||||
|
||||
def backprop(self, error: np.ndarray) -> np.ndarray:
|
||||
for layer in self.layers[::-1]:
|
||||
error = layer.backprop(error)
|
||||
return error
|
||||
46
src/easyvae/utils.py
Normal file
46
src/easyvae/utils.py
Normal file
@@ -0,0 +1,46 @@
|
||||
|
||||
import numpy as np
|
||||
import matplotlib.pyplot as plt
|
||||
|
||||
|
||||
def softmax(v: np.ndarray) -> np.ndarray:
|
||||
v = v - np.max(v)
|
||||
exp_v = np.exp(v)
|
||||
return exp_v / np.sum(exp_v)
|
||||
|
||||
|
||||
def normalize(v: np.ndarray) -> np.ndarray:
|
||||
return v / (np.linalg.norm(v) + 1e-8)
|
||||
|
||||
|
||||
def regularize(v: np.ndarray) -> np.ndarray:
|
||||
v_min = v.min(axis=0)
|
||||
v_max = v.max(axis=0)
|
||||
if v_min - v_max == 0:
|
||||
return v
|
||||
return (v - v_min) / (v_max - v_min)
|
||||
|
||||
|
||||
def dynamic_loss_plot_init(losses: list):
|
||||
plt.ion()
|
||||
fig, ax = plt.subplots()
|
||||
line, = ax.plot([0], losses, label="Loss")
|
||||
ax.set_xlabel("Epoch")
|
||||
ax.set_ylabel("Loss")
|
||||
ax.set_title("Training Loss")
|
||||
ax.legend()
|
||||
return ax, line
|
||||
|
||||
|
||||
def dynamic_loss_plot_update(ax, line, loss):
|
||||
line.set_xdata(range(len(loss)))
|
||||
line.set_ydata(loss)
|
||||
ax.relim()
|
||||
ax.autoscale_view()
|
||||
plt.draw()
|
||||
plt.pause(0.1)
|
||||
|
||||
|
||||
def dynamic_loss_plot_finish(ax, line):
|
||||
plt.ioff()
|
||||
plt.show()
|
||||
Reference in New Issue
Block a user