From a4334568ecf58cd3e17c9ea00f0f175f4c5371a9 Mon Sep 17 00:00:00 2001 From: Lenoctambule <106790775+lenoctambule@users.noreply.github.com> Date: Sun, 12 Apr 2026 19:40:04 +0200 Subject: [PATCH 1/7] refactor: separate gradient back and weight updates --- src/easyvae/autoencoder.py | 17 +++++++++++------ src/easyvae/layers.py | 33 ++++++++++++++++++++++----------- 2 files changed, 33 insertions(+), 17 deletions(-) diff --git a/src/easyvae/autoencoder.py b/src/easyvae/autoencoder.py index 3926481..e5f0fed 100644 --- a/src/easyvae/autoencoder.py +++ b/src/easyvae/autoencoder.py @@ -82,9 +82,11 @@ class ClassicalAutoencoder(AAutoencoder): self.encoder.forward(v) ) error = out - v - self.encoder.backprop( - self.decoder.backprop(error) + self.encoder.back( + self.decoder.back(error) ) + self.encoder.backprop() + self.decoder.backprop() return np.sum(np.abs(error)) / len(v) @interruptable @@ -109,7 +111,7 @@ class ClassicalAutoencoder(AAutoencoder): error += self.train(x) error /= len(data_set) derror = prev_error - error - if derror <= 0 or abs(derror) < 1e-4: + if abs(derror) < 1e-4: no_improv += 1 else: no_improv = 0 @@ -167,11 +169,14 @@ class VariationalAutoencoder(AAutoencoder): def train(self, v: np.ndarray) -> tuple[float, float]: out, _ = self.forward(v) error = out - v - self.encoder.backprop( - self.sampler.backprop( - self.decoder.backprop(error) + self.encoder.back( + self.sampler.back( + self.decoder.back(error) ) ) + self.encoder.backprop() + self.sampler.backprop() + self.decoder.backprop() return np.mean(error ** 2), self.sampler.DKL() @interruptable diff --git a/src/easyvae/layers.py b/src/easyvae/layers.py index a79c6cd..327b0f3 100644 --- a/src/easyvae/layers.py +++ b/src/easyvae/layers.py @@ -15,6 +15,7 @@ class NNLayer: self.input = None self.output = None self.output_linear = None + self.error = None self.activation_func = activation_func def __str__(self): @@ -27,15 +28,16 @@ class NNLayer: self.output_linear ) return self.output + + def back(self, error: np.ndarray) -> np.ndarray: + self.error = error * self.activation_func.d(self.output_linear) + return self.W @ self.error - def backprop(self, error: np.ndarray) -> np.ndarray: - error *= self.activation_func.d(self.output_linear) - ret = self.W @ error - dW = np.outer(self.input, error) * self.lr - dB = error * self.lr + def backprop(self) -> np.ndarray: + dW = np.outer(self.input, self.error) * self.lr + dB = self.error * self.lr self.W -= dW self.B -= dB - return ret class SampleLayer: @@ -66,13 +68,17 @@ class SampleLayer: self.eps = np.random.normal(0, 1, self.mean.shape) return 0.5 * self.eps * self.std + self.mean - def backprop(self, error: np.ndarray) -> np.ndarray: + def back(self, error: np.ndarray) -> np.ndarray: dmean = error + self.mean dstd = error * self.eps + 0.5 * (np.exp(self.logvar) - 1) - mean_error = self.mean_nn.backprop(dmean) - logvar_error = self.std_nn.backprop(dstd * self.std) + mean_error = self.mean_nn.back(dmean) + logvar_error = self.std_nn.back(dstd * self.std) return mean_error + logvar_error + def backprop(self): + self.mean_nn.backprop() + self.std_nn.backprop() + class DeepNNLayer: def __init__(self, @@ -100,7 +106,12 @@ class DeepNNLayer: v = layer.forward(v) return v - def backprop(self, error: np.ndarray) -> np.ndarray: + def back(self, error: np.ndarray): for layer in self.layers[::-1]: - error = layer.backprop(error) + error = layer.back(error) return error + + def backprop(self) -> np.ndarray: + for layer in self.layers: + layer.backprop() + From 82acc6b4f331b0509e0f67e32da77d0209df6a4d Mon Sep 17 00:00:00 2001 From: Lenoctambule <106790775+lenoctambule@users.noreply.github.com> Date: Sun, 12 Apr 2026 20:31:31 +0200 Subject: [PATCH 2/7] feat: NoiseLayer class + keep loss across trainings --- src/easyvae/autoencoder.py | 24 +++++++++++++++--------- src/easyvae/layers.py | 11 ++++++++++- 2 files changed, 25 insertions(+), 10 deletions(-) diff --git a/src/easyvae/autoencoder.py b/src/easyvae/autoencoder.py index e5f0fed..1dc6c24 100644 --- a/src/easyvae/autoencoder.py +++ b/src/easyvae/autoencoder.py @@ -1,6 +1,6 @@ import numpy as np from tqdm import tqdm -from .layers import DeepNNLayer, SampleLayer +from .layers import DeepNNLayer, SampleLayer, NoiseLayer from .activations import ActivationFunc, Identity from .plotters import Plotter, CAPlotter, VAEPlotter from .utils import interruptable @@ -17,13 +17,15 @@ class AAutoencoder(ABC): encoder_layers: list[int], decoder_layers: list[int], lr: float, - activation_func: ActivationFunc): + activation_func: ActivationFunc, + noise=0): if encoder_layers[-1] != decoder_layers[0]: raise Exception( f"Encoder output and decoder input don't match {encoder_layers[-1]} != {encoder_layers[0]}" # noqa ) self.encoder = DeepNNLayer(encoder_layers, lr, activation_func) self.decoder = DeepNNLayer(decoder_layers, lr, activation_func) + self.noise = NoiseLayer(noise) self.space_dim = decoder_layers[0] self.lr = lr self.losses = [0] @@ -78,8 +80,8 @@ class ClassicalAutoencoder(AAutoencoder): return loss / len(data_set) def train(self, v: np.ndarray): - out = self.decoder.forward( - self.encoder.forward(v) + out, _ = self.forward( + self.noise.forward(v) ) error = out - v self.encoder.back( @@ -96,7 +98,8 @@ class ClassicalAutoencoder(AAutoencoder): patience: int, display_loss: bool = False) -> list[float]: plotter = self.plotter_cls(self) if display_loss else Plotter(self) - self.losses = [self.loss(data_set)] + if len(self.losses) == 0: + self.losses = [self.loss(data_set)] epoch = 0 no_improv = 0 prev_error = self.losses[0] @@ -167,7 +170,9 @@ class VariationalAutoencoder(AAutoencoder): return recon_loss, kl_loss def train(self, v: np.ndarray) -> tuple[float, float]: - out, _ = self.forward(v) + out, _ = self.forward( + self.noise.forward(v) + ) error = out - v self.encoder.back( self.sampler.back( @@ -186,9 +191,10 @@ class VariationalAutoencoder(AAutoencoder): patience: int, display_loss: bool = False) -> list[float]: plotter = self.plotter_cls(self) if display_loss else Plotter(self) - recon_0, kl_0 = self.loss(data_set) - self.recon_losses = [recon_0] - self.KL_losses = [kl_0] + if len(self.recon_losses) == 0: + recon_0, kl_0 = self.loss(data_set) + self.recon_losses = [recon_0] + self.KL_losses = [kl_0] epoch = 0 no_improv = 0 prev_loss = self.recon_losses[0] + self.KL_losses[0] diff --git a/src/easyvae/layers.py b/src/easyvae/layers.py index 327b0f3..dc4daef 100644 --- a/src/easyvae/layers.py +++ b/src/easyvae/layers.py @@ -28,7 +28,7 @@ class NNLayer: self.output_linear ) return self.output - + def back(self, error: np.ndarray) -> np.ndarray: self.error = error * self.activation_func.d(self.output_linear) return self.W @ self.error @@ -115,3 +115,12 @@ class DeepNNLayer: for layer in self.layers: layer.backprop() + +class NoiseLayer: + def __init__(self, amount=0.1): + self.amount = amount + + def forward(self, v: np.ndarray): + if self.amount == 0: + return v + return v + np.random.normal(0, self.amount, v.shape) From e6c1229a7e422acf760a374f854d1a56899c6669 Mon Sep 17 00:00:00 2001 From: Lenoctambule <106790775+lenoctambule@users.noreply.github.com> Date: Mon, 13 Apr 2026 23:07:11 +0200 Subject: [PATCH 3/7] fix: return encoding after sample layer --- src/easyvae/autoencoder.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/easyvae/autoencoder.py b/src/easyvae/autoencoder.py index 1dc6c24..1880781 100644 --- a/src/easyvae/autoencoder.py +++ b/src/easyvae/autoencoder.py @@ -232,7 +232,7 @@ class VariationalAutoencoder(AAutoencoder): code = self.encoder.forward(v) sample = self.sampler.forward(code) out = self.decoder.forward(sample) - return out, code + return out, sample def encode(self, v: np.ndarray) -> np.ndarray: return self.sampler.forward( From b1dc34e699a1d4fa296422e21ec40ae9efb993d9 Mon Sep 17 00:00:00 2001 From: Lenoctambule <106790775+lenoctambule@users.noreply.github.com> Date: Tue, 14 Apr 2026 14:12:22 +0200 Subject: [PATCH 4/7] feat: post training and online labeling VAE class --- examples/mnist_test.py | 4 +-- src/easyvae/autoencoder.py | 64 ++++++++++++++++++++++++++++++++++++-- 2 files changed, 64 insertions(+), 4 deletions(-) diff --git a/examples/mnist_test.py b/examples/mnist_test.py index 160cb16..7082e1b 100644 --- a/examples/mnist_test.py +++ b/examples/mnist_test.py @@ -4,6 +4,7 @@ import os from easyvae.autoencoder import ( # noqa VariationalAutoencoder, ClassicalAutoencoder, + LabelingVAE, AAutoencoder ) from easyvae.activations import LeakyReLU @@ -26,10 +27,9 @@ def mnist_train( max_epoch: int, patience: int, cls: type[AAutoencoder],) -> AAutoencoder: - x_train, _, x_test, _ = load_mnist() + x_train, _, _, _ = load_mnist() in_len = x_train[0].shape[0] * x_train[0].shape[0] x_train.resize(x_train.shape[0], in_len) - x_test.resize(x_test.shape[0], in_len) x_train = x_train / 255 if os.path.exists(filename): autoencoder = cls.load(filename) diff --git a/src/easyvae/autoencoder.py b/src/easyvae/autoencoder.py index 1880781..490b6c5 100644 --- a/src/easyvae/autoencoder.py +++ b/src/easyvae/autoencoder.py @@ -7,6 +7,7 @@ from .utils import interruptable from abc import ABC, abstractmethod LOADER = ['⡿', '⣟', '⣯', '⣷', '⣾', '⣽', '⣻', '⢿'] +SQRT_2PI = np.sqrt(2 * np.pi) class AAutoencoder(ABC): @@ -236,8 +237,67 @@ class VariationalAutoencoder(AAutoencoder): def encode(self, v: np.ndarray) -> np.ndarray: return self.sampler.forward( - self.encoder.forward(v) - ) + self.encoder.forward(v) + ) def decode(self, v: np.ndarray) -> np.ndarray: return self.decoder.forward(v) + + +class Label: + def __init__(self, name: str, embedding_size: int): + self.name = name + self.embedding_size = embedding_size + self.history = [] + self.mean = np.zeros(embedding_size) + self.std = np.zeros(embedding_size) + + def observe(self, code: np.ndarray): + self.history.append(code) + + def cache(self): + history = np.array(self.history) + self.mean = np.mean(history, axis=0) + self.std = np.std(history, axis=0, mean=self.mean) + + def p(self, x: np.ndarray): + return np.mean( + np.exp(-(x - self.mean) ** 2 / (2 * self.std)) / (self.std * SQRT_2PI) # noqa + ) + + +class LabelingVAE(VariationalAutoencoder): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.labels: list[Label] = [] + self.labels_idxs: dict[str, int] = {} + + def learn_labels(self, data: np.ndarray, labels: list[list[str]], epoch=5): + self.labels.clear() + self.labels_idxs.clear() + for _ in range(epoch): + for x_i, labels_i in zip(data, labels): + y_i = self.encode(x_i) + for c in labels_i: + idx = self.labels_idxs.get(c, None) + if idx is None: + label = Label(c, self.encoder.out_size) + self.labels.append(label) + self.labels_idxs[c] = len(self.labels) - 1 + else: + label = self.labels[idx] + label.observe(y_i) + for label in self.labels: + label.cache() + + def label(self, x: np.ndarray): + y = self.encode(x) + probs = {} + total = 0 + for label in self.labels: + p = label.p(y) + probs[label.name] = p + total += p + for k in probs: + probs[k] = float(probs[k] / total) + return dict(sorted(probs.items())) From b635bf0467b2b0d730bb8142057776e146361fbf Mon Sep 17 00:00:00 2001 From: Lenoctambule <106790775+lenoctambule@users.noreply.github.com> Date: Tue, 14 Apr 2026 19:24:20 +0200 Subject: [PATCH 5/7] feat: add monte-carlo method and MSE to labeling method --- examples/mnist_test.py | 13 +++++++++---- src/easyvae/autoencoder.py | 15 ++++++++++++--- 2 files changed, 21 insertions(+), 7 deletions(-) diff --git a/examples/mnist_test.py b/examples/mnist_test.py index 7082e1b..236f507 100644 --- a/examples/mnist_test.py +++ b/examples/mnist_test.py @@ -37,7 +37,7 @@ def mnist_train( autoencoder = cls( [in_len, 256, 2], [2, 256, in_len], - 0.0001, + 0.001, LeakyReLU() ) print("CTRL+C to interrupt training.") @@ -91,8 +91,8 @@ def plot_random_reconstruction( print(f'{code.tolist()}') -def mnist_test(model: str | AAutoencoder): - x_train, _, x_test, y_test = load_mnist() +def mnist_test(model: str | AAutoencoder | LabelingVAE): + x_train, y_train, x_test, y_test = load_mnist() in_len = x_train[0].shape[0] * x_train[0].shape[0] img_shape = x_train[0].shape x_train.resize(x_train.shape[0], in_len) @@ -107,6 +107,11 @@ def mnist_test(model: str | AAutoencoder): print(autoencoder) idx = np.random.randint(0, len(x_test)) example: np.ndarray = x_test[idx] + y_train = [str(int(i)) for i in y_train] + autoencoder.learn_labels(x_train, y_train, 5) + res = autoencoder.label(x_train[idx]) + for k, v in res.items(): + print(f"{k} => {v}") plot_random_reconstruction(autoencoder, example, img_shape, y_test[idx]) if autoencoder.space_dim == 2: plot_mnist_latent_space(autoencoder, x_test, y_test) @@ -150,6 +155,6 @@ if __name__ == "__main__": args.m, args.e, args.p, - VariationalAutoencoder + LabelingVAE ) mnist_test(autoencoder) diff --git a/src/easyvae/autoencoder.py b/src/easyvae/autoencoder.py index 490b6c5..3c5dd30 100644 --- a/src/easyvae/autoencoder.py +++ b/src/easyvae/autoencoder.py @@ -290,8 +290,11 @@ class LabelingVAE(VariationalAutoencoder): for label in self.labels: label.cache() - def label(self, x: np.ndarray): - y = self.encode(x) + def label(self, x: np.ndarray, samples=10): + y = np.zeros((samples, self.encoder.out_size)) + for i in range(samples): + y[i] = self.encode(x) + y = np.mean(y, axis=0) probs = {} total = 0 for label in self.labels: @@ -300,4 +303,10 @@ class LabelingVAE(VariationalAutoencoder): total += p for k in probs: probs[k] = float(probs[k] / total) - return dict(sorted(probs.items())) + return dict( + sorted( + probs.items(), + key=lambda item: item[1], + reverse=True + ) + ) From 4cc349c22c8012c9109e14238bbf42f6ab00c617 Mon Sep 17 00:00:00 2001 From: Lenoctambule <106790775+lenoctambule@users.noreply.github.com> Date: Tue, 14 Apr 2026 20:53:13 +0200 Subject: [PATCH 6/7] feat: simple distances instead of std+mean for labeling --- examples/mnist_test.py | 8 +++---- src/easyvae/autoencoder.py | 46 ++++++++++++++++---------------------- 2 files changed, 23 insertions(+), 31 deletions(-) diff --git a/examples/mnist_test.py b/examples/mnist_test.py index 236f507..883128a 100644 --- a/examples/mnist_test.py +++ b/examples/mnist_test.py @@ -69,7 +69,6 @@ def plot_mnist_latent_space(autoencoder: AAutoencoder, x: np.ndarray, y,): ) plt.colorbar(scatter) plt.grid(True) - plt.show() def plot_random_reconstruction( @@ -107,14 +106,15 @@ def mnist_test(model: str | AAutoencoder | LabelingVAE): print(autoencoder) idx = np.random.randint(0, len(x_test)) example: np.ndarray = x_test[idx] - y_train = [str(int(i)) for i in y_train] - autoencoder.learn_labels(x_train, y_train, 5) - res = autoencoder.label(x_train[idx]) + labels_train = [str(int(i)) for i in y_train] + autoencoder.learn_labels(x_train, labels_train) + res = autoencoder.label(example) for k, v in res.items(): print(f"{k} => {v}") plot_random_reconstruction(autoencoder, example, img_shape, y_test[idx]) if autoencoder.space_dim == 2: plot_mnist_latent_space(autoencoder, x_test, y_test) + plt.show() if __name__ == "__main__": diff --git a/src/easyvae/autoencoder.py b/src/easyvae/autoencoder.py index 3c5dd30..e6c035e 100644 --- a/src/easyvae/autoencoder.py +++ b/src/easyvae/autoencoder.py @@ -249,20 +249,16 @@ class Label: self.name = name self.embedding_size = embedding_size self.history = [] - self.mean = np.zeros(embedding_size) - self.std = np.zeros(embedding_size) def observe(self, code: np.ndarray): self.history.append(code) def cache(self): - history = np.array(self.history) - self.mean = np.mean(history, axis=0) - self.std = np.std(history, axis=0, mean=self.mean) + self.history_np = np.array(self.history) def p(self, x: np.ndarray): return np.mean( - np.exp(-(x - self.mean) ** 2 / (2 * self.std)) / (self.std * SQRT_2PI) # noqa + np.exp(-np.abs(self.history_np - x)) ) @@ -272,33 +268,29 @@ class LabelingVAE(VariationalAutoencoder): self.labels: list[Label] = [] self.labels_idxs: dict[str, int] = {} - def learn_labels(self, data: np.ndarray, labels: list[list[str]], epoch=5): + def learn_labels(self, data: np.ndarray, labels: list[list[str]]): self.labels.clear() self.labels_idxs.clear() - for _ in range(epoch): - for x_i, labels_i in zip(data, labels): - y_i = self.encode(x_i) - for c in labels_i: - idx = self.labels_idxs.get(c, None) - if idx is None: - label = Label(c, self.encoder.out_size) - self.labels.append(label) - self.labels_idxs[c] = len(self.labels) - 1 - else: - label = self.labels[idx] - label.observe(y_i) - for label in self.labels: - label.cache() + for x_i, labels_i in zip(data, labels): + y_i = self.encode(x_i) + for c in labels_i: + idx = self.labels_idxs.get(c, None) + if idx is None: + label = Label(c, self.encoder.out_size) + self.labels.append(label) + self.labels_idxs[c] = len(self.labels) - 1 + else: + label = self.labels[idx] + label.observe(y_i) + for label in self.labels: + label.cache() - def label(self, x: np.ndarray, samples=10): - y = np.zeros((samples, self.encoder.out_size)) - for i in range(samples): - y[i] = self.encode(x) - y = np.mean(y, axis=0) + def label(self, x: np.ndarray): probs = {} total = 0 + code = self.encode(x) for label in self.labels: - p = label.p(y) + p = label.p(code) probs[label.name] = p total += p for k in probs: From 32a4a39ab9da69965456ce96de5b71fc0cff30ef Mon Sep 17 00:00:00 2001 From: Lenoctambule <106790775+lenoctambule@users.noreply.github.com> Date: Tue, 14 Apr 2026 21:03:04 +0200 Subject: [PATCH 7/7] fix: add history mem cap and mid point pruning --- src/easyvae/autoencoder.py | 24 +++++++++++++++--------- 1 file changed, 15 insertions(+), 9 deletions(-) diff --git a/src/easyvae/autoencoder.py b/src/easyvae/autoencoder.py index e6c035e..1c0c978 100644 --- a/src/easyvae/autoencoder.py +++ b/src/easyvae/autoencoder.py @@ -245,20 +245,28 @@ class VariationalAutoencoder(AAutoencoder): class Label: - def __init__(self, name: str, embedding_size: int): + def __init__(self, + name: str, + embedding_size: int, + N=100): self.name = name self.embedding_size = embedding_size - self.history = [] + self.N = N + self.idx = 0 + self.history = np.zeros((self.N, embedding_size)) def observe(self, code: np.ndarray): - self.history.append(code) - - def cache(self): - self.history_np = np.array(self.history) + if self.idx < self.N: + self.history[self.idx] = code + self.idx += 1 + else: + diffs = np.linalg.norm(self.history - code, axis=0) + idx = np.argmin(diffs) + self.history[idx] = (self.history[idx] + code) / 2 def p(self, x: np.ndarray): return np.mean( - np.exp(-np.abs(self.history_np - x)) + np.exp(-np.abs(self.history - x)) ) @@ -282,8 +290,6 @@ class LabelingVAE(VariationalAutoencoder): else: label = self.labels[idx] label.observe(y_i) - for label in self.labels: - label.cache() def label(self, x: np.ndarray): probs = {}