Source code for honeio.integrations.sklearn.qcmlsklearn

"""Sklearn wrapper for QCML models."""

from collections.abc import Callable
from typing import Self, TypedDict, Unpack

import numpy as np
import torch
from honeio.layers._killswitch import set_killswitch
from honeio.layers.dropout import DropoutLayer
from honeio.layers.general import PytorchGeneralHSM
from honeio.layers.weighted import WeightedLayer
from sklearn.base import BaseEstimator, ClassifierMixin, RegressorMixin
from sklearn.preprocessing import StandardScaler
from torch.utils.data import DataLoader, TensorDataset


def _init_weighted_layer(
    *,
    hilbert_space_dim: int,
    input_operator_count: int,
    output_operator_count: int,
    device: str = torch.device("cpu"),
    groups: list[list[int]] | None = None,
) -> WeightedLayer:
    hsm_layer = PytorchGeneralHSM(
        hilbert_space_dims=hilbert_space_dim,
        input_operator_count=input_operator_count,
        output_operator_count=output_operator_count,
        device=device,
    )
    return WeightedLayer(
        hsm_layer=hsm_layer,
        groups=groups,
        device=device,
    )


class ModelParameters(TypedDict):
    """Model parameters."""

    hilbert_space_dim: int
    epochs: int
    random_state: int
    lr: float
    weights_lr: float
    loss: str
    device: str
    batch_size: int | None
    groups: list[list[int]] | None
    dropout_rate: float
    input_operator_count: int
    output_operator_count: int
    classes_: list


class SaveModelKwargs(TypedDict):
    """Arguments for initializing the weighted layer."""

    scaler: StandardScaler
    weighted_layer_state_dict: dict
    model_parameters: ModelParameters


SaveModelFn = Callable[[Unpack[SaveModelKwargs]], None]
"""Function to save the model.

The function should take the target scaler and the state
dictionary of the weighted layer.
"""

LoadStatesFn = Callable[[], tuple[StandardScaler, dict, ModelParameters]]
"""Function to load the model states.

The function should return the target scaler, the state
dictionary of the weighted layer and the model parameters.
"""


class QCMLBase(BaseEstimator):
    """Scikit-learn wrapper base class for QCML models."""

    save_model_fn: SaveModelFn | None = None
    optimizer: torch.optim.Optimizer

    def _train_loop(
        self,
        X: np.ndarray,
        y: np.ndarray,
    ) -> Self:
        # Initialize the optimizer.
        self.optimizer = torch.optim.Adam(
            params=[
                {
                    "params": [
                        p
                        for n, p in self.weighted_layer.named_parameters()
                        if not n.endswith("weights")
                    ]
                },
                {
                    "params": [
                        p
                        for n, p in self.weighted_layer.named_parameters()
                        if n.endswith("weights")
                    ],
                    "lr": self.weights_lr,
                },
            ],
            lr=self.lr,
            amsgrad=True,
            **(dict(betas=self.opt_betas) if self.opt_betas is not None else {}),
        )

        # Set the model to training mode.
        self.weighted_layer.train(True)

        # Fit the model.
        if self.batch_size and self.batch_size > 0:
            self._fit_batch(X, y)
        else:
            self._fit_no_batch(X, y)

        return self

    def _fit_no_batch(
        self,
        X: np.ndarray,
        y: np.ndarray,
    ) -> None:
        X_tensor = torch.tensor(X, dtype=torch.float32, device=self.device)
        y_tensor = torch.tensor(y, dtype=torch.float32, device=self.device)
        for _ in range(self.epochs):
            outputs = self.weighted_layer(X_tensor)
            loss = self.loss_fn(outputs, y_tensor)
            loss.backward()
            self.optimizer.step()
            self.optimizer.zero_grad()

    def _fit_batch(
        self,
        X: np.ndarray,
        y: np.ndarray,
        use_dataloader: bool = False,
    ) -> None:
        # Using dataloader can make it really slow for large batch_size since it calls `__getitem__` row-by-row and
        # then concatenates into a single tensor. On the other hand, dataloader may provide better memory management
        # and speed performance in some use-cases when using `num_workers > 0`. In most cases we use
        # `use_dataloader=False` but keep dataloader code for future reference.
        if use_dataloader:
            dataset = TensorDataset(
                torch.tensor(X, dtype=torch.float32, device=self.device),
                torch.tensor(y, dtype=torch.float32, device=self.device),
            )
            dataloader = DataLoader(
                dataset,
                batch_size=self.batch_size,
                shuffle=True,
            )
        else:
            shuffle_index = torch.randperm(X.shape[0])
            X_shuffled = torch.tensor(
                X[shuffle_index], dtype=torch.float32, device=self.device
            )
            y_shuffled = torch.tensor(
                y[shuffle_index], dtype=torch.float32, device=self.device
            )
            dataloader = list(
                zip(
                    torch.split(X_shuffled, self.batch_size),
                    torch.split(y_shuffled, self.batch_size),
                    strict=False,
                )
            )

        for _ in range(self.epochs):
            for batch_X, batch_y in dataloader:
                outputs = self.weighted_layer(batch_X)
                loss = self.loss_fn(outputs, batch_y)
                loss.backward()
                self.optimizer.step()
                self.optimizer.zero_grad()

        self._is_fitted = True


[docs] class QCMLRegressor(QCMLBase, RegressorMixin): """Scikit-learn wrapper for QCML Regression models.""" loss_fn: torch.nn.MSELoss | torch.nn.L1Loss | torch.nn.SmoothL1Loss
[docs] def __init__( self, *, hilbert_space_dim: int = 8, epochs: int = 1000, random_state: int = 0, lr: float = 0.1, weights_lr: float | None = None, opt_betas: tuple[float, float] | None = None, loss: str = "L1", groups: list[list[int]] | None = None, device: str = "cpu", batch_size: int | None = None, dropout_rate: float = 0.0, ) -> None: """ Initialize the QCMLRegressor. Parameters ---------- hilbert_space_dim : int, optional The dimension of the Hilbert space, by default 8 epochs : int, optional The number of epochs for training, by default 1000 random_state : int, optional The random seed, by default 0 lr : float, optional The learning rate, by default 0.1 weights_lr : float, optional The learning rate for the weight layer, by default None which will use the same learning rate as `lr`. opt_betas : tuple[float, float], optional The betas for the optimizer, by default None which will use (0.9, 0.999) loss : str, optional The loss function to use, by default 'L1' Options: 'L1', 'L2', 'SmoothL1' groups : list[list[int]] | None, optional The indices of groups of input features that should be assigned the same weight in weight layer. This is a list of lists, where each list contains the indices of the inputs in that group. This can be useful for one hot encoded features where you may want to assign same weight to all categories. If None, all input weights are learned independently. By default, None device : str, optional The device to use for training, by default 'cpu' batch_size : int, optional The batch size for training, by default None. If None or -1, no batching is performed. dropout_rate : float, optional The dropout rate for the model, by default 0.0. If 0.0, no dropout is applied. """ self.hilbert_space_dim = hilbert_space_dim self.epochs = epochs self.random_state = random_state self.lr = lr self.weights_lr = weights_lr if weights_lr is not None else lr self.opt_betas = opt_betas self.device = device self.loss = loss self.batch_size = batch_size self.groups = groups self.dropout_rate = dropout_rate self.save_model_fn: SaveModelFn | None = None self._is_fitted: bool = False # If the model has been trained or loaded # these will be set. self.input_operator_count: int | None = None self.output_operator_count: int | None = None set_killswitch(self, on=["number_of_rows"])
def _fit_transform_targets( self, y: np.ndarray, ) -> np.ndarray: self.scaler = StandardScaler() y = self.scaler.fit_transform(y) return y def _fit( self, X: np.ndarray, y: np.ndarray, ) -> Self: # Default weighted layer initialization. self.input_operator_count = X.shape[1] self.output_operator_count = y.shape[1] if y.ndim > 1 else 1 assert self.input_operator_count is not None assert self.output_operator_count is not None self.weighted_layer = _init_weighted_layer( hilbert_space_dim=self.hilbert_space_dim, input_operator_count=self.input_operator_count, output_operator_count=self.output_operator_count, device=self.device, groups=self.groups, ) if self.dropout_rate > 0.0: self.weighted_layer = DropoutLayer( underlying_layer=self.weighted_layer, dropout_rate=self.dropout_rate, device=self.device, ) # Initialize the loss function if self.loss == "L2": self.loss_fn = torch.nn.MSELoss() elif self.loss == "L1": self.loss_fn = torch.nn.L1Loss() elif self.loss == "SmoothL1": self.loss_fn = torch.nn.SmoothL1Loss() else: raise ValueError(f"Unsupported loss function: {self.loss}") return self._train_loop(X, y)
[docs] def fit(self, X: np.ndarray, y: np.ndarray) -> "QCMLRegressor": """Fit Method. ---------- X : np.ndarray The input features. y : np.ndarray The target values. Returns ------- QCMLRegressor The fitted model. """ if y is None: raise ValueError("Target values (y) must be provided and cannot be None.") torch.manual_seed(self.random_state) if y.ndim == 1: y = y.reshape(-1, 1) # === Fit the scaler === y = self._fit_transform_targets(y) # === Fit the model === self._fit(X, y) self._is_fitted = True return self
[docs] def predict(self, X: np.ndarray) -> np.ndarray: """Predict Method. Parameters ---------- X : np.ndarray The input features. Returns ------- np.ndarray The predicted values. """ if not self._is_fitted: raise ValueError("Model has not been fitted yet.") test_data = torch.tensor(X, device=self.device) with torch.no_grad(): self.weighted_layer.eval() y_pred = self.weighted_layer(test_data) if len(y_pred) > 0: y_pred = self.scaler.inverse_transform(y_pred.cpu().detach().numpy()) else: y_pred = y_pred.cpu().detach().numpy() return y_pred
[docs] def save(self) -> None: """Save the model. Needs the save_model_fn to be set. """ if not self.save_model_fn: raise ValueError("Save model function not set.") # Model should have been fitted or loaded if not self._is_fitted: raise ValueError("Model has not been fitted yet.") # Input operator count and output operator count should have been set if not self.input_operator_count or not self.output_operator_count: raise ValueError( "Input operator count and output operator count have not been set." ) self.save_model_fn( # type: ignore scaler=self.scaler, weighted_layer_state_dict=self.weighted_layer.state_dict(), model_parameters={ "hilbert_space_dim": self.hilbert_space_dim, "epochs": self.epochs, "random_state": self.random_state, "lr": self.lr, "weights_lr": self.weights_lr, "loss": self.loss, "device": self.device, "batch_size": self.batch_size, "groups": self.groups, "dropout_rate": self.dropout_rate, "input_operator_count": self.input_operator_count, "output_operator_count": self.output_operator_count, "classes_": [], }, )
[docs] @classmethod def load(cls, load_states_fn: LoadStatesFn) -> Self: """Load the model. This class method instantiate a new QCMLRegressor object and loads the states of the model using a load_states_fn. You can use the `load_states_pickle` function from the hooks module as an example of how to implement a load_states_fn. Parameters ---------- load_states_fn : LoadStatesFn Function to load the states of the model. Returns ------- QCMLRegressor The loaded model. """ scaler, weighted_layer_state_dict, model_parameters = load_states_fn() # Initialize the model with the model parameters from the # deserialized state. model = cls( hilbert_space_dim=model_parameters["hilbert_space_dim"], epochs=model_parameters["epochs"], random_state=model_parameters["random_state"], lr=model_parameters["lr"], weights_lr=model_parameters["weights_lr"], loss=model_parameters["loss"], device=model_parameters["device"], batch_size=model_parameters["batch_size"], groups=model_parameters["groups"], dropout_rate=model_parameters["dropout_rate"], ) # Set the scaler and the weighted layer. model.scaler = scaler # Set the input and output operator count. model.input_operator_count = model_parameters["input_operator_count"] model.output_operator_count = model_parameters["output_operator_count"] model.weighted_layer = _init_weighted_layer( hilbert_space_dim=model.hilbert_space_dim, input_operator_count=model.input_operator_count, output_operator_count=model.output_operator_count, device=model.device, groups=model.groups, ) if model.dropout_rate > 0.0: model.weighted_layer = DropoutLayer( underlying_layer=model.weighted_layer, dropout_rate=model.dropout_rate, device=model.device, ) # Set the fitted flag. model._is_fitted = True # Load the state of the weighted layer. model.weighted_layer.load_state_dict(weighted_layer_state_dict) return model
[docs] class QCMLClassifier(QCMLBase, ClassifierMixin): """Scikit-learn wrapper for QCML Classification models.""" loss_fn: torch.nn.CrossEntropyLoss
[docs] def __init__( self, *, hilbert_space_dim: int = 8, epochs: int = 1000, random_state: int = 0, lr: float = 0.1, weights_lr: float | None = None, opt_betas: tuple[float, float] | None = None, loss: str = "cross_entropy", groups: list[list[int]] | None = None, device: str = "cpu", batch_size: int | None = None, dropout_rate: float = 0.0, ) -> None: """ Initialize the QCMLClassifier. Parameters ---------- hilbert_space_dim : int, optional The dimension of the Hilbert space, by default 8 epochs : int, optional The number of epochs for training, by default 1000 random_state : int, optional The random seed, by default 0 lr : float, optional The learning rate, by default 0.1 weights_lr : float, optional The learning rate for the weight layer, by default None which will use the same learning rate as `lr`. opt_betas : tuple[float, float], optional The betas for the optimizer, by default None which will use (0.9, 0.999) loss : str, optional The loss function to use, by default 'cross_entropy' groups : list[list[int]] | None, optional The indices of groups of input features that should be assigned the same weight in weight layer. This is a list of lists, where each list contains the indices of the inputs in that group. This can be useful for one hot encoded features where you may want to assign same weight to all categories. If None, all input weights are learned independently. By default, None device : str, optional The device to use for training, by default 'cpu' batch_size : int, optional The batch size for training, by default None. If None or -1, no batching is performed. dropout_rate : float, optional The dropout rate for the model, by default 0.0. If 0.0, no dropout is applied. """ self.hilbert_space_dim = hilbert_space_dim self.epochs = epochs self.random_state = random_state self.lr = lr self.weights_lr = weights_lr if weights_lr is not None else lr self.opt_betas = opt_betas self.device = device self.loss = loss self.batch_size = batch_size self.groups = groups self.dropout_rate = dropout_rate self.save_model_fn: SaveModelFn | None = None self._is_fitted: bool = False # If the model has been trained or loaded # these will be set. self.input_operator_count: int | None = None self.output_operator_count: int | None = None self.classes_: list set_killswitch(self, on=["number_of_rows"])
def _fit( self, X: np.ndarray, y: np.ndarray, classes: list | None = None, ) -> Self: if classes is None: classes = np.unique(y) self.classes_ = np.array(classes) class_to_index = {c: i for i, c in enumerate(self.classes_)} y_indices = np.vectorize(class_to_index.get)(y) y_onehot = np.eye(len(self.classes_))[y_indices] # Default weighted layer initialization. self.input_operator_count = X.shape[1] self.output_operator_count = len(self.classes_) assert self.input_operator_count is not None assert self.output_operator_count is not None self.weighted_layer = _init_weighted_layer( hilbert_space_dim=self.hilbert_space_dim, input_operator_count=self.input_operator_count, output_operator_count=self.output_operator_count, device=self.device, groups=self.groups, ) if self.dropout_rate > 0.0: self.weighted_layer = DropoutLayer( underlying_layer=self.weighted_layer, dropout_rate=self.dropout_rate, device=self.device, ) # Initialize the loss function if self.loss == "cross_entropy": self.loss_fn = torch.nn.CrossEntropyLoss() else: raise ValueError(f"Unsupported loss function: {self.loss}") return self._train_loop(X, y_onehot)
[docs] def fit( self, X: np.ndarray, y: np.ndarray, classes: list | None = None ) -> "QCMLClassifier": """Fit Method. ---------- X : np.ndarray The input features. y : np.ndarray The target values. classes : list | None, optional List of possible classes for the target values. If None, the classes will be inferred from the target values. By default, None Returns ------- QCMLClassifier The fitted model. """ if y is None: raise ValueError("Target values (y) must be provided and cannot be None.") torch.manual_seed(self.random_state) # === Fit the model === self._fit(X, y, classes) self._is_fitted = True return self
[docs] def predict_proba(self, X: np.ndarray) -> np.ndarray: """Predict Method. Parameters ---------- X : np.ndarray The input features. Returns ------- np.ndarray The predicted values. """ if not self._is_fitted: raise ValueError("Model has not been fitted yet.") test_data = torch.tensor(X, device=self.device) with torch.no_grad(): self.weighted_layer.eval() y_pred_logits = self.weighted_layer(test_data) y_pred_prob = torch.softmax(y_pred_logits, dim=1) return y_pred_prob.cpu().detach().numpy()
[docs] def predict(self, X: np.ndarray) -> np.ndarray: """Predict Method. Parameters ---------- X : np.ndarray The input features. Returns ------- np.ndarray The predicted values. """ y_pred_prob = self.predict_proba(X) y_pred_indices = np.argmax(y_pred_prob, axis=1) return self.classes_[y_pred_indices]
[docs] def save(self) -> None: """Save the model. Needs the save_model_fn to be set. """ if not self.save_model_fn: raise ValueError("Save model function not set.") # Model should have been fitted or loaded if not self._is_fitted: raise ValueError("Model has not been fitted yet.") # Input operator count and output operator count should have been set if not self.input_operator_count or not self.output_operator_count: raise ValueError( "Input operator count and output operator count have not been set." ) self.save_model_fn( # type: ignore scaler=None, weighted_layer_state_dict=self.weighted_layer.state_dict(), model_parameters={ "hilbert_space_dim": self.hilbert_space_dim, "epochs": self.epochs, "random_state": self.random_state, "lr": self.lr, "weights_lr": self.weights_lr, "loss": self.loss, "device": self.device, "batch_size": self.batch_size, "groups": self.groups, "dropout_rate": self.dropout_rate, "input_operator_count": self.input_operator_count, "output_operator_count": self.output_operator_count, "classes_": self.classes_, }, )
[docs] @classmethod def load(cls, load_states_fn: LoadStatesFn) -> Self: """Load the model. This class method instantiate a new QCMLClassifier object and loads the states of the model using a load_states_fn. You can use the `load_states_pickle` function from the hooks module as an example of how to implement a load_states_fn. Parameters ---------- load_states_fn : LoadStatesFn Function to load the states of the model. Returns ------- QCMLClassifier The loaded model. """ scaler, weighted_layer_state_dict, model_parameters = load_states_fn() # Initialize the model with the model parameters from the # deserialized state. model = cls( hilbert_space_dim=model_parameters["hilbert_space_dim"], epochs=model_parameters["epochs"], random_state=model_parameters["random_state"], lr=model_parameters["lr"], weights_lr=model_parameters["weights_lr"], loss=model_parameters["loss"], device=model_parameters["device"], batch_size=model_parameters["batch_size"], groups=model_parameters["groups"], dropout_rate=model_parameters["dropout_rate"], ) # Set the input and output operator count. model.input_operator_count = model_parameters["input_operator_count"] model.output_operator_count = model_parameters["output_operator_count"] model.classes_ = model_parameters["classes_"] model.weighted_layer = _init_weighted_layer( hilbert_space_dim=model.hilbert_space_dim, input_operator_count=model.input_operator_count, output_operator_count=model.output_operator_count, device=model.device, groups=model.groups, ) if model.dropout_rate > 0.0: model.weighted_layer = DropoutLayer( underlying_layer=model.weighted_layer, dropout_rate=model.dropout_rate, device=model.device, ) # Set the fitted flag. model._is_fitted = True # Load the state of the weighted layer. model.weighted_layer.load_state_dict(weighted_layer_state_dict) return model