diff --git a/sigllm/core.py b/sigllm/core.py index 0008002..1059d94 100644 --- a/sigllm/core.py +++ b/sigllm/core.py @@ -15,7 +15,7 @@ LOGGER = logging.getLogger(__name__) INTERVAL_PRIMITIVE = 'mlstars.custom.timeseries_preprocessing.time_segments_aggregate#1' -DECIMAL_PRIMITIVE = 'sigllm.primitives.transformation.Float2Scalar#1' +FLOAT2SCALAR_PRIMITIVE = 'sigllm.primitives.transformation.Float2Scalar#1' WINDOW_SIZE_PRIMITIVE = 'sigllm.primitives.forecasting.custom.rolling_window_sequences#1' @@ -35,8 +35,12 @@ class SigLLM(Orion): * A ``dict`` with an ``MLPipeline`` specification. interval (int): Number of time points between one sample and another. + strategy (str): + Discretization strategy: 'scaling' or 'binning'. Default to 'binning'. decimal (int): - Number of decimal points to keep from the float representation. + Number of decimal points to keep (scaling strategy only). + n_clusters (int): + Number of clusters for binning (binning strategy only). window_size (int): Size of the input window. hyperparameters (dict): @@ -46,7 +50,7 @@ class SigLLM(Orion): DEFAULT_PIPELINE = 'mistral_detector' def _augment_hyperparameters(self, primitive, key, value): - if not value: + if value is None: return if self._hyperparameters is None: @@ -61,7 +65,9 @@ def __init__( self, pipeline: Union[str, dict, MLPipeline] = None, interval: int = None, + strategy: str = None, decimal: int = None, + n_clusters: int = None, window_size: int = None, hyperparameters: dict = None, ): @@ -71,11 +77,15 @@ def __init__( self._fitted = False self.interval = interval + self.strategy = strategy self.decimal = decimal + self.n_clusters = n_clusters self.window_size = window_size self._augment_hyperparameters(INTERVAL_PRIMITIVE, 'interval', interval) - self._augment_hyperparameters(DECIMAL_PRIMITIVE, 'decimal', decimal) + self._augment_hyperparameters(FLOAT2SCALAR_PRIMITIVE, 'strategy', strategy) + self._augment_hyperparameters(FLOAT2SCALAR_PRIMITIVE, 'decimal', decimal) + self._augment_hyperparameters(FLOAT2SCALAR_PRIMITIVE, 'n_clusters', n_clusters) self._augment_hyperparameters(WINDOW_SIZE_PRIMITIVE, 'window_size', window_size) def __repr__(self): diff --git a/sigllm/primitives/jsons/sigllm.primitives.transformation.Float2Scalar.json b/sigllm/primitives/jsons/sigllm.primitives.transformation.Float2Scalar.json index 3bdaae2..2f3bbc9 100644 --- a/sigllm/primitives/jsons/sigllm.primitives.transformation.Float2Scalar.json +++ b/sigllm/primitives/jsons/sigllm.primitives.transformation.Float2Scalar.json @@ -34,17 +34,21 @@ "type": "ndarray" }, { - "name": "minimum", - "type": "float" - }, - { - "name": "decimal", - "type": "int" + "name": "metadata", + "type": "dict" } ] }, "hyperparameters": { "fixed": { + "strategy": { + "type": "str", + "default": "scaling" + }, + "n_clusters": { + "type": "int", + "default": 100 + }, "decimal": { "type": "int", "default": 2 diff --git a/sigllm/primitives/jsons/sigllm.primitives.transformation.Scalar2Float.json b/sigllm/primitives/jsons/sigllm.primitives.transformation.Scalar2Float.json index 2aefbaf..7fe7bec 100644 --- a/sigllm/primitives/jsons/sigllm.primitives.transformation.Scalar2Float.json +++ b/sigllm/primitives/jsons/sigllm.primitives.transformation.Scalar2Float.json @@ -19,14 +19,8 @@ "type": "ndarray" }, { - "name": "minimum", - "type": "float", - "default": 0 - }, - { - "name": "decimal", - "type": "int", - "default": 2 + "name": "metadata", + "type": "dict" } ], "output": [ diff --git a/sigllm/primitives/transformation.py b/sigllm/primitives/transformation.py index 5965861..64d0c25 100644 --- a/sigllm/primitives/transformation.py +++ b/sigllm/primitives/transformation.py @@ -5,6 +5,7 @@ import numpy as np +from sklearn.cluster import KMeans def format_as_string(X, sep=',', space=False, single=False): """Format X to a list of string. @@ -126,61 +127,109 @@ def format_as_integer(X, sep=',', trunc=None, errors='ignore'): class Float2Scalar: - """Convert an array of float values to scalar. + """Convert an array of float values to scalar.""" - Transforms an array of floats to an array integers. With the - option to rescale such that the minimum value becomes zero - and you can keep certain decimal points. - - 1.05, 2., 3.1, 4.8342, 5, 0 -> 105, 200, 310, 483, 500, 0 - - Args: - decimal (int): - Number of decimal points to keep from the float representation. Default to `2`. - rescale (bool): - Whether to rescale the array such that the minimum value becomes 0. Default to `True`. - """ - - def __init__(self, decimal=2, rescale=True): + def __init__(self, strategy='scaling', n_clusters=100, decimal=2, rescale=True): + self.strategy = strategy + self.n_clusters = n_clusters self.decimal = decimal self.rescale = rescale self.minimum = None + self.centroids = None + self.labels = None def fit(self, X): - """Learn minimum value in fit data.""" - self.minimum = np.min(X) + """Learn parameters from data. + + For scaling: learns the minimum value. + For binning: learns K-means cluster centroids. + """ + if self.strategy == 'scaling': + self.minimum = np.min(X) + elif self.strategy == 'binning': + centroids_list = [] + labels = [] + for col in X.T: + if self.n_clusters >= len(np.unique(col)): + centroids = np.unique(col) + else: + kmeans = KMeans(n_clusters=self.n_clusters, random_state=0) + kmeans.fit(col.reshape(-1, 1)) + centroids = np.sort(kmeans.cluster_centers_.ravel()) + + col_labels = np.argmin(np.abs(col[:, None] - centroids[None, :]), axis=1) + + labels.append(col_labels) + centroids_list.append(centroids) + + self.labels = np.column_stack(labels) + self.centroids = centroids_list + else: + raise ValueError(f"Unknown strategy '{self.strategy}'. Use 'scaling' or 'binning'.") def transform(self, X): """Transform data.""" - if self.rescale: - X = X - self.minimum - - sign = 1 * (X >= 0) - 1 * (X < 0) - values = np.abs(X) - - values = sign * (values * 10**self.decimal).astype(int) - - return values, self.minimum, self.decimal + print(f"[Float2Scalar] Using strategy: {self.strategy}") + if self.strategy == 'scaling': + if self.rescale: + X = X - self.minimum + + sign = 1 * (X >= 0) - 1 * (X < 0) + values = np.abs(X) + + values = sign * np.round(values * 10**self.decimal).astype(int) + + metadata = { + 'strategy': 'scaling', + 'minimum': self.minimum, + 'decimal': self.decimal + } + return values, metadata + + elif self.strategy == 'binning': + # Re-fit to get labels for this X (transform is same as fit for binning) + self.fit(X) + metadata = { + 'strategy': 'binning', + 'centroids': self.centroids + } + return self.labels, metadata + + else: + raise ValueError(f"Unknown strategy '{self.strategy}'. Use 'scaling' or 'binning'.") class Scalar2Float: """Convert an array of integer values to float. - Transforms an array of integers to an array floats. - Shift values by minimum and include a predetermined - number of decimal points. - - 105, 200, 310, 483, 500, 0 -> 1.05, 2., 3.1, 4.8342, 5, 0 - - Args: - minimum (float): - Bias to shift the data. Captured from Float2Scalar. - decimal (int): - Number of decimal points to keep from the float representation. Default to `2`. + Transforms an array of integers back to floats using the metadata from Float2Scalar. + + - 'scaling': Divide by 10^decimal and add minimum offset. + Example: 105, 200, 310, 483, 500, 0 -> 1.05, 2., 3.1, 4.83, 5, 0 + + - 'binning': Map cluster indices back to centroid values. """ - def transform(self, X, minimum=0, decimal=2): - """Convert data from integer to float.""" - values = X * 10 ** (-decimal) - - return values + minimum + def transform(self, X, metadata): + """Transform data.""" + strategy = metadata.get('strategy', 'binning') + print(f"[Scalar2Float] Using strategy: {strategy}") + print(f"[Scalar2Float] Full metadata: {metadata}") + + if strategy == 'scaling': + minimum = metadata.get('minimum', 0) + decimal = metadata.get('decimal', 2) + values = X * 10 ** (-decimal) + return values + minimum + + elif strategy == 'binning': + centroids = metadata.get('centroids') + if centroids is None: + raise ValueError("centroids must be provided in metadata for binning strategy") + base_centroids = np.asarray(centroids[0]) + idx = np.clip(X.astype(int), 0, len(base_centroids) - 1) + X_pred = np.take(base_centroids, idx) + return X_pred + + else: + raise ValueError(f"Unknown strategy '{strategy}'. Use 'scaling' or 'binning'.")