"""Data.
========
This module contains convenience utilities to
manage and handle training data.
"""
# Copyright (C) 2018 Steven H. Berguin
# This work is licensed under the MIT License.
from __future__ import annotations # needed if python is 3.9
import math
from dataclasses import dataclass
from functools import cached_property
import numpy as np
[docs]def mini_batches(
X: np.ndarray,
batch_size: int | None,
shuffle: bool = True,
random_state: int | None = None,
) -> list[tuple[int, ...]]:
r"""Create randomized mini-batches.
:param X: training data input :math:`X\in\mathbb{R}^{n_x\times m}`
:param batch_size: mini batch size (if None, single batch with all
data)
:param shuffle: swhether to huffle data points or not
:param random_state: random seed (useful to make runs repeatable)
:return: list of tuples containing training data indices allocated
to each batch
"""
rng = np.random.default_rng(random_state)
batches = []
m = X.shape[1]
if not batch_size:
batch_size = m
batch_size = min(batch_size, m)
# Step 1: Shuffle the indices
indices: list[int] = list(rng.permutation(m)) if shuffle else np.arange(m).tolist()
# Step 2: Partition (shuffled_X, shuffled_Y). Minus the end case.
num_complete_minibatches = math.floor(m / batch_size)
k = 0
for _ in range(num_complete_minibatches):
mini_batch = indices[k * batch_size : (k + 1) * batch_size]
if mini_batch:
batches.append(tuple(mini_batch))
k += 1
# Handling the end case (last mini-batch < mini_batch_size)
if m % batch_size != 0:
mini_batch = indices[(k + 1) * batch_size :]
if mini_batch:
batches.append(tuple(mini_batch))
return batches
[docs]def avg(array: np.ndarray) -> np.ndarray:
"""Compute mean and reshape as column array.
:param array: array of shape (-1, m)
:return: column array corresponding to mean of each row
"""
return np.mean(array, axis=1).reshape((-1, 1))
[docs]def std(array: np.ndarray) -> np.ndarray:
"""Compute standard deviation and reshape as column array.
:param array: array of shape (-1, m)
:return: column array corresponding to std dev of each row
"""
return np.std(array, axis=1).reshape((-1, 1))
def _safe_divide(
value: np.ndarray,
eps: float = float(np.finfo(float).eps),
) -> np.ndarray:
"""Add small number to avoid dividing by zero."""
mask = value == 0.0
value[mask] += eps
return value
[docs]def normalize(data: np.ndarray, mu: np.ndarray, sigma: np.ndarray) -> np.ndarray:
"""Center data about mean and normalize by standard deviation.
:param data: data to be normalized, array of shape (-1, m)
:param mu: mean of the data, array of shape (-1, 1)
:param sigma: std deviation of the data, array of shape (-1, 1)
:return: normalized data, array of shape (-1, m)
"""
return (data - mu) / _safe_divide(sigma)
[docs]def denormalize(data: np.ndarray, mu: np.ndarray, sigma: np.ndarray) -> np.ndarray:
"""Undo normalization.
:param data: normalized data, array of shape (-1, m)
:param mu: mean of the data, array of shape (-1, 1)
:param sigma: std deviation of the data, array of shape (-1, 1)
:return: denormalized data, array of shape (-1, m)
"""
return sigma * data + mu
[docs]def normalize_partials(
partials: np.ndarray | None,
sigma_x: np.ndarray,
sigma_y: np.ndarray,
) -> np.ndarray | None:
r"""Normalize partials.
:param partials: training data partials to be normalized
:math:`J\in\mathbb{R}^{n_y\times n_x \times m}`
:param sigma_x: std dev of training data factors :math:`\sigma_x`,
array of shape (-1, 1)
:param sigma_y: std dev of training data responses :math:`\sigma_y`,
array of shape (-1, 1)
:return: normalized partials, array of shape (n_y, n_x, m)
"""
if partials is None:
return partials
n_y, n_x, _ = partials.shape
sigma_x = sigma_x.T.reshape((1, n_x, 1))
sigma_y = sigma_y.reshape((n_y, 1, 1))
return partials * sigma_x / _safe_divide(sigma_y)
[docs]def denormalize_partials(
partials: np.ndarray,
sigma_x: np.ndarray,
sigma_y: np.ndarray,
) -> np.ndarray:
r"""Undo normalization of partials.
:param partials: normalized training data partials
:math:`\bar{J}\in\mathbb{R}^{n_y\times n_x \times m}`
:param sigma_x: std dev of training data factors :math:`\sigma_x`,
array of shape (-1, 1)
:param sigma_y: std dev of training data responses :math:`\sigma_y`,
array of shape (-1, 1)
:return: denormalized partials, array of shape (n_y, n_x, m)
"""
n_y, n_x, _ = partials.shape
sigma_x = sigma_x.T.reshape((1, n_x, 1))
sigma_y = sigma_y.reshape((n_y, 1, 1))
return partials * sigma_y / _safe_divide(sigma_x)
[docs]@dataclass
class Dataset:
"""Store training data and associated metadata for easy access.
:param X: training data outputs, array of shape (n_x, m)
:param Y: training data outputs, array of shape (n_y, m)
:param J: training data Jacobians, array of shape (n_y, n_x, m)
"""
X: np.ndarray
Y: np.ndarray
J: np.ndarray | None = None
Y_weights: np.ndarray | float = 1.0
J_weights: np.ndarray | float = 1.0
def __post_init__(self) -> None: # noqa: D105
if self.X.shape[1] != self.Y.shape[1]:
msg = "X and Y must have the same number of examples"
raise ValueError(msg)
n_y, n_x, m = self.n_y, self.n_x, self.m
self.Y_weights *= np.ones((n_y, m))
self.J_weights *= np.ones((n_y, n_x, m))
if self.J is not None and self.J.shape != (n_y, n_x, m):
msg = f"J must be of shape ({n_y}, {n_x}, {m})"
raise ValueError(msg)
[docs] def set_weights(
self,
beta: np.ndarray | float = 1.0,
gamma: np.ndarray | float = 1.0,
) -> None:
"""Prioritize certain points more than others.
Rational: this can be used to reward the optimizer more in certain regions.
:param beta: multiplier(s) on Y
:param beta: multiplier(s) on J
"""
self.Y_weights = beta * np.ones((self.n_y, self.m))
self.J_weights = gamma * np.ones((self.n_y, self.n_x, self.m))
@property
def m(self) -> int:
"""Return number of training examples."""
return int(self.X.shape[1])
@property
def n_x(self) -> int:
"""Return number of inputs."""
return int(self.X.shape[0])
@property
def n_y(self) -> int:
"""Return number of outputs."""
return int(self.Y.shape[0])
@cached_property
def avg_x(self) -> np.ndarray:
"""Return mean of input data as array of shape (n_x, 1)."""
return avg(self.X)
@cached_property
def avg_y(self) -> np.ndarray:
"""Return mean of output data as array of shape (n_y, 1)."""
return avg(self.Y)
@cached_property
def std_x(self) -> np.ndarray:
"""Return standard dev of input data, array of shape (n_x, 1)."""
return std(self.X)
@cached_property
def std_y(self) -> np.ndarray:
"""Return standard dev of output data, array of shape (n_y, 1)."""
return std(self.Y)
[docs] def mini_batches(
self,
batch_size: int | None,
shuffle: bool = True,
random_state: int | None = None,
) -> list[Dataset]:
"""Breakup data into multiple batches and return list of Datasets.
:param batch_size: mini batch size (if None, single batch with
all data)
:param shuffle: swhether to huffle data points or not
:param random_state: random seed (useful to make runs
repeatable)
:return: list of Dataset representing data broken up in batches
"""
X = self.X
Y = self.Y
J = self.J
Y_weights = np.ones(Y.shape) * self.Y_weights
batches = mini_batches(X, batch_size, shuffle, random_state)
if J is None:
return [
Dataset(X[:, b], Y[:, b], Y_weights=Y_weights[:, b]) for b in batches
]
J_weights = np.ones(J.shape) * self.J_weights
return [
Dataset(X[:, b], Y[:, b], J[:, :, b], Y_weights[:, b], J_weights[:, :, b])
for b in batches
]
[docs] def normalize(self) -> Dataset:
"""Return normalized Dataset."""
X_norm = normalize(self.X, self.avg_x, self.std_x)
Y_norm = normalize(self.Y, self.avg_y, self.std_y)
J_norm = normalize_partials(self.J, self.std_x, self.std_y)
return Dataset(X_norm, Y_norm, J_norm)