import os
import warnings
import numpy as np
import pandas as pd
import collections
import platform
import itertools
from collections import defaultdict
from dataclasses import dataclass
from multiprocessing.pool import ThreadPool as Pool
import collections
from typing import List, Tuple, Any
from sklearn.cluster import KMeans
from .fkmns import sampledKMeansInter
from joblib import parallel_backend
try:
# # %load_ext Cython
# !python3 setup.py build_ext --inplace
from .compmem import compress
if platform.system() == 'Linux':
from .aggmem import aggregate # cython with memory view
else:
from .aggwin import aggregate # cython with memory view
from .inversetc import *
except ModuleNotFoundError:
warnings.warn("cython fail.")
from .comp import compress
from .agg import aggregate
from .inverset import *
import multiprocessing
import subprocess
def check_faiss_installation() -> bool:
"""
Checks if FAISS is installed and, if so, whether the GPU version is
detected by checking the number of available GPUs through the library.
Returns:
bool: True if FAISS is installed AND detects one or more GPUs.
"""
try:
# Check for FAISS import
import faiss
# Check for GPU capability using FAISS's built-in function
num_gpus = faiss.get_num_gpus()
if num_gpus > 0:
print(f"[INFO] FAISS installed. GPU version detected with {num_gpus} available GPU(s).")
return True
else:
print("[INFO] FAISS installed (CPU version) or no GPUs detected.")
return False
except ImportError:
print("[ERROR] FAISS is NOT installed. Please run 'pip install faiss-cpu' or 'pip install faiss-gpu'.")
return False
def get_macos_thread_affinity():
"""Attempt to get thread affinity on macOS using Mach thread_policy_get."""
import ctypes
from ctypes import c_uint32, c_int, c_size_t
try:
libc = ctypes.CDLL('libc.dylib')
THREAD_AFFINITY_POLICY = 4
THREAD_AFFINITY_POLICY_COUNT = 1
class ThreadAffinityPolicyData(ctypes.Structure):
_fields_ = [("affinity_tag", c_uint32)]
mach_thread_self = libc.mach_thread_self
mach_thread_self.restype = c_uint32
policy_data = ThreadAffinityPolicyData()
policy_count = c_uint32(THREAD_AFFINITY_POLICY_COUNT)
get_default = c_int(0)
ret = libc.thread_policy_get(
mach_thread_self(),
c_uint32(THREAD_AFFINITY_POLICY),
ctypes.byref(policy_data),
ctypes.byref(policy_count),
ctypes.byref(get_default)
)
if ret == 0 and policy_data.affinity_tag != 0:
# print(f"macOS thread_policy_get succeeded: Affinity tag {policy_data.affinity_tag}")
return [policy_data.affinity_tag]
else:
print(f"macOS thread_policy_get failed: Error code {ret}, Affinity tag: {policy_data.affinity_tag}")
return None
except Exception as e:
print(f"Exception in macOS thread_policy_get: {str(e)}")
return None
def get_macos_cpu_count():
"""Get the number of available CPUs on macOS using multiprocessing."""
try:
cpu_count = multiprocessing.cpu_count()
# print(f"Retrieved CPU count via multiprocessing: {cpu_count}")
return list(range(cpu_count))
except Exception as e:
print(f"Exception in multiprocessing.cpu_count: {str(e)}")
return None
def get_macos_cpu_count_sysctl():
"""Get the number of available CPUs on macOS using sysctl."""
try:
result = subprocess.run(['sysctl', '-n', 'hw.ncpu'], capture_output=True, text=True, check=True)
cpu_count = int(result.stdout.strip())
# print(f"Retrieved CPU count via sysctl: {cpu_count}")
return list(range(cpu_count))
except Exception as e:
print(f"Exception in sysctl hw.ncpu: {str(e)}")
return None
def get_cpu_affinity():
"""Get CPU affinity for the current process, with enhanced macOS support."""
import platform
system = platform.system()
try:
import psutil
except ImportError:
psutil = None
try:
# Linux: Try os.sched_getaffinity
if hasattr(os, 'sched_getaffinity') and system == 'Linux':
affinity = os.sched_getaffinity(0)
# print(f"Retrieved Linux affinity: {affinity}")
return affinity
# Cross-platform: Try psutil.cpu_affinity
if psutil is not None:
process = psutil.Process()
if hasattr(process, 'cpu_affinity'):
try:
affinity = process.cpu_affinity()
# print(f"Retrieved psutil affinity: {affinity}")
return affinity
except Exception as e:
print(f"psutil.cpu_affinity failed: {str(e)}")
else:
print("psutil.cpu_affinity not available in this version")
# macOS: Try thread affinity and fallbacks
if system == 'Darwin':
# print("Attempting macOS thread affinity retrieval...")
affinity = get_macos_thread_affinity()
if affinity:
return affinity
# print("Falling back to CPU count as macOS affinity proxy...")
affinity = get_macos_cpu_count()
if affinity:
# print("Note: This is the list of available CPUs, not true affinity")
return affinity
# print("Falling back to sysctl CPU count...")
affinity = get_macos_cpu_count_sysctl()
if affinity:
# print("Note: This is the list of available CPUs, not true affinity")
return affinity
# print("macOS does not provide direct CPU affinity information")
# Other platforms
else:
print(f"CPU affinity not supported on {system}")
except Exception as e:
print(f"Unexpected error in get_cpu_affinity: {str(e)}")
return None
def symbolsAssign(clusters, alphabet_set=0):
"""
Automatically assign symbols to different groups, start with '!'
Parameters
----------
clusters - list or pd.Series or array
The list of labels.
alphabet_set - int or list
The list of alphabet letter.
----------
Return:
string (list of string), alphabets(numpy.ndarray): for the
corresponding symbolic sequence and for mapping from symbols to labels or
labels to symbols, repectively.
"""
clusters = pd.Series(clusters)
N = len(clusters.unique())
if alphabet_set == 0:
alphabets = ['A','a','B','b','C','c','D','d','E','e',
'F','f','G','g','H','h','I','i','J','j',
'K','k','L','l','M','m','N','n','O','o',
'P','p','Q','q','R','r','S','s','T','t',
'U','u','V','v','W','w','X','x','Y','y','Z','z']
elif alphabet_set == 1:
alphabets = ['A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L',
'M', 'N', 'O', 'P', 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X',
'Y', 'Z', 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j',
'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v',
'w', 'x', 'y', 'z']
elif isinstance(alphabet_set, list):
if N <= len(alphabet_set):
alphabets = alphabet_set
else:
raise ValueError("Please ensure the length of ``alphabet_set`` is greatere than ``clusters``.")
else:
alphabets = ['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l',
'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x',
'y', 'z', 'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J',
'K', 'L', 'M', 'N', 'O', 'P', 'Q', 'R', 'S', 'T', 'U', 'V',
'W', 'X', 'Y', 'Z']
cluster_sort = [0] * N
counter = collections.Counter(clusters)
for ind, el in enumerate(counter.most_common()):
cluster_sort[ind] = el[0]
if N >= len(alphabets):
alphabets = [chr(i+33) for i in range(0, N)]
else:
alphabets = alphabets[:N]
alphabets = np.asarray(alphabets)
string = alphabets[clusters]
return string.tolist(), alphabets
@dataclass
class Model:
"""
save ABBA model - parameters
"""
centers: np.ndarray # store aggregation centers
""" dictionary """
alphabets: np.ndarray # labels -> symbols, symbols -> labels
def general_compress(pabba, data, adjust=True, n_jobs=-1):
if len(data.shape) > 3:
raise ValueError("Please transform the shape of data into 1D, 2D or 3D.")
elif len(data.shape) == 3:
pabba.d_shape = data.shape
data = data.reshape(data.shape[0], data.shape[1]*data.shape[2])
else:
pabba.d_shape = data.shape
if adjust:
_mean = data.mean(axis=0)
_std = data.std(axis=0)
if np.any(_std == 0):
_std[_std == 0] = 1
data = (data - _mean) / _std
strings = pabba.fit_transform(data, n_jobs=n_jobs)
pabba.d_norm = (_mean, _std)
else:
pabba.d_norm = None
strings = pabba.fit_transform(data, n_jobs=n_jobs)
return strings
def general_decompress(pabba, strings, int_type=True, n_jobs=-1):
reconstruction = np.array(pabba.inverse_transform(strings, n_jobs=n_jobs))
if pabba.d_norm is not None:
try:
reconstruction = reconstruction*pabba.d_norm[1] + pabba.d_norm[0]
except ValueError:
raise ValueError("The number of symbol is not enough to reconstruct this data, please use more symbols.")
if len(pabba.d_shape) == 3:
if int_type:
reconstruction = reconstruction.round().reshape(pabba.d_shape).astype(np.uint8)
else:
reconstruction = reconstruction.reshape(pabba.d_shape)
return reconstruction
def flatten_to_2d_keep_last(x: Any, keep_last: bool = True, verbose: bool = False) -> Tuple[np.ndarray, Tuple[int, ...]]:
"""
Flatten all dimensions except the last one (or first one) into a single dimension.
Parameters
----------
x : array-like
Input of any dimension >= 2
keep_last : bool
If True → keep last dim → output: (everything_else, last_dim) ← most common
If False → keep first dim → output: (first_dim, everything_else)
Returns
-------
x_2d : np.ndarray
2D array
original_shape : tuple
Original shape (for restoring later)
"""
x = np.asarray(x) # handle list, torch, etc.
if x.ndim < 2:
raise ValueError(f"Input must have at least 2 dimensions, got {x.ndim}")
original_shape = x.shape
if keep_last:
new_shape = (-1, x.shape[-1]) # -1 means "infer this"
description = f"Flattened all except last dim: {original_shape} -> {new_shape}"
else:
new_shape = (x.shape[0], -1)
description = f"Flattened all except first dim: {original_shape} -> {new_shape}"
x_2d = x.reshape(new_shape)
if verbose:
print(description)
return x_2d, original_shape, x_2d.shape
def restore_from_2d(x_2d: np.ndarray, original_shape: Tuple[int, ...]) -> np.ndarray:
"""
Restore the original high-dimensional shape from the 2D flattened version.
"""
x_restored = x_2d.reshape(original_shape)
return x_restored
def pad_or_truncate_2d_list(
data: List[List[Any]],
target_len: int
) -> np.ndarray:
"""
Pad or truncate each inner list to target_len.
- If longer: truncate
- If shorter: pad with last element
"""
padded = []
for row in data:
if len(row) == 0:
raise ValueError("Empty inner list is not allowed")
if len(row) >= target_len:
padded.append(row[:target_len])
else:
pad_value = row[-1]
padded.append(row + [pad_value] * (target_len - len(row)))
return np.asarray(padded)
def pad_or_trim_keep_ends(
data: List[List[Any]],
target_len: int
) -> np.ndarray:
"""
For each row:
- If len > target_len:
keep first & last, remove from the back of the middle part
- If len < target_len:
pad with last element
"""
if target_len < 1:
raise ValueError("target_len must be >= 1")
out = []
for row in data:
if len(row) == 0:
raise ValueError("Empty inner list is not allowed")
n = len(row)
# --- case 1: too long ---
if n > target_len:
if target_len == 1:
# edge case: only one slot → keep last (or first, your choice)
new_row = [row[-1]]
elif target_len == 2:
new_row = [row[0], row[-1]]
else:
middle_len = target_len - 2
new_row = [row[0]] + row[1:1 + middle_len] + [row[-1]]
# --- case 2: too short ---
elif n < target_len:
pad_value = row[-1]
new_row = row + [pad_value] * (target_len - n)
# --- case 3: exact ---
else:
new_row = row
out.append(new_row)
return np.asarray(out)
[docs]
class JABBA(object):
"""
Parallel version of ABBA with fast implementation.
Parameters
----------
tol - double, default=0.5
Tolerance for compression.
k - int, default=1
The number of clusters (distinct symbols) specified for ABBA.
r - float, default=0.5
The rate of data sampling to perform k-means.
alpha - double, default=None
Tolerance for digitization. If None is set, auto-digitization will be enabled.
init - str, default='agg'
The clustering algorithm in digitization. optional: 'f-kmeans', 'kmeans', 'gpu-kmeans'.
sorting - str, default="norm".
Apply sorting data before aggregation (inside digitization). Alternative option: "pca".
max_len - int
The max length of series contained in each compression pieces.
max_iter - int, default=2
The max iteration for fast k-means algorithm.
batch_size - int, default=1024
Size of the mini batches for mini-batch kmeans in digitization. For faster compuations,
you can set the batch_size greater than 256 * number of cores to enable parallelism on all cores.
verbose - int or boolean, default=1
Enable verbose output.
partition_rate - float or int, default=None
This parameter is to get the number of partitions of time series.
when this parameter is not None, the partitions will be
n_jobs*int(np.round(np.exp(1/self.partition_rate), 0))
partition - int:
The number of subsequences for time series to be partitioned.
scl - int or float, default=1
Scale the length of compression pieces. The larger the value is, the more important of the length information is.
Therefore, it can solve some problem resulted from peak shift.
eta - float, default=None,
Parameter to control the auto-digitization. If None, eta = 3.
last_dim - boolean, default=True,
The method to process the varying shape (>=2) of time series. True as default otherwise flatten the shape dimension > 1.
auto_digitize - boolean, default=False
Enable auto digitization without prior knowledge of alpha.
trim_method - str, default='pad'
The method to process the varying length of time series.
'pad' : pad the time series to the max length with the last value.
'keep_ends' : keep the first and last elements, trim or pad the middle part to the max length.
Attributes
----------
params: dict
Parameters of trained model.
string_ - str or list
Contains the ABBA representation.
"""
def __init__(self, tol=0.2, init='agg', k=2, r=0.5, alpha=None,
sorting="norm", scl=1, max_iter=10,
partition_rate=None, partition=None,
max_len=np.inf, verbose=1, random_state=2022, eta=None,
fillna='ffill', last_dim=True, auto_digitize=False, trim_method='keep_ends'):
self.tol = tol
self.alpha = alpha
self.k = k
self.scl = scl
self.init = init
if self.alpha == None:
auto_digitize = True
self.max_len = max_len
self.max_iter = max_iter
self.sorting = sorting
self.verbose = verbose
self.r = r
self.eta = eta
self.partition = partition
self.partition_rate = partition_rate
self.temp_symbols = None
self.symbols_ = None
self.auto_digitize = auto_digitize
self.d_norm = None
self.d_shape = None
self.eta = None # The coefficient applied to auto digitization. Needless to specify.
self.fillna = fillna
self.random_state = random_state
self.recap_shape = None
self.last_dim = last_dim
self.stack_last_dim = False
self.new_shape = None
self.trim_method = trim_method
[docs]
def fit(self, series, n_jobs=-1, alphabet_set=0):
"""
Fitted the numerical series.
Parameters
----------
series - numpy.ndarray, 2-dimension or 1-dimension
Univariate or multivariate time series
n_jobs - int, default=-1
The mumber of processors used for parallelism. When n_jobs < 0, use all of available
processors the machine allows. Note: For the univariate time series, if n_jobs = 1,
PABBA will degenerate to fABBA, but the result may be diferent since PABBA use
aggregated groups starting points for reconstruction instead of aggregated groups
centers.
alphabet_set - int or list, default=0
The list of alphabet letter. Here provide two different kinds of alphabet letters, namely 0 and 1.
"""
self.pieces = self.parallel_compress(series, n_jobs=n_jobs)
self.string_ = self.digitize(series, self.pieces, alphabet_set, n_jobs)
return self
[docs]
def parallel_compress(self, series, n_jobs=-1):
"""
Compress the numerical series in a parallel manner.
Parameters
----------
series - numpy.ndarray, 2-dimension or 1-dimension
Univariate or multivariate time series
n_jobs - int, default=-1
The mumber of processors used for parallelism. When n_jobs < 0, use all of available
processors the machine allows. Note: For the univariate time series, if n_jobs = 1,
PABBA will degenerate to fABBA, but the result may be diferent since PABBA use
aggregated groups starting points for reconstruction instead of aggregated groups
centers.
"""
len_ts = len(series)
n_jobs = self.n_jobs_init(n_jobs, _max=len_ts)
if isinstance(series, np.ndarray):
if len(series.shape) == 1:
uni_dim = True
else:
uni_dim = False
elif isinstance(series, list):
if not isinstance(series[0], list):
uni_dim = True
else:
uni_dim = False
else:
raise ValueError('Please enter time series with correct shape.')
if uni_dim:
series = np.asarray(series)
if series.dtype != 'float64':
series = np.asarray(series).astype('float64')
# Partition time series for parallelism (for n_jobs > 1 or = -1) if it is univarite
self.return_series_univariate = True # means the series is univariate,
# so the reconstruction can automatically
# determine if should return the univariate series.
if self.partition == None:
if self.partition_rate == None:
partition = n_jobs
else:
partition = int(np.round(np.exp(1/self.partition_rate), 0))*n_jobs
if partition > len_ts:
warnings.warn("Partition has exceed the maximum length of series.")
partition = len_ts
else:
if self.partition < len_ts:
partition = self.partition
if n_jobs > partition: # to prevent useless processors
n_jobs = partition
else:
warnings.warn("Partition has exceed the maximum length of series.")
partition = n_jobs
# for i in range(partition,0,-1):
# if len_ts % i == 0:
interval = int(len_ts / partition)
series = np.vstack([series[i*interval : (i+1)*interval] for i in range(partition)])
if self.verbose:
if partition != 1:
print("Partition series into {} parts".format(partition))
print("Init {} processors.".format(n_jobs))
else:
self.return_series_univariate = False
if isinstance(series, np.ndarray):
if len(series.shape) > 2:
series, self.recap_shape, self.new_shape = flatten_to_2d_keep_last(series, self.last_dim, verbose=self.verbose)
self.stack_last_dim= True
elif isinstance(series, list):
pass
pieces = list()
self.start_set = list()
p = Pool(n_jobs)
self.start_set = [ts[0] for ts in series]
pieces = [p.apply_async(compress, args=(fillna(np.asarray(ts).astype(np.double), self.fillna), self.tol, self.max_len)) for ts in series]
p.close()
p.join()
pieces = [p.get() for p in pieces]
return pieces
[docs]
def digitize(self, series, pieces, alphabet_set=0, n_jobs=-1):
""" Digitization
Parameters
----------
pieces - numpy.ndarray
The compressed pieces of numpy.ndarray with shape (n_samples, n_features) after compression
len_ts - int
The length of time series.
num_pieces - int
The number of pieces.
init - str
Use aggregation, fast-kmeans or kmeans for digitization to get symbols.
alphabet_set - int or list, default=0
The list of alphabet letter. Here provide two different kinds of alphabet letters, namely 0 and 1.
"""
len_ts = len(series)
if len(series.shape) > 1:
sum_of_length = np.prod(series.shape)
if self.eta is None: self.eta = 3
else:
sum_of_length = len_ts
if self.eta is None: self.eta = 3
num_pieces = list()
for i in range(len(pieces)):
num_pieces.append(len(pieces[i]))
pieces = np.vstack(pieces)[:,:2]
self._std = np.std(pieces, axis=0)
if self._std[0] == 0: # prevent zero-division
self._std[0] = 1
if self._std[1] == 0:
self._std[1] = 1
if self.scl == 0:
len_pieces = pieces[:,0]
pieces = pieces * np.array([self.scl, 1]) / self._std
max_k = np.unique(pieces[:,:2],axis=0).shape[0]
N = pieces[:,:2].shape[0]
if self.init == 'agg':
if self.auto_digitize:
# self.alpha = ( (20 * (sum_of_length - N) * self.tol**2) / (N * (self.eta**4) * sum_of_length**2) ) ** 0.25
self.alpha = (30 * (sum_of_length - N) * self.tol**2 / (self.eta**4 * sum_of_length))**0.25
print(f"auto-digitization: alpha={self.alpha}")
labels, splist = aggregate(pieces, self.sorting, self.alpha)
splist = np.array(splist)
centers = splist[:,3:5] * self._std / np.array([self.scl, 1])
self.k = centers.shape[0]
elif self.init == 'f-kmeans':
if self.k > max_k:
self.k = max_k
warnings.warn("k is larger than the unique pieces size, so k reduces to unique pieces size.")
with parallel_backend('threading', n_jobs=n_jobs):
kmeans = sampledKMeansInter(n_clusters=self.k,
r=self.r,
init='k-means++',
max_iter=self.max_iter,
random_state=self.random_state)
kmeans.sampled_fit(pieces)
labels = kmeans.labels_
centers = kmeans.cluster_centers_ * self._std / np.array([self.scl, 1])
elif self.init == 'gpu-kmeans':
if check_faiss_installation():
from .gpu_kmeans import faiss_kmeans_cluster
centers , labels = faiss_kmeans_cluster(pieces, self.k, self.max_iter)
centers = centers * self._std / np.array([self.scl, 1])
splist = None
else:
warnings.warn("PyTorch is not installed or not properly configured for GPU K-means, falling back to CPU K-means.")
with parallel_backend('threading', n_jobs=n_jobs):
kmeans = KMeans(n_clusters=self.k, n_init="auto", max_iter=self.max_iter, random_state=0).fit(pieces)
labels = kmeans.labels_
centers = kmeans.cluster_centers_ * self._std / np.array([self.scl, 1])
splist = None
else: # default => 'kmeans'
if self.k >= max_k:
self.k = max_k
warnings.warn("k is larger than the unique pieces size, so k reduces to unique pieces size.")
with parallel_backend('threading', n_jobs=n_jobs):
kmeans = KMeans(n_clusters=self.k, n_init="auto", max_iter=self.max_iter, random_state=0).fit(pieces)
labels = kmeans.labels_
centers = kmeans.cluster_centers_ * self._std / np.array([self.scl, 1])
splist = None
if self.scl == 0:
centers[:, 0] = one_D_centers(len_pieces, labels, self.k)
string, alphabets = symbolsAssign(labels, alphabet_set)
self.parameters = Model(centers, alphabets)
self.num_grp = self.parameters.centers.shape[0]
if self.verbose:
print("Total number of pieces: {}".format(N))
print("Generate {} symbols".format(self.num_grp))
string_sequences = self.string_separation(string, num_pieces)
return string_sequences
[docs]
def piece_to_symbol(self, piece):
"""
Transform a piece to symbol.
Parameters
----------
piece: numpy.ndarray
A piece from compression pieces.
"""
splabels = np.argmin(np.linalg.norm(self.parameters.centers - piece, ord=2, axis=1))
return self.parameters.alphabets[splabels]
[docs]
def recast_shape(self, reconstruct_list):
"""Reshape the multiarray to the same shape of the input, the shape might be expanded or squeezed."""
size_list = [len(i) for i in reconstruct_list]
fixed_len = np.prod(self.recap_shape[1:])
if fixed_len > np.max(size_list):
warnings.warn('The reconstructed shape has been expanded.')
elif fixed_len < np.max(size_list):
warnings.warn('The reconstructed shape has been squeezed.')
org_size = len(reconstruct_list)
if self.recap_shape is not None:
reconstruct_list.append(fixed_len * [-1])
pad_token = [np.mean(i) for i in reconstruct_list]
padded = zip(*zip_longest(*reconstruct_list, fillvalue=pad_token))
padded = list(padded)
padded = np.asarray(padded)
padded = padded[:org_size, :fixed_len].reshape(-1, *self.recap_shape[1:])
else:
print(f"""Please ensure your fitted series (not this function input) is numpy.ndarray type with dimensions > 2.""")
return padded
[docs]
def string_separation(self, symbols, num_pieces):
"""
Separate symbols into symbolic subsequence.
"""
string_sequences = list()
num_pieces_csum = np.cumsum([0] + num_pieces)
for index in range(len(num_pieces)):
string_sequences.append(symbols[num_pieces_csum[index]:num_pieces_csum[index+1]])
return string_sequences
[docs]
def n_jobs_init(self, n_jobs=-1, _max=np.inf):
"""
Initialize parameter n_jobs.
"""
if n_jobs > _max:
n_jobs = _max
warnings.warn("n_jobs init warning, 'n_jobs' is set to maximum {}.".format(n_jobs))
if not isinstance(n_jobs,int):
raise TypeError('Expected a int type.')
if n_jobs == 0:
raise ValueError(
"Please feed an correct value for n_jobs.")
if n_jobs == None or n_jobs == -1:
cpu_affinity = get_cpu_affinity()
if cpu_affinity is not None:
n_jobs = len(cpu_affinity)
else:
n_jobs = 1
# int(mp.cpu_count()) , return the available usable CPUs
else:
n_jobs = n_jobs
return n_jobs
@property
def tol(self):
return self._tol
@tol.setter
def tol(self, value):
if not isinstance(value, float) and not isinstance(value,int):
raise TypeError("Expected a float or int type.")
if value <= 0:
raise ValueError(
"Please feed an correct value for tolerance.")
if value > 1:
warnings.warn("Might lead to bad aggregation.", DeprecationWarning)
self._tol = value
@property
def sorting(self):
return self._sorting
@sorting.setter
def sorting(self, value):
if not isinstance(value, str):
raise TypeError("Expected a string type")
if value not in ["lexi", "2-norm", "1-norm", "norm", "pca"]:
raise ValueError(
"Please refer to an correct sorting way, namely 'lexi', '2-norm' and '1-norm'.")
self._sorting = value
@property
def alpha(self):
return self._alpha
@alpha.setter
def alpha(self, value):
if value != None:
if not isinstance(value, float) and not isinstance(value,int):
raise TypeError("Expected a float or int type.")
if value <= 0:
raise ValueError(
"Please feed an correct value for alpha.")
self._alpha = value
@property
def max_len(self):
return self._max_len
@max_len.setter
def max_len(self, value):
if value != np.inf:
if not isinstance(value, float) and not isinstance(value,int):
raise TypeError("Expected a float or int type.")
if value <= 0:
raise ValueError(
"Please feed an correct value for max_len.")
self._max_len = value
@property
def partition(self):
return self._partition
@partition.setter
def partition(self, value):
if value != None:
if not isinstance(value, float) and not isinstance(value,int):
raise TypeError("Expected int type.")
if value <= 0:
raise ValueError(
"Please feed an correct value for partition.")
self._partition = value
@property
def partition_rate(self):
return self._partition_rate
@partition_rate.setter
def partition_rate(self, value):
if value != None:
if not isinstance(value, float) and not isinstance(value,int):
raise TypeError("Expected float or int type.")
if value > 1 or value < 0:
raise ValueError(
"Please feed an correct value for partition_rate.")
self._partition_rate = value
@property
def scl(self):
return self._scl
@scl.setter
def scl(self, value):
if not isinstance(value, float) and not isinstance(value,int):
raise TypeError("Expected float or int type.")
if value < 0:
raise ValueError(
"Please feed an correct value for scl.")
self._scl = value
@property
def eta(self):
return self._eta
@eta.setter
def eta(self, value):
if value is not None:
if not isinstance(value, float) and not isinstance(value,int):
raise TypeError("Expected float or int type.")
if value == 0:
raise ValueError(
"Please feed an correct value for eta.")
self._eta = value
@property
def k(self):
return self._k
@k.setter
def k(self, value):
if not isinstance(value,int):
raise TypeError("Expected int type.")
if value == 0:
raise ValueError(
"Please feed an correct value for k.")
self._k = value
class fastABBA(object):
def __init__(self, tol=0.2, k=2, r=0.5, scl=1, random_state=2022, n_jobs=1,
alphabet_set=0, max_len=np.inf, max_iter=2, verbose=True):
self.tol = tol
self.k = k
self.r = r
self.scl = scl
self.max_len = max_len
self.random_state = random_state
self.verbose = verbose
self.alphabet_set = alphabet_set
self.kmeans = None
self.max_iter = max_iter
self.n_jobs = n_jobs
def transform(self, series):
series = np.array(series)
pieces = self._compress(series)
return self._digitize(pieces)
def predict(self, series):
if self.kmeans is not None:
series = np.array(series)
pieces = self._compress(series)
pieces = np.array(pieces)[:,:2]
pieces = pieces * np.array([self.scl, 1]) / self._std
with parallel_backend('threading', n_jobs=self.n_jobs):
labels = self.kmeans.predict(pieces)
return self.parameters.alphabets[labels]
else:
raise ValueError("Please train the model first.")
def _compress(self, series):
return compress(series, tol=self.tol, max_len=self.max_len)
def _digitize(self, pieces):
pieces = np.array(pieces)[:,:2]
max_k = np.unique(pieces, axis=0).shape[0]
if self.k > max_k:
self.k = max_k
warnings.warn("k is larger than the unique pieces size, so k reduces to unique pieces size.")
self._std = np.std(pieces, axis=0)
if self._std[0] == 0: # prevent zero-division
self._std[0] = 1
if self._std[1] == 0:
self._std[1] = 1
if self.scl == 0:
len_pieces = pieces[:, 0]
pieces = pieces * np.array([self.scl, 1]) / self._std
with parallel_backend('threading', n_jobs=self.n_jobs):
self.kmeans = sampledKMeansInter(n_clusters=self.k,
r=self.r,
init='k-means++',
max_iter=self.max_iter,
random_state=self.random_state)
labels = self.kmeans.sampled_fit_predict(pieces[:,:2])
if self.scl != 0:
centers = self.kmeans.cluster_centers_ * self._std / np.array([self.scl, 1])
else:
centers = self.kmeans.cluster_centers_ * self._std
centers[:, 0] = one_D_centers(len_pieces, labels, self.k)
self.string_, alphabets = symbolsAssign(labels, self.alphabet_set)
self.parameters = Model(centers, alphabets)
self.num_grp = self.parameters.centers.shape[0]
if self.verbose:
print("Generate {} symbols".format(self.num_grp))
return self.string_
def inverse_transform(self, strings, start):
return inv_transform(strings, self.parameters.centers, self.parameters.alphabets.tolist(), start)
def one_D_centers(data, labels, k): # for scl = 0
centers = np.zeros(k)
for clust in np.unique(labels):
centers[clust] = np.mean(data[labels == clust])
return centers
# data = np.random.randn(10)
# labels = np.array([1,1,1,0,1,1,1,0,0,0])
# one_D_centers(data, labels)
def dtw(x, y, *, dist=lambda a, b: (a-b)*(a-b), return_path=False, filter_redundant=False):
"""
Compute dynamic time warping distance between two time series x and y.
Parameters
----------
x - list
First time series.
y - list
Second time series.
dist - lambda function
Lambda function defining the distance between two points of the time series.
By default we use (x-y)^2 to correspond to literature standard for
dtw. Note final distance d should be square rooted.
return_path - bool
Option to return tuple (d, path) where path is a list of tuples outlining
the route through time series taken to compute dtw distance.
filter_redundant - bool
Control filtering to remove `redundant` time series due to sampling
resolution. For example, if x = [0, 1, 2, 3, 4] and y = [0, 4]. The dynamic
time series distance is non-zero. If filter_redundant=True then we remove
the middle 3 time points from x where gradient is constant.
Returns
-------
d - numpy float
Summation of the dist(x[i], y[i]) along the optimal path to minimise overall
distance. Standard dynamic time warping distance given by default dist and
d**(0.5).
path - list
Path taken through time series.
"""
x = np.array(x)
y = np.array(y)
if filter_redundant:
# remove points
if len(x) > 2:
xdiff = np.diff(x)
x_keep = np.abs(xdiff[1:] - xdiff[0:-1]) >= 1e-14
x = x[np.hstack((True, x_keep, True))]
else:
x_keep = []
if len(y) > 2:
ydiff = np.diff(y)
y_keep = np.abs(ydiff[1:] - ydiff[0:-1]) >= 1e-14
y = y[np.hstack((True, y_keep, True))]
else:
y_keep = []
len_x, len_y = len(x), len(y)
window = [(i+1, j+1) for i in range(len_x) for j in range(len_y)]
D = defaultdict(lambda: (float('inf'),))
if return_path:
if filter_redundant:
x_ind = np.arange(1, len(x_keep)+1)
y_ind = np.arange(1, len(y_keep)+1)
x_ind = np.hstack((0, x_ind[x_keep], len(x_keep)+1))
y_ind = np.hstack((0, y_ind[y_keep], len(y_keep)+1))
else:
x_ind = np.arange(len(x))
y_ind = np.arange(len(y))
D[0, 0] = (0, 0, 0)
for i, j in window:
dt = dist(x[i-1], y[j-1])
D[i, j] = min((D[i-1, j][0]+dt, i-1, j), (D[i, j-1][0]+dt, i, j-1),
(D[i-1, j-1][0]+dt, i-1, j-1), key=lambda a: a[0])
path = []
i, j = len_x, len_y
while not (i == j == 0):
path.append((x_ind[i-1], y_ind[j-1]))
i, j = D[i, j][1], D[i, j][2]
path.reverse()
return (D[len_x, len_y][0], path)
else:
D[0, 0] = 0
for i, j in window:
dt = dist(x[i-1], y[j-1])
D[i, j] = min(D[i-1, j]+dt, D[i, j-1]+dt, D[i-1, j-1]+dt)
return D[len_x, len_y]
def fillna(series, method='ffill'):
"""Fill the NA values
Parameters
----------
series - numpy.ndarray or list
Time series of the shape (1, n_samples).
fillna - str, default = 'zero'
Fill NA/NaN values using the specified method.
'zero': Fill the holes of series with value of 0.
'mean': Fill the holes of series with mean value.
'median': Fill the holes of series with mean value.
'ffill': Forward last valid observation to fill gap.
If the first element is nan, then will set it to zero.
'bfill': Use next valid observation to fill gap.
If the last element is nan, then will set it to zero.
"""
if method == 'mean':
series[np.isnan(series)] = np.mean(series[~np.isnan(series)])
elif method == 'median':
series[np.isnan(series)] = np.median(series[~np.isnan(series)])
elif method == 'ffill':
for i in np.where(np.isnan(series))[0]:
if i > 0:
series[i] = series[i-1]
else:
series[i] = 0
elif method == 'bfill':
for i in sorted(np.where(np.isnan(series))[0], reverse=True):
if i < len(series):
series[i] = series[i+1]
else:
series[i] = 0
else:
series[np.isnan(series)] = 0
return series
def zip_longest(*iterables, fillvalue=None):
# zip_longest('ABCD', 'xy', fillvalue='-') → Ax By C- D-
iterators = list(map(iter, iterables))
num_active = len(iterators)
if not num_active:
return
while True:
values = []
for i, iterator in enumerate(iterators):
try:
value = next(iterator)
except StopIteration:
num_active -= 1
if not num_active:
return
iterators[i] = itertools.repeat(fillvalue[i])
value = fillvalue[i]
values.append(value)
yield tuple(values)