Source code for galaxywitness.clusterization

from collections import defaultdict
# from itertools import groupby
import numpy as np
from scipy.spatial.transform import Rotation

import plotly.express as px
import plotly.graph_objects as go

from gudhi.clustering.tomato import Tomato
from gudhi.representations.preprocessing import ProminentPoints

"""
If one wants to compare two clusterizations they should collect the data (lists of coordinates and weights) needed for
the constructor. As a temporary solution this cold be done by adding several changes to `clustering` function
from __main__.py. Insert the following at the end of the function:

d = {'x': points[:, 0], 'y': points[:, 1], 'z': points[:, 2]}
df = pd.DataFrame(d)
df['weight'] = 1.0
df['cluster'] = tomato.labels_
return df

Also, we still need group points by cluster label, this code does the work:

cl = []
for c in df['cluster'].unique():
    d = df.loc[cl1['cluster'] == c, 'x':'weight'].to_numpy().T.tolist()
    cl.append(d)
o1 = Clusterization(len(cl), cl)

Now you can
cost = o1.compare_clusterization(o2)
print(cost)
"""


def center_of_mass_diff(c1, c2):
    sum_square = (c1[0] - c2[0]) ** 2 + (c1[1] - c2[1]) ** 2 + (c1[2] - c2[2]) ** 2
    dist = np.sqrt(sum_square)
    weight_diff = abs(c1[3] - c2[3])  # improve weight_diff metric
    return dist + weight_diff


def distances_matrix(centers1, centers2):
    """
    matrix is supposed to have a shape (n, m), n <= m
    """
    n_cntrs = centers2 if len(centers1) > len(centers2) else centers1
    m_cntrs = centers1 if len(centers1) > len(centers2) else centers2
    a = np.zeros((len(n_cntrs) + 1, len(m_cntrs) + 1))
    for i in range(1, a.shape[0]):
        for j in range(1, a.shape[1]):
            a[i][j] = center_of_mass_diff(n_cntrs[i - 1], m_cntrs[j - 1])
    return a


def Hungarian(a):
    n = a.shape[0] - 1  # истинные размеры матрицы расстояний, a.shape учитывает фиктивные нулевые строку и столбец
    m = a.shape[1] - 1
    u = [0] * (n + 1)
    v = [0] * (m + 1)
    matches = [0] * (m + 1)
    way = [0] * (m + 1)
    for i in range(1, n + 1):
        matches[0] = i
        j_free = 0
        minv = [np.inf] * (m + 1)
        used = [False] * (m + 1)
        while matches[j_free] != 0:
            used[j_free] = True
            i_free = matches[j_free]
            delta = np.inf
            j1 = 0
            for j in range(1, m + 1):
                if not used[j]:
                    cur = a[i_free][j] - u[i_free] - v[j]
                    if cur < minv[j]:
                        minv[j] = cur
                        way[j] = j_free
                    if minv[j] < delta:
                        delta = minv[j]
                        j1 = j
            for j in range(0, m + 1):
                if used[j]:
                    u[matches[j]] += delta
                    v[j] -= delta
                else:
                    minv[j] -= delta
            j_free = j1
        while j_free != 0:
            j1 = way[j_free]
            matches[j_free] = matches[j1]
            j_free = j1
    cost = -v[0]
    return cost, matches[1:]


[docs]class Clusterization: """ Class for handling clusterization of a point cloud :param n_clusters: number of clusters. :type n_clusters: int :param clusters: collection of clusters, each cluster contains four subarrays: list of first coordinates, list of second coordinates, list of third coordinates, list of weights. Weights are optional to pass though for convenience __init__() fills the parameter with 1.0 in case it isn't present :type clusters: list, Iterable, dict, or DataFrame :param centers_of_mass_computed: flag is set to False unless centers of mass were computed for clusters. :type centers_of_mass_computed: bool :param centers_of_mass: collection of computed centers of mass, info contained about each center: [coord_1, coord_2, coord_3, weight] :type centers_of_mass: list """ __slots__ = [ 'points', 'labels', 'n_clusters', 'clusters', 'centers_of_mass_computed', 'centers_of_mass' ] def __init__(self, points, n_clusters=0, clusters=None): """ Constuctor """ self.points = points self.labels = None self.n_clusters = n_clusters self.clusters = clusters self.centers_of_mass_computed = False self.centers_of_mass = []
[docs] def center_of_mass(self): """ Compute centroids of clusters in clustering """ for cluster in self.clusters: weight = sum(cluster[3]) # av_x = sum(np.multiply(cluster[0], cluster[3])) / weight av_y = sum(np.multiply(cluster[1], cluster[3])) / weight av_z = sum(np.multiply(cluster[2], cluster[3])) / weight self.centers_of_mass.append([av_x, av_y, av_z, weight]) self.centers_of_mass_computed = True return self.centers_of_mass
def _build_clustering(self): if self.labels is None: return self.clusters = [] labels = defaultdict(list) for i, l in enumerate(self.labels): labels[l].append(i) for _, cluster in labels.items(): # temp = np.array(cluster) self.clusters.append([self.points[cluster, 0], self.points[cluster, 1], self.points[cluster, 2], [1.]*len(cluster)]) self.n_clusters = len(self.clusters) self.center_of_mass()
[docs] def import_clustering(self, labels): """ Import outer clustering :param labels: labels of outer clustering :type labels: list or np.ndarray """ self.labels = labels self._build_clustering()
[docs] def tomato(self, max_fil_val=7.5): """ Tomato clustering :param max_fil_val: maximum value of filtration :type max_fil_val: float """ tomato = Tomato(density_type='DTM') tomato.fit(self.points) self.n_clusters = self._compute_number_of_clusters(tomato.diagram_, max_fil_val) #tomato.n_clusters_ = self.n_clusters self.n_clusters = tomato.n_clusters_ self.labels = tomato.labels_ self._build_clustering()
def _compute_number_of_clusters(self, diag, max_fil_val=7.5): new_diag = [] if diag.size > 0: prom = ProminentPoints(use=True, num_pts=len(diag), threshold=max_fil_val) new_diag = prom.transform([diag]) return len(new_diag)
[docs] def draw_projections(self, num): """ Draw projections of clustering of point cloud on the several random planes :param num: number of planes :type num: int """ thetas = np.random.rand(num)*np.pi axes = np.random.randn(3, num) axes /= np.linalg.norm(axes, axis=0) for j in range(num): axis = axes[:, j] rotation = Rotation.from_quat([np.sin(thetas[j])*axis[0], np.sin(thetas[j])*axis[1],\ np.sin(thetas[j])*axis[2], np.cos(thetas[j])]) points = rotation.apply(self.points) fig = px.scatter(x=points[:, 0], y=points[:, 1], color=self.labels) fig.update_traces(marker_size=2) fig.show()
[docs] def draw_clustering(self): """ Draw clustering of point cloud """ fig = go.Figure(data=[go.Scatter3d(x=self.points[:, 0], y=self.points[:, 1], z=self.points[:, 2], mode='markers', marker=dict(size=1, color=self.labels))]) fig.update_layout(scene=dict(xaxis_title="X, Mpc", yaxis_title="Y, Mpc", zaxis_title="Z, Mpc")) fig.show()
[docs] def compare_clusterization(self, other): """ Compare two clusterizations (not ready) :param other: another clustering :type num: galaxywitness.clusterization.Clusterization """ if not self.centers_of_mass_computed: self.center_of_mass() if not other.centers_of_mass_computed: other.center_of_mass() print(self.n_clusters, other.n_clusters) a = distances_matrix(self.centers_of_mass, other.centers_of_mass) cost, _ = Hungarian(a) return cost