diff --git a/pytensor_distributions/extgenpareto.py b/pytensor_distributions/extgenpareto.py new file mode 100644 index 0000000..8015f90 --- /dev/null +++ b/pytensor_distributions/extgenpareto.py @@ -0,0 +1,186 @@ +"""Extended Generalized Pareto Distribution (ExtGPD). + +The ExtGPD is a transformation of the GPD that provides a smooth connection +between the upper tail and the main body of the distribution. This avoids +explicit threshold selection and helps model the entire distribution. + +The CDF is G(x) = H(x)^kappa where H is the GPD CDF. When kappa=1, this +reduces to the standard GPD. + +Parameters +---------- +mu : location parameter (threshold) +sigma : scale parameter (sigma > 0) +xi : shape parameter controlling upper tail behavior + xi > 0: heavy tail (Pareto-like) + xi = 0: exponential tail + xi < 0: bounded upper tail +kappa : lower tail parameter (kappa > 0) + Controls behavior near the lower bound. When kappa=1, reduces to GPD. + +References +---------- +.. [1] Naveau, P., Huser, R., Ribereau, P., and Hannart, A. (2016). + Modeling jointly low, moderate, and heavy rainfall intensities without + a threshold selection. Water Resources Research, 52(4), 2753-2769. + +.. [2] Papastathopoulos, I. and Tawn, J. A. (2013). + Extended generalised Pareto models for tail estimation. + Journal of Statistical Planning and Inference, 143(1), 131-143. +""" + +import pytensor.tensor as pt + +from pytensor_distributions.genpareto import _gpd_isf, _log1p_ratio, _safe_mul, _upper_bound +from pytensor_distributions.helper import ( + continuous_entropy, + continuous_kurtosis, + continuous_mean, + continuous_mode, + continuous_skewness, + continuous_variance, + ppf_bounds_cont, +) + +# --- Core distribution functions --- + + +def logpdf(x, mu, sigma, xi, kappa): + """Log-probability density function. + + log g(x) = log(kappa) + (kappa-1)*log(H(x)) + log(h(x)) + where H is the GPD CDF and h is the GPD PDF. + + Uses _log1p_ratio to compute log(h) and H without branching on xi. + """ + scaled = (x - mu) / sigma + t = _safe_mul(xi, scaled) + lr = _log1p_ratio(xi, scaled) # = log1p(t)/xi + + # GPD survival and CDF via _log1p_ratio + S = pt.exp(-lr) + H = 1 - S + + # GPD log-PDF: -log(sigma) - log1p(t) - log1p(t)/xi + log_h = -pt.log(sigma) - pt.log1p(t) - lr + + # ExtGPD log-PDF + logp_expr = pt.log(kappa) + (kappa - 1) * pt.log(H) + log_h + + return pt.switch( + pt.and_(pt.and_(pt.ge(scaled, 0), pt.gt(1 + t, 0)), pt.gt(H, 0)), + logp_expr, + -pt.inf, + ) + + +def pdf(x, mu, sigma, xi, kappa): + return pt.exp(logpdf(x, mu, sigma, xi, kappa)) + + +def cdf(x, mu, sigma, xi, kappa): + """Cumulative distribution function: G(x) = H(x)^kappa.""" + from pytensor_distributions.genpareto import cdf as _gpd_cdf + + H = _gpd_cdf(x, mu, sigma, xi) + return pt.pow(H, kappa) + + +def logcdf(x, mu, sigma, xi, kappa): + """Log cumulative distribution function: kappa * log(H(x)).""" + scaled = (x - mu) / sigma + t = _safe_mul(xi, scaled) + + # H via _log1p_ratio + H = 1 - pt.exp(-_log1p_ratio(xi, scaled)) + logcdf_expr = kappa * pt.log(H) + + return pt.switch( + pt.lt(scaled, 0), + -pt.inf, + pt.switch(pt.le(1 + t, 0), 0.0, logcdf_expr), + ) + + +def sf(x, mu, sigma, xi, kappa): + return 1 - cdf(x, mu, sigma, xi, kappa) + + +def logsf(x, mu, sigma, xi, kappa): + return pt.log1p(-cdf(x, mu, sigma, xi, kappa)) + + +def ppf(q, mu, sigma, xi, kappa): + """Percent-point function (quantile function, inverse CDF). + + G^{-1}(p) = H^{-1}(p^{1/kappa}), computed via the GPD inverse + survival function with v_gpd = 1 - p^{1/kappa} = -expm1(log(p)/kappa). + """ + v_gpd = -pt.expm1(pt.log(q) / kappa) + x_val = _gpd_isf(v_gpd, mu, sigma, xi) + return ppf_bounds_cont(x_val, q, mu, _upper_bound(mu, sigma, xi)) + + +def isf(p, mu, sigma, xi, kappa): + return ppf(1 - p, mu, sigma, xi, kappa) + + +def rvs(mu, sigma, xi, kappa, size=None, random_state=None): + # v_gpd = 1 - u^{1/kappa} = -expm1(log(u)/kappa) avoids catastrophic + # cancellation when u^{1/kappa} is close to 1. + u = pt.random.uniform(size=size, rng=random_state) + v_gpd = -pt.expm1(pt.log(u) / kappa) + return _gpd_isf(v_gpd, mu, sigma, xi) + + +# --- Moments and descriptive statistics --- + + +def mean(mu, sigma, xi, kappa): + lower = mu + upper = _upper_bound(mu, sigma, xi) + upper = pt.switch(pt.isinf(upper), ppf(0.9999, mu, sigma, xi, kappa), upper) + return continuous_mean(lower, upper, logpdf, mu, sigma, xi, kappa) + + +def median(mu, sigma, xi, kappa): + return ppf(0.5, mu, sigma, xi, kappa) + + +def mode(mu, sigma, xi, kappa): + lower = mu + upper = _upper_bound(mu, sigma, xi) + upper = pt.switch(pt.isinf(upper), ppf(0.999, mu, sigma, xi, kappa), upper) + return continuous_mode(lower, upper, logpdf, mu, sigma, xi, kappa) + + +def var(mu, sigma, xi, kappa): + lower = mu + upper = _upper_bound(mu, sigma, xi) + upper = pt.switch(pt.isinf(upper), ppf(0.9999, mu, sigma, xi, kappa), upper) + return continuous_variance(lower, upper, logpdf, mu, sigma, xi, kappa) + + +def std(mu, sigma, xi, kappa): + return pt.sqrt(var(mu, sigma, xi, kappa)) + + +def entropy(mu, sigma, xi, kappa): + lower = mu + upper = _upper_bound(mu, sigma, xi) + upper = pt.switch(pt.isinf(upper), ppf(0.9999, mu, sigma, xi, kappa), upper) + return continuous_entropy(lower, upper, logpdf, mu, sigma, xi, kappa) + + +def skewness(mu, sigma, xi, kappa): + lower = mu + upper = _upper_bound(mu, sigma, xi) + upper = pt.switch(pt.isinf(upper), ppf(0.9999, mu, sigma, xi, kappa), upper) + return continuous_skewness(lower, upper, logpdf, mu, sigma, xi, kappa) + + +def kurtosis(mu, sigma, xi, kappa): + lower = mu + upper = _upper_bound(mu, sigma, xi) + upper = pt.switch(pt.isinf(upper), ppf(0.9999, mu, sigma, xi, kappa), upper) + return continuous_kurtosis(lower, upper, logpdf, mu, sigma, xi, kappa) diff --git a/pytensor_distributions/genpareto.py b/pytensor_distributions/genpareto.py new file mode 100644 index 0000000..3cc7f60 --- /dev/null +++ b/pytensor_distributions/genpareto.py @@ -0,0 +1,211 @@ +"""Generalized Pareto Distribution (GPD). + +The GPD is used for modeling exceedances over a threshold in extreme value +analysis. It is the natural distribution for peaks-over-threshold modeling, +complementing the GEV distribution which is used for block maxima. + +Parameters +---------- +mu : location parameter (threshold) +sigma : scale parameter (sigma > 0) +xi : shape parameter controlling tail behavior + xi > 0: heavy tail (Pareto-like) + xi = 0: exponential tail + xi < 0: bounded upper tail +""" + +import pytensor.tensor as pt + +from pytensor_distributions.helper import cdf_bounds, ppf_bounds_cont + +# --- Numerical primitives --- + + +def _safe_mul(a, b): + """Multiply a * b, mapping NaN from IEEE 754's 0 * inf to 0. + + PyTensor's constant folding is inconsistent: it may simplify 0 * inf + to 0 for some operations (eval, log1p) but leave it as NaN for others + (abs, division). Materialising the product through a switch ensures + every downstream consumer sees the same, clean value. + """ + t = a * b + return pt.switch(pt.isnan(t), 0.0, t) + + +def _exprel(t): + """Compute exprel(t) = (exp(t) - 1) / t, stable at t = 0. + + Limit: exprel(0) = 1. + + Used in ppf / isf / rvs to give a single formula for all xi. + """ + taylor = 1 + t * (0.5 + t * (1 / 6 + t / 24)) + direct = pt.expm1(t) / t + return pt.switch(pt.abs(t) < 1e-6, taylor, direct) + + +def _log1p_ratio(xi, z): + """Compute log1p(xi * z) / xi, stable at xi = 0 where the limit is z. + + Used in logpdf / sf / logsf / cdf / logcdf to give a single formula for + all xi. + + For small xi * z, uses z * Taylor(xi * z) to avoid 0/0. + For large xi * z, uses log1p(xi * z) / xi to avoid the inf * 0 + indeterminate form that would arise from z * (log1p(xi * z) / (xi * z)). + """ + t = _safe_mul(xi, z) + taylor = z * (1 + t * (-0.5 + t * (1 / 3 + t * (-0.25)))) + direct = pt.log1p(t) / xi + return pt.switch(pt.abs(t) < 1e-6, taylor, direct) + + +def _gpd_isf(v, mu, sigma, xi): + """GPD inverse survival function: x such that P(X > x) = v.""" + log_v = pt.log(v) + t = -xi * log_v + return mu + sigma * (-log_v) * _exprel(t) + + +def _upper_bound(mu, sigma, xi): + """Upper bound of the GPD support.""" + return pt.switch(pt.lt(xi, 0), mu - sigma / xi, pt.inf) + + +# --- Core distribution functions --- + + +def logpdf(x, mu, sigma, xi): + """Log-probability density function. + + Uses _log1p_ratio to express (1 + 1/xi)*log1p(xi*z) = log1p(t) + + log1p(t)/xi where t = xi*z, giving a single formula for all xi. + """ + scaled = (x - mu) / sigma + t = _safe_mul(xi, scaled) + + logp_expr = -pt.log(sigma) - pt.log1p(t) - _log1p_ratio(xi, scaled) + + return pt.switch( + pt.and_(pt.ge(scaled, 0), pt.gt(1 + t, 0)), + logp_expr, + -pt.inf, + ) + + +def pdf(x, mu, sigma, xi): + return pt.exp(logpdf(x, mu, sigma, xi)) + + +def cdf(x, mu, sigma, xi): + """Cumulative distribution function. + + CDF = 1 - exp(-log1p(xi*z)/xi) via _log1p_ratio. + """ + scaled = (x - mu) / sigma + cdf_expr = 1 - pt.exp(-_log1p_ratio(xi, scaled)) + return cdf_bounds(cdf_expr, x, mu, _upper_bound(mu, sigma, xi)) + + +def logcdf(x, mu, sigma, xi): + scaled = (x - mu) / sigma + t = _safe_mul(xi, scaled) + logcdf_expr = pt.log1p(-pt.exp(-_log1p_ratio(xi, scaled))) + + return pt.switch( + pt.lt(scaled, 0), + -pt.inf, + pt.switch(pt.le(1 + t, 0), 0.0, logcdf_expr), + ) + + +def sf(x, mu, sigma, xi): + """Survival function: S(x) = exp(-log1p(xi*z)/xi) via _log1p_ratio.""" + scaled = (x - mu) / sigma + sf_expr = pt.exp(-_log1p_ratio(xi, scaled)) + upper = _upper_bound(mu, sigma, xi) + return pt.switch(pt.lt(x, mu), 1.0, pt.switch(pt.ge(x, upper), 0.0, sf_expr)) + + +def logsf(x, mu, sigma, xi): + """Log survival function: -log1p(xi*z)/xi via _log1p_ratio.""" + scaled = (x - mu) / sigma + logsf_expr = -_log1p_ratio(xi, scaled) + upper = _upper_bound(mu, sigma, xi) + return pt.switch(pt.lt(x, mu), 0.0, pt.switch(pt.ge(x, upper), -pt.inf, logsf_expr)) + + +def ppf(q, mu, sigma, xi): + """Percent-point function (quantile function, inverse CDF). + + Uses _exprel for unified handling of all xi values including xi = 0. + """ + log_surv = pt.log1p(-q) + t = -xi * log_surv + x_val = mu + sigma * (-log_surv) * _exprel(t) + return ppf_bounds_cont(x_val, q, mu, _upper_bound(mu, sigma, xi)) + + +def isf(p, mu, sigma, xi): + return ppf(1 - p, mu, sigma, xi) + + +def rvs(mu, sigma, xi, size=None, random_state=None): + u = pt.random.uniform(size=size, rng=random_state) + log_surv = pt.log1p(-u) + t = -xi * log_surv + return mu + sigma * (-log_surv) * _exprel(t) + + +# --- Moments and descriptive statistics --- + + +def mean(mu, sigma, xi): + shape = pt.broadcast_arrays(mu, sigma, xi)[0] + return pt.switch(pt.lt(xi, 1), mu + sigma / (1 - xi), pt.full_like(shape, pt.inf)) + + +def median(mu, sigma, xi): + # sigma * (2^xi - 1)/xi = sigma * log(2) * exprel(xi * log(2)). + return mu + sigma * pt.log(2) * _exprel(xi * pt.log(2)) + + +def mode(mu, sigma, xi): + shape = pt.broadcast_arrays(mu, sigma, xi)[0] + return pt.full_like(shape, mu) + + +def var(mu, sigma, xi): + shape = pt.broadcast_arrays(mu, sigma, xi)[0] + return pt.switch( + pt.lt(xi, 0.5), + sigma**2 / ((1 - xi) ** 2 * (1 - 2 * xi)), + pt.full_like(shape, pt.inf), + ) + + +def std(mu, sigma, xi): + return pt.sqrt(var(mu, sigma, xi)) + + +def entropy(mu, sigma, xi): + return pt.log(sigma) + xi + 1 + + +def skewness(mu, sigma, xi): + shape = pt.broadcast_arrays(mu, sigma, xi)[0] + return pt.switch( + pt.lt(xi, 1.0 / 3), + 2 * (1 + xi) * pt.sqrt(1 - 2 * xi) / (1 - 3 * xi), + pt.full_like(shape, pt.nan), + ) + + +def kurtosis(mu, sigma, xi): + shape = pt.broadcast_arrays(mu, sigma, xi)[0] + return pt.switch( + pt.lt(xi, 0.25), + 3 * (1 - 2 * xi) * (2 * xi**2 + xi + 3) / ((1 - 3 * xi) * (1 - 4 * xi)) - 3, + pt.full_like(shape, pt.nan), + ) diff --git a/tests/test_extgenpareto.py b/tests/test_extgenpareto.py new file mode 100644 index 0000000..e67401e --- /dev/null +++ b/tests/test_extgenpareto.py @@ -0,0 +1,278 @@ +"""Test Extended Generalized Pareto Distribution (ExtGPD). + +Since there is no scipy equivalent, tests compare against numpy reference +implementations built on scipy.stats.genpareto (the base GPD). +""" + +import numpy as np +import pytensor.tensor as pt +import pytest +from numpy.testing import assert_allclose +from scipy import stats +from scipy.stats import kstest + +from pytensor_distributions import extgenpareto as ExtGPD +from pytensor_distributions import genpareto as GPD +from tests.helper_scipy import make_params + +# --- Reference implementations using scipy/numpy --- + + +def _ref_cdf(x, mu, sigma, xi, kappa): + H = stats.genpareto.cdf(x, c=xi, loc=mu, scale=sigma) + return np.power(np.maximum(H, 0), kappa) + + +def _ref_logpdf(x, mu, sigma, xi, kappa): + H = stats.genpareto.cdf(x, c=xi, loc=mu, scale=sigma) + log_h = stats.genpareto.logpdf(x, c=xi, loc=mu, scale=sigma) + with np.errstate(divide="ignore", invalid="ignore"): + result = np.log(kappa) + (kappa - 1) * np.log(H) + log_h + return np.where(H <= 0, -np.inf, result) + + +def _ref_ppf(q, mu, sigma, xi, kappa): + p_gpd = np.power(q, 1 / kappa) + return stats.genpareto.ppf(p_gpd, c=xi, loc=mu, scale=sigma) + + +def _ref_isf(v, mu, sigma, xi): + return stats.genpareto.isf(v, c=xi, loc=mu, scale=sigma) + + +# --- Test parameter sets --- + +_TEST_PARAMS = [ + # (mu, sigma, xi, kappa) + (0.0, 1.0, 0.0, 2.0), # Exponential base, kappa=2 + (0.0, 1.0, 0.2, 2.0), # Heavy tail, kappa=2 + (1.0, 0.5, -0.3, 5.0), # Bounded upper tail, kappa=5 + (0.0, 2.0, 0.1, 3.0), # Moderate tail, kappa=3 +] + +_KAPPA_ONE_PARAMS = [ + # (mu, sigma, xi) where kappa=1 should reduce to GPD + (0.0, 1.0, 0.0), + (0.0, 1.0, 0.2), + (1.0, 0.5, -0.3), +] + + +# --- Tests --- + + +class TestExtGPDLogpdf: + """Test log-PDF against numpy reference.""" + + @pytest.mark.parametrize("mu,sigma,xi,kappa", _TEST_PARAMS) + def test_logpdf(self, mu, sigma, xi, kappa): + x = np.array([mu + 0.01, mu + 0.1, mu + 0.5, mu + 1.0, mu + 2.0, mu + 5.0]) + p_params = make_params(mu, sigma, xi, kappa, dtype="float64") + expected = _ref_logpdf(x, mu, sigma, xi, kappa) + actual = ExtGPD.logpdf(x, *p_params).eval() + + mask = np.isfinite(expected) + assert_allclose(actual[mask], expected[mask], rtol=1e-10) + + @pytest.mark.parametrize("mu,sigma,xi,kappa", _TEST_PARAMS) + def test_pdf_matches_exp_logpdf(self, mu, sigma, xi, kappa): + x = np.array([mu + 0.1, mu + 0.5, mu + 1.0, mu + 2.0]) + p_params = make_params(mu, sigma, xi, kappa, dtype="float64") + pdf_val = ExtGPD.pdf(x, *p_params).eval() + logpdf_val = ExtGPD.logpdf(x, *p_params).eval() + assert_allclose(pdf_val, np.exp(logpdf_val), rtol=1e-12) + + @pytest.mark.parametrize("mu,sigma,xi,kappa", _TEST_PARAMS) + def test_below_support(self, mu, sigma, xi, kappa): + x = np.array([mu - 1.0, mu - 0.01]) + p_params = make_params(mu, sigma, xi, kappa, dtype="float64") + actual = ExtGPD.logpdf(x, *p_params).eval() + assert np.all(np.isneginf(actual)) + + +class TestExtGPDCdf: + """Test CDF against numpy reference.""" + + @pytest.mark.parametrize("mu,sigma,xi,kappa", _TEST_PARAMS) + def test_cdf(self, mu, sigma, xi, kappa): + x = np.array([mu - 1.0, mu, mu + 0.1, mu + 0.5, mu + 1.0, mu + 5.0]) + p_params = make_params(mu, sigma, xi, kappa, dtype="float64") + expected = _ref_cdf(x, mu, sigma, xi, kappa) + actual = ExtGPD.cdf(x, *p_params).eval() + assert_allclose(actual, expected, rtol=1e-10, atol=1e-15) + + @pytest.mark.parametrize("mu,sigma,xi,kappa", _TEST_PARAMS) + def test_cdf_bounds(self, mu, sigma, xi, kappa): + p_params = make_params(mu, sigma, xi, kappa, dtype="float64") + # Below support → 0 + assert ExtGPD.cdf(mu - 1.0, *p_params).eval() == 0.0 + # At lower bound → 0 + assert ExtGPD.cdf(mu, *p_params).eval() == 0.0 + + +class TestExtGPDSf: + """Test survival function.""" + + @pytest.mark.parametrize("mu,sigma,xi,kappa", _TEST_PARAMS) + def test_sf_plus_cdf_equals_one(self, mu, sigma, xi, kappa): + x = np.array([mu + 0.1, mu + 0.5, mu + 1.0, mu + 2.0]) + p_params = make_params(mu, sigma, xi, kappa, dtype="float64") + cdf_val = ExtGPD.cdf(x, *p_params).eval() + sf_val = ExtGPD.sf(x, *p_params).eval() + assert_allclose(cdf_val + sf_val, 1.0, atol=1e-14) + + +class TestExtGPDLogcdf: + """Test log-CDF.""" + + @pytest.mark.parametrize("mu,sigma,xi,kappa", _TEST_PARAMS) + def test_logcdf_matches_log_cdf(self, mu, sigma, xi, kappa): + x = np.array([mu + 0.1, mu + 0.5, mu + 1.0, mu + 2.0]) + p_params = make_params(mu, sigma, xi, kappa, dtype="float64") + logcdf_val = ExtGPD.logcdf(x, *p_params).eval() + cdf_val = ExtGPD.cdf(x, *p_params).eval() + assert_allclose(logcdf_val, np.log(cdf_val), rtol=1e-12) + + +class TestExtGPDPpf: + """Test quantile function.""" + + @pytest.mark.parametrize("mu,sigma,xi,kappa", _TEST_PARAMS) + def test_ppf(self, mu, sigma, xi, kappa): + q = np.array([0.01, 0.1, 0.25, 0.5, 0.75, 0.9, 0.99]) + p_params = make_params(mu, sigma, xi, kappa, dtype="float64") + expected = _ref_ppf(q, mu, sigma, xi, kappa) + actual = ExtGPD.ppf(q, *p_params).eval() + assert_allclose(actual, expected, rtol=1e-10) + + @pytest.mark.parametrize("mu,sigma,xi,kappa", _TEST_PARAMS) + def test_ppf_cdf_roundtrip(self, mu, sigma, xi, kappa): + # Use ppf to get points guaranteed to be in the interior of support + q_vals = np.array([0.1, 0.3, 0.5, 0.7, 0.9]) + p_params = make_params(mu, sigma, xi, kappa, dtype="float64") + x_orig = ExtGPD.ppf(q_vals, *p_params).eval() + q_recovered = ExtGPD.cdf(x_orig, *p_params).eval() + assert_allclose(q_recovered, q_vals, rtol=1e-10) + + @pytest.mark.parametrize("mu,sigma,xi,kappa", _TEST_PARAMS) + def test_ppf_bounds(self, mu, sigma, xi, kappa): + p_params = make_params(mu, sigma, xi, kappa, dtype="float64") + # q=0 → lower bound + assert_allclose(ExtGPD.ppf(0.0, *p_params).eval(), mu, atol=1e-15) + # q out of bounds → NaN + assert np.isnan(ExtGPD.ppf(-0.1, *p_params).eval()) + assert np.isnan(ExtGPD.ppf(1.1, *p_params).eval()) + + +class TestExtGPDIsf: + """Test inverse survival function.""" + + @pytest.mark.parametrize("mu,sigma,xi,kappa", _TEST_PARAMS) + def test_isf_matches_ppf(self, mu, sigma, xi, kappa): + p = np.array([0.01, 0.1, 0.25, 0.5, 0.75, 0.9, 0.99]) + p_params = make_params(mu, sigma, xi, kappa, dtype="float64") + isf_val = ExtGPD.isf(p, *p_params).eval() + ppf_val = ExtGPD.ppf(1 - p, *p_params).eval() + assert_allclose(isf_val, ppf_val, rtol=1e-12) + + +class TestExtGPDKappaOne: + """When kappa=1, ExtGPD should reduce to GPD.""" + + @pytest.mark.parametrize("mu,sigma,xi", _KAPPA_ONE_PARAMS) + def test_logpdf(self, mu, sigma, xi): + kappa = 1.0 + x = np.array([mu + 0.1, mu + 0.5, mu + 1.0, mu + 2.0]) + gpd_params = make_params(mu, sigma, xi, dtype="float64") + ext_params = make_params(mu, sigma, xi, kappa, dtype="float64") + assert_allclose( + ExtGPD.logpdf(x, *ext_params).eval(), + GPD.logpdf(x, *gpd_params).eval(), + rtol=1e-10, + ) + + @pytest.mark.parametrize("mu,sigma,xi", _KAPPA_ONE_PARAMS) + def test_cdf(self, mu, sigma, xi): + kappa = 1.0 + x = np.array([mu + 0.1, mu + 0.5, mu + 1.0, mu + 2.0]) + gpd_params = make_params(mu, sigma, xi, dtype="float64") + ext_params = make_params(mu, sigma, xi, kappa, dtype="float64") + assert_allclose( + ExtGPD.cdf(x, *ext_params).eval(), + GPD.cdf(x, *gpd_params).eval(), + rtol=1e-10, + ) + + @pytest.mark.parametrize("mu,sigma,xi", _KAPPA_ONE_PARAMS) + def test_ppf(self, mu, sigma, xi): + kappa = 1.0 + q = np.array([0.1, 0.25, 0.5, 0.75, 0.9]) + gpd_params = make_params(mu, sigma, xi, dtype="float64") + ext_params = make_params(mu, sigma, xi, kappa, dtype="float64") + assert_allclose( + ExtGPD.ppf(q, *ext_params).eval(), + GPD.ppf(q, *gpd_params).eval(), + rtol=1e-10, + ) + + @pytest.mark.parametrize("mu,sigma,xi", _KAPPA_ONE_PARAMS) + def test_median(self, mu, sigma, xi): + kappa = 1.0 + gpd_params = make_params(mu, sigma, xi, dtype="float64") + ext_params = make_params(mu, sigma, xi, kappa, dtype="float64") + assert_allclose( + ExtGPD.median(*ext_params).eval(), + GPD.median(*gpd_params).eval(), + rtol=1e-8, + ) + + +class TestExtGPDRvs: + """Test random variate generation.""" + + @pytest.mark.parametrize("mu,sigma,xi,kappa", _TEST_PARAMS) + def test_rvs_element_wise(self, mu, sigma, xi, kappa): + """Test that rvs produces same values as reference (same seed).""" + p_params = make_params(mu, sigma, xi, kappa, dtype="float64") + rng_p = pt.random.default_rng(42) + actual = ExtGPD.rvs(*p_params, size=20, random_state=rng_p).eval() + + # Reference: same uniform samples, same ISF-based transformation + # rvs uses v_gpd = -expm1(log(u)/kappa), then _gpd_isf + rng_n = np.random.default_rng(42) + u = rng_n.uniform(size=20) + v_gpd = -np.expm1(np.log(u) / kappa) + expected = _ref_isf(v_gpd, mu, sigma, xi) + + assert_allclose(actual, expected, rtol=1e-10) + + @pytest.mark.parametrize("mu,sigma,xi,kappa", _TEST_PARAMS) + def test_rvs_in_support(self, mu, sigma, xi, kappa): + p_params = make_params(mu, sigma, xi, kappa, dtype="float64") + rng = pt.random.default_rng(123) + samples = ExtGPD.rvs(*p_params, size=1000, random_state=rng).eval() + assert np.all(samples >= mu) + if xi < 0: + upper = mu - sigma / xi + assert np.all(samples <= upper + 1e-10) + + @pytest.mark.parametrize("mu,sigma,xi,kappa", _TEST_PARAMS) + def test_rvs_ks(self, mu, sigma, xi, kappa): + """Kolmogorov-Smirnov test for distributional correctness.""" + p_params = make_params(mu, sigma, xi, kappa, dtype="float64") + rng = pt.random.default_rng(0) + samples = ExtGPD.rvs(*p_params, size=5000, random_state=rng).eval() + + stat, pvalue = kstest(samples, lambda x: _ref_cdf(x, mu, sigma, xi, kappa)) + assert pvalue > 0.01, f"KS test failed: stat={stat}, p={pvalue}" + + +class TestExtGPDMedian: + """Test median.""" + + @pytest.mark.parametrize("mu,sigma,xi,kappa", _TEST_PARAMS) + def test_median(self, mu, sigma, xi, kappa): + p_params = make_params(mu, sigma, xi, kappa, dtype="float64") + expected = _ref_ppf(0.5, mu, sigma, xi, kappa) + actual = ExtGPD.median(*p_params).eval() + assert_allclose(actual, expected, rtol=1e-8) diff --git a/tests/test_genpareto.py b/tests/test_genpareto.py new file mode 100644 index 0000000..b0cee46 --- /dev/null +++ b/tests/test_genpareto.py @@ -0,0 +1,39 @@ +"""Test Generalized Pareto Distribution against scipy implementation.""" + +import pytest +from scipy import stats + +from pytensor_distributions import genpareto as GenPareto +from tests.helper_scipy import make_params, run_distribution_tests + + +@pytest.mark.parametrize( + "params, sp_params", + [ + # xi=0: exponential case + ([0.0, 1.0, 0.0], {"c": 0.0, "loc": 0.0, "scale": 1.0}), + # xi>0: heavy tail + ([0.0, 2.0, 0.1], {"c": 0.1, "loc": 0.0, "scale": 2.0}), + # xi<0: bounded upper tail + ([1.0, 0.5, -0.3], {"c": -0.3, "loc": 1.0, "scale": 0.5}), + # Moderate heavy tail + ([0.0, 1.0, 0.2], {"c": 0.2, "loc": 0.0, "scale": 1.0}), + ], +) +def test_genpareto_vs_scipy(params, sp_params): + """Test Generalized Pareto distribution against scipy.""" + p_params = make_params(*params, dtype="float64") + mu, sigma, xi = params + if xi < 0: + support = (mu, mu - sigma / xi) + else: + support = (mu, float("inf")) + + run_distribution_tests( + p_dist=GenPareto, + sp_dist=stats.genpareto, + p_params=p_params, + sp_params=sp_params, + support=support, + name="genpareto", + )