Source code for fABBA.digitization

# License: BSD 3 clause

# Copyright (c) 2021, Stefan Güttel, Xinye Chen
# All rights reserved.

# Digitization -- based on aggregation


try:
    try:
        from .separate.aggregation_cm import aggregate as aggregate_fc 
        # cython with memory view
        from .extmod.fabba_agg_cm import aggregate as aggregate_fabba 
        
    except ModuleNotFoundError:
        from .separate.aggregation_c import aggregate as aggregate_fc 
        from .extmod.fabba_agg_c import aggregate as aggregate_fabba 
    

except (ModuleNotFoundError):
    from .separate.aggregation import aggregate as aggregate_fc 
    from .fabba_agg import aggregate as aggregate_fabba
    from .inverse_t import *
    
import collections
import numpy as np
import pandas as pd
from dataclasses import dataclass


@dataclass
class Model:
    centers: np.ndarray # store aggregation centers
    splist: np.ndarray # store start point data
    alphabets: np.ndarray # labels -> symbols, symbols -> labels
    
    
    
[docs] def digitize(pieces, alpha=0.5, sorting='norm', scl=1, alphabet_set=0): """ Greedy 2D clustering of pieces (a Nx2 numpy array), using tolernce alpha and len/inc scaling parameter scl. A 'temporary' group center, which we call it starting point, is used when assigning pieces to clusters. This temporary cluster is the first piece available after appropriate scaling and sorting of all pieces. After finishing the grouping procedure, the centers are calculated the mean value of the objects within the clusters Parameters ---------- pieces - numpy.ndarray The compressed pieces of numpy.ndarray with shape (n_samples, n_features) after compression Returns ---------- string (str or list) string sequence """ pieces = np.array(pieces)[:,:2].astype(np.float64) if sorting not in ["lexi", "2-norm", "1-norm", "norm", "pca"]: raise ValueError("Please refer to a specific and correct sorting way, namely 'lexi', '2-norm' and '1-norm'") _std = np.std(pieces, axis=0) if _std[0] != 0: # to prevent 0 std when assign max_len as 1 to compression, which make aggregation go wrong. npieces = pieces * np.array([scl, 1]) / _std else: npieces = pieces * np.array([scl, 1]) npieces[:,1] = npieces[:,1] / _std[1] if sorting in ["lexi", "2-norm", "1-norm"]: # warnings.warn(f"Pass {sorting} as keyword args. From the next version " # f"passing these as positional arguments " # "will result in an error. Additionally, cython implementation will be impossible for this sorting.", # FutureWarning) labels, splist = aggregate_fabba(npieces, sorting, alpha) else: labels, splist = aggregate_fc(npieces, sorting, alpha) centers = np.zeros((0,2)) for c in range(len(splist)): indc = np.argwhere(labels==c) center = np.mean(pieces[indc,:], axis=0) centers = np.r_[ centers, center ] string, alphabets = symbolsAssign(labels, alphabet_set) parameters = Model(centers, np.array(splist), alphabets) return string, parameters
[docs] def inverse_digitize(strings, parameters): """ Convert symbolic representation back to compressed representation for reconstruction. Parameters ---------- string - string Time series in symbolic representation using unicode characters starting with character 'a'. centers - numpy array centers of clusters from clustering algorithm. Each centre corresponds to character in string. Returns ------- pieces - np.array Time series in compressed format. See compression. """ pieces = np.vstack([parameters.centers[parameters.alphabets.tolist().index(p)] for p in strings]) return pieces[:,0:2]
def calculate_group_centers(data, labels): agg_centers = list() for c in set(labels): center = np.mean(data[labels==c,:], axis=0).tolist() agg_centers.append( center ) return np.array(agg_centers) def wcss(data, labels, centers): inertia_ = 0 for i in np.unique(labels): c = centers[i] partition = data[labels == i] inertia_ = inertia_ + np.sum(np.linalg.norm(partition - c, ord=2, axis=1)**2) return inertia_ def symbolsAssign(clusters, alphabet_set=0): """ Automatically assign symbols to different groups, start with '!' Parameters ---------- clusters - list or pd.Series or array The list of labels. alphabet_set - int or list The list of alphabet letter. ---------- Return: string (list of string), alphabets(numpy.ndarray): for the corresponding symbolic sequence and for mapping from symbols to labels or labels to symbols, repectively. """ if alphabet_set == 0: alphabets = ['A','a','B','b','C','c','D','d','E','e', 'F','f','G','g','H','h','I','i','J','j', 'K','k','L','l','M','m','N','n','O','o', 'P','p','Q','q','R','r','S','s','T','t', 'U','u','V','v','W','w','X','x','Y','y','Z','z'] elif alphabet_set == 1: alphabets = ['A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M', 'N', 'O', 'P', 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X', 'Y', 'Z', 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z'] elif isinstance(alphabet_set, list) and len(alphabets): alphabets = alphabet_set else: alphabets = ['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z', 'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M', 'N', 'O', 'P', 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X', 'Y', 'Z'] clusters = pd.Series(clusters) N = len(clusters.unique()) cluster_sort = [0] * N counter = collections.Counter(clusters) for ind, el in enumerate(counter.most_common()): cluster_sort[ind] = el[0] if N >= len(alphabets): alphabets = [chr(i+33) for i in range(0, N)] else: alphabets = alphabets[:N] alphabets = np.asarray(alphabets) string = alphabets[clusters] return string, alphabets def quantize(pieces): """ Realign window lengths with integer grid. Parameters ---------- pieces: Time series in compressed representation. Returns ------- pieces: Time series in compressed representation with window length adjusted to integer grid. """ if len(pieces) == 1: pieces[0,0] = round(pieces[0,0]) else: for p in range(len(pieces)-1): corr = round(pieces[p,0]) - pieces[p,0] pieces[p,0] = round(pieces[p,0] + corr) pieces[p+1,0] = pieces[p+1,0] - corr if pieces[p,0] == 0: pieces[p,0] = 1 pieces[p+1,0] -= 1 pieces[-1,0] = round(pieces[-1,0],0) return pieces