diff --git a/main/como/approx.py b/main/como/approx.py deleted file mode 100644 index 441a489f..00000000 --- a/main/como/approx.py +++ /dev/null @@ -1,333 +0,0 @@ -from __future__ import annotations - -from collections.abc import Callable, Sequence -from typing import Literal, NamedTuple - -import numpy as np -import numpy.typing as npt - - -class RegularizedArray(NamedTuple): - x: npt.NDArray[np.number] - y: npt.NDArray[np.number] - not_na: npt.NDArray[bool] - kept_na: bool - - -class Approx(NamedTuple): - x: npt.NDArray[float] - y: npt.NDArray[float] - - -def _coerce_to_float_array(a: Sequence[int | float | np.number]): - """Coerce input to a 1D float array. - - Args: - a: the array to coerce - - Returns: - A floating point 1D array. - """ - arr = np.asarray(a, dtype=float) - if arr.ndim != 1: - arr = arr.ravel() - return arr - - -def _regularize_values( - x: npt.NDArray[np.number], - y: npt.NDArray[np.number], - *, - na_rm: bool, - ties: Callable[[np.ndarray], float] | Literal["mean", "first", "last", "min", "max", "median", "sum"] = "mean", -) -> RegularizedArray: - """Reimplementation of R's regularize.values() used by approx(). - - - Removes NA pairs if na_rm is True (else requires x to have no NA). - - Sorts by x (stable). - - Collapses duplicate x via `ties` aggregator. - - Args: - x: the x values to regularize - y: ties: the corresponding y values - na_rm: should NA values be removed? - ties: how to handle duplicate x values; can be a string or a callable - - Returns: - A NamedTuple with: - - x: sorted unique x - - y: aggregated y aligned with x - - not_na: boolean mask of y that are not NaN - - kept_na: True iff any NaN remained in y after regularization - """ - if na_rm: - # Remove pairs where x or y is NA - keep = ~np.isnan(x) & ~np.isnan(y) - x = x[keep] - y = y[keep] - kept_na = False - else: - # R's C code errors if na_rm=False and any x is NA - if np.isnan(x).any(): - raise ValueError("approx(x,y, .., na.rm=False): NA values in x are not allowed") - kept_na = np.isnan(y).any() - - if x.size == 0: - return RegularizedArray(x=x, y=y, not_na=np.array([], dtype=bool), kept_na=kept_na) - - # Use a stable sort (mergesort) to match R's order() - order = np.argsort(x, kind="mergesort") - x_sorted = x[order] - y_sorted = y[order] - - # Handle the 'ties' argument - if callable(ties): - agg_fn = ties - else: - # fmt: off - if ties in ("mean", "avg", "average"): - agg_fn = np.mean - elif ties in ("first", "left"): - def agg_fn(a): - return a[0] - elif ties in ("last", "right"): - def agg_fn(a): - return a[-1] - elif ties == "min": - agg_fn = np.min - elif ties == "max": - agg_fn = np.max - elif ties == "median": - agg_fn = np.median - elif ties == "sum": - agg_fn = np.sum - else: - raise ValueError("Unsupported `ties`; use a callable or one of 'mean', 'first', 'last', 'min', 'max', 'median', 'sum'.") - # fmt: on - - # Find unique x values and their indices/counts - unique_x, start_idx, counts = np.unique(x_sorted, return_index=True, return_counts=True) - - # Aggregate y values for tied x values - y_agg = np.empty(unique_x.shape, dtype=float) - for k, (start, count) in enumerate(zip(start_idx, counts, strict=True)): - seg = y_sorted[start : start + count] - # Note: aggregators like np.mean will return NaN if `seg` contains NaN, - # matching R's default (mean(..., na.rm = FALSE)). - y_agg[k] = agg_fn(seg) - - not_na = ~np.isnan(y_agg) - # Check if any NaNs remain in the *aggregated* y values - kept_na_final = bool(np.any(~not_na) if np.any(np.isnan(y_agg)) else False) - return RegularizedArray(x=unique_x, y=y_agg, not_na=not_na, kept_na=kept_na_final) - - -def approx( - x: Sequence[float], - y: Sequence[float] | None = None, - xout: Sequence[float] | None = None, - method: str | int = "linear", - n: int = 50, - yleft: float | None = None, - yright: float | None = None, - rule: int | Sequence[int] | npt.NDArray[int] = 1, - f: float = 0.0, - ties: Callable[[np.ndarray], float] | Literal["mean", "first", "last", "min", "max", "median", "sum"] = "mean", - na_rm: bool = True, -) -> Approx: - """Faithful Python port of R's `approx` function. - - This implementation aims to replicate the behavior of R's `approx` - function, including its handling of `ties`, `rule`, `na_rm`, and - `method`. - - Args: - x: Coordinates, or y-values if `y` is None. - y: y-coordinates. If None, `x` is treated as `y` and `x` becomes `1..n`. - xout: Points at which to interpolate. - method: Interpolation method. "linear" (1) or "constant" (2). - n: If `xout` is not provided, interpolation happens at `n` equally spaced points spanning the range of `x`. - yleft: Value to use for extrapolation to the left. Defaults to `NA` (np.nan) if `rule` is 1. - yright: Value to use for extrapolation to the right. Defaults to `NA` (np.nan) if `rule` is 1. - rule: Extrapolation rule. - - 1: Return `np.nan` for points outside the `x` range. - - 2: Use `yleft` and `yright` for points outside the range. - f: For `method="constant"`, determines the split point. `f=0` is left-step, `f=1` is right-step, `f=0.5` is midpoint. - ties: How to handle duplicate `x` values. Can be 'mean', 'first', 'last', 'min', 'max', 'median', 'sum', or a callable function. - na_rm: If True, `NA` pairs are removed before interpolation. If False, `NA`s in `x` cause an error, `NA`s in `y` are propagated. - - Returns: - `Approx` class with: - - 'x': numpy array of xout used - - 'y': numpy array of interpolated values - """ - # One-argument form: approx(y) -> x := 1..n, y := y - if y is None: - y_arr = _coerce_to_float_array(x) - x_arr = np.arange(1.0, y_arr.size + 1.0, dtype=float) - else: - x_arr = _coerce_to_float_array(x) - y_arr = _coerce_to_float_array(y) - if x_arr.size != y_arr.size: - raise ValueError("x and y must have same length") - - # --- Method normalization --- - # (matches R's pmatch() result: 1=linear, 2=constant) - if isinstance(method, str): - m = method.strip().lower() - if m.startswith("lin"): - method_code = 1 - elif m.startswith("const"): - method_code = 2 - else: - raise ValueError("invalid interpolation method") - else: - if method in (1, 2): - method_code = int(method) - else: - raise ValueError("invalid interpolation method") - - # --- Rule normalization --- - if isinstance(rule, Sequence | np.ndarray): - rule_list: list[int] = list(rule) # type: ignore[invalid-argument-type] # This is a valid argument type, not sure why ty is upset - if not (1 <= len(rule_list) <= 2): - raise ValueError("`rule` must have length 1 or 2") - if len(rule_list) == 1: - rule_list = [rule_list[0], rule_list[0]] - else: - rule_list = [rule, rule] - - # Sort by x, collapse ties, and handle NAs like R's regularize.values() - r: RegularizedArray = _regularize_values(x_arr, y_arr, na_rm=na_rm, ties=ties) - x_reg: npt.NDArray[float] = r.x - y_reg: npt.NDArray[float] = r.y - not_na_mask: npt.NDArray[bool] = r.not_na - # no_na is True if we don't have to worry about NAs in y_reg - no_na: bool = na_rm or (not r.kept_na) - # nx is the number of *valid* (non-NA) points for interpolation - nx: int = x_reg.size if no_na else int(np.count_nonzero(not_na_mask)) - - if np.isnan(nx): - raise ValueError("invalid length(x)") - - # R's C_ApproxTest logic - if nx <= 1: - if method_code == 1: - raise ValueError("need at least two non-NA values to interpolate") - if nx == 0: - raise ValueError("zero non-NA points") - - # set extrapolation values (yleft/yright) - # This logic matches R's C code (R_approxtest) - if no_na: - y_first: float = y_reg[0] - y_last: float = y_reg[-1] - else: - # Find first and last non-NA y values - y_valid: npt.NDArray[float] = y_reg[not_na_mask] - y_first: float = y_valid[0] - y_last: float = y_valid[-1] - - yleft_val: float = (np.nan if int(rule_list[0]) == 1 else y_first) if yleft is None else float(yleft) - yright_val: float = (np.nan if int(rule_list[1]) == 1 else y_last) if yright is None else float(yright) - - # fabricate xout if it is missing - if xout is None: - if n <= 0: - raise ValueError("'approx' requires n >= 1") - if no_na: - x_first: float = x_reg[0] - x_last: float = x_reg[nx - 1] - else: - x_valid: npt.NDArray[float] = x_reg[not_na_mask] - x_first: float = x_valid[0] - x_last: float = x_valid[-1] - xout_arr: npt.NDArray[float] = np.linspace(x_first, x_last, num=int(n), dtype=float) - else: - xout_arr: npt.NDArray[float] = _coerce_to_float_array(xout) - - # replicate R's C_ApproxTest checks - if method_code == 2 and (not np.isfinite(f) or f < 0.0 or f > 1.0): - raise ValueError("approx(): invalid f value; with `method=2`, `f` must be in the range [0,1] (inclusive) or NA") - if not no_na: - # R re-filters x and y here if NAs remained - x_reg: npt.NDArray[float] = x_reg[not_na_mask] - y_reg: npt.NDArray[float] = y_reg[not_na_mask] - - # perform interpolation - # this section is a vectorized form of the logic from R's approx1 and R_approxfun - yout: npt.NDArray[float] = np.empty_like(xout_arr, dtype=float) - mask_nan_xout: npt.NDArray[bool] = np.isnan(xout_arr) - yout[mask_nan_xout] = np.nan - mask_valid: npt.NDArray[bool] = ~mask_nan_xout - - if x_reg.size == 0: - # No valid points to interpolate against - yout[mask_valid] = np.nan - return Approx(x=xout_arr, y=yout) - - xv: npt.NDArray[float] = xout_arr[mask_valid] - left_mask: npt.NDArray[bool] = xv < x_reg[0] - right_mask: npt.NDArray[bool] = xv > x_reg[-1] - mid_mask: npt.NDArray[bool] = ~(left_mask | right_mask) - - res: npt.NDArray[float] = np.empty_like(xv, dtype=float) - res[left_mask] = yleft_val - res[right_mask] = yright_val - - if np.any(mid_mask): - xm: npt.NDArray[float] = xv[mid_mask] - - # Find indices - # j_right[k] = index of first x_reg > xm[k] - j_right: npt.NDArray[int] = np.searchsorted(x_reg, xm, side="right") - # j_left[k] = index of first x_reg >= xm[k] - j_left: npt.NDArray[int] = np.searchsorted(x_reg, xm, side="left") - - # Points that exactly match an x_reg value - eq_mask: npt.NDArray[bool] = j_left != j_right - # Points that fall between x_reg values - in_interval_mask: npt.NDArray[bool] = ~eq_mask - - res_mid: npt.NDArray[float] = np.empty_like(xm, dtype=float) - - if np.any(eq_mask): - # For exact matches, use the corresponding y_reg value - # R's C code uses the 'j_left' index here - res_mid[eq_mask] = y_reg[j_left[eq_mask]] # type: ignore[non-subscriptable] # we know this is of type npt.NDArray[float], not sure why ty is upset - - if np.any(in_interval_mask): - # R's C code sets i = j-1, where j is the 'right' index - j: npt.NDArray[float] = j_right[in_interval_mask] # type: ignore[non-subscriptable] # we know this is of type npt.NDArray[float], not sure why ty is upset - i: npt.NDArray[float] = j - 1 - - # Get the surrounding x and y values - xi: npt.NDArray[float] = x_reg[i] - xj: npt.NDArray[float] = x_reg[j] - yi: npt.NDArray[float] = y_reg[i] - yj: npt.NDArray[float] = y_reg[j] - - if method_code == 1: # linear - t: npt.NDArray[float] = (xm[in_interval_mask] - xi) / (xj - xi) - res_mid[in_interval_mask] = yi + (yj - yi) * t - else: # constant - # This matches R_approxfun's constant logic - if f == 0.0: - res_mid[in_interval_mask] = yi - elif f == 1.0: - res_mid[in_interval_mask] = yj - else: - # computes R's (1-f)*yi + f*yj - f1: float = float(1.0 - f) - f2: float = float(f) - part: npt.NDArray[float] = np.zeros_like(yi, dtype=float) - if f1 != 0.0: - part: npt.NDArray[float] = part + yi * f1 - if f2 != 0.0: - part: npt.NDArray[float] = part + yj * f2 - res_mid[in_interval_mask] = part - - res[mid_mask] = res_mid - - yout[mask_valid] = res - return Approx(x=xout_arr, y=yout) diff --git a/main/como/density.py b/main/como/density.py deleted file mode 100644 index 810ced26..00000000 --- a/main/como/density.py +++ /dev/null @@ -1,328 +0,0 @@ -from __future__ import annotations - -import sys -from collections.abc import Sequence -from typing import Literal, NamedTuple, cast - -import numpy as np -import numpy.typing as npt - -from como.approx import Approx, approx - - -class DensityResult(NamedTuple): - x: npt.NDArray[float] - y: npt.NDArray[float] - x_grid: npt.NDArray[float] - y_grid: npt.NDArray[float] - bw: float - n: int - - -NUMBER = int | float | np.number - - -def bin_distance(x: npt.NDArray[float], weights: npt.NDArray[float], lo: NUMBER, up: NUMBER, n: int) -> npt.NDArray[float]: - """Bin weighted distances.""" - ixmin: int = 0 - ixmax: int = n - 2 - delta: float = (up - lo) / (n - 1) - - y: npt.NDArray[float] = np.zeros((2 * n,), dtype=float) - for i in range(x.size): - i: int - xpos: float = (x[i] - lo) / delta - if xpos > sys.maxsize or xpos < -sys.maxsize: # avoid integer overflows (taken from R's massdist.c) - continue - ix: int = np.floor(xpos).astype(int) - fx: float = xpos - ix - wi: float = weights[i] - if ixmin <= ix <= ixmax: - y[ix] += (1 - fx) * wi - y[ix + 1] += fx * wi - elif ix == -1: - y[0] += fx * wi - elif ix == ixmax + 1: - y[ix] += (1 - fx) * wi - return y - - -def dnorm(x: float, mean: NUMBER = 0.0, sd: NUMBER = 1.0, log: bool = False, fast_dnorm: bool = False) -> float: - """Density function for the normal distribution. - - This is a reproduction of R's `density` function. - Neither SciPy nor NumPy are capable of producing KDE values that align with R, and as a result, - a manual translation of R's KDE implementation was necessary. - - References: - 1) Documentation: https://www.rdocumentation.org/packages/stats/versions/3.6.2/topics/Normal - 2) Source code (2025-OCT-30): https://github.com/wch/r-source/blob/3f7e2528990351bc4b0d1f1b237714668ab4c0c5/src/nmath/dnorm.c - - Args: - x: Value at which to evaluate the density. - mean: Mean of the normal distribution. - sd: Standard deviation of the normal distribution. - log: If True, return the log density. - fast_dnorm: If True, use a faster but less accurate calculation for small `x`. - - - """ - # Constants - m_ln2: float = 0.693147180559945309417232121458 # ln(2) - m_1_sqrt_2pi: float = 0.398942280401432677939946059934 # 1/sqrt(2pi) - m_ln_sqrt_2pi: float = 0.918938533204672741780329736406 # log(sqrt(2*pi)) - r_d__0 = float(-np.inf) if log else 0.0 # R's R_D__0: (log_p ? ML_NEGINF : 0.) - - # 11 bits exponent, where one bit is used to indicate special numbers (e.g. NaN and Infinity), - # so the maximum exponent is 10 bits wide (2^10 == 1024). - # dbl_min_exp = -1022 - dbl_min_exp = -1074 - - # The mantissa is 52 bits wide, but because numbers are normalized the initial binary 1 is represented - # implicitly (the so-called "hidden bit"), which leaves us with the ability to represent 53 bits wide mantissa. - dbl_mant_dig = 53 - - mean: float = float(mean) - sd: float = float(sd) - - if np.isnan(x) or np.isnan(mean) or np.isnan(sd): - return x + mean + sd - if sd < 0.0: - return float(np.nan) - if not np.isfinite(sd): - return r_d__0 - if not np.isfinite(x) and x == mean: - return float(np.nan) - if sd == 0.0: - return float(np.inf) if x == mean else r_d__0 - - # From this point on, dividing by `sd` is safe because we know it is not 0 - z = (x - mean) / sd - if (not np.isfinite(z)) and (x == mean): - return float(np.nan) - - if not np.isfinite(z): - return r_d__0 - - a = np.fabs(z) - if a >= 2 * np.sqrt(np.finfo(float).max): - return r_d__0 - if log: - return -(m_ln_sqrt_2pi + 0.5 * a * a + np.log(sd)) - if a < 5 or fast_dnorm: # for `x < 5`, this is more accurate but less fast - return m_1_sqrt_2pi * np.exp(-0.5 * a * a) / sd - - # underflow boundary - boundary = np.sqrt(-2.0 * m_ln2 * (dbl_min_exp + 1 - dbl_mant_dig)) - if a > boundary: - return 0.0 - - # Now, to get full accuracy, split x into two parts, - # x = x1+x2, such that |x2| <= 2^-16. - # Assuming that we are using IEEE doubles, that means that - # x1*x1 is error free for x<1024 (but we have x < 38.6 anyway). - # If we do not have IEEE this is still an improvement over the naive formula. - a1 = np.ldexp(np.rint(np.ldexp(a, 16)), -16) - a2 = a - a1 - return (m_1_sqrt_2pi / sd) * (np.exp(-0.5 * a1 * a1) * np.exp((-a1 * a2) - (0.5 * a2 * a2))) - - -def nrd0(x: npt.NDArray[float]) -> float: - """Calculate nrd0 from R source. - - This bandwidth calculation matches R's - - References: - 1) Documentation: https://www.rdocumentation.org/packages/stats/versions/3.6.2/topics/bandwidth - 2) Source code (as of 2025-OCT-30, copied below): https://github.com/wch/r-source/blob/trunk/src/library/stats/R/bandwidths.R - ```R - bw.nrd0 <- function (x) - { - if(length(x) < 2L) stop("need at least 2 data points") - hi <- sd(x) - if(!(lo <- min(hi, IQR(x)/1.34)))# qnorm(.75) - qnorm(.25) = 1.34898 - (lo <- hi) || (lo <- abs(x[1L])) || (lo <- 1.) - 0.9 * lo * length(x)^(-0.2) - } - ``` - """ - if x.size < 2: - raise ValueError("need at least 2 data points") - - hi = np.std(x, ddof=1) - q75, q25 = cast(tuple[float, float], cast(object, np.percentile(x, [75, 25]))) - iqr_over_134 = (q75 - q25) / 1.34 - - # We are using a cascading series of checks on `lo` to make sure it is a non-zero value - lo = min(hi, iqr_over_134) - if lo == 0: - if hi != 0: - lo = hi - elif abs(x[0]) != 0: - lo = abs(x[0]) - else: - lo = 1.0 - - return float(0.9 * lo * x.size ** (-1 / 5)) - - -def density( - x: Sequence[NUMBER] | npt.NDArray[NUMBER], - bw: NUMBER | Literal["nrd0"] = "nrd0", - adjust: NUMBER = 1, - kernel: Literal["gaussian", "epanechnikov", "rectangular", "triangular", "biweight", "cosine", "optcosine"] = "gaussian", - weights: Sequence[NUMBER] | None = None, - n: int = 512, - from_: NUMBER | None = None, - to_: NUMBER | None = None, - cut: int = 3, - ext: int = 4, - remove_na: bool = False, - kernel_only: bool = False, -) -> DensityResult: - """Compute kernel density estimates (KDE) using FFT method. - - This is a reproduction of R's `density` function. - Neither SciPy nor NumPy are capable of producing KDE values that align with R, and as a result, - a manual translation of R's KDE implementation was necessary. - - Args: - x: Input data points. - bw: Bandwidth for the kernel. If "nrd0", uses R's nrd0 method. - adjust: Adjustment factor for the bandwidth. - kernel: Kernel type to use. - weights: Optional weights for each data point. - n: Number of points in the output grid. - from_: Start of the grid (calculated automatically if not provided). - to_: End of the grid (calculated automatically if not provided). - cut: Number of bandwidths to extend the grid on each side. - ext: Number of bandwidths to extend the grid for FFT calculation. - remove_na: Whether to remove NA values from `x`. - kernel_only: If True, returns only the integral of the kernel function. - - Returns: - DensityResult: A named tuple containing: - x: The x-coordinates of the density estimate. - y: The estimated density values at the x-coordinates. - x_grid: The grid of x-coordinates used for FFT calculation. - y_grid: The density values on the FFT grid. - bw: The bandwidth used. - n: The number of points in the output grid. - """ - if not isinstance(x, Sequence | np.ndarray) or (len(x) < 2 and bw == "nrd0"): - raise ValueError("Need at at least two points to select a bandwidth automatically using 'nrd0'") - if kernel_only: - if kernel == "gaussian": - return 1 / (2 * np.sqrt(np.pi)) - elif kernel == "epanechnikov": - return 3 / (5 * np.sqrt(5)) - elif kernel == "rectangular": - return np.sqrt(3) / 6 - elif kernel == "triangular": - return np.sqrt(6) / 9 - elif kernel == "biweight": - return 5 * np.sqrt(7) / 49 - elif kernel == "cosine": - return 3 / 4 * np.sqrt(1 / 3 - 2 / np.pi**2) - elif kernel == "optcosine": - return np.sqrt(1 - 8 / np.pi**2) * np.pi**2 / 16 - - if kernel != "gaussian": - raise NotImplementedError(f"Only 'gaussian' kernel is implemented; got '{kernel}'") - - x: npt.NDArray[float] = np.asarray(x, dtype=float) - - has_weights = weights is not None - weights: npt.NDArray[float] | None = np.asarray(weights, float) if weights is not None else None - if has_weights and (weights is not None and weights.size != x.size): - raise ValueError(f"The length of provided weights does not match the length of x: {weights.size} != {x.size}") - - x_na: npt.NDArray[np.bool_] = np.isnan(x) - if np.any(x_na): - if remove_na: - x: npt.NDArray[float] = x[~x_na] - if has_weights and weights is not None: - true_d = weights.sum().astype(float) == 1 - weights = weights[~x_na] - if true_d: - weights = weights / weights.sum() - else: - raise ValueError("NA values found in 'x'. Set 'remove_na=True' to ignore them.") - - nx = n - x_finite = np.isfinite(x) - if np.any(~x_finite): - x = x[x_finite] - nx = x.size - if not has_weights: - weights: npt.NDArray[float] = np.full(shape=nx, fill_value=1 / nx, dtype=float) - total_mass = nx / n - else: - weights = np.asarray(weights) - if not np.all(np.isfinite(weights)): - raise ValueError("Non-finite values found in 'weights'.") - if np.any(weights < 0): - raise ValueError("Negative values found in 'weights'.") - wsum: float = weights.sum() - if np.any(~x_finite): - weights = weights[x_finite] - total_mass = float(weights.sum() / wsum) - else: - total_mass = float(1) - - n_user: int = n - n = max(n, 512) - if n > 512: # round n up to the next power of 2 (i.e., 2^8=512, 2^9=1024) - n: int = int(2 ** np.ceil(np.log2(n))) - - if isinstance(bw, str) and bw != "nrd0": - raise TypeError("Bandwidth 'bw' must be a number or 'nrd0'.") - elif isinstance(bw, str) and bw == "nrd0": - bw_calc = nrd0(x) - else: - bw_calc = float(bw) - if not np.isfinite(bw_calc): - raise ValueError("Calculated bandwidth 'bw' is not finite.") - bw_calc *= adjust - - if bw_calc <= 0: - raise ValueError("Bandwidth 'bw' must be positive.") - - # have to use `... if ... else` because `0` is falsey, resulting in the right-half being used instead of the user-provided value - from_ = float(from_ if from_ is not None else min(x) - cut * bw_calc) - to_ = float(to_ if to_ is not None else max(x) + cut * bw_calc) - - if not np.isfinite(from_): - raise ValueError("'from_' is not finite.") - if not np.isfinite(to_): - raise ValueError("'to_' is not finite.") - - lo = float(from_ - ext * bw_calc) - up = float(to_ + ext * bw_calc) - - y: npt.NDArray[float] = bin_distance(x, weights, lo, up, n) * total_mass - kords: npt.NDArray[float] = np.linspace(start=0, stop=((2 * n - 1) / (n - 1) * (up - lo)), num=2 * n, dtype=float) - kords[n + 1 : 2 * n] = -kords[n:1:-1] # mirror/negate tail: R's kords[n:2] will index from the reverse if `n`>2 - - # Initial diverge here (inside dnorm calculation) - kords: npt.NDArray[float] = np.asarray([dnorm(i, sd=bw_calc) for i in kords], dtype=float) - - fft_y: npt.NDArray[np.complex128] = np.fft.fft(y) - conj_fft_kords: npt.NDArray[np.complex128] = np.conjugate(np.fft.fft(kords)) - # Must multiply by `kords.size` because R does not produce a normalize inverse FFT, but NumPy normalizes by `1/size` - kords: npt.NDArray[np.complex128] = np.fft.ifft(fft_y * conj_fft_kords) * kords.size - kords: npt.NDArray[float] = (np.maximum(0, kords.real)[0:n]) / y.size # for values of kords, get 0 or kords[i], whichever is larger - xords: npt.NDArray[float] = np.linspace(lo, up, num=n, dtype=float) - - # xp=known x-coords, fp=known y-cords, x=unknown x-coords; returns interpolated (e.g., unknown) y-coords - interp_x: npt.NDArray[float] = np.linspace(from_, to_, num=n_user) - interp_y: Approx = approx(xords, kords, interp_x) - - return DensityResult( - x=interp_x, - y=interp_y.y, - x_grid=xords, - y_grid=kords, - bw=bw_calc, - n=n, - ) diff --git a/main/como/peak_finder.py b/main/como/peak_finder.py deleted file mode 100644 index 230db014..00000000 --- a/main/como/peak_finder.py +++ /dev/null @@ -1,191 +0,0 @@ -import re -from collections.abc import Sequence -from typing import Literal, cast, overload - -import numpy as np -import numpy.typing as npt -import pandas as pd - -from como.utils import _num_rows - - -def _validate_args( - x: npt.NDArray[np.number], - nups: int, - ndowns: int, - zero: Literal["0", "+", "-"], - min_peak_height: float, - min_peak_distance: int, - threshold: float, -): - """Validate input arguments to `find_peaks` function. - - Function created to reduce the complexity of `find_peaks` (ruff linting rule C901) - """ - if x.ndim != 1: - raise ValueError(f"Expected a 1D array, got {x.ndim}D array instead.") - if np.any(np.isnan(x)): - raise ValueError("Input x contains NaNs") - if nups < 0: - raise ValueError("Argument 'nups' must be non-negative") - if ndowns < 0: - raise ValueError("Argument 'ndowns' must be non-negative") - if zero not in {"0", "+", "-"}: - raise ValueError("Argument 'zero' must be '0', '+', or '-'") - if min_peak_height < 0: - raise ValueError("Argument 'min_peak_height' must be non-negative") - if min_peak_distance < 0: - raise ValueError("Argument 'minpeakdistance' must be non-negative") - if threshold < 0: - raise ValueError("Argument 'threshold' must be non-negative") - - -def _encode_signs(x: npt.NDArray[np.number], zero: str) -> str: - """Encode the signs of the differences in `x` into a string. - - Function created to reduce the complexity of `find_peaks` (ruff linting rule C901) - """ - diff_signs = np.sign(np.diff(x)) - chars = ["+" if d > 0 else "-" if d < 0 else "0" for d in diff_signs] - x_chars = "".join(chars) - if zero != "0": - x_chars = x_chars.replace("0", zero) - return x_chars - - -@overload -def _enforce_minimum_peak_distance(df: pd.DataFrame, min_peak_distance: int, inplace: Literal[True] = True) -> None: ... - - -@overload -def _enforce_minimum_peak_distance(df: pd.DataFrame, min_peak_distance: int, inplace: Literal[False] = False) -> pd.DataFrame: ... - - -def _enforce_minimum_peak_distance(df: pd.DataFrame, min_peak_distance: int, inplace: bool = True) -> None | pd.DataFrame: - """Enforces a minimum peak distance between identified peaks. - - Function created to reduce the complexity of `find_peaks` (ruff linting rule C901) - - Modifies `df` inplace - """ - good_peaks = np.ones(_num_rows(df), dtype=bool) - for i in range(_num_rows(df)): - if not good_peaks[i]: - continue - dpos = np.abs(df.at[i, "peak_idx"] - df["peak_idx"].values) - close = (dpos > 0) & (dpos < min_peak_distance) - good_peaks[close] = False - - if inplace: - df.drop(df.index[~good_peaks], inplace=True) - df.reset_index(drop=True, inplace=True) - return None - return cast(pd.DataFrame, df.iloc[good_peaks, :].reset_index(drop=True)) - - -def find_peaks( - x: Sequence[float] | Sequence[int] | npt.NDArray[np.number], - nups: int = 1, - ndowns: int | None = None, - zero: Literal["0", "+", "-"] = "0", - peak_pattern: str | None = None, - min_peak_height: float = 0.02, # default for R's zFPKM `peak_parameters` - min_peak_distance: int = 1, # default for R's zFPKM `peak_parameters` - threshold: float = 0.0, - npeaks: int = 0, - sortstr: bool = False, -) -> pd.DataFrame: - """Identify peaks in a given time series. - - This function is modelled after R's `pracma::findpeaks` function. - SciPy's `scipy.signal.find_peaks` provides different results than `pracma::findpeaks`, - resulting in the requirement for this translation. - - References: - 1) pracma::findpeaks: https://rdrr.io/cran/pracma/man/findpeaks.html - - Args: - x: numerical vector taken as a time series (no NA values allowed) - nups: minimum number of increasing steps before a peak is reached - ndowns: minimum number of decreasing steps after the peak (defaults to the same value as `nups`) - zero: can be '+', '-', or '0'; how to interprete succeeding steps of the same value: increasing, decreasing, or special - peak_pattern: define a peak as a regular pattern, such as the default pattern `[+]{1,}[-]{1,}` - If a pattern is provided, parameters `nups` and `ndowns` are not taken into account - min_peak_height: the minimum (absolute) height a peak has to have before being recognized - min_peak_distance: the minimum distance (in indices) between peaks before they are counted - threshold: the minimum difference in height between a peak and its surrounding values - npeaks: the number of peaks to return (<=0 returns all) - sortstr: should the peaks be returned in decreasing order of their peak height? - - Returns: - A dataframe with the columns: - height: the height of the peak - peak_idx: the index (from `x`) of the identified peak - start_idx: the starting index (from `x`) of the identified peak - end_idx: the ending index (from `x`) of the identified peak - If the dataframe is empty, no peaks could be identified - """ - x = np.asarray(x, dtype=float) if not isinstance(x, np.ndarray) else x - npeaks: int = max(npeaks, 0) - ndowns: int = ndowns or nups - peak_pattern: str = peak_pattern or rf"[+]{{{nups},}}[-]{{{ndowns},}}" - _validate_args( - x=x, - nups=nups, - ndowns=ndowns, - zero=zero, - min_peak_height=min_peak_height, - min_peak_distance=min_peak_distance, - threshold=threshold, - ) - - # find peaks by regex matching the sign pattern - derivative_chars = _encode_signs(x=x, zero=zero) - matches: list[re.Match[str]] = list(re.finditer(peak_pattern, derivative_chars)) - if not matches: - return pd.DataFrame(columns=["height", "peak_idx", "start_idx", "end_idx"]) - - pattern_start_index: npt.NDArray[int] = np.asarray([m.start() for m in matches], dtype=int) - pattern_end_index: npt.NDArray[int] = np.asarray([m.end() for m in matches], dtype=int) - - num_matches: int = len(pattern_start_index) - peak_index: npt.NDArray[int] = np.zeros(num_matches, dtype=int) - peak_height: npt.NDArray[float] = np.zeros(num_matches, dtype=float) - - # for each match region, find the local max - for i in range(num_matches): - segment: npt.NDArray[np.uint64] = x[pattern_start_index[i] : pattern_end_index[i] + 1] - segment_max_idx: np.uint64 = np.argmax(segment) - peak_index[i] = pattern_start_index[i] + segment_max_idx - peak_height[i] = segment[segment_max_idx] - - # filter for values that are too low or below the threshold difference - x_left: float = x[pattern_start_index] - x_right: float = x[np.minimum(pattern_end_index, x.size - 1)] - valid_peaks: list[int] = list(np.where((peak_height >= min_peak_height) & ((peak_height - np.maximum(x_left, x_right)) >= threshold))[0].astype(int)) # noqa: E501 # fmt: skip - - if len(valid_peaks) == 0: - return pd.DataFrame(columns=["height", "peak_idx", "start_idx", "end_idx"]) - - out_df: pd.DataFrame = pd.DataFrame( - data={ - "height": peak_height[valid_peaks], - "peak_idx": peak_index[valid_peaks], - "start_idx": pattern_start_index[valid_peaks], - "end_idx": pattern_end_index[valid_peaks], - } - ) - - # sort by peak height in descending order (largest to smallest) - if sortstr or min_peak_distance > 1: - out_df.sort_values(by=["height"], ascending=False, ignore_index=True, inplace=True) - - # if provided, enforce a minimum distance between the peaks (removes smaller peaks next to larger ones) - if min_peak_distance > 1 and len(out_df) > 1: - _enforce_minimum_peak_distance(df=out_df, min_peak_distance=min_peak_distance) - - # if provided, limit the number of peaks returned - if 0 < npeaks < _num_rows(out_df): - out_df = cast(pd.DataFrame, out_df.iloc[:npeaks, :].reset_index(drop=True)) - - return out_df diff --git a/tests/unit/test_approx.py b/tests/unit/test_approx.py deleted file mode 100644 index 772dbc77..00000000 --- a/tests/unit/test_approx.py +++ /dev/null @@ -1,237 +0,0 @@ -import numpy as np -import pytest - -from como.approx import _coerce_to_float_array, _regularize_values, approx - - -class TestCoerceToFloatArray: - """Tests for the _coerce_to_float_array helper function.""" - - def test_converts_int_list_to_array(self): - result = _coerce_to_float_array([1, 2, 3]) - assert isinstance(result, np.ndarray) - assert result.dtype == float - np.testing.assert_array_equal(result, [1.0, 2.0, 3.0]) - - def test_converts_float_list_to_array(self): - result = _coerce_to_float_array([1.0, 2.0, 3.0]) - assert isinstance(result, np.ndarray) - assert result.dtype == float - np.testing.assert_array_equal(result, [1.0, 2.0, 3.0]) - - -class TestRegularizeValues: - """Tests for the _regularize_values function.""" - - def test_removes_na_pairs_when_na_rm_true(self): - x: list[float] = np.array([1.0, 2.0, np.nan, 4.0]) - y: list[float] = np.array([10.0, np.nan, 30.0, 40.0]) - result = _regularize_values(x, y, na_rm=True, ties="mean") - np.testing.assert_array_equal(result.x, [1.0, 4.0]) - np.testing.assert_array_equal(result.y, [10.0, 40.0]) - - def test_raises_error_with_na_in_x_when_na_rm_false(self): - x: list[float] = np.array([1.0, np.nan, 3.0]) - y: list[float] = np.array([10.0, 20.0, 30.0]) - with pytest.raises(ValueError, match="NA values in x are not allowed"): - _regularize_values(x, y, na_rm=False, ties="mean") - - def test_sorts_by_x(self): - x: list[float] = np.array([3.0, 1.0, 2.0]) - y: list[float] = np.array([30.0, 10.0, 20.0]) - result = _regularize_values(x, y, na_rm=True, ties="mean") - np.testing.assert_array_equal(result.x, [1.0, 2.0, 3.0]) - np.testing.assert_array_equal(result.y, [10.0, 20.0, 30.0]) - - def test_aggregates_duplicates_with_mean(self): - x: list[float] = np.array([1.0, 1.0, 2.0]) - y: list[float] = np.array([10.0, 20.0, 30.0]) - result = _regularize_values(x, y, na_rm=True, ties="mean") - np.testing.assert_array_equal(result.x, [1.0, 2.0]) - np.testing.assert_array_equal(result.y, [15.0, 30.0]) - - def test_aggregates_duplicates_with_first(self): - x: list[float] = np.array([1.0, 1.0, 2.0]) - y: list[float] = np.array([10.0, 20.0, 30.0]) - result = _regularize_values(x, y, na_rm=True, ties="first") - np.testing.assert_array_equal(result.x, [1.0, 2.0]) - np.testing.assert_array_equal(result.y, [10.0, 30.0]) - - def test_aggregates_duplicates_with_last(self): - x: list[float] = np.array([1.0, 1.0, 2.0]) - y: list[float] = np.array([10.0, 20.0, 30.0]) - result = _regularize_values(x, y, na_rm=True, ties="last") - np.testing.assert_array_equal(result.x, [1.0, 2.0]) - np.testing.assert_array_equal(result.y, [20.0, 30.0]) - - def test_handles_empty_arrays(self): - x: list[float] = np.array([]) - y: list[float] = np.array([]) - result = _regularize_values(x, y, na_rm=True, ties="mean") - assert result.x.size == 0 - assert result.y.size == 0 - assert result.not_na.size == 0 - - def test_callable_ties_function(self): - x: list[float] = np.array([1.0, 1.0, 2.0]) - y: list[float] = np.array([10.0, 20.0, 30.0]) - result = _regularize_values(x, y, na_rm=True, ties=np.sum) - np.testing.assert_array_equal(result.x, [1.0, 2.0]) - np.testing.assert_array_equal(result.y, [30.0, 30.0]) - - -class TestApprox: - """Tests for the main approx function.""" - - def test_basic_linear_interpolation(self): - x: list[float] = [1.0, 2.0, 3.0] - y: list[float] = [10.0, 20.0, 30.0] - xout: list[float] = [1.5, 2.5] - result = approx(x, y, xout=xout) - np.testing.assert_array_almost_equal(result.y, [15.0, 25.0]) - - def test_one_argument_form(self): - y: list[float] = [10.0, 20.0, 30.0] - result = approx(y, xout=[1.5, 2.5]) - np.testing.assert_array_almost_equal(result.y, [15.0, 25.0]) - - def test_default_n_points(self): - x: list[float] = [1.0, 5.0] - y: list[float] = [10.0, 50.0] - result = approx(x, y) - assert len(result.x) == 50 - assert len(result.y) == 50 - - def test_custom_n_points(self): - x: list[float] = [1.0, 5.0] - y: list[float] = [10.0, 50.0] - result = approx(x, y, n=10) - assert len(result.x) == 10 - - def test_extrapolation_rule_1(self): - x: list[float] = [1.0, 2.0, 3.0] - y: list[float] = [10.0, 20.0, 30.0] - xout: list[float] = [0.5, 3.5] - result = approx(x, y, xout=xout, rule=1) - assert np.isnan(result.y[0]) - assert np.isnan(result.y[1]) - - def test_extrapolation_rule_2(self): - x: list[float] = [1.0, 2.0, 3.0] - y: list[float] = [10.0, 20.0, 30.0] - xout: list[float] = [0.5, 3.5] - result = approx(x, y, xout=xout, rule=2) - assert result.y[0] == 10.0 - assert result.y[1] == 30.0 - - def test_yleft_yright_parameters(self): - x: list[float] = [1.0, 2.0, 3.0] - y: list[float] = [10.0, 20.0, 30.0] - xout: list[float] = [0.5, 3.5] - result = approx(x, y, xout=xout, yleft=5.0, yright=35.0) - assert result.y[0] == 5.0 - assert result.y[1] == 35.0 - - def test_constant_method_f_0(self): - x: list[float] = [1.0, 2.0, 3.0] - y: list[float] = [10.0, 20.0, 30.0] - xout: list[float] = [1.5] - result = approx(x, y, xout=xout, method="constant", f=0.0) - assert result.y[0] == 10.0 - - def test_constant_method_f_1(self): - x: list[float] = [1.0, 2.0, 3.0] - y: list[float] = [10.0, 20.0, 30.0] - xout: list[float] = [1.5] - result = approx(x, y, xout=xout, method="constant", f=1.0) - assert result.y[0] == 20.0 - - def test_constant_method_f_05(self): - x: list[float] = [1.0, 2.0, 3.0] - y: list[float] = [10.0, 20.0, 30.0] - xout: list[float] = [1.5] - result = approx(x, y, xout=xout, method="constant", f=0.5) - assert result.y[0] == 15.0 - - def test_exact_match_returns_exact_value(self): - x: list[float] = [1.0, 2.0, 3.0] - y: list[float] = [10.0, 20.0, 30.0] - xout: list[float] = [2.0] - result = approx(x, y, xout=xout) - assert result.y[0] == 20.0 - - def test_na_in_xout_returns_nan(self): - x: list[float] = [1.0, 2.0, 3.0] - y: list[float] = [10.0, 20.0, 30.0] - xout: list[float] = [np.nan, 2.0] - result = approx(x, y, xout=xout) - assert np.isnan(result.y[0]) - assert result.y[1] == 20.0 - - def test_method_numeric_codes(self): - x: list[float] = [1.0, 2.0] - y: list[float] = [10.0, 20.0] - xout: list[float] = [1.5] - result1 = approx(x, y, xout=xout, method=1) - result2 = approx(x, y, xout=xout, method=2, f=0.0) - assert result1.y[0] == 15.0 - assert result2.y[0] == 10.0 - - def test_raises_error_different_length_xy(self): - x: list[float] = [1.0, 2.0] - y: list[float] = [10.0] - with pytest.raises(ValueError, match="x and y must have same length"): - approx(x, y) - - def test_raises_error_invalid_method(self): - x: list[float] = [1.0, 2.0] - y: list[float] = [10.0, 20.0] - with pytest.raises(ValueError, match="invalid interpolation method"): - approx(x, y, method="invalid") - - def test_raises_error_invalid_method_code(self): - x: list[float] = [1.0, 2.0] - y: list[float] = [10.0, 20.0] - with pytest.raises(ValueError, match="invalid interpolation method"): - approx(x, y, method=3) - - def test_raises_error_invalid_f(self): - x: list[float] = [1.0, 2.0] - y: list[float] = [10.0, 20.0] - with pytest.raises(ValueError, match="invalid f value"): - approx(x, y, method="constant", f=2.0) - - def test_raises_error_need_two_points_for_linear(self): - x: list[float] = [1.0] - y: list[float] = [10.0] - with pytest.raises(ValueError, match="need at least two non-NA values"): - approx(x, y, method="linear") - - def test_raises_error_zero_points(self): - x: list[float] = [] - y: list[float] = [] - with pytest.raises(ValueError, match="zero non-NA points"): - approx(x, y, method="constant") - - def test_handles_ties_mean(self): - x: list[float] = [1.0, 1.0, 2.0] - y: list[float] = [10.0, 20.0, 30.0] - xout: list[float] = [1.5] - result = approx(x, y, xout=xout, ties="mean") - assert result.y[0] == 22.5 - - def test_rule_as_list(self): - x: list[float] = [1.0, 2.0, 3.0] - y: list[float] = [10.0, 20.0, 30.0] - xout: list[float] = [0.5, 3.5] - result = approx(x, y, xout=xout, rule=[1, 2]) - assert np.isnan(result.y[0]) - assert result.y[1] == 30.0 - - def test_na_rm_false_with_na_in_y(self): - x: list[float] = [1.0, 2.0, 3.0] - y: list[float] = [10.0, np.nan, 30.0] - xout: list[float] = [2.5] - result = approx(x, y, xout=xout, na_rm=False) - # After filtering out NA, should interpolate between 1.0->10.0 and 3.0->30.0 - assert result.y[0] == 25.0 diff --git a/tests/unit/test_density.py b/tests/unit/test_density.py deleted file mode 100644 index c4d88721..00000000 --- a/tests/unit/test_density.py +++ /dev/null @@ -1,226 +0,0 @@ -from typing import Literal, cast - -import numpy as np -import numpy.typing as npt -import pytest -from numpy.testing import assert_allclose, assert_array_equal - -from como.density import DensityResult, bin_distance, density, dnorm, nrd0 - -KERNEL_TYPE = Literal["gaussian", "epanechnikov", "rectangular", "triangular", "biweight", "cosine", "optcosine"] - - -class TestBinDistance: - def test_basic_binning(self): - x: npt.NDArray[float] = np.array([0.5, 1.5, 2.5]) - weights: npt.NDArray[float] = np.array([1.0, 1.0, 1.0]) - result: npt.NDArray[float] = bin_distance(x, weights, lo=0, up=3, n=4) - - assert result.shape == (8,) - assert np.all(result >= 0) - - def test_weighted_binning(self): - x: npt.NDArray[float] = np.array([1.0, 2.0]) - weights: npt.NDArray[float] = np.array([2.0, 1.0]) - result: npt.NDArray[float] = bin_distance(x, weights, lo=0, up=3, n=4) - - assert result.shape == (8,) - assert result.sum() > 0 - - def test_empty_array(self): - x: npt.NDArray[float] = np.array([]) - weights: npt.NDArray[float] = np.array([]) - result: npt.NDArray[float] = bin_distance(x, weights, lo=0, up=1, n=2) - - assert result.shape == (4,) - assert_array_equal(result, np.zeros(4)) - - def test_out_of_bounds_handling(self): - x: npt.NDArray[float] = np.array([10.0]) - weights: npt.NDArray[float] = np.array([1.0]) - result: npt.NDArray[float] = bin_distance(x, weights, lo=0, up=1, n=2) - - assert result.shape == (4,) - - -class TestDnorm: - def test_standard_normal(self): - # dnorm(0) for standard normal should be ~0.3989 - result: float = dnorm(0.0, mean=0.0, sd=1.0) - expected: float = 1.0 / np.sqrt(2 * np.pi) - assert_allclose(result, expected, rtol=1e-10) - - def test_with_mean_and_sd(self): - result: float = dnorm(5.0, mean=5.0, sd=2.0) - expected: float = 1.0 / (2.0 * np.sqrt(2 * np.pi)) - assert_allclose(result, expected, rtol=1e-10) - - def test_log_density(self): - result: float = dnorm(0.0, mean=0.0, sd=1.0, log=True) - expected: float = np.log(1.0 / np.sqrt(2 * np.pi)) - assert_allclose(result, expected, rtol=1e-10) - - def test_nan_inputs(self): - assert np.isnan(dnorm(np.nan)) - assert np.isnan(dnorm(0.0, mean=np.nan)) - assert np.isnan(dnorm(0.0, sd=np.nan)) - - def test_negative_sd(self): - result: float = dnorm(0.0, sd=-1.0) - assert np.isnan(result) - - def test_infinite_sd(self): - result: float = dnorm(0.0, sd=np.inf) - assert result == 0.0 or result == -np.inf - - def test_zero_sd_at_mean(self): - result: float = dnorm(5.0, mean=5.0, sd=0.0) - assert np.isinf(result) - - def test_zero_sd_away_from_mean(self): - result: float = dnorm(5.0, mean=3.0, sd=0.0, log=False) - assert result == 0.0 - - def test_fast_dnorm_flag(self): - result_fast: float = dnorm(1.0, fast_dnorm=True) - result_slow: float = dnorm(1.0, fast_dnorm=False) - assert_allclose(result_fast, result_slow, rtol=1e-5) - - def test_large_values(self): - result: float = dnorm(100.0, mean=0.0, sd=1.0) - assert result >= 0.0 - assert result < 1e-10 - - -class TestNrd0: - def test_basic_calculation(self): - x: npt.NDArray[float] = np.array([1.0, 2.0, 3.0, 4.0, 5.0]) - result: float = nrd0(x) - assert result > 0 - assert np.isfinite(result) - - def test_with_constant_values(self): - x: npt.NDArray[float] = np.array([5.0, 5.0, 5.0, 5.0]) - result: float = nrd0(x) - assert result > 0 - assert np.isfinite(result) - - def test_single_nonzero_value(self): - x: npt.NDArray[float] = np.array([0.0, 7.0]) - result: float = nrd0(x) - assert result > 0 - - def test_all_zeros(self): - # if the input array is changed from a shape of `(3,)`, the result will change - # this is because `nrd0` takes the length of the input into account when calculating bandwidth - x: npt.NDArray[float] = np.array([0.0, 0.0, 0.0], dtype=float) - result: float = nrd0(x) - assert result == 0.7224674055842076 - - def test_insufficient_data(self): - with pytest.raises(ValueError, match="need at least 2 data points"): - nrd0(np.array([1.0])) - - def test_empty_array(self): - with pytest.raises(ValueError, match="need at least 2 data points"): - nrd0(np.array([])) - - -class TestDensity: - def test_basic_density(self): - x: npt.NDArray[float] = np.random.normal(0, 1, 100) - result: DensityResult = density(x) - - assert isinstance(result, DensityResult) - assert len(result.x) == 512 - assert len(result.y) == 512 - assert result.bw > 0 - assert np.all(result.y >= 0) - - def test_custom_bandwidth(self): - x: npt.NDArray[float] = np.array([1.0, 2.0, 3.0, 4.0, 5.0]) - result: DensityResult = density(x, bw=0.5) - - assert_allclose(result.bw, 0.5, rtol=1e-10) - - def test_with_weights(self): - x: npt.NDArray[float] = np.array([1.0, 2.0, 3.0, 4.0, 5.0]) - weights: npt.NDArray[float] = np.array([0.1, 0.2, 0.4, 0.2, 0.1]) - result: DensityResult = density(x, weights=weights, n=5) - - assert len(result.x) == 5 - assert np.all(result.y >= 0) - - def test_custom_grid_range(self): - x: npt.NDArray[float] = np.array([1.0, 2.0, 3.0, 4.0, 5.0]) - result: DensityResult = density(x, from_=0, to_=6, n=100) - assert result.x[0] == 0 - assert result.x[-1] == 6 - assert len(result.x) == 100 - - def test_different_kernels(self): - x: npt.NDArray[float] = np.random.normal(0, 1, 50) - - for kernel in ["epanechnikov", "rectangular", "triangular", "biweight", "cosine", "optcosine"]: - with pytest.raises(NotImplementedError, match=f"Only 'gaussian' kernel is implemented; got '{kernel}'"): - density(x, kernel=cast(KERNEL_TYPE, kernel)) - - def test_kernel_only_mode(self): - result: npt.NDArray[float] = density([1, 2, 3], kernel="gaussian", kernel_only=True) - expected: float = 1 / (2 * np.sqrt(np.pi)) - assert isinstance(result, float) - assert_allclose(result, expected, rtol=1e-10) - - def test_na_removal(self): - x: npt.NDArray[float] = np.array([1.0, 2.0, np.nan, 4.0, 5.0]) - result: DensityResult = density(x, remove_na=True) - - assert len(result.x) == 512 - assert np.all(np.isfinite(result.y)) - - def test_na_without_removal(self): - x: npt.NDArray[float] = np.array([1.0, 2.0, np.nan, 4.0, 5.0]) - - with pytest.raises(ValueError, match="NA values found"): - density(x, remove_na=False) - - def test_insufficient_data_for_nrd0(self): - with pytest.raises(ValueError, match="at least two points"): - density([1.0], bw="nrd0") - - def test_adjust_parameter(self): - x: npt.NDArray[float] = np.array([1.0, 2.0, 3.0, 4.0, 5.0]) - result1: DensityResult = density(x, bw=1.0, adjust=1.0) - result2: DensityResult = density(x, bw=1.0, adjust=2.0) - - assert_allclose(result2.bw, 2.0 * result1.bw, rtol=1e-10) - - def test_invalid_bandwidth_string(self): - x: npt.NDArray[float] = np.array([1.0, 2.0, 3.0]) - - with pytest.raises(TypeError, match="must be a number or 'nrd0'"): - density(x, bw="invalid") - - def test_negative_weights(self): - x: npt.NDArray[float] = np.array([1.0, 2.0, 3.0]) - weights: npt.NDArray[float] = np.array([1.0, -1.0, 1.0]) - - with pytest.raises(ValueError, match="Negative values found"): - density(x, weights=weights) - - def test_infinite_values_in_x(self): - x: npt.NDArray[float] = np.array([1.0, 2.0, np.inf, 4.0, 5.0]) - result: DensityResult = density(x) - - assert np.all(np.isfinite(result.y)) - - def test_result_named_tuple_fields(self): - x: npt.NDArray[float] = np.array([1.0, 2.0, 3.0, 4.0, 5.0]) - result: DensityResult = density(x, n=100) - - assert hasattr(result, "x") - assert hasattr(result, "y") - assert hasattr(result, "x_grid") - assert hasattr(result, "y_grid") - assert hasattr(result, "bw") - assert hasattr(result, "n") diff --git a/tests/unit/test_peak_finder.py b/tests/unit/test_peak_finder.py deleted file mode 100644 index 7d2d730d..00000000 --- a/tests/unit/test_peak_finder.py +++ /dev/null @@ -1,204 +0,0 @@ -import numpy as np -import numpy.typing as npt -import pandas as pd -import pytest - -from como.peak_finder import ( - _encode_signs, - _enforce_minimum_peak_distance, - _validate_args, - find_peaks, -) - - -class TestValidateArgs: - def test_multidimensional_array_raises_error(self): - x: npt.NDArray[float] = np.array([[1, 2], [3, 4]]) - with pytest.raises(ValueError, match="Expected a 1D array, got 2D array instead"): - _validate_args(x, 1, 1, "0", 0.0, 1, 0.0) - - def test_nan_values_raise_error(self): - x: npt.NDArray[float] = np.array([1.0, 2.0, np.nan, 4.0]) - with pytest.raises(ValueError, match="Input x contains NaNs"): - _validate_args(x, 1, 1, "0", 0.0, 1, 0.0) - - def test_negative_nups_raises_error(self): - x: npt.NDArray[float] = np.array([1.0, 2.0, 3.0]) - with pytest.raises(ValueError, match="Argument 'nups' must be non-negative"): - _validate_args(x, -1, 1, "0", 0.0, 1, 0.0) - - def test_negative_ndowns_raises_error(self): - x: npt.NDArray[float] = np.array([1.0, 2.0, 3.0]) - with pytest.raises(ValueError, match="Argument 'ndowns' must be non-negative"): - _validate_args(x, 1, -1, "0", 0.0, 1, 0.0) - - def test_invalid_zero_raises_error(self): - x: npt.NDArray[float] = np.array([1.0, 2.0, 3.0]) - with pytest.raises(ValueError, match="Argument 'zero' must be '0', '\\+', or '-'"): - _validate_args(x, 1, 1, "x", 0.0, 1, 0.0) # type: ignore[invalid-type-argument] - - def test_negative_min_peak_height_raises_error(self): - x: npt.NDArray[float] = np.array([1.0, 2.0, 3.0]) - with pytest.raises(ValueError, match="Argument 'min_peak_height' must be non-negative"): - _validate_args(x, 1, 1, "0", -1.0, 1, 0.0) - - def test_negative_min_peak_distance_raises_error(self): - x: npt.NDArray[float] = np.array([1.0, 2.0, 3.0]) - with pytest.raises(ValueError, match="Argument 'minpeakdistance' must be non-negative"): - _validate_args(x, 1, 1, "0", 0.0, -1, 0.0) - - def test_negative_threshold_raises_error(self): - x: npt.NDArray[float] = np.array([1.0, 2.0, 3.0]) - with pytest.raises(ValueError, match="Argument 'threshold' must be non-negative"): - _validate_args(x, 1, 1, "0", 0.0, 1, -1.0) - - def test_valid_args_does_not_raise(self): - x: npt.NDArray[float] = np.array([1.0, 2.0, 3.0]) - _validate_args(x, 1, 1, "0", 0.0, 1, 0.0) - - -class TestEncodeSigns: - def test_increasing_sequence(self): - x: npt.NDArray[float] = np.array([1.0, 2.0, 3.0, 4.0]) - result: str = _encode_signs(x, "0") - assert result == "+++" - - def test_decreasing_sequence(self): - x: npt.NDArray[float] = np.array([4.0, 3.0, 2.0, 1.0]) - result: str = _encode_signs(x, "0") - assert result == "---" - - def test_flat_sequence_with_zero(self): - x: npt.NDArray[float] = np.array([1.0, 1.0, 1.0]) - result: str = _encode_signs(x, "0") - assert result == "00" - - def test_flat_sequence_with_plus(self): - x: npt.NDArray[float] = np.array([1.0, 1.0, 1.0]) - result: str = _encode_signs(x, "+") - assert result == "++" - - def test_flat_sequence_with_minus(self): - x: npt.NDArray[float] = np.array([1.0, 1.0, 1.0]) - result: str = _encode_signs(x, "-") - assert result == "--" - - def test_flat_sequence_with_dollarsign(self): - x: npt.NDArray[float] = np.array([1.0, 1.0, 1.0]) - result: str = _encode_signs(x, "$") - assert result == "$$" - - def test_mixed_sequence(self): - x: npt.NDArray[float] = np.array([1.0, 2.0, 2.0, 3.0, 2.0]) - result: str = _encode_signs(x, "0") - assert result == "+0+-" - - -class TestEnforceMinimumPeakDistance: - def test_inplace_removes_close_peaks(self): - df: pd.DataFrame = pd.DataFrame( - { - "height": [10.0, 8.0, 5.0], - "peak_idx": [0, 2, 10], - "start_idx": [0, 1, 9], - "end_idx": [1, 3, 11], - } - ) - _enforce_minimum_peak_distance(df, min_peak_distance=5, inplace=True) - assert len(df) == 2 - assert df["peak_idx"].tolist() == [0, 10] - - def test_not_inplace_returns_new_dataframe(self): - df: pd.DataFrame = pd.DataFrame( - { - "height": [10.0, 8.0, 5.0], - "peak_idx": [0, 2, 10], - "start_idx": [0, 1, 9], - "end_idx": [1, 3, 11], - } - ) - result = _enforce_minimum_peak_distance(df, min_peak_distance=5, inplace=False) - assert len(result) == 2 - assert len(df) == 3 # original unchanged - assert result["peak_idx"].tolist() == [0, 10] - - def test_all_peaks_sufficiently_spaced(self): - df: pd.DataFrame = pd.DataFrame( - { - "height": [10.0, 8.0], - "peak_idx": [0, 10], - "start_idx": [0, 9], - "end_idx": [1, 11], - } - ) - _enforce_minimum_peak_distance(df, min_peak_distance=5, inplace=True) - assert len(df) == 2 - - -class TestFindPeaks: - def test_simple_peak(self): - x: npt.NDArray[float] = np.asarray([0.0, 1.0, 2.0, 3.0, 2.0, 1.0, 0.0], dtype=float) - result: pd.DataFrame = find_peaks(x, min_peak_height=0.0) - assert len(result) == 1 - assert result.iloc[0]["peak_idx"] == 3 - assert result.iloc[0]["height"] == 3.0 - - def test_multiple_peaks(self): - x: npt.NDArray[float] = np.asarray([0.0, 1.0, 0.0, 2.0, 0.0, 3.0, 0.0], dtype=float) - result: pd.DataFrame = find_peaks(x, min_peak_height=0.0) - assert len(result) == 3 - - def test_no_peaks(self): - x: npt.NDArray[float] = np.asarray([1.0, 2.0, 3.0, 4.0, 5.0], dtype=float) - result: pd.DataFrame = find_peaks(x) - assert len(result) == 0 - assert list(result.columns) == ["height", "peak_idx", "start_idx", "end_idx"] - - def test_min_peak_height_filter(self): - x: npt.NDArray[float] = np.asarray([0.0, 1.0, 0.0, 5.0, 0.0], dtype=float) - result: pd.DataFrame = find_peaks(x, min_peak_height=2.0) - assert len(result) == 1 - assert result.iloc[0]["height"] == 5.0 - - def test_nups_and_ndowns(self): - x: npt.NDArray[float] = np.asarray([0.0, 1.0, 2.0, 3.0, 2.0, 1.0, 0.0], dtype=float) - result: pd.DataFrame = find_peaks(x, nups=2, ndowns=2, min_peak_height=0.0) - assert len(result) == 1 - - def test_npeaks_limits_output(self): - x: npt.NDArray[float] = np.asarray([0.0, 5.0, 0.0, 3.0, 0.0, 1.0, 0.0], dtype=float) - result: pd.DataFrame = find_peaks(x, npeaks=2, min_peak_height=0.0) - assert len(result) == 2 - - def test_sortstr_orders_by_height(self): - x: npt.NDArray[float] = np.asarray([0.0, 1.0, 0.0, 5.0, 0.0, 3.0, 0.0], dtype=float) - result: pd.DataFrame = find_peaks(x, sortstr=True, min_peak_height=0.0) - heights = result["height"].tolist() - assert heights == sorted(heights, reverse=True) - - def test_min_peak_distance(self): - x: npt.NDArray[float] = np.asarray([0.0, 10.0, 0.0, 8.0, 0.0], dtype=float) - result: pd.DataFrame = find_peaks(x, min_peak_distance=3, min_peak_height=0.0) - assert len(result) == 1 - assert result.iloc[0]["height"] == 10.0 - - def test_threshold_filter(self): - x: npt.NDArray[float] = np.asarray([1.0, 2.0, 1.5, 5.0, 1.0], dtype=float) - result: pd.DataFrame = find_peaks(x, threshold=2.0, min_peak_height=0.0) - assert len(result) == 1 - assert result.iloc[0]["height"] == 5.0 - - def test_accepts_list_input(self): - x: npt.NDArray[float] = np.asarray([0.0, 1.0, 2.0, 1.0, 0.0], dtype=float) - result: pd.DataFrame = find_peaks(x, min_peak_height=0.0) - assert len(result) == 1 - - def test_accepts_numpy_array(self): - x: npt.NDArray[float] = np.array([0.0, 1.0, 2.0, 1.0, 0.0]) - result: pd.DataFrame = find_peaks(x, min_peak_height=0.0) - assert len(result) == 1 - - def test_custom_peak_pattern(self): - x: npt.NDArray[float] = np.asarray([0.0, 1.0, 2.0, 3.0, 2.0, 1.0, 0.0], dtype=float) - result: pd.DataFrame = find_peaks(x, peak_pattern=r"[+]{3,}[-]{3,}", min_peak_height=0.0) - assert len(result) == 1