import heapq
import numpy as np
from sklearn.metrics.pairwise import cosine_similarity
from sklearn.feature_extraction.text import CountVectorizer
from scipy.sparse import vstack, csr_matrix
from numba import njit
from gismo.corpus import Corpus, toy_source_text
from gismo.embedding import Embedding
from gismo.parameters import RESOLUTION, WIDE
[docs]class Cluster:
"""
The 'Cluster' class is used for internal representation of hierarchical cluster. It stores
the attributes that describe a clustering structure and provides cluster basic addition
for merge operations.
Parameters
----------
indice: int
Index of the head (main element) of the cluster.
rank: int
The ranking order of a cluster.
vector: :class:`~scipy.sparse.csr_matrix`
The vector representation of the cluster.
Attributes
----------
indice: int
Index of the head (main element) of the cluster.
rank: int
The ranking order of a cluster.
vector: :class:`~scipy.sparse.csr_matrix`
The vector representation of the cluster.
intersection_vector: :class:`~scipy.sparse.csr_matrix` (deprecated)
The vector representation of the common points of a cluster.
members: list of int
The indices of the cluster elements.
focus: float in range [0.0, 1.0]
The consistency of the cluster (higher focus means that elements are more similar).
children: list of :class:`~gismo.clustering.Cluster`
The subclusters.
Examples
---------
>>> c1 = Cluster(indice=0, rank=1, vector=csr_matrix([1.0, 0.0, 1.0]))
>>> c2 = Cluster(indice=5, rank=0, vector=csr_matrix([1.0, 1.0, 0.0]))
>>> c3 = c1+c2
>>> c3.members
[0, 5]
>>> c3.indice
5
>>> c3.vector.toarray()
array([[2., 1., 1.]])
>>> c3.intersection_vector.toarray()
array([[1., 0., 0.]])
>>> c1 == sum([c1])
True
"""
def __init__(self, indice=None, rank=None, vector=None):
self.indice = indice
self.rank = rank
self.members = [indice] if indice is not None else []
self.focus = 1.0
self.vector = vector
self.intersection_vector = vector
self.children = []
def __add__(self, cluster):
result = Cluster()
if self.rank < cluster.rank:
result.indice = self.indice
result.rank = self.rank
else:
result.indice = cluster.indice
result.rank = cluster.rank
result.members = self.members + cluster.members
result.focus = min(self.focus, cluster.focus) # Don't forget external focus
result.vector = self.vector + cluster.vector
result.intersection_vector = self.intersection_vector.multiply(cluster.intersection_vector)
result.children = [] # Better update sons list outside class definition
return result
def __radd__(self, other):
if other == 0:
return self
else:
return self.__add__(other)
[docs]def merge_clusters(cluster_list: list, focus=1.0):
"""
Complete merge operation. In addition to the basic merge provided by
:class:`~gismo.clustering.Cluster`, it ensures the following:
* Consistency of focus by integrating the extra-focus (typically given by :func:`~gismo.clustering.subspace_partition`).
* Children (the members of the list) are sorted according to their respective rank.
Parameters
----------
cluster_list: list of :class:`~gismo.clustering.Cluster`
The clusters to merge into one cluster.
focus: float
Evaluation of the focus (similarity) between clusters.
Returns
-------
:class:`~gismo.clustering.Cluster`
The cluster merging the list.
"""
if len(cluster_list) < 2:
return cluster_list[0]
result = sum(cluster_list)
result.focus = min(result.focus, focus)
result.children = sorted(cluster_list, key=lambda c: c.rank)
return result
[docs]def subspace_partition(subspace, resolution=RESOLUTION):
"""
Proposes a partition of the subspace that merges together vectors with a similar direction.
Parameters
----------
subspace: :class:`~numpy.ndarray`, :class:`~scipy.sparse.csr_matrix`
A ``k x m`` matrix seen as a list of ``k`` ``m``-dimensional vectors sorted by importance order.
resolution: float in range [0.0, 1.0]
How strict the merging should be. ``0.0`` will merge all items together, while ``1.0``
will only merge mutually closest items.
Returns
-------
list
A list of subsets that form a partition. Each subset is represented by a pair ``(p, f)``. ``p``
is the set of indices of the subset, ``f`` is the typical similarity of the partition (called `focus`).
"""
# Applying a square distortion to resolution gives a more linear behavior in practice.
resolution = 2 * resolution - resolution ** 2
n, _ = subspace.shape
similarity_matrix = cosine_similarity(subspace, subspace) - 2 * np.identity(n)
similarity = np.max(similarity_matrix, axis=0)
closest = np.argmax(similarity_matrix, axis=0)
local_similarity = [similarity[closest[i]] for i in range(n)]
partition = [{i} for i in range(n)]
heads = np.arange(n)
cluster_similarity = np.ones(n)
for i in range(n):
for j in range(n):
if i == j:
continue
sij = similarity_matrix[i, j]
master = min(heads[i], heads[j])
slave = max(heads[i], heads[j])
if sij >= resolution * local_similarity[i]:
cluster_similarity[master] = min(cluster_similarity[master], sij)
if master != slave:
for k in partition[slave]:
heads[k] = master
partition[master] |= partition[slave]
partition[slave] = set()
return [(c, d) for c, d in zip(partition, cluster_similarity) if c]
[docs]def rec_clusterize(cluster_list: list, resolution=RESOLUTION):
"""
Auxiliary recursive function for clustering.
Parameters
----------
cluster_list: list of :class:`~gismo.clustering.Cluster`
Current aggregation state.
resolution: float in range [0.0, 1.0]
Sets the lazyness of aggregation. A 'resolution' set to 0.0 yields a one-step clustering
(*star* structure), while a 'resolution ' set to 1.0 yields, up to tie similarities, a binary tree
(*dendrogram*).
Returns
-------
list of :class:`~gismo.clustering.Cluster`
"""
if len(cluster_list) == 1:
return cluster_list
else:
partition = subspace_partition(vstack([c.vector for c in cluster_list]), resolution)
return rec_clusterize([merge_clusters([cluster_list[i] for i in p[0]], p[1]) for p in partition],
resolution)
[docs]def subspace_clusterize(subspace, resolution=RESOLUTION, indices=None):
"""
Converts a subspace (matrix seen as a list of vectors) to a Cluster object (hierarchical clustering).
Parameters
----------
subspace: :class:`~numpy.ndarray`, :class:`~scipy.sparse.csr_matrix`
A ``k x m`` matrix seen as a list of ``k`` ``m``-dimensional vectors sorted by importance order.
resolution: float in range [0.0, 1.0]
Sets the lazyness of aggregation. A 'resolution' set to 0.0 yields a one-step clustering
(*star* structure), while a 'resolution ' set to 1.0 yields, up to tie similarities, a binary tree
(*dendrogram*).
indices: list, optional
Indicates the index for each element of the subspace. Used when 'subspace'
is extracted from a larger space (e.g. X or Y). If not set, indices are set to ``range(k)``.
Returns
-------
Cluster
A cluster whose leaves are the `k` vectors from 'subspace'.
Example
_________
>>> corpus = Corpus(toy_source_text)
>>> vectorizer = CountVectorizer(dtype=float)
>>> embedding = Embedding(vectorizer=vectorizer)
>>> embedding.fit_transform(corpus)
>>> subspace = embedding.x[1:, :]
>>> cluster = subspace_clusterize(subspace)
>>> len(cluster.children)
2
>>> cluster = subspace_clusterize(subspace, resolution=.02)
>>> len(cluster.children)
4
"""
if indices is None:
n, _ = subspace.shape
indices = range(n)
return rec_clusterize([Cluster(indice=r, rank=i, vector=subspace[i, :]) for i, r in enumerate(indices)],
resolution)[0]
class Covering:
def __init__(self):
self.result = []
self.heap = []
self.used = set()
def set(self, cluster):
self.result = []
self.heap = []
self.used = set()
self.push(cluster)
def push(self, cluster):
heapq.heappush(self.heap, (cluster.focus, cluster.rank, cluster))
def update(self, item):
if item not in self.used:
self.used.add(item)
self.result.append(item)
def pop(self):
return heapq.heappop(self.heap)[2]
def core(self, cluster):
self.set(cluster)
while len(self.heap) > 0:
c = self.pop()
self.update(c.indice)
for child in c.children:
self.push(child)
return self.result
def wide(self, cluster):
self.set(cluster)
while len(self.heap) > 0:
c = self.pop()
for child in c.children:
self.push(child)
self.update(child.indice)
return self.result
[docs]def covering_order(cluster, wide=WIDE):
"""
Uses a hierarchical cluster to provide an ordering of the items that mixes rank and coverage.
This is done by exploring all cluster and subclusters by increasing similarity and rank (lexicographic order).
Two variants are proposed:
* `Core`: for each cluster, append its representant to the list if new. Central items tend to have better rank.
* `Wide`: for each cluster, append its children representants to the list if new. Marginal items tend to have better rank.
Parameters
----------
cluster: :class:`~gismo.clustering.Cluster`
The cluster to explore.
wide: :class:`bool`
Use Wide (``True``) or Core (``False``) variant.
Returns
-------
list of int
Sorted indices of the items of the cluster.
"""
if wide:
return Covering().wide(cluster)
else:
return Covering().core(cluster)
[docs]@njit
def subspace_distortion(indices, data, relevance, distortion: float):
"""
Apply inplace distortion of a subspace with relevance.
Parameters
----------
indices: :class:`~numpy.ndarray`
Indice attribute of the subspace :class:`~scipy.sparse.csr_matrix`.
data: :class:`~numpy.ndarray`
Data attribute of the subspace :class:`~scipy.sparse.csr_matrix`.
relevance: :class:`~numpy.ndarray`
Relevance values in the embedding space.
distortion: float in [0.0, 1.0]
Power applied to relevance for distortion.
"""
for i, indice in enumerate(indices):
data[i] *= relevance[indice] ** distortion
[docs]def get_sim(csr, arr):
"""
Simple similarity computation between csr_matrix and ndarray.
Parameters
----------
csr: :class:`~scipy.sparse.csr_matrix`
arr: :class:`~numpy.ndarray`
Returns
-------
float
"""
return csr.dot(arr)[0]/np.linalg.norm(csr.data)/np.linalg.norm(arr)