"""Sklearn wrapper for QCML models."""
from collections.abc import Callable
from typing import Self, TypedDict, Unpack
import numpy as np
import torch
from honeio.layers._killswitch import set_killswitch
from honeio.layers.dropout import DropoutLayer
from honeio.layers.general import PytorchGeneralHSM
from honeio.layers.weighted import WeightedLayer
from sklearn.base import BaseEstimator, ClassifierMixin, RegressorMixin
from sklearn.preprocessing import StandardScaler
from torch.utils.data import DataLoader, TensorDataset
def _init_weighted_layer(
*,
hilbert_space_dim: int,
input_operator_count: int,
output_operator_count: int,
device: str = torch.device("cpu"),
groups: list[list[int]] | None = None,
) -> WeightedLayer:
hsm_layer = PytorchGeneralHSM(
hilbert_space_dims=hilbert_space_dim,
input_operator_count=input_operator_count,
output_operator_count=output_operator_count,
device=device,
)
return WeightedLayer(
hsm_layer=hsm_layer,
groups=groups,
device=device,
)
class ModelParameters(TypedDict):
"""Model parameters."""
hilbert_space_dim: int
epochs: int
random_state: int
lr: float
weights_lr: float
loss: str
device: str
batch_size: int | None
groups: list[list[int]] | None
dropout_rate: float
input_operator_count: int
output_operator_count: int
classes_: list
class SaveModelKwargs(TypedDict):
"""Arguments for initializing the weighted layer."""
scaler: StandardScaler
weighted_layer_state_dict: dict
model_parameters: ModelParameters
SaveModelFn = Callable[[Unpack[SaveModelKwargs]], None]
"""Function to save the model.
The function should take the target scaler and the state
dictionary of the weighted layer.
"""
LoadStatesFn = Callable[[], tuple[StandardScaler, dict, ModelParameters]]
"""Function to load the model states.
The function should return the target scaler, the state
dictionary of the weighted layer and the model parameters.
"""
class QCMLBase(BaseEstimator):
"""Scikit-learn wrapper base class for QCML models."""
save_model_fn: SaveModelFn | None = None
optimizer: torch.optim.Optimizer
def _train_loop(
self,
X: np.ndarray,
y: np.ndarray,
) -> Self:
# Initialize the optimizer.
self.optimizer = torch.optim.Adam(
params=[
{
"params": [
p
for n, p in self.weighted_layer.named_parameters()
if not n.endswith("weights")
]
},
{
"params": [
p
for n, p in self.weighted_layer.named_parameters()
if n.endswith("weights")
],
"lr": self.weights_lr,
},
],
lr=self.lr,
amsgrad=True,
**(dict(betas=self.opt_betas) if self.opt_betas is not None else {}),
)
# Set the model to training mode.
self.weighted_layer.train(True)
# Fit the model.
if self.batch_size and self.batch_size > 0:
self._fit_batch(X, y)
else:
self._fit_no_batch(X, y)
return self
def _fit_no_batch(
self,
X: np.ndarray,
y: np.ndarray,
) -> None:
X_tensor = torch.tensor(X, dtype=torch.float32, device=self.device)
y_tensor = torch.tensor(y, dtype=torch.float32, device=self.device)
for _ in range(self.epochs):
outputs = self.weighted_layer(X_tensor)
loss = self.loss_fn(outputs, y_tensor)
loss.backward()
self.optimizer.step()
self.optimizer.zero_grad()
def _fit_batch(
self,
X: np.ndarray,
y: np.ndarray,
use_dataloader: bool = False,
) -> None:
# Using dataloader can make it really slow for large batch_size since it calls `__getitem__` row-by-row and
# then concatenates into a single tensor. On the other hand, dataloader may provide better memory management
# and speed performance in some use-cases when using `num_workers > 0`. In most cases we use
# `use_dataloader=False` but keep dataloader code for future reference.
if use_dataloader:
dataset = TensorDataset(
torch.tensor(X, dtype=torch.float32, device=self.device),
torch.tensor(y, dtype=torch.float32, device=self.device),
)
dataloader = DataLoader(
dataset,
batch_size=self.batch_size,
shuffle=True,
)
else:
shuffle_index = torch.randperm(X.shape[0])
X_shuffled = torch.tensor(
X[shuffle_index], dtype=torch.float32, device=self.device
)
y_shuffled = torch.tensor(
y[shuffle_index], dtype=torch.float32, device=self.device
)
dataloader = list(
zip(
torch.split(X_shuffled, self.batch_size),
torch.split(y_shuffled, self.batch_size),
strict=False,
)
)
for _ in range(self.epochs):
for batch_X, batch_y in dataloader:
outputs = self.weighted_layer(batch_X)
loss = self.loss_fn(outputs, batch_y)
loss.backward()
self.optimizer.step()
self.optimizer.zero_grad()
self._is_fitted = True
[docs]
class QCMLRegressor(QCMLBase, RegressorMixin):
"""Scikit-learn wrapper for QCML Regression models."""
loss_fn: torch.nn.MSELoss | torch.nn.L1Loss | torch.nn.SmoothL1Loss
[docs]
def __init__(
self,
*,
hilbert_space_dim: int = 8,
epochs: int = 1000,
random_state: int = 0,
lr: float = 0.1,
weights_lr: float | None = None,
opt_betas: tuple[float, float] | None = None,
loss: str = "L1",
groups: list[list[int]] | None = None,
device: str = "cpu",
batch_size: int | None = None,
dropout_rate: float = 0.0,
) -> None:
"""
Initialize the QCMLRegressor.
Parameters
----------
hilbert_space_dim : int, optional
The dimension of the Hilbert space, by default 8
epochs : int, optional
The number of epochs for training, by default 1000
random_state : int, optional
The random seed, by default 0
lr : float, optional
The learning rate, by default 0.1
weights_lr : float, optional
The learning rate for the weight layer, by default None which will use the same learning rate as `lr`.
opt_betas : tuple[float, float], optional
The betas for the optimizer, by default None which will use (0.9, 0.999)
loss : str, optional
The loss function to use, by default 'L1'
Options: 'L1', 'L2', 'SmoothL1'
groups : list[list[int]] | None, optional
The indices of groups of input features that should be assigned the same weight in weight layer.
This is a list of lists, where each list contains the indices of the inputs in that group.
This can be useful for one hot encoded features where you may want to assign same weight to all categories.
If None, all input weights are learned independently. By default, None
device : str, optional
The device to use for training, by default 'cpu'
batch_size : int, optional
The batch size for training, by default None. If None or -1, no batching is performed.
dropout_rate : float, optional
The dropout rate for the model, by default 0.0. If 0.0, no dropout is applied.
"""
self.hilbert_space_dim = hilbert_space_dim
self.epochs = epochs
self.random_state = random_state
self.lr = lr
self.weights_lr = weights_lr if weights_lr is not None else lr
self.opt_betas = opt_betas
self.device = device
self.loss = loss
self.batch_size = batch_size
self.groups = groups
self.dropout_rate = dropout_rate
self.save_model_fn: SaveModelFn | None = None
self._is_fitted: bool = False
# If the model has been trained or loaded
# these will be set.
self.input_operator_count: int | None = None
self.output_operator_count: int | None = None
set_killswitch(self, on=["number_of_rows"])
def _fit_transform_targets(
self,
y: np.ndarray,
) -> np.ndarray:
self.scaler = StandardScaler()
y = self.scaler.fit_transform(y)
return y
def _fit(
self,
X: np.ndarray,
y: np.ndarray,
) -> Self:
# Default weighted layer initialization.
self.input_operator_count = X.shape[1]
self.output_operator_count = y.shape[1] if y.ndim > 1 else 1
assert self.input_operator_count is not None
assert self.output_operator_count is not None
self.weighted_layer = _init_weighted_layer(
hilbert_space_dim=self.hilbert_space_dim,
input_operator_count=self.input_operator_count,
output_operator_count=self.output_operator_count,
device=self.device,
groups=self.groups,
)
if self.dropout_rate > 0.0:
self.weighted_layer = DropoutLayer(
underlying_layer=self.weighted_layer,
dropout_rate=self.dropout_rate,
device=self.device,
)
# Initialize the loss function
if self.loss == "L2":
self.loss_fn = torch.nn.MSELoss()
elif self.loss == "L1":
self.loss_fn = torch.nn.L1Loss()
elif self.loss == "SmoothL1":
self.loss_fn = torch.nn.SmoothL1Loss()
else:
raise ValueError(f"Unsupported loss function: {self.loss}")
return self._train_loop(X, y)
[docs]
def fit(self, X: np.ndarray, y: np.ndarray) -> "QCMLRegressor":
"""Fit Method.
----------
X : np.ndarray
The input features.
y : np.ndarray
The target values.
Returns
-------
QCMLRegressor
The fitted model.
"""
if y is None:
raise ValueError("Target values (y) must be provided and cannot be None.")
torch.manual_seed(self.random_state)
if y.ndim == 1:
y = y.reshape(-1, 1)
# === Fit the scaler ===
y = self._fit_transform_targets(y)
# === Fit the model ===
self._fit(X, y)
self._is_fitted = True
return self
[docs]
def predict(self, X: np.ndarray) -> np.ndarray:
"""Predict Method.
Parameters
----------
X : np.ndarray
The input features.
Returns
-------
np.ndarray
The predicted values.
"""
if not self._is_fitted:
raise ValueError("Model has not been fitted yet.")
test_data = torch.tensor(X, device=self.device)
with torch.no_grad():
self.weighted_layer.eval()
y_pred = self.weighted_layer(test_data)
if len(y_pred) > 0:
y_pred = self.scaler.inverse_transform(y_pred.cpu().detach().numpy())
else:
y_pred = y_pred.cpu().detach().numpy()
return y_pred
[docs]
def save(self) -> None:
"""Save the model.
Needs the save_model_fn to be set.
"""
if not self.save_model_fn:
raise ValueError("Save model function not set.")
# Model should have been fitted or loaded
if not self._is_fitted:
raise ValueError("Model has not been fitted yet.")
# Input operator count and output operator count should have been set
if not self.input_operator_count or not self.output_operator_count:
raise ValueError(
"Input operator count and output operator count have not been set."
)
self.save_model_fn( # type: ignore
scaler=self.scaler,
weighted_layer_state_dict=self.weighted_layer.state_dict(),
model_parameters={
"hilbert_space_dim": self.hilbert_space_dim,
"epochs": self.epochs,
"random_state": self.random_state,
"lr": self.lr,
"weights_lr": self.weights_lr,
"loss": self.loss,
"device": self.device,
"batch_size": self.batch_size,
"groups": self.groups,
"dropout_rate": self.dropout_rate,
"input_operator_count": self.input_operator_count,
"output_operator_count": self.output_operator_count,
"classes_": [],
},
)
[docs]
@classmethod
def load(cls, load_states_fn: LoadStatesFn) -> Self:
"""Load the model.
This class method instantiate a new QCMLRegressor object
and loads the states of the model using a load_states_fn.
You can use the `load_states_pickle` function from the hooks module
as an example of how to implement a load_states_fn.
Parameters
----------
load_states_fn : LoadStatesFn
Function to load the states of the model.
Returns
-------
QCMLRegressor
The loaded model.
"""
scaler, weighted_layer_state_dict, model_parameters = load_states_fn()
# Initialize the model with the model parameters from the
# deserialized state.
model = cls(
hilbert_space_dim=model_parameters["hilbert_space_dim"],
epochs=model_parameters["epochs"],
random_state=model_parameters["random_state"],
lr=model_parameters["lr"],
weights_lr=model_parameters["weights_lr"],
loss=model_parameters["loss"],
device=model_parameters["device"],
batch_size=model_parameters["batch_size"],
groups=model_parameters["groups"],
dropout_rate=model_parameters["dropout_rate"],
)
# Set the scaler and the weighted layer.
model.scaler = scaler
# Set the input and output operator count.
model.input_operator_count = model_parameters["input_operator_count"]
model.output_operator_count = model_parameters["output_operator_count"]
model.weighted_layer = _init_weighted_layer(
hilbert_space_dim=model.hilbert_space_dim,
input_operator_count=model.input_operator_count,
output_operator_count=model.output_operator_count,
device=model.device,
groups=model.groups,
)
if model.dropout_rate > 0.0:
model.weighted_layer = DropoutLayer(
underlying_layer=model.weighted_layer,
dropout_rate=model.dropout_rate,
device=model.device,
)
# Set the fitted flag.
model._is_fitted = True
# Load the state of the weighted layer.
model.weighted_layer.load_state_dict(weighted_layer_state_dict)
return model
[docs]
class QCMLClassifier(QCMLBase, ClassifierMixin):
"""Scikit-learn wrapper for QCML Classification models."""
loss_fn: torch.nn.CrossEntropyLoss
[docs]
def __init__(
self,
*,
hilbert_space_dim: int = 8,
epochs: int = 1000,
random_state: int = 0,
lr: float = 0.1,
weights_lr: float | None = None,
opt_betas: tuple[float, float] | None = None,
loss: str = "cross_entropy",
groups: list[list[int]] | None = None,
device: str = "cpu",
batch_size: int | None = None,
dropout_rate: float = 0.0,
) -> None:
"""
Initialize the QCMLClassifier.
Parameters
----------
hilbert_space_dim : int, optional
The dimension of the Hilbert space, by default 8
epochs : int, optional
The number of epochs for training, by default 1000
random_state : int, optional
The random seed, by default 0
lr : float, optional
The learning rate, by default 0.1
weights_lr : float, optional
The learning rate for the weight layer, by default None which will use the same learning rate as `lr`.
opt_betas : tuple[float, float], optional
The betas for the optimizer, by default None which will use (0.9, 0.999)
loss : str, optional
The loss function to use, by default 'cross_entropy'
groups : list[list[int]] | None, optional
The indices of groups of input features that should be assigned the same weight in weight layer.
This is a list of lists, where each list contains the indices of the inputs in that group.
This can be useful for one hot encoded features where you may want to assign same weight to all categories.
If None, all input weights are learned independently. By default, None
device : str, optional
The device to use for training, by default 'cpu'
batch_size : int, optional
The batch size for training, by default None. If None or -1, no batching is performed.
dropout_rate : float, optional
The dropout rate for the model, by default 0.0. If 0.0, no dropout is applied.
"""
self.hilbert_space_dim = hilbert_space_dim
self.epochs = epochs
self.random_state = random_state
self.lr = lr
self.weights_lr = weights_lr if weights_lr is not None else lr
self.opt_betas = opt_betas
self.device = device
self.loss = loss
self.batch_size = batch_size
self.groups = groups
self.dropout_rate = dropout_rate
self.save_model_fn: SaveModelFn | None = None
self._is_fitted: bool = False
# If the model has been trained or loaded
# these will be set.
self.input_operator_count: int | None = None
self.output_operator_count: int | None = None
self.classes_: list
set_killswitch(self, on=["number_of_rows"])
def _fit(
self,
X: np.ndarray,
y: np.ndarray,
classes: list | None = None,
) -> Self:
if classes is None:
classes = np.unique(y)
self.classes_ = np.array(classes)
class_to_index = {c: i for i, c in enumerate(self.classes_)}
y_indices = np.vectorize(class_to_index.get)(y)
y_onehot = np.eye(len(self.classes_))[y_indices]
# Default weighted layer initialization.
self.input_operator_count = X.shape[1]
self.output_operator_count = len(self.classes_)
assert self.input_operator_count is not None
assert self.output_operator_count is not None
self.weighted_layer = _init_weighted_layer(
hilbert_space_dim=self.hilbert_space_dim,
input_operator_count=self.input_operator_count,
output_operator_count=self.output_operator_count,
device=self.device,
groups=self.groups,
)
if self.dropout_rate > 0.0:
self.weighted_layer = DropoutLayer(
underlying_layer=self.weighted_layer,
dropout_rate=self.dropout_rate,
device=self.device,
)
# Initialize the loss function
if self.loss == "cross_entropy":
self.loss_fn = torch.nn.CrossEntropyLoss()
else:
raise ValueError(f"Unsupported loss function: {self.loss}")
return self._train_loop(X, y_onehot)
[docs]
def fit(
self, X: np.ndarray, y: np.ndarray, classes: list | None = None
) -> "QCMLClassifier":
"""Fit Method.
----------
X : np.ndarray
The input features.
y : np.ndarray
The target values.
classes : list | None, optional
List of possible classes for the target values.
If None, the classes will be inferred from the target values. By default, None
Returns
-------
QCMLClassifier
The fitted model.
"""
if y is None:
raise ValueError("Target values (y) must be provided and cannot be None.")
torch.manual_seed(self.random_state)
# === Fit the model ===
self._fit(X, y, classes)
self._is_fitted = True
return self
[docs]
def predict_proba(self, X: np.ndarray) -> np.ndarray:
"""Predict Method.
Parameters
----------
X : np.ndarray
The input features.
Returns
-------
np.ndarray
The predicted values.
"""
if not self._is_fitted:
raise ValueError("Model has not been fitted yet.")
test_data = torch.tensor(X, device=self.device)
with torch.no_grad():
self.weighted_layer.eval()
y_pred_logits = self.weighted_layer(test_data)
y_pred_prob = torch.softmax(y_pred_logits, dim=1)
return y_pred_prob.cpu().detach().numpy()
[docs]
def predict(self, X: np.ndarray) -> np.ndarray:
"""Predict Method.
Parameters
----------
X : np.ndarray
The input features.
Returns
-------
np.ndarray
The predicted values.
"""
y_pred_prob = self.predict_proba(X)
y_pred_indices = np.argmax(y_pred_prob, axis=1)
return self.classes_[y_pred_indices]
[docs]
def save(self) -> None:
"""Save the model.
Needs the save_model_fn to be set.
"""
if not self.save_model_fn:
raise ValueError("Save model function not set.")
# Model should have been fitted or loaded
if not self._is_fitted:
raise ValueError("Model has not been fitted yet.")
# Input operator count and output operator count should have been set
if not self.input_operator_count or not self.output_operator_count:
raise ValueError(
"Input operator count and output operator count have not been set."
)
self.save_model_fn( # type: ignore
scaler=None,
weighted_layer_state_dict=self.weighted_layer.state_dict(),
model_parameters={
"hilbert_space_dim": self.hilbert_space_dim,
"epochs": self.epochs,
"random_state": self.random_state,
"lr": self.lr,
"weights_lr": self.weights_lr,
"loss": self.loss,
"device": self.device,
"batch_size": self.batch_size,
"groups": self.groups,
"dropout_rate": self.dropout_rate,
"input_operator_count": self.input_operator_count,
"output_operator_count": self.output_operator_count,
"classes_": self.classes_,
},
)
[docs]
@classmethod
def load(cls, load_states_fn: LoadStatesFn) -> Self:
"""Load the model.
This class method instantiate a new QCMLClassifier object
and loads the states of the model using a load_states_fn.
You can use the `load_states_pickle` function from the hooks module
as an example of how to implement a load_states_fn.
Parameters
----------
load_states_fn : LoadStatesFn
Function to load the states of the model.
Returns
-------
QCMLClassifier
The loaded model.
"""
scaler, weighted_layer_state_dict, model_parameters = load_states_fn()
# Initialize the model with the model parameters from the
# deserialized state.
model = cls(
hilbert_space_dim=model_parameters["hilbert_space_dim"],
epochs=model_parameters["epochs"],
random_state=model_parameters["random_state"],
lr=model_parameters["lr"],
weights_lr=model_parameters["weights_lr"],
loss=model_parameters["loss"],
device=model_parameters["device"],
batch_size=model_parameters["batch_size"],
groups=model_parameters["groups"],
dropout_rate=model_parameters["dropout_rate"],
)
# Set the input and output operator count.
model.input_operator_count = model_parameters["input_operator_count"]
model.output_operator_count = model_parameters["output_operator_count"]
model.classes_ = model_parameters["classes_"]
model.weighted_layer = _init_weighted_layer(
hilbert_space_dim=model.hilbert_space_dim,
input_operator_count=model.input_operator_count,
output_operator_count=model.output_operator_count,
device=model.device,
groups=model.groups,
)
if model.dropout_rate > 0.0:
model.weighted_layer = DropoutLayer(
underlying_layer=model.weighted_layer,
dropout_rate=model.dropout_rate,
device=model.device,
)
# Set the fitted flag.
model._is_fitted = True
# Load the state of the weighted layer.
model.weighted_layer.load_state_dict(weighted_layer_state_dict)
return model