Source code for mgng.mgng

# -*- coding: utf-8 -*-
r"""
Implementation of the Merged Growing Neural Gas for temporal data.
"""

__author__ = "Stefan Ulbrich"
__copyright__ = "Copyright 2020, All rights reserved."
# __credits__ = []
__license__ = "Confidential"
__version__ = "0.1"
__maintainer__ = "Stefan Ulbrich"
__email__ = "Stefan.Ulbrich@gmail.com"
__status__ = "alpha"
__date__ = "2020-01-27"
__all__ = ["MergeGNG"]

import logging
from typing import List, Tuple

import numpy as np
from attr import attrib, attributes
from numpy.linalg import norm


from mgng.helpers import get_dymmy_2D_data
from mgng.validators import is_greater_zero, is_weight_factor, repr_ndarray

logger = logging.getLogger(__name__)

[docs]@attributes class MergeGNG: r""" Class that represents a Merge Growing Neural Gas. Differences to default implementation * Neurons all kept in memory to allow numpy operations * Introduce half life and threshold for connections (planned). For now, only decrease. * Adaptation rate should depend on connection strength * Introduce method (half-life?, decay on all synapses) to remove very old movements (I am sure that the original implementation allows for orphans) * Compare with an approach of a regular neural gas with a refactory time * Add threshold for an activity to trigger a new neuron (hey, make a fifo). I really want to enforce this. If a neuron gets activated 3 times in a row it's tiem for a new neuron! * REALLY CONSIDER REMOVING THE DIOGONAL ELEMENTS! Implement neighbor learn rate .. maybe weighted by synapse strength * Activity is never 0 unless it is a never used neuron or one removed because it had no connections * Todo: did we remove neurons without connections? Parameters ---------- n_neurons: int Max. number of neurons n_dim: int Output dimension (of the feature space). connection_decay: float Hyper parameter for influencing the decay of neuron connections. NOT USED RIGHT NOW temporal_influence: float The influence of the temporal memory on finding the winning neuron memory_weight: float Determines the influence of past samples in the sequence. (Kinda "how long" it looks back into the past). life_span: int How many iterations until a synapse is deleted. Note .. synapses of the winning neuron only are decayed (it forgets "wrong" neighbors) max_activity: Maximal activity allowed for a neuron (cf. refractory period). If a neuron is more active than this threshold, a new neuron is inserted between it and the second most active neuron. Note that each time the neuron is the winning neuron, it's activity level is increased by 1.0 and then continuously decreases continuously in each iteration (c.f. `decrease_activity`) This is different to the reference paper where the network gros in regular intervals. Our approach reflects a more "on demand" approach and prevents the network from growing unnecessarily. decrease_activity: float Less important .. the activity decreases exponentially ... only interesting if there are only few iterations between reccuring sequences learn_rate: float lorem ipsum learn_rate_neighbors: float lorem ipsum delta: float Need a better name, right? It's a parameter that decides the neuron's activity if a new neuron is added. allow_removal: float lorem ipsum Attributes ---------- _weights: np.ndarray, :math:`n_{\text{neurons}} \times n_{\text{dim}}` The amount of neurons is constant in this implementation for simplicity reasons and speed (block operations). """ # FIXME: Until pylint + attrs work nicely together (or pylint and typehints) # pylint: disable=unsubscriptable-object,unsupported-assignment-operation,no-member # Note that comment type hints are used to ensure Python 3.5 support _and_ VSCode autocomplete # See https://www.attrs.org/en/stable/types.html#mypy n_neurons = attrib(default=100) # type: int n_dim = attrib(default=3) # type: int connection_decay = attrib(default=0.1, validator=[is_weight_factor]) # type: float temporal_influence = attrib(default=0.5, validator=[is_weight_factor]) # type: float memory_weight = attrib(default=0.5, validator=[is_weight_factor]) # type: float life_span = attrib(default=10) # type: int learn_rate = attrib(default=0.2, validator=[is_weight_factor]) # type: float learn_rate_neighbors = attrib(default=0.2, validator=[is_weight_factor]) # type: float decrease_activity = attrib(default=0.8, validator=[is_weight_factor]) # type: float # TODO find a goood name for delta (in fact, update all names) delta = attrib(default=0.8, validator=[is_weight_factor]) # type: float max_activity = attrib(default=2.0, validator=[is_greater_zero]) # type: float allow_removal = attrib(default=True) # type: bool # I don't want this parameter truth to be told creation_frequency = attrib(default=5) # type: int # Private variables. Default initializers depend on n_neurons and n_dim. The order matters! _weights = attrib(init=False, repr=repr_ndarray) # type: np.ndarray _context = attrib(init=False, repr=repr_ndarray) # type: np.ndarray _connections = attrib(init=False, repr=repr_ndarray) # type: np.ndarray _counter = attrib(init=False, repr=False) # type: np.ndarray _global_context = attrib(init=False, repr=repr_ndarray) # type: np.ndarray debug = attrib(default=False) # type: bool past = attrib(init=False, factory=list, repr=False) # type: List[List[np.ndarray]]
[docs] @_weights.default def _default_weights(self): self.n_neurons return np.random.rand(self.n_neurons, self.n_dim)
[docs] @_context.default def _default_context(self): return np.random.rand(self.n_neurons, self.n_dim)
[docs] @_global_context.default def _default_global_context(self): return np.random.rand(self.n_dim)
[docs] @_connections.default def _default_connections(self): # XXX: we keep all neurons in memory such that we can do block operations return np.zeros((self.n_neurons, self.n_neurons))
[docs] @_counter.default def _default_counter(self): return np.zeros(self.n_neurons)
[docs] def _decay(self, first: int): """ Decrease all synapses of a neuron but don't allow negative synampses. Parameters ---------- first : int Index of the neuron """ def decay_vector(vector: np.ndarray): vector -= 1.0 / self.life_span vector[vector < 0] = 0 logger.debug("Decaying connections before:\n%s", self._connections) decay_vector(self._connections[first, :]) decay_vector(self._connections[:, first]) logger.debug("after \n%s", self._connections)
[docs] def kill_orphans(self): # argwhere not suitable for indexing orphans = np.nonzero(np.sum(self._connections, axis=1) == 0) logger.debug("Orphans: %s", orphans) logger.debug("counter before: %s", self._counter) self._counter[orphans] = 0 logger.debug("counter after: %s", self._counter)
[docs] def adapt(self, sample: np.ndarray) -> Tuple[int, int]: """ Single adaptation step Parameters ---------- sample : np.ndarray, shape: :math:`(n_{\text{dim}},)` A single sample. Returns ------- Tuple[int,int] Optionally returns the first and second winning neurons used for Hebbian learning. """ if self.debug: self.past.append( [ self._weights.copy(), self._context.copy(), self._connections.copy(), self._counter.copy(), self.get_active_weights().copy(), sample.copy(), ] ) dist_weights = norm(self._weights - sample[np.newaxis, :], axis=-1) dist_context = norm(self._context - self._global_context, axis=-1) logger.debug("|weights| = %s, |context| = %s", dist_weights, dist_context) logger.debug("connections at beginning:\n%s", self._connections) # Todo remove this variable distance = ( 1 - self.temporal_influence ) * dist_weights + self.temporal_influence * dist_context winners = np.argsort( distance # (1 - self.temporal_influence) * dist_weights + self.temporal_influence * dist_context ) logger.debug("winners %s, %s", winners, distance[winners]) first = winners[0] second = winners[1] assert distance[first] <= distance[second] old_global_context = self._global_context.copy() # fmt: off self._global_context = ( (1 - self.memory_weight) * self._weights[first, :] + self.memory_weight * self._context[first, :] ) # fmt: on self._decay(first) # Let's decay first so that the new new connection has maximal value logger.debug("Adding edge to:\n%s", self._connections) # Symmetric connection matrix self._connections[first, second] = self._connections[second, first] = 1.0 # Diagonal only needed when the connection values are used in the update rule below. # then it should probably not be 1.0 # self._connections[first, first] = 1.0 logger.debug("after\n%s", self._connections) # Needs to be after new connections are created. otherwise the counter of first might be reset self.kill_orphans() self._weights[first, :] += self.learn_rate * (sample - self._weights[first, :]) self._context[first, :] += self.learn_rate * (old_global_context - self._context[first, :]) neighbors = np.nonzero(self._connections[first, :]) # == non-zeros in the row logger.debug("winning neuron's neighbors %s", neighbors) # Suggestion: weight adaptation by age of synapse # self._weights[neighbors, :] += self.learn_rate * self._connections[first, neighbors] *\ # (sample - self._weights[neighbors, :]) # self._context[neighbors, :] += self.learn_rate * self._connections[first, neighbors] *\ # (old_global_context - self._context[neighbors, :]) self._weights[neighbors, :] += self.learn_rate_neighbors * ( sample - self._weights[neighbors, :] ) self._context[neighbors, :] += self.learn_rate_neighbors * ( old_global_context - self._context[neighbors, :] ) self._counter[first] += 1 logger.debug("New counter: %s, \n%s", self._counter, self._connections) return first, second
[docs] def grow(self): """ Entropy maximization by adding neurons in regions of high activity. Note: this picks the weakest neuron. TODO this needs to be implemented too! """ # Warning .. error when the max neuron does not have neighbors (pretty much impossible) most = np.argmax(self._counter) its_neighbors = np.nonzero(self._connections[most, :]) # e.g., (array([0, 2]),) logger.debug(its_neighbors) most_active_neighbors = np.argsort( self._counter[its_neighbors] ) # get the activations and sort them logger.debug(most_active_neighbors) # The last entry is the winning neuron itself (WATCH OUT unless the diagonal is zero!, in that case, use -1) neighbor = its_neighbors[0][most_active_neighbors[-1]] logger.debug( "Most active: %d\nIts neighbors: %s, Its most active neighbor: %s", most, its_neighbors, neighbor, ) new = self.kill_weakest() self.delta = 0.8 # Yet another parameter :-( self._weights[new, :] = 0.5 * (self._weights[most, :] + self._weights[neighbor, :]) self._context[new, :] = 0.5 * (self._context[most, :] + self._context[neighbor, :]) self._counter[new] = self.delta * (self._counter[most] + self._counter[neighbor]) self._counter[most] *= 1 - self.delta self._counter[neighbor] *= 1 - self.delta self._connections[most, neighbor] = self._connections[neighbor, most] = 0.0 self._connections[new, neighbor] = self._connections[neighbor, new] = 1.0 self._connections[most, new] = self._connections[new, most] = 1.0
[docs] def kill_weakest(self) -> np.signedinteger: """ Finds the weakest neuron (or the first with zero activity in the list) and returns its index Returns ------- int Index of the neuron """ least = np.argmin(self._counter) # That is a good metric? Probably yes logger.info("Least active neuron: %d, value: %f", least, self._counter[least]) logger.info("Did it have conntections?\n%s", self._connections[least, :]) if np.sum(self._connections[least, :]) > 0: logger.warning( "Killing existing neuron. Consider larger pool! Activity: %f", self._counter[least] ) # Remove connections: self._connections[least, :] = 0.0 self._connections[:, least] = 0.0 return least
[docs] def learn(self, samples: np.ndarray, epochs: int): r""" Batch learning Parameters ---------- samples : np.ndarray Row array of points. Shape :math:`n_{\text{samples}} \times n_{\text{dim}}`. epochs : int Number of repetitions. """ assert samples.shape[1] == self.n_dim for e in range(epochs): for i, sample in enumerate(samples): logger.info("\n\n\n%s\nSample: %d, Epoch: %d", "*" * 24, i, e) self.adapt(sample) self._counter *= self.decrease_activity if True: if np.max(self._counter) >= self.max_activity: self.grow() else: if i % self.creation_frequency == self.creation_frequency - 1: # Make this a factor depending on the activity of neurons self.grow()
[docs] def get_active_weights(self): # Watchout there is an argwhere not an nonzero return ( self._weights[np.nonzero(self._counter > 0), :], self._weights[np.nonzero(self._counter <= 0), :], )
if __name__ == "__main__": import matplotlib.pyplot as plt # type: ignore mgng = MergeGNG(connection_decay=0.1) print(mgng) a = mgng.n_neurons X = get_dymmy_2D_data(20) print(repr_ndarray(X)) plt.plot(X[0, :], X[1, :]) plt.show()