from __future__ import annotations
import math
from typing import Any, Generic, Literal, Sequence
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
from ...types import TX, TY, TEstimator
from .reshaper import SkorchCNNReshaper, SkorchReshaper
[docs]
class AllowNan(nn.Module):
"""Replaces NaNs in the target values with the predictions."""
def __init__(
self, loss: nn.Module, *, has_nan: Literal["left", "right", "both"] = "both"
) -> None:
"""Replaces NaNs in the target values with the predictions.
Parameters
----------
loss : nn.Module
The loss function to use.
has_nan : Literal['left', 'right', 'both'], optional
If 'left', allows NaNs in the first argument of the loss function.
If 'right', allows NaNs in the second argument of the loss function.
If 'both', allows NaNs in both arguments of the loss function.
"""
super().__init__()
self.loss = loss
self.has_nan = has_nan
[docs]
def forward(self, y_pred: torch.Tensor, y_true: torch.Tensor) -> torch.Tensor:
if self.has_nan == "both":
is_nan = torch.isnan(y_pred) | torch.isnan(y_true)
elif self.has_nan == "left":
is_nan = torch.isnan(y_pred)
elif self.has_nan == "right":
is_nan = torch.isnan(y_true)
else:
raise ValueError(f"Unknown has_nan: {self.has_nan}")
y_pred = torch.where(is_nan, torch.zeros_like(y_pred), y_pred)
y_true = torch.where(is_nan, torch.zeros_like(y_true), y_true)
return self.loss(y_pred, y_true)
[docs]
class LNErrors(nn.Module):
"""Returns L^n errors (not mean of L^n errors)."""
def __init__(self, n: int = 2) -> None:
"""Returns L^n errors (not mean of L^n errors).
Parameters
----------
n : int, optional
The exponent, by default 2
"""
super().__init__()
self.n = n
[docs]
def forward(self, y_pred: torch.Tensor, y_true: torch.Tensor) -> torch.Tensor:
return torch.abs(y_true - y_pred).pow(self.n)
[docs]
class LogCoshErrors(nn.Module):
"""Log cosh errors.
Loss = log(cosh(errors + eps))
See also
--------
https://datascience.stackexchange.com/questions/96271/logcoshloss-on-pytorch
"""
def __init__(self, *, softplus: bool = True, eps: float | None = None) -> None:
"""Returns log(cosh(errors)).
Parameters
----------
softplus : bool, optional
If True, uses softplus to get stable results, by default True"""
super().__init__()
self.softplus = softplus
self.eps = eps
[docs]
def forward(self, y_pred: torch.Tensor, y_true: torch.Tensor) -> torch.Tensor:
x = y_pred - y_true
if self.softplus:
return x + F.softplus(-2.0 * x) - math.log(2.0)
else:
eps = self.eps or torch.finfo(x.dtype).eps
return torch.log(torch.cosh(x + eps))
[docs]
class XTanhErrors(nn.Module):
"""XTanh errors.
Loss = x * tanh(x)
See also
--------
https://github.com/tuantle/regression-losses-pytorch
"""
[docs]
def forward(self, y_pred: torch.Tensor, y_true: torch.Tensor) -> torch.Tensor:
x = y_pred - y_true
return x * torch.tanh(x)
[docs]
class XSigmoidErrors(nn.Module):
"""XSigmoid errors.
Loss = x * (2 * sigmoid(x) - 1)
See also
--------
https://github.com/tuantle/regression-losses-pytorch
"""
[docs]
def forward(self, y_pred: torch.Tensor, y_true: torch.Tensor) -> torch.Tensor:
x = y_pred - y_true
return x * (2 * torch.sigmoid(x) - 1)
[docs]
class AlgebraicErrors(nn.Module):
"""Algebraic errors.
Loss = x^2 / sqrt(1 + x^2)
See also
--------
https://github.com/tuantle/regression-losses-pytorch
"""
[docs]
def forward(self, y_pred: torch.Tensor, y_true: torch.Tensor) -> torch.Tensor:
x = y_pred - y_true
x2 = torch.pow(x, 2)
return x2 / torch.sqrt(1 + x2)
[docs]
class AsymmetricLoss(nn.Module):
def __init__(
self,
*,
t: float,
loss_pred_grater: nn.Module = LNErrors(1),
loss_pred_less: nn.Module = LNErrors(1),
) -> None:
super().__init__()
self.loss_pred_grater = loss_pred_grater
self.loss_pred_less = loss_pred_less
self.t = t
[docs]
def forward(self, y_pred: torch.Tensor, y_true: torch.Tensor) -> torch.Tensor:
return torch.mean(
torch.max(
self.t * self.loss_pred_grater(y_pred, y_true),
(1 - self.t) * self.loss_pred_less(y_pred, y_true),
)
)
def _reshape_y_pred(
y_pred: torch.Tensor, y_true_shape: torch.Size, n_ts: int
) -> torch.Tensor:
if y_pred.ndim == 2 and len(y_true_shape) == 2:
# [B, Ts * Ny] -> [B, Ts, Ny]
if y_pred.shape[1] == n_ts * y_true_shape[1]:
y_pred = y_pred.reshape(y_pred.shape[0], n_ts, -1)
else:
raise ValueError(
"Got y_pred.ndim == 2 and y_true.ndim == 2, but "
f"y_pred.shape[1] = {y_pred.shape[1]} != "
f"n_ts * y_true.shape[1] = {n_ts} * {y_true_shape[1]}"
)
return y_pred
def _get_ts_axis(y_pred_shape: torch.Size, y_true_shape: torch.Size) -> int:
# ignore first axis (batch)
y_pred_shape = y_pred_shape[1:]
y_true_shape = y_true_shape[1:]
# find the shape difference between y_pred and y_true
# if Ts = Ny (y_true_shape[1] = y_true_shape[2]),
# then ts_axis = -1
ts_axis = -1
for i, s in enumerate(y_pred_shape):
if s not in y_true_shape:
ts_axis = i + 1
break
return ts_axis
[docs]
class AsymmetricLosses(nn.Module):
"""Asymmetric loss with multioutput support.
Parameters
----------
y_pred : torch.Tensor
The predicted values.
[B, Ts * Ny] or [B, Ts, Ny] or [B, Ny, Ts]
y_true : torch.Tensor
The true values.
[B, Ny] or [B] (if Ny == 1)
Returns
-------
torch.Tensor
0-dim tensor with the loss."""
def __init__(
self, *, ts: Sequence[float] | int, loss: nn.Module = LNErrors(1)
) -> None:
"""Asymmetric loss with multioutput support.
Parameters
----------
ts : Sequence[float] | int
The list of `t` to use for fitting the data or the number of `t` to use.
If `ts` is an integer,
`np.linspace(1 / (ts * 2), 1 - 1 / (ts * 2), ts)` is used.
loss : nn.Module, optional
The loss function to use, by default LNErrors(1)
"""
super().__init__()
if isinstance(ts, int):
ts = list(np.linspace(1 / (ts * 2), 1 - 1 / (ts * 2), ts))
self.ts = ts
self.loss = loss
self.losses = nn.ModuleList(
[
AsymmetricLoss(t=t, loss_pred_grater=loss, loss_pred_less=loss)
for t in ts
]
)
[docs]
def forward(self, y_pred: torch.Tensor, y_true: torch.Tensor) -> torch.Tensor:
y_pred = _reshape_y_pred(y_pred, y_true.shape, len(self.ts))
self.y_true_shape_ = y_true.shape
self.ts_axis_ = _get_ts_axis(y_pred.shape, y_true.shape)
# calculate losses
losses = []
for y_pred_q, loss in zip(
torch.moveaxis(y_pred, self.ts_axis_, 0), self.losses
):
losses.append(loss(y_pred_q, y_true))
return torch.stack(losses).mean()
[docs]
class SkorchReshaperProbaMixin(Generic[TEstimator]):
"""skorch wrapper mixin that converts quantile predictions to mean and std."""
estimator: TEstimator
m_type: Literal["mean", "median", "nanmean", "nanmedian"]
"""M-statistics type to return from `predict` by default"""
var_type: Literal["var", "std", "ptp", "nanvar", "nanstd"]
"""Variance type to return from `predict` by default"""
def __init__(
self,
estimator: TEstimator,
*args: Any,
m_type: Literal["mean", "median", "nanmean", "nanmedian"] = "mean",
var_type: Literal["var", "std", "ptp", "nanvar", "nanstd"] = "std",
**kwargs: Any,
) -> None:
super().__init__(estimator, *args, **kwargs) # type: ignore
self.m_type = m_type
self.var_type = var_type
@staticmethod
def _get(
y: TY,
type_: Literal[
"mean",
"median",
"var",
"std",
"ptp",
"nanmean",
"nanmedian",
"nanvar",
"nanstd",
],
*,
axis: int,
) -> TY:
if hasattr(np, type_):
return getattr(np, type_)(y, axis=axis)
else:
raise ValueError(f"Unknown type: {type_}")
[docs]
def predict(
self,
X: TX,
*,
return_std: bool = False,
type_: Literal[
"mean",
"median",
"nanmean",
"nanmedian",
"var",
"std",
"ptp",
"nanvar",
"nanstd",
]
| tuple[
Literal["mean", "median", "nanmean", "nanmedian"],
Literal["var", "std", "ptp", "nanvar", "nanstd"],
]
| None = None,
**predict_params: Any,
) -> TY | tuple[TY, TY]:
ts_axis_ = self.estimator.criterion.ts_axis_
y = super().predict(X, **predict_params) # type: ignore
y = _reshape_y_pred(
y, self.estimator.criterion.y_true_shape_, len(self.estimator.criterion.ts)
)
if return_std or isinstance(type_, tuple):
if isinstance(type_, str):
type_tuple = (type_, self.var_type)
elif type_ is None:
type_tuple = (self.m_type, self.var_type)
else:
type_tuple = type_
return self._get(y, type_tuple[0], axis=ts_axis_), self._get(
y, type_tuple[1], axis=ts_axis_
)
return self._get(y, type_ or self.m_type, axis=ts_axis_)
[docs]
class SkorchReshaperProba( # type: ignore
SkorchReshaperProbaMixin[TEstimator],
SkorchReshaper[TEstimator],
Generic[TEstimator],
):
def __init__(
self,
estimator: TEstimator,
m_type: Literal["mean", "median", "nanmean", "nanmedian"] = "mean",
var_type: Literal["var", "std", "ptp", "nanvar", "nanstd"] = "std",
) -> None:
"""skorch wrapper that reshapes tabular data for NNs
and supports quantile predictions.
X: [B, F] -> [B, F]
y: [B] -> [B, 1] or [B, NY] -> [B, NY] where
B: batch, F: features, NY: number of outputs
Use `AsymmetricLosses` for NeuralNet.criterion.
`estimator.module` may return [B, Ts * NY] or [B, Ts, NY] or [B, NY, Ts].
If NY = Ts, assumes that [B, NY, Ts] is returned.
Parameters
----------
estimator : TEstimator
The estimator to wrap.
m_type : Literal['mean', 'median', 'nanmean', 'nanmedian'], optional
M-statistics type to return from `predict` by default, by default "mean"
var_type : Literal['var', 'std', 'ptp', 'nanvar', 'nanstd'], optional
Variance type to return from `predict` by default, by default "std"
Examples
--------
>>> from sklearn_utilities.torch.skorch import SkorchReshaperProba, AsymmetricLosses
>>> from skorch import NeuralNet
>>> import torch.nn as nn
>>> net = nn.Sequential(nn.LazyLinear(10), nn.GELU(), nn.LazyLinear(5))
>>> est = SkorchReshaperProba(NeuralNet(module=net, criterion=AsymmetricLosses(ts=5)))
>>> est.fit(X_train, Y_train)
>>> y_pred, y_std = est.predict(X_test, return_std=True)
"""
super().__init__(estimator, m_type=m_type, var_type=var_type)
[docs]
class SkorchCNNReshaperProba( # type: ignore
SkorchReshaperProbaMixin[TEstimator],
SkorchCNNReshaper[TEstimator],
Generic[TEstimator],
):
def __init__(
self,
estimator: TEstimator,
*,
window_size: int | None,
m_type: Literal["mean", "median", "nanmean", "nanmedian"] = "mean",
var_type: Literal["var", "std", "ptp", "nanvar", "nanstd"] = "std",
) -> None:
"""skorch wrapper that reshapes tabular data for CNNs using sliding windows
and supports quantile predictions.
X: [B, F] -> [B - H + 1, 1, H, F] if window_size is not None (for Conv2d)
[B, F] -> [B, 1, F] if window_size is None (for Conv1d)
y: [B] -> [B - H + 1, 1] or [B, NY] -> [B - H + 1, NY] where
C = 1: channels, B: batch, H: window, F: features, NY: number of outputs
Use `AsymmetricLosses` for NeuralNet.criterion.
`estimator.module` may return [B, Ts * NY] or [B, Ts, NY] or [B, NY, Ts].
If NY = Ts, assumes that [B, NY, Ts] is returned.
Parameters
----------
estimator : TEstimator
The estimator to wrap.
window_size : int | None
The size of the sliding window.
Make sure that CNN kernel size is equal or larger than this.
If None, no sliding window is applied.
m_type : Literal['mean', 'median', 'nanmean', 'nanmedian'], optional
M-statistics type to return from `predict` by default, by default "mean"
var_type : Literal['var', 'std', 'ptp', 'nanvar', 'nanstd'], optional
Variance type to return from `predict` by default, by default "std"
Examples
--------
>>> from sklearn_utilities.torch.skorch import SkorchCNNReshaperProba, AsymmetricLosses
>>> from skorch import NeuralNet
>>> import torch.nn as nn
>>> net = nn.Sequential(nn.Conv1d(1, 10, 3), nn.GELU(), nn.Flatten(), nn.LazyLinear(5))
>>> est = SkorchCNNReshaperProba(NeuralNet(module=net, criterion=AsymmetricLosses(ts=5)))
>>> est.fit(X_train, Y_train)
>>> y_pred, y_std = est.predict(X_test, return_std=True)
"""
super().__init__(
estimator, m_type=m_type, var_type=var_type, window_size=window_size
)