diff --git a/ptgp/gp/svgp.py b/ptgp/gp/svgp.py index cdfa52b..67eabca 100644 --- a/ptgp/gp/svgp.py +++ b/ptgp/gp/svgp.py @@ -230,7 +230,7 @@ def predict_marginal(self, X, incl_lik=False): fmean, fvar = conditional_unwhitened(A, Kmn, Knn_diag, self.q_mu, self.q_sqrt) fmean = fmean + self.mean(X) if incl_lik: - return self.likelihood.predict_mean_and_var(fmean, fvar) + return self.likelihood.at(X).predict_mean_and_var(fmean, fvar) return fmean, fvar def predict_joint(self, X): diff --git a/ptgp/gp/unapproximated.py b/ptgp/gp/unapproximated.py index 0b37af0..a2e3f9f 100644 --- a/ptgp/gp/unapproximated.py +++ b/ptgp/gp/unapproximated.py @@ -18,6 +18,10 @@ class Unapproximated: Mean function (default: ``Zero()``). sigma : tensor or PyMC random variable Observation noise standard deviation. + x : tensor, optional + The design matrix ``sigma`` was built against, for heteroskedastic + noise. Pass it so sigma can be re-rooted onto the test inputs at predict + time. """ extra_vars = () @@ -25,11 +29,11 @@ class Unapproximated: default_objective = staticmethod(marginal_log_likelihood) predict_needs_data = True - def __init__(self, kernel, mean=None, sigma=None): + def __init__(self, kernel, mean=None, sigma=None, x=None): """Store the kernel and mean; build a Gaussian likelihood from sigma.""" self.kernel = kernel self.mean = mean if mean is not None else Zero() - self.likelihood = Gaussian(sigma) + self.likelihood = Gaussian(sigma, x=x) def predict_marginal(self, X_new, X_train, y_train, incl_lik=False): """Posterior marginal mean and variance at each point in X_new. @@ -64,6 +68,5 @@ def predict_marginal(self, X_new, X_train, y_train, incl_lik=False): fvar = Kss_diag - pt.sum(Kns * (Knn_inv @ Kns), axis=0) if incl_lik: - sigma_new = self.likelihood.sigma_at(X_train, X_new) - return fmean, fvar + sigma_new**2 + return self.likelihood.at(X_new).predict_mean_and_var(fmean, fvar) return fmean, fvar diff --git a/ptgp/gp/vfe.py b/ptgp/gp/vfe.py index 3c2550a..a3a5f44 100644 --- a/ptgp/gp/vfe.py +++ b/ptgp/gp/vfe.py @@ -22,12 +22,16 @@ class VFE: Observation noise standard deviation. inducing_variable : InducingVariables Inducing point locations. + x : tensor, optional + The design matrix ``sigma`` was built against, for heteroskedastic + noise. Pass it so sigma can be re-rooted onto the test inputs at predict + time. """ default_objective = staticmethod(collapsed_elbo) predict_needs_data = True - def __init__(self, kernel, mean=None, sigma=None, inducing_variable=None): + def __init__(self, kernel, mean=None, sigma=None, inducing_variable=None, x=None): """Store the kernel, mean, and inducing variable; build a Gaussian likelihood from sigma.""" if not hasattr(inducing_variable, "Z"): raise TypeError( @@ -37,7 +41,7 @@ def __init__(self, kernel, mean=None, sigma=None, inducing_variable=None): ) self.kernel = kernel self.mean = mean if mean is not None else Zero() - self.likelihood = Gaussian(sigma) + self.likelihood = Gaussian(sigma, x=x) self.inducing_variable = inducing_variable @property @@ -87,6 +91,5 @@ def predict_marginal(self, X_new, X_train, y_train, incl_lik=False): fvar = Kss_diag - pt.sum(Kus * ((Kuu_inv - Sigma_inv) @ Kus), axis=0) if incl_lik: - sigma_new = self.likelihood.sigma_at(X_train, X_new) - return fmean, fvar + sigma_new**2 + return self.likelihood.at(X_new).predict_mean_and_var(fmean, fvar) return fmean, fvar diff --git a/ptgp/likelihoods/__init__.py b/ptgp/likelihoods/__init__.py index f51989b..2f0cc9e 100644 --- a/ptgp/likelihoods/__init__.py +++ b/ptgp/likelihoods/__init__.py @@ -1,8 +1,40 @@ -from ptgp.likelihoods.base import Likelihood +from ptgp.likelihoods.base import ( + GPData, + LikelihoodOp, + LikelihoodType, + LikelihoodVariable, + at, + gp_data, + op_of, + param, + predict_log_density, + predict_mean_and_var, + variational_expectation, +) from ptgp.likelihoods.bernoulli import Bernoulli from ptgp.likelihoods.gaussian import Gaussian from ptgp.likelihoods.negative_binomial import NegativeBinomial from ptgp.likelihoods.poisson import Poisson from ptgp.likelihoods.student_t import StudentT -__all__ = ["Likelihood", "Gaussian", "Bernoulli", "StudentT", "Poisson", "NegativeBinomial"] +__all__ = [ + # Family helpers — build a LikelihoodVariable. + "Gaussian", + "Bernoulli", + "StudentT", + "Poisson", + "NegativeBinomial", + # Graph types. + "LikelihoodOp", + "LikelihoodType", + "LikelihoodVariable", + "GPData", + "gp_data", + # Purely functional API — operate on a likelihood node via ``owner.op``. + "op_of", + "param", + "at", + "variational_expectation", + "predict_mean_and_var", + "predict_log_density", +] diff --git a/ptgp/likelihoods/base.py b/ptgp/likelihoods/base.py index 24771d6..39776de 100644 --- a/ptgp/likelihoods/base.py +++ b/ptgp/likelihoods/base.py @@ -1,70 +1,183 @@ import numpy as np import pytensor.tensor as pt +from pytensor.graph.basic import Apply, Variable +from pytensor.graph.op import Op +from pytensor.graph.replace import graph_replace +from pytensor.graph.rewriting.basic import node_rewriter +from pytensor.graph.type import Type +from pytensor.tensor.rewriting.basic import register_specialize -class Likelihood: - """Base class for all PTGP likelihoods. - Subclasses must implement ``_log_prob(f, y)`` which returns the symbolic - log-likelihood log p(y|f). +def inv_probit(x): + """Probit inverse link Phi(x), clamped to (jitter, 1 - jitter).""" + jitter = 1e-3 + return 0.5 * (1.0 + pt.erf(x / pt.sqrt(2.0))) * (1.0 - 2.0 * jitter) + jitter - Gaussian likelihoods override ``variational_expectation`` and - ``predict_mean_and_var`` with closed-form expressions. Non-Gaussian - likelihoods inherit the default Gauss-Hermite quadrature implementations. - Parameters - ---------- - invlink : callable, optional - Inverse link function mapping latent f to the natural parameter. - Subclasses set a default (e.g. exp for Poisson, sigmoid for Bernoulli). - n_points : int - Number of Gauss-Hermite quadrature points (default 20). +def inv_cloglog(x): + """Complementary log-log inverse link, clamped to (jitter, 1 - jitter).""" + jitter = 1e-3 + return (1.0 - pt.exp(-pt.exp(x))) * (1.0 - 2.0 * jitter) + jitter + + +LINKS = { + "log": pt.exp, + "logit": pt.sigmoid, + "probit": inv_probit, + "cloglog": inv_cloglog, +} + + +class LikelihoodVariable(Variable): + """A likelihood that lives in the PyTensor graph *and* carries the accessor API. + + It is the output of a :class:`LikelihoodOp` node, so ``owner.op`` recovers + the family (with behaviour) and ``clone`` / ``graph_replace`` carry it. It + also exposes the methods we used to put on a holder, each dispatched through + ``owner.op`` (the free functions below): + + - parameter access — ``lik.sigma`` / ``lik.nu`` / ... (the node's inputs), + - :meth:`at` — re-root the parameters onto new design inputs, + - :meth:`variational_expectation` / :meth:`predict_mean_and_var` / + :meth:`predict_log_density`. + """ + + def at(self, X): + """Re-root onto design matrix ``X`` (no-op without a design matrix).""" + return at(self, X) + + def variational_expectation(self, y, mu, var): + """E_{q(f)}[log p(y|f)] where q(f) = N(mu, var).""" + return variational_expectation(self, y, mu, var) + + def predict_mean_and_var(self, mu, var): + """Predictive mean and variance under q(f) = N(mu, var).""" + return predict_mean_and_var(self, mu, var) + + def predict_log_density(self, y, mu, var): + """Predictive log-density log E_{q(f)}[p(y|f)] at test points.""" + return predict_log_density(self, y, mu, var) + + def __getattr__(self, name): + # The guard keeps __getattr__ from recursing on its own attribute access + # and lets dunders and PyTensor internals fall through as AttributeError. + if name.startswith("_") or name in ("owner", "type", "index", "tag", "name", "auto_name"): + raise AttributeError(name) + owner = self.__dict__.get("owner") + if owner is None: + raise AttributeError(name) + op = owner.op + if name in op.param_names: + return param(self, name) + if name in ("n_points", "link", "param_names", "has_data"): + return getattr(op, name) + raise AttributeError(name) + + +class LikelihoodType(Type): + """Type of a likelihood handle. Carries no runtime value — the node it comes + from is a structural marker, never consumed by the loss or compiled.""" + + def filter(self, value, strict=False, allow_downcast=None): + return value + + def make_variable(self, name=None): + return LikelihoodVariable(self, None, name=name) + + def __eq__(self, other): + return type(self) is type(other) + + def __hash__(self): + return hash(type(self)) + + +_LIK_TYPE = LikelihoodType() + + +class LikelihoodOp(Op): + """Structural marker keeping a likelihood inside the PyTensor graph. + + The Apply node consumes the design matrix (optional; input 0 when present) + and the parameters as its *inputs*, and emits a single :class:`LikelihoodType` + handle (a :class:`LikelihoodVariable`) as output. It computes nothing — it is + a real graph node purely so ``graph_replace`` / ``fgraph.clone`` / PyMC + conditioning carry the likelihood (and re-root its parameters). The Op *type* + (the subclass) encodes the family; behaviour is recoverable via ``owner.op``. + + Because the parameters are *inputs* and the handle output is never consumed + by the loss, the node is never on the differentiation path nor in a compiled + loss graph — no gradient, view, shape, or strip machinery is needed. + + Subclasses set the class attributes ``param_names`` and ``allowed_links`` + and implement the behaviour methods. """ - n_points: int = 20 - invlink = None + __props__ = ("param_names", "n_points", "link", "has_data") + param_names = () + allowed_links = () + + def __init__(self, n_points=20, link=None, has_data=False): + self.param_names = type(self).param_names + self.n_points = n_points + self.link = link if link is not None else self.default_link + if self.link is not None and self.link not in self.allowed_links: + raise ValueError( + f"{type(self).__name__} does not support link {self.link!r}; " + f"choose from {self.allowed_links}." + ) + self.has_data = has_data + + @property + def default_link(self): + """Default link name: the first of ``allowed_links`` (``None`` if empty).""" + return self.allowed_links[0] if self.allowed_links else None + + def _invlink(self, f): + """Map the latent ``f`` to the natural parameter via the inverse link.""" + return LINKS[self.link](f) + + def make_node(self, *args): + args = [pt.as_tensor_variable(a) for a in args] + return Apply(self, args, [_LIK_TYPE()]) - def _log_prob(self, f, y): + def perform(self, node, inputs, outputs): + outputs[0][0] = None + + def _log_prob(self, f, y, *params): """Symbolic log p(y|f). Subclasses must implement.""" raise NotImplementedError - def _conditional_mean(self, f): + def _conditional_mean(self, f, *params): """E[y|f]. Subclasses must implement.""" raise NotImplementedError - def _conditional_variance(self, f): + def _conditional_variance(self, f, *params): """Var[y|f]. Subclasses must implement.""" raise NotImplementedError - def variational_expectation(self, y, mu, var): - """E_{q(f)}[log p(y|f)] where q(f) = N(mu, var). - - Default: Gauss-Hermite quadrature. - """ - return self._gauss_hermite(lambda f, y: self._log_prob(f, y), y, mu, var) - - def predict_mean_and_var(self, mu, var): - """Predictive mean and variance: E_{q(f)}[E[y|f]], E_{q(f)}[Var[y|f]] + Var_{q(f)}[E[y|f]]. + def variational_expectation(self, params, y, mu, var): + """E_{q(f)}[log p(y|f)] where q(f) = N(mu, var). Default: quadrature.""" + return self._gauss_hermite(lambda f, yy: self._log_prob(f, yy, *params), y, mu, var) - Default: Gauss-Hermite quadrature. - """ + def predict_mean_and_var(self, params, mu, var): + """Predictive mean and variance. Default: Gauss-Hermite quadrature.""" E_mean = self._gauss_hermite( - lambda f, _: self._conditional_mean(f), pt.zeros_like(mu), mu, var + lambda f, _: self._conditional_mean(f, *params), pt.zeros_like(mu), mu, var ) E_mean_sq = self._gauss_hermite( - lambda f, _: self._conditional_mean(f) ** 2, pt.zeros_like(mu), mu, var + lambda f, _: self._conditional_mean(f, *params) ** 2, pt.zeros_like(mu), mu, var ) E_var = self._gauss_hermite( - lambda f, _: self._conditional_variance(f), pt.zeros_like(mu), mu, var + lambda f, _: self._conditional_variance(f, *params), pt.zeros_like(mu), mu, var ) return E_mean, E_var + E_mean_sq - E_mean**2 - def predict_log_density(self, y, mu, var): - """log E_{q(f)}[p(y|f)] — predictive log-density at test points. - - Default: Gauss-Hermite quadrature in log-space for numerical stability. - """ - return self._gauss_hermite_logspace(lambda f, y: self._log_prob(f, y), y, mu, var) + def predict_log_density(self, params, y, mu, var): + """log E_{q(f)}[p(y|f)]. Default: log-space Gauss-Hermite quadrature.""" + return self._gauss_hermite_logspace( + lambda f, yy: self._log_prob(f, yy, *params), y, mu, var + ) def _gauss_hermite(self, func, y, mu, var): """E_{q(f)}[func(f, y)] via Gauss-Hermite quadrature.""" @@ -72,27 +185,131 @@ def _gauss_hermite(self, func, y, mu, var): gh_points = pt.as_tensor_variable(gh_points) gh_weights = pt.as_tensor_variable(gh_weights / np.sqrt(np.pi)) - # f = mu + sqrt(2 * var) * t_j, shape (N, n_points) sd = pt.sqrt(var)[:, None] F = mu[:, None] + pt.sqrt(2.0) * sd * gh_points[None, :] - - # func(f, y) at each quadrature point, shape (N, n_points) vals = func(F, y[:, None]) - return pt.sum(vals * gh_weights[None, :], axis=1) def _gauss_hermite_logspace(self, func, y, mu, var): - """log E_{q(f)}[exp(func(f, y))] via Gauss-Hermite quadrature. - - Uses logsumexp for numerical stability. - """ + """log E_{q(f)}[exp(func(f, y))] via quadrature, using logsumexp.""" gh_points, gh_weights = np.polynomial.hermite.hermgauss(self.n_points) gh_points = pt.as_tensor_variable(gh_points) log_weights = pt.as_tensor_variable(np.log(gh_weights / np.sqrt(np.pi))) sd = pt.sqrt(var)[:, None] F = mu[:, None] + pt.sqrt(2.0) * sd * gh_points[None, :] - log_vals = func(F, y[:, None]) + log_weights[None, :] - return pt.logsumexp(log_vals, axis=1) + + +class GPData(Op): + """Opaque identity barrier marking the design matrix in a parameter graph. + + The design matrix is routed through this Op before it reaches the + parameters (``sigma = f(gp_data(X))``), which keeps a *stable* handle to it: + rewrites cannot see through the barrier, so an algebraic cancellation + (e.g. ``exp(log(x)) -> x``) cannot dissolve the path from the parameter back + to ``X`` and sever the handle. It is also the likelihood node's design-matrix + input (input 0), so it is structurally findable and clone-safe. + + It carries no value of its own (identity passthrough) and is stripped at the + *specialize* stage (:func:`_strip_gp_data`) — i.e. only when building the + final executable, after re-rooting, never during the canonicalization passes + a model graph might go through while the handle still matters. + """ + + __props__ = () + view_map = {0: [0]} + + def make_node(self, x): + x = pt.as_tensor_variable(x) + return Apply(self, [x], [x.type()]) + + def perform(self, node, inputs, outputs): + outputs[0][0] = inputs[0] + + def infer_shape(self, fgraph, node, input_shapes): + return input_shapes + + def pullback(self, inputs, outputs, cotangents): + return list(cotangents) # identity + + +_gp_data_op = GPData() + + +def gp_data(x): + """Wrap the design matrix ``x`` in an opaque :class:`GPData` barrier.""" + return _gp_data_op(pt.as_tensor_variable(x)) + + +@register_specialize +@node_rewriter([GPData]) +def _strip_gp_data(fgraph, node): + """Elide the design-matrix barrier when building the final executable. + + Registered at the *specialize* stage (not canonicalize), so it does not fire + during the canonicalization passes the barrier is meant to survive — it only + drops the identity Op once all graph transforms (incl. re-rooting) are done. + """ + return [node.inputs[0]] + + +def to_inputs(params, x): + """Node inputs for ``params`` and an optional design matrix ``x``. + + Without ``x`` the inputs are just the parameters. With ``x``, it is wrapped in + a :class:`GPData` barrier and the parameters are re-expressed over it, so the + barrier becomes input 0 — the data demarcator that :func:`at` re-roots. + """ + params = [pt.as_tensor_variable(p) for p in params] + if x is None: + return params + x = pt.as_tensor_variable(x) + xd = gp_data(x) + return [xd, *graph_replace(params, {x: xd}, strict=False)] + + +def _params_of(node): + """The parameter inputs of a likelihood node (after the optional X slot).""" + off = 1 if node.op.has_data else 0 + return list(node.inputs[off:]) + + +def op_of(lik): + """The :class:`LikelihoodOp` behind a likelihood node (``owner.op``).""" + return lik.owner.op + + +def param(lik, name): + """The named parameter input of a likelihood node (e.g. ``"sigma"``).""" + node = lik.owner + return _params_of(node)[node.op.param_names.index(name)] + + +def at(lik, X): + """Re-root a likelihood node onto design matrix ``X``. + + A no-op for a likelihood with no design matrix (``op.has_data`` is False). + Otherwise returns a new handle whose parameter inputs are re-expressed over + ``X`` (RV-safe via ``graph_replace``). + """ + node = lik.owner + if not node.op.has_data: + return lik + return graph_replace([lik], {node.inputs[0]: X}, strict=False)[0] + + +def variational_expectation(lik, y, mu, var): + """E_{q(f)}[log p(y|f)] for a likelihood node, dispatched via ``owner.op``.""" + return lik.owner.op.variational_expectation(_params_of(lik.owner), y, mu, var) + + +def predict_mean_and_var(lik, mu, var): + """Predictive mean and variance for a likelihood node.""" + return lik.owner.op.predict_mean_and_var(_params_of(lik.owner), mu, var) + + +def predict_log_density(lik, y, mu, var): + """Predictive log-density for a likelihood node.""" + return lik.owner.op.predict_log_density(_params_of(lik.owner), y, mu, var) diff --git a/ptgp/likelihoods/bernoulli.py b/ptgp/likelihoods/bernoulli.py index 583f7a4..4a3deb1 100644 --- a/ptgp/likelihoods/bernoulli.py +++ b/ptgp/likelihoods/bernoulli.py @@ -1,48 +1,44 @@ import pytensor.tensor as pt -from ptgp.likelihoods.base import Likelihood +from ptgp.likelihoods.base import LikelihoodOp, inv_probit -def inv_probit(x): - """Probit link: Phi(x) = 0.5 * (1 + erf(x / sqrt(2))), clamped to (jitter, 1-jitter).""" - jitter = 1e-3 - return 0.5 * (1.0 + pt.erf(x / pt.sqrt(2.0))) * (1.0 - 2.0 * jitter) + jitter +class BernoulliOp(LikelihoodOp): + """Bernoulli likelihood Op. Closed-form predictive for the probit link.""" - -class Bernoulli(Likelihood): - """Bernoulli likelihood: p(y=1|f) = invlink(f). - - Default link is probit. Variational expectation via Gauss-Hermite quadrature. - - Parameters - ---------- - invlink : callable, optional - Inverse link function (default: probit). Use ``pt.sigmoid`` for logit link. - n_points : int - Number of Gauss-Hermite quadrature points (default 20). - """ - - def __init__(self, invlink=None, n_points=20): - self.invlink = invlink or inv_probit - self.n_points = n_points + param_names = () + allowed_links = ("probit", "logit", "cloglog") def _log_prob(self, f, y): - p = self.invlink(f) + p = self._invlink(f) return y * pt.log(p) + (1.0 - y) * pt.log(1.0 - p) def _conditional_mean(self, f): - return self.invlink(f) + return self._invlink(f) def _conditional_variance(self, f): - p = self.invlink(f) + p = self._invlink(f) return p * (1.0 - p) - def predict_mean_and_var(self, mu, var): - """Closed-form for probit link: p = Phi(mu / sqrt(1 + var)). - - Falls back to quadrature for other link functions. - """ - if self.invlink is inv_probit: + def predict_mean_and_var(self, params, mu, var): + if self.link == "probit": p = inv_probit(mu / pt.sqrt(1.0 + var)) return p, p - p**2 - return super().predict_mean_and_var(mu, var) + return super().predict_mean_and_var(params, mu, var) + + +def Bernoulli(link=None, n_points=20): + """Build a Bernoulli likelihood p(y=1|f) = invlink(f). + + Returns a :class:`~ptgp.likelihoods.base.LikelihoodVariable`. The probit link + (default) has a closed-form predictive; other links fall back to quadrature. + + Parameters + ---------- + link : str, optional + Inverse link name, one of ``"probit"`` (default), ``"logit"``, or + ``"cloglog"``. + n_points : int + Number of Gauss-Hermite quadrature points (default 20). + """ + return BernoulliOp(n_points=n_points, link=link)() diff --git a/ptgp/likelihoods/gaussian.py b/ptgp/likelihoods/gaussian.py index ef2cb8a..6b74818 100644 --- a/ptgp/likelihoods/gaussian.py +++ b/ptgp/likelihoods/gaussian.py @@ -1,66 +1,52 @@ import pytensor.tensor as pt -from pytensor.graph.basic import Constant -from pytensor.graph.replace import graph_replace -from pytensor.graph.traversal import ancestors -from pytensor.tensor.type import TensorType - -from ptgp.likelihoods.base import Likelihood +from ptgp.likelihoods.base import LikelihoodOp, to_inputs LOG2PI = pt.log(2.0 * pt.pi) -class Gaussian(Likelihood): - """Gaussian likelihood p(y|f) = N(y; f, sigma^2). +class GaussianOp(LikelihoodOp): + """Gaussian likelihood Op: closed-form expectations (no quadrature).""" - All methods have closed-form expressions (no quadrature needed). + param_names = ("sigma",) - Parameters - ---------- - sigma : tensor - Observation noise standard deviation. May be a scalar (homoskedastic) - or a vector built against a symbolic ``X`` (heteroskedastic). At - predict time, :meth:`sigma_at` substitutes the training ``X`` for - ``X_new`` in sigma's graph; mismatches surface as a strict-mode - ``graph_replace`` error. - """ + def _log_prob(self, f, y, sigma): + return -0.5 * (LOG2PI + pt.log(sigma**2) + (y - f) ** 2 / sigma**2) - def __init__(self, sigma): - self.sigma = pt.as_tensor_variable(sigma) + def _conditional_mean(self, f, sigma): + return f - def sigma_at(self, X_train, X_new): - """Return ``sigma`` with ``X_train`` substituted by ``X_new``. + def _conditional_variance(self, f, sigma): + return pt.ones_like(f) * sigma**2 - Sigma is treated as data-dependent iff it has any free symbolic - tensor input in its graph (covers both raw ``pt.matrix`` X and - ``pm.Data`` X — the latter is a SharedVariable but its type is - ``TensorType``). For data-independent sigma the call is a no-op. - For data-dependent sigma, strict-mode ``graph_replace`` raises a - clear error if ``X_train`` is not in sigma's graph. - """ - has_data_dep = any( - v.owner is None and not isinstance(v, Constant) and isinstance(v.type, TensorType) - for v in ancestors([self.sigma]) - ) - if not has_data_dep: - return self.sigma - return graph_replace(self.sigma, {X_train: X_new}) + def variational_expectation(self, params, y, mu, var): + (sigma,) = params + return -0.5 * (LOG2PI + pt.log(sigma**2) + ((y - mu) ** 2 + var) / sigma**2) - def _log_prob(self, f, y): - return -0.5 * (LOG2PI + pt.log(self.sigma**2) + (y - f) ** 2 / self.sigma**2) + def predict_mean_and_var(self, params, mu, var): + (sigma,) = params + return mu, var + sigma**2 - def _conditional_mean(self, f): - return f + def predict_log_density(self, params, y, mu, var): + (sigma,) = params + total_var = var + sigma**2 + return -0.5 * (LOG2PI + pt.log(total_var) + (y - mu) ** 2 / total_var) - def _conditional_variance(self, f): - return pt.ones_like(f) * self.sigma**2 - def variational_expectation(self, y, mu, var): - return -0.5 * (LOG2PI + pt.log(self.sigma**2) + ((y - mu) ** 2 + var) / self.sigma**2) +def Gaussian(sigma, x=None): + """Build a Gaussian likelihood p(y|f) = N(y; f, sigma^2). - def predict_mean_and_var(self, mu, var): - return mu, var + self.sigma**2 + Returns a :class:`~ptgp.likelihoods.base.LikelihoodVariable` — a graph node + exposing ``.sigma``, ``.at``, ``.predict_mean_and_var``, etc. - def predict_log_density(self, y, mu, var): - total_var = var + self.sigma**2 - return -0.5 * (LOG2PI + pt.log(total_var) + (y - mu) ** 2 / total_var) + Parameters + ---------- + sigma : tensor + Observation noise standard deviation. Scalar (homoskedastic) or a vector + built against a design matrix ``X`` (heteroskedastic). + x : tensor, optional + The design matrix ``sigma`` was built against; pass it for + heteroskedastic noise so ``.at`` can re-root sigma onto test inputs. + """ + op = GaussianOp(has_data=x is not None) + return op(*to_inputs([sigma], x)) diff --git a/ptgp/likelihoods/negative_binomial.py b/ptgp/likelihoods/negative_binomial.py index 788e3d5..8cfad3e 100644 --- a/ptgp/likelihoods/negative_binomial.py +++ b/ptgp/likelihoods/negative_binomial.py @@ -1,33 +1,16 @@ import pytensor.tensor as pt -from ptgp.likelihoods.base import Likelihood +from ptgp.likelihoods.base import LikelihoodOp, to_inputs -class NegativeBinomial(Likelihood): - """Negative binomial likelihood: p(y|f) = NB(y; invlink(f), alpha). +class NegativeBinomialOp(LikelihoodOp): + """Negative binomial likelihood Op. Expectations via quadrature.""" - Parameterized as NB(y; mu, alpha) where mu = invlink(f) and alpha is the - overdispersion parameter. Variance is mu + mu^2 / alpha. - Default link is log (invlink=exp). + param_names = ("alpha",) + allowed_links = ("log", "cloglog") - Parameters - ---------- - alpha : tensor or PyMC random variable - Overdispersion parameter. - invlink : callable, optional - Inverse link function (default: exp). - n_points : int - Number of Gauss-Hermite quadrature points (default 20). - """ - - def __init__(self, alpha, invlink=None, n_points=20): - self.alpha = alpha - self.invlink = invlink or pt.exp - self.n_points = n_points - - def _log_prob(self, f, y): - mu = self.invlink(f) - alpha = self.alpha + def _log_prob(self, f, y, alpha): + mu = self._invlink(f) return ( pt.gammaln(y + alpha) - pt.gammaln(alpha) @@ -36,9 +19,31 @@ def _log_prob(self, f, y): + y * pt.log(mu / (alpha + mu)) ) - def _conditional_mean(self, f): - return self.invlink(f) + def _conditional_mean(self, f, alpha): + return self._invlink(f) + + def _conditional_variance(self, f, alpha): + mu = self._invlink(f) + return mu + mu**2 / alpha - def _conditional_variance(self, f): - mu = self.invlink(f) - return mu + mu**2 / self.alpha + +def NegativeBinomial(alpha, link=None, n_points=20, x=None): + """Build a negative binomial likelihood NB(y; invlink(f), alpha). + + Returns a :class:`~ptgp.likelihoods.base.LikelihoodVariable`. Variance is + ``mu + mu**2 / alpha`` with ``mu = invlink(f)``; all links use quadrature. + + Parameters + ---------- + alpha : tensor or PyMC random variable + Overdispersion parameter. + link : str, optional + Inverse link name, one of ``"log"`` (default) or ``"cloglog"``. + n_points : int + Number of Gauss-Hermite quadrature points (default 20). + x : tensor, optional + The design matrix ``alpha`` was built against, for a heteroskedastic + parameter re-rooted onto test inputs via ``.at``. + """ + op = NegativeBinomialOp(n_points=n_points, link=link, has_data=x is not None) + return op(*to_inputs([alpha], x)) diff --git a/ptgp/likelihoods/poisson.py b/ptgp/likelihoods/poisson.py index d434ff4..5466754 100644 --- a/ptgp/likelihoods/poisson.py +++ b/ptgp/likelihoods/poisson.py @@ -1,41 +1,42 @@ import pytensor.tensor as pt -from ptgp.likelihoods.base import Likelihood +from ptgp.likelihoods.base import LikelihoodOp -class Poisson(Likelihood): - """Poisson likelihood: p(y|f) = Poisson(y; invlink(f)). +class PoissonOp(LikelihoodOp): + """Poisson likelihood Op. Closed-form variational expectation for log link.""" - Default link is log (invlink=exp). Has a closed-form variational - expectation with the log link; falls back to quadrature for other links. - - Parameters - ---------- - invlink : callable, optional - Inverse link function (default: exp). - n_points : int - Number of Gauss-Hermite quadrature points (default 20). - """ - - def __init__(self, invlink=None, n_points=20): - self.invlink = invlink or pt.exp - self.n_points = n_points + param_names = () + allowed_links = ("log",) def _log_prob(self, f, y): - lam = self.invlink(f) + lam = self._invlink(f) return y * pt.log(lam) - lam - pt.gammaln(y + 1.0) def _conditional_mean(self, f): - return self.invlink(f) + return self._invlink(f) def _conditional_variance(self, f): - return self.invlink(f) + return self._invlink(f) - def variational_expectation(self, y, mu, var): - """Closed-form for log link: E_q[y*f - exp(f) - log(y!)]. - - Falls back to quadrature for other link functions. - """ - if self.invlink is pt.exp: + def variational_expectation(self, params, y, mu, var): + if self.link == "log": return y * mu - pt.exp(mu + var / 2.0) - pt.gammaln(y + 1.0) - return super().variational_expectation(y, mu, var) + return super().variational_expectation(params, y, mu, var) + + +def Poisson(link=None, n_points=20): + """Build a Poisson likelihood p(y|f) with rate ``invlink(f)``. + + Returns a :class:`~ptgp.likelihoods.base.LikelihoodVariable`. The log link + (the default and only supported link) has a closed-form variational + expectation. + + Parameters + ---------- + link : str, optional + Inverse link name; only ``"log"`` is supported (the default). + n_points : int + Number of Gauss-Hermite quadrature points (default 20). + """ + return PoissonOp(n_points=n_points, link=link)() diff --git a/ptgp/likelihoods/student_t.py b/ptgp/likelihoods/student_t.py index 247dd19..7f4974c 100644 --- a/ptgp/likelihoods/student_t.py +++ b/ptgp/likelihoods/student_t.py @@ -1,30 +1,14 @@ import pytensor.tensor as pt -from ptgp.likelihoods.base import Likelihood +from ptgp.likelihoods.base import LikelihoodOp, to_inputs -class StudentT(Likelihood): - """Student-T likelihood p(y|f) = StudentT(y; f, sigma, nu). +class StudentTOp(LikelihoodOp): + """Student-T likelihood Op. Expectations via Gauss-Hermite quadrature.""" - Variational expectation via Gauss-Hermite quadrature. + param_names = ("nu", "sigma") - Parameters - ---------- - nu : tensor or PyMC random variable - Degrees of freedom. - sigma : tensor or PyMC random variable - Scale parameter. - n_points : int - Number of Gauss-Hermite quadrature points (default 20). - """ - - def __init__(self, nu, sigma, n_points=20): - self.nu = nu - self.sigma = sigma - self.n_points = n_points - - def _log_prob(self, f, y): - nu, sigma = self.nu, self.sigma + def _log_prob(self, f, y, nu, sigma): z = (y - f) / sigma return ( pt.gammaln((nu + 1.0) / 2.0) @@ -33,8 +17,30 @@ def _log_prob(self, f, y): - 0.5 * (nu + 1.0) * pt.log1p(z**2 / nu) ) - def _conditional_mean(self, f): + def _conditional_mean(self, f, nu, sigma): return f - def _conditional_variance(self, f): - return pt.ones_like(f) * self.sigma**2 * self.nu / (self.nu - 2.0) + def _conditional_variance(self, f, nu, sigma): + return pt.ones_like(f) * sigma**2 * nu / (nu - 2.0) + + +def StudentT(nu, sigma, n_points=20, x=None): + """Build a Student-T likelihood p(y|f) with scale sigma and degrees of freedom nu. + + Returns a :class:`~ptgp.likelihoods.base.LikelihoodVariable`. Variational + expectation via Gauss-Hermite quadrature. + + Parameters + ---------- + nu : tensor or PyMC random variable + Degrees of freedom. + sigma : tensor or PyMC random variable + Scale parameter. + n_points : int + Number of Gauss-Hermite quadrature points (default 20). + x : tensor, optional + The design matrix ``nu``/``sigma`` were built against, for + heteroskedastic parameters re-rooted onto test inputs via ``.at``. + """ + op = StudentTOp(n_points=n_points, has_data=x is not None) + return op(*to_inputs([nu, sigma], x)) diff --git a/tests/likelihoods/test_base.py b/tests/likelihoods/test_base.py index d87edea..2d1d7e7 100644 --- a/tests/likelihoods/test_base.py +++ b/tests/likelihoods/test_base.py @@ -1,10 +1,12 @@ """Tests for the likelihood base class — configurable inverse-link behavior.""" import numpy as np +import pymc as pm import pytensor import pytensor.tensor as pt +import pytest -from ptgp.likelihoods import Bernoulli, Poisson +from ptgp.likelihoods import Bernoulli, Gaussian, NegativeBinomial, Poisson, StudentT def _eval(*tensors): @@ -12,6 +14,18 @@ def _eval(*tensors): return f() +def _hetero(X): + return 0.5 + 0.1 * X[:, 0] ** 2 + + +def _provider(arr, kind): + if kind == "symbolic": + return pt.matrix("Xtr", shape=(None, arr.shape[1])) + if kind == "baked": + return pt.as_tensor_variable(arr) + return pm.Data("Xtr", arr) + + class TestConfigurableLink: def test_bernoulli_logit_link(self): """Bernoulli with logit link should differ from probit but still be valid.""" @@ -24,7 +38,7 @@ def test_bernoulli_logit_link(self): ) ) ve_logit = _eval( - Bernoulli(invlink=pt.sigmoid).variational_expectation( + Bernoulli(link="logit").variational_expectation( pt.as_tensor_variable(y), pt.as_tensor_variable(mu), pt.as_tensor_variable(var) ) ) @@ -32,17 +46,161 @@ def test_bernoulli_logit_link(self): assert np.all(ve_probit < 0) and np.all(ve_logit < 0) assert not np.allclose(ve_probit, ve_logit) - def test_poisson_custom_link_uses_quadrature(self): - """Poisson with non-exp link should fall back to quadrature and still work.""" - mu, var = np.array([1.0]), np.array([0.1]) - y = np.array([2.0]) - - def softplus(f): - return pt.log1p(pt.exp(f)) - + def test_bernoulli_cloglog_link_uses_quadrature(self): + """cloglog has no closed-form predictive, so it routes through quadrature.""" + mu, var, y = np.array([0.0, 1.0]), np.array([0.5, 0.5]), np.array([1.0, 0.0]) ve = _eval( - Poisson(invlink=softplus).variational_expectation( + Bernoulli(link="cloglog").variational_expectation( pt.as_tensor_variable(y), pt.as_tensor_variable(mu), pt.as_tensor_variable(var) ) ) - assert np.isfinite(ve).all() + assert np.all(ve < 0) + + def test_unsupported_link_raises(self): + with pytest.raises(ValueError, match="does not support link"): + Poisson(link="logit") + + +class TestCloneReplaceData: + @pytest.mark.parametrize("kind", ["symbolic", "pm_data"]) + def test_reroot_matches_direct_build(self, kind): + """Re-rooting yields the parameter evaluated at the new inputs, for the + variable-length data providers used in prediction. ``train`` and ``new`` + differ in length to exercise re-rooting onto a different number of + points (a symbolic placeholder or pm.Data is ``(None, D)``).""" + train = np.linspace(-1.0, 1.0, 5)[:, None] + new = np.array([[0.0], [1.0], [2.0]]) + X_new = pt.matrix("X_new", shape=(None, 1)) + with pm.Model(): + Xtr = _provider(train, kind) + rerooted = Gaussian(_hetero(Xtr), x=Xtr).at(X_new).sigma + got = pytensor.function([X_new], rerooted)(new) + np.testing.assert_allclose(got, _hetero(new)) + + def test_baked_constant_x_reroots_at_matching_length(self): + """A frozen numpy constant as ``x`` re-roots cleanly only at its own + length: its static row dimension propagates into the parameter graph at + build time, and re-rooting (which uses ``graph_replace`` to preserve the + identity of any RV hyperparameters) cannot un-specialize it afterward. + Variable-length re-rooting requires a symbolic placeholder or pm.Data.""" + arr = np.linspace(-1.0, 1.0, 5)[:, None] + same_len = np.linspace(2.0, 3.0, 5)[:, None] + X_new = pt.matrix("X_new", shape=(None, 1)) + Xc = pt.as_tensor_variable(arr) # one node, reused as both param input and x= + rerooted = Gaussian(_hetero(Xc), x=Xc).at(X_new).sigma + got = pytensor.function([X_new], rerooted)(same_len) + np.testing.assert_allclose(got, _hetero(same_len)) + + def test_reroot_matches_multidim_input(self): + new = np.array([[0.0, 1.0], [2.0, 3.0]]) + X = pt.matrix("X", shape=(None, 2)) + X_new = pt.matrix("X_new", shape=(None, 2)) + rerooted = Gaussian(0.1 + pt.sum(X**2, axis=1), x=X).at(X_new).sigma + got = pytensor.function([X_new], rerooted)(new) + np.testing.assert_allclose(got, 0.1 + np.sum(new**2, axis=1)) + + def test_reroots_all_tensor_params(self): + X = pt.matrix("X", shape=(None, 1)) + new = np.array([[1.0], [2.0], [3.0]]) + out = StudentT(nu=3.0 + X[:, 0] ** 2, sigma=_hetero(X), x=X).at(pt.as_tensor_variable(new)) + nu_val, sigma_val = _eval(out.nu, out.sigma) + np.testing.assert_allclose(nu_val, 3.0 + new[:, 0] ** 2) + np.testing.assert_allclose(sigma_val, _hetero(new)) + + def test_negative_binomial_alpha(self): + X = pt.matrix("X", shape=(None, 1)) + new = np.array([[0.0], [2.0]]) + out = NegativeBinomial(alpha=_hetero(X), x=X).at(pt.as_tensor_variable(new)) + np.testing.assert_allclose(_eval(out.alpha), _hetero(new)) + + def test_leaves_scalar_params_untouched(self): + """A data-independent parameter keeps its value through re-rooting (it + shares the likelihood node, so its output object is rebuilt, but the + value it computes is unchanged).""" + X = pt.matrix("X", shape=(None, 1)) + lik = StudentT(nu=4.0, sigma=_hetero(X), x=X) + out = lik.at(pt.as_tensor_variable(np.array([[0.0], [1.0], [2.0]]))) + np.testing.assert_allclose(_eval(out.nu), 4.0) + + def test_returns_copy_leaving_original_untouched(self): + X = pt.matrix("X", shape=(None, 1)) + lik = Gaussian(_hetero(X), x=X) + original = lik.sigma + out = lik.at(pt.matrix("X_new", shape=(None, 1))) + assert lik.sigma is original + assert out.sigma is not original + + def test_preserves_non_parameter_attributes(self): + X = pt.matrix("X", shape=(None, 1)) + lik = StudentT(nu=_hetero(X), sigma=0.5, n_points=33, x=X) + assert lik.at(pt.matrix("X_new", shape=(None, 1))).n_points == 33 + + def test_scalar_hyperparam_survives_reroot(self): + """A parameter mixing data with a scalar placeholder re-roots only the + data; the scalar leaf keeps its identity and flows through to the + result (graph_replace preserves sibling leaves).""" + X = pt.matrix("X", shape=(None, 1)) + hyp = pt.scalar("hyp") + new = np.array([[0.0], [1.0], [2.0]]) + X_new = pt.matrix("X_new", shape=(None, 1)) + out = Gaussian(hyp + 0.05 * X[:, 0] ** 2, x=X).at(X_new) + got = pytensor.function([X_new, hyp], out.sigma)(new, 2.0) + np.testing.assert_allclose(got, 2.0 + 0.05 * new[:, 0] ** 2) + + def test_eager_numpy_parameter_is_not_rerooted(self): + """Pure-numpy arithmetic collapses the data dependence to a fixed vector; + with no ``x=`` the likelihood carries no design matrix, so ``at`` is a + no-op and the value is preserved.""" + arr = np.linspace(-1.0, 1.0, 5)[:, None] + expected = 0.1 + 0.05 * arr[:, 0] ** 2 + lik = Gaussian(pt.as_tensor_variable(expected)) + out = lik.at(pt.matrix("X_new", shape=(None, 1))) + assert out.sigma is lik.sigma # at did nothing (no design matrix) + np.testing.assert_allclose(_eval(out.sigma), expected) + + def test_no_x_means_not_rerooted(self): + """Omitting ``x=`` is an explicit opt-out: the likelihood carries no + design matrix, so ``at`` leaves the parameter untouched.""" + sigma = 0.1 + pt.sum(pt.matrix("Xtr", shape=(None, 3)) ** 2, axis=1) + lik = Gaussian(sigma) + out = lik.at(pt.matrix("X_new", shape=(None, 3))) + assert out.sigma is lik.sigma + + def test_one_dimensional_X_new_is_supported(self): + """With an explicit, identity-keyed handle there is no parameter/data + ambiguity, so the replacement inputs need not be 2D — a 1D X_new + substitutes cleanly.""" + X = pt.vector("X", shape=(None,)) + new = np.array([0.0, 1.0, 2.0]) + X_new = pt.vector("X_new", shape=(None,)) + out = Gaussian(0.1 + 0.05 * X**2, x=X).at(X_new) + got = pytensor.function([X_new], out.sigma)(new) + np.testing.assert_allclose(got, 0.1 + 0.05 * new**2) + + +class TestGradientThroughLikelihoodNode: + """The likelihood node is a pass-through, so gradients must flow through it + to the parameters' hyperparameters — for both homoskedastic (no design + matrix) and heteroskedastic nodes.""" + + def _ve_grad(self, lik, wrt, givens): + y = pt.as_tensor_variable(np.array([0.5, -0.3])) + mu = pt.as_tensor_variable(np.array([0.0, 0.2])) + var = pt.as_tensor_variable(np.array([1.0, 0.5])) + ve = pt.sum(lik.variational_expectation(y, mu, var)) + g = pytensor.grad(ve, wrt) + return pytensor.function(list(givens), g) + + def test_grad_homoskedastic(self): + alpha = pt.scalar("alpha") + lik = Gaussian(alpha) # node, has_data=False + val = self._ve_grad(lik, alpha, [alpha])(0.7) + assert np.isfinite(val) + + def test_grad_heteroskedastic(self): + X = pt.matrix("X", shape=(None, 1)) + alpha = pt.scalar("alpha") + lik = Gaussian(alpha + 0.05 * X[:, 0] ** 2, x=X) # node, has_data=True + fn = self._ve_grad(lik, alpha, [X, alpha]) + val = fn(np.array([[0.0], [1.0]]), 0.7) + assert np.isfinite(val) diff --git a/tests/likelihoods/test_bernoulli.py b/tests/likelihoods/test_bernoulli.py index ca75fa8..9ffa2e5 100644 --- a/tests/likelihoods/test_bernoulli.py +++ b/tests/likelihoods/test_bernoulli.py @@ -9,6 +9,7 @@ from gpjax.likelihoods import Bernoulli as GPJaxBernoulli from ptgp.likelihoods import Bernoulli +from ptgp.likelihoods.base import LikelihoodOp ATOL = 1e-5 @@ -42,13 +43,20 @@ def test_ve_against_gpjax(self): np.testing.assert_allclose(ve, gpjax_ve, atol=ATOL) - def test_predict_mean_and_var_closed_form(self): - mu, var = np.array([0.0, 2.0, -2.0]), np.array([0.1, 0.5, 1.0]) - pm_val, pv_val = _eval( - *Bernoulli().predict_mean_and_var(pt.as_tensor_variable(mu), pt.as_tensor_variable(var)) + def test_probit_closed_form_predict_matches_quadrature(self): + """The probit closed-form predictive should agree with the base quadrature + path it overrides — a correctness check, not just bounds.""" + mu, var = np.array([0.0, 0.7, -1.2]), np.array([0.2, 0.5, 1.0]) + op = Bernoulli(n_points=64).owner.op + m_closed, v_closed = op.predict_mean_and_var( + [], pt.as_tensor_variable(mu), pt.as_tensor_variable(var) + ) + m_quad, v_quad = LikelihoodOp.predict_mean_and_var( + op, [], pt.as_tensor_variable(mu), pt.as_tensor_variable(var) ) - assert np.all(pm_val >= 0.0) and np.all(pm_val <= 1.0) - assert np.all(pv_val >= 0.0) and np.all(pv_val <= 0.25) + mc, vc, mq, vq = _eval(m_closed, v_closed, m_quad, v_quad) + np.testing.assert_allclose(mc, mq, atol=1e-4) + np.testing.assert_allclose(vc, vq, atol=1e-4) def test_ve_negative(self): mu, var = np.array([0.0, 2.0, -2.0]), np.array([0.1, 0.5, 1.0]) diff --git a/tests/likelihoods/test_functional_api.py b/tests/likelihoods/test_functional_api.py new file mode 100644 index 0000000..8a364be --- /dev/null +++ b/tests/likelihoods/test_functional_api.py @@ -0,0 +1,124 @@ +"""The likelihood works in a purely functional way: a likelihood node carries +its Op, design-matrix input, and parameters, so it can be used without the +stateful holder at all. Everything dispatches through ``owner.op``.""" + +import numpy as np +import pymc as pm +import pytensor +import pytensor.tensor as pt + +from pytensor.graph.fg import FunctionGraph + +from ptgp.likelihoods import ( + Gaussian, + op_of, + param, + predict_mean_and_var, + variational_expectation, + at, +) +from ptgp.likelihoods.gaussian import GaussianOp +from pytensor.graph.replace import clone_replace + + +def _eval(*tensors): + f = pytensor.function([], list(tensors) if len(tensors) > 1 else tensors[0]) + return f() + + +def test_likelihood_survives_clone_and_reroots_on_the_clone(): + X = pt.matrix("X", shape=(None, 1)) + lik = Gaussian(0.1 + 0.05 * X[:, 0] ** 2, x=X) + assert isinstance(lik.owner.op, GaussianOp) + X_new = pt.matrix("X_new", shape=(None, 1)) + X_new_test = np.array([[0.0], [1.0], [2.0]]) + mu = pt.as_tensor_variable(np.zeros(3)) + var = pt.as_tensor_variable(np.ones(3)) + + _, orig_var = lik.at(X_new).predict_mean_and_var(mu, var) + orig_eval = orig_var.eval({X_new: X_new_test}) + + [cloned_lik] = clone_replace([lik]) + assert cloned_lik is not lik + assert isinstance(cloned_lik.owner.op, GaussianOp) + _, cloned_var = predict_mean_and_var(at(cloned_lik, X_new), mu, var) + cloned_eval = cloned_var.eval({X_new: X_new_test}) + + expected = 1.0 + (0.1 + 0.05 * X_new_test[:, 0] ** 2) ** 2 # var + sigma(X_new)**2 + np.testing.assert_allclose(orig_eval, expected) + np.testing.assert_allclose(cloned_eval, expected) + + +def test_node_built_without_holder_is_fully_usable(): + """Build a likelihood node directly from the Op — no holder — and drive it + through the functional API: param access, re-rooting, and prediction all + come from ``owner.op``.""" + X = pt.matrix("X", shape=(None, 1)) + sigma = 0.1 + 0.05 * X[:, 0] ** 2 + lik = GaussianOp(has_data=True)(X, sigma) # a node, via the Op directly + + assert isinstance(op_of(lik), GaussianOp) + assert param(lik, "sigma") is sigma # the parameter is the node's input + + new = np.array([[0.0], [1.0], [2.0]]) + rerooted = at(lik, pt.as_tensor_variable(new)) + np.testing.assert_allclose(_eval(param(rerooted, "sigma")), 0.1 + 0.05 * new[:, 0] ** 2) + + +def test_functional_matches_holder(): + """The free functions and the holder methods are the same computation.""" + X = pt.matrix("X", shape=(None, 1)) + mu = pt.as_tensor_variable(np.array([0.0, 1.0])) + var = pt.as_tensor_variable(np.array([0.5, 0.2])) + new = pt.as_tensor_variable(np.array([[0.5], [1.5]])) + + lik = Gaussian(0.1 + 0.05 * X[:, 0] ** 2, x=X) # a LikelihoodVariable + + hm, hv = lik.at(new).predict_mean_and_var(mu, var) # method API + fm, fv = predict_mean_and_var(at(lik, new), mu, var) # functional API + np.testing.assert_allclose(_eval(hm), _eval(fm)) + np.testing.assert_allclose(_eval(hv), _eval(fv)) + + +def test_functional_variational_expectation_preserves_rv_identity(): + """The functional path is RV-safe: an RV hyperparameter in the parameter + survives re-rooting, so it can be evaluated inside a model context.""" + with pm.Model(): + alpha = pm.HalfNormal("alpha") + X = pt.matrix("X", shape=(None, 1)) + lik = GaussianOp(has_data=True)(X, alpha + 0.05 * X[:, 0] ** 2) + new = pt.as_tensor_variable(np.array([[0.0], [2.0]])) + ve = variational_expectation( + at(lik, new), + pt.as_tensor_variable(np.array([0.0, 0.0])), + pt.as_tensor_variable(np.array([0.0, 0.0])), + pt.as_tensor_variable(np.array([1.0, 1.0])), + ) + # alpha is still a free input of the re-rooted expectation graph + val = pytensor.function([alpha], ve)(0.3) + assert np.isfinite(val).all() + + +def test_design_matrix_handle_survives_rewrite(): + """Regression — the "don't lose the handle" case: when the design matrix is an + intermediate node (X = log(x)) and a parameter exps it (sigma = exp(X)), an + algebraic rewrite would cancel exp(log(x)) -> x and sever our handle to X. The + design matrix is routed through an opaque gp_data barrier, so canonicalization + cannot see through it: X survives the rewrite and re-rooting still reaches it. + """ + from pytensor.graph.rewriting.utils import rewrite_graph + from pytensor.graph.traversal import ancestors + + x = pt.matrix("x", shape=(None, 1)) + X = pt.log(x) # design matrix is an intermediate node, not a leaf + lik = Gaussian(pt.exp(X[:, 0]), x=X) # sigma = exp(log(x)) — would cancel without a barrier + sigma = lik.owner.inputs[1] # the parameter, routed through the gp_data barrier + + rewritten = rewrite_graph(sigma, include=("canonicalize",)) + # The barrier blocked the exp(log(x)) -> x cancellation, so X (=log(x)) survives. + assert X in set(ancestors([rewritten])) + + # And re-rooting the (held) likelihood still reaches the design matrix. + X_new = pt.matrix("X_new", shape=(None, 1)) + got = lik.at(X_new).sigma.eval({X_new: np.array([[2.0], [3.0]])}) + np.testing.assert_allclose(got, np.exp([2.0, 3.0])) diff --git a/tests/likelihoods/test_gaussian.py b/tests/likelihoods/test_gaussian.py index 3593739..24f4b11 100644 --- a/tests/likelihoods/test_gaussian.py +++ b/tests/likelihoods/test_gaussian.py @@ -43,18 +43,18 @@ def test_ve_against_gpjax(self): np.testing.assert_allclose(ve, gpjax_ve, atol=ATOL) - def test_zero_var_matches_log_prob(self): + def test_zero_var_collapses_to_log_likelihood(self): + """At zero latent variance the VE reduces to the pointwise log N(y; mu, sigma^2).""" mu, y, sigma = np.array([0.0, 1.0]), np.array([0.1, 0.9]), 0.3 - lik = Gaussian(sigma) ve = _eval( - lik.variational_expectation( + Gaussian(sigma).variational_expectation( pt.as_tensor_variable(y), pt.as_tensor_variable(mu), pt.as_tensor_variable(np.zeros(2)), ) ) - lp = _eval(lik._log_prob(pt.as_tensor_variable(mu), pt.as_tensor_variable(y))) - np.testing.assert_allclose(ve, lp, atol=1e-12) + expected = -0.5 * (np.log(2 * np.pi * sigma**2) + (y - mu) ** 2 / sigma**2) + np.testing.assert_allclose(ve, expected, atol=1e-12) def test_predict_mean_and_var(self): mu, var, sigma = np.array([1.0, 2.0]), np.array([0.5, 1.0]), 0.3 diff --git a/tests/likelihoods/test_negative_binomial.py b/tests/likelihoods/test_negative_binomial.py index 854129e..61d79ca 100644 --- a/tests/likelihoods/test_negative_binomial.py +++ b/tests/likelihoods/test_negative_binomial.py @@ -29,6 +29,19 @@ def test_quadrature_convergence(self): ) np.testing.assert_allclose(ve_20, ve_50, atol=1e-6) + def test_predict_variance_includes_overdispersion(self): + """At zero latent variance the predictive variance is mu + mu**2 / alpha, + exceeding the Poisson mean-equals-variance.""" + mu, var, alpha = np.array([0.5, 1.0]), np.zeros(2), 3.0 + lam = np.exp(mu) + pm, pv = _eval( + *NegativeBinomial(alpha=alpha).predict_mean_and_var( + pt.as_tensor_variable(mu), pt.as_tensor_variable(var) + ) + ) + np.testing.assert_allclose(pm, lam, atol=1e-10) + np.testing.assert_allclose(pv, lam + lam**2 / alpha, atol=1e-10) + def test_converges_to_poisson(self): """NB with large alpha should approach Poisson.""" mu, var = np.array([0.5, 1.0]), np.array([0.1, 0.3]) @@ -36,9 +49,10 @@ def test_converges_to_poisson(self): # Use quadrature for both so comparison is apples-to-apples poisson_lik = Poisson(n_points=50) + pop = poisson_lik.owner.op ve_poisson = _eval( - poisson_lik._gauss_hermite( - poisson_lik._log_prob, + pop._gauss_hermite( + pop._log_prob, pt.as_tensor_variable(y), pt.as_tensor_variable(mu), pt.as_tensor_variable(var), diff --git a/tests/likelihoods/test_poisson.py b/tests/likelihoods/test_poisson.py index 6835f0a..1988c38 100644 --- a/tests/likelihoods/test_poisson.py +++ b/tests/likelihoods/test_poisson.py @@ -4,6 +4,8 @@ import pytensor import pytensor.tensor as pt +from scipy.special import gammaln + from ptgp.likelihoods import Poisson @@ -23,11 +25,12 @@ def test_closed_form_matches_quadrature(self): pt.as_tensor_variable(y), pt.as_tensor_variable(mu), pt.as_tensor_variable(var) ) ) - # Use base class quadrature via _gauss_hermite directly + # Use base-op quadrature via _gauss_hermite directly lik = Poisson(n_points=50) + op = lik.owner.op ve_quad = _eval( - lik._gauss_hermite( - lik._log_prob, + op._gauss_hermite( + op._log_prob, pt.as_tensor_variable(y), pt.as_tensor_variable(mu), pt.as_tensor_variable(var), @@ -46,3 +49,21 @@ def test_ve_values(self): ) ) np.testing.assert_allclose(ve, np.array([-1.0]), atol=1e-12) + + def test_quadrature_predict_collapses_at_zero_variance(self): + """At zero latent variance the base quadrature predictives reduce to the + conditional moments and the pointwise log-density at f = mu.""" + mu, y, var = np.array([0.5, 1.0]), np.array([1.0, 3.0]), np.zeros(2) + lam = np.exp(mu) + lik = Poisson() + pm, pv = _eval( + *lik.predict_mean_and_var(pt.as_tensor_variable(mu), pt.as_tensor_variable(var)) + ) + pld = _eval( + lik.predict_log_density( + pt.as_tensor_variable(y), pt.as_tensor_variable(mu), pt.as_tensor_variable(var) + ) + ) + np.testing.assert_allclose(pm, lam, atol=1e-10) + np.testing.assert_allclose(pv, lam, atol=1e-10) + np.testing.assert_allclose(pld, y * mu - lam - gammaln(y + 1.0), atol=1e-10) diff --git a/tests/likelihoods/test_student_t.py b/tests/likelihoods/test_student_t.py index 2a47a8e..3417257 100644 --- a/tests/likelihoods/test_student_t.py +++ b/tests/likelihoods/test_student_t.py @@ -43,3 +43,33 @@ def test_quadrature_convergence(self): ) ) np.testing.assert_allclose(ve_20, ve_50, atol=1e-6) + + def test_predict_variance_uses_studentt_formula(self): + """At zero latent variance the predictive variance is the Student-T + variance sigma**2 * nu / (nu - 2).""" + mu, var = np.array([0.5, -1.0]), np.zeros(2) + nu, sigma = 5.0, 0.7 + pm, pv = _eval( + *StudentT(nu=nu, sigma=sigma).predict_mean_and_var( + pt.as_tensor_variable(mu), pt.as_tensor_variable(var) + ) + ) + np.testing.assert_allclose(pm, mu, atol=1e-6) + np.testing.assert_allclose(pv, sigma**2 * nu / (nu - 2.0), atol=1e-6) + + def test_heavier_tails_than_gaussian_for_outlier(self): + """A far-from-mean observation is penalized less under Student-T than + Gaussian at matched scale — the heavy-tail behavior.""" + mu, var, y = np.array([0.0]), np.array([0.1]), np.array([10.0]) + sigma = 1.0 + ve_studentt = _eval( + StudentT(nu=3.0, sigma=sigma).variational_expectation( + pt.as_tensor_variable(y), pt.as_tensor_variable(mu), pt.as_tensor_variable(var) + ) + ) + ve_gaussian = _eval( + Gaussian(sigma).variational_expectation( + pt.as_tensor_variable(y), pt.as_tensor_variable(mu), pt.as_tensor_variable(var) + ) + ) + assert ve_studentt > ve_gaussian diff --git a/tests/test_heteroskedastic_predict.py b/tests/test_heteroskedastic_predict.py index d6d407b..229e586 100644 --- a/tests/test_heteroskedastic_predict.py +++ b/tests/test_heteroskedastic_predict.py @@ -2,11 +2,11 @@ import pymc as pm import pytensor import pytensor.tensor as pt -import pytest -from ptgp.gp import VFE, Unapproximated +from ptgp.gp import SVGP, VFE, Unapproximated, init_variational_params from ptgp.inducing import Points from ptgp.kernels import ExpQuad +from ptgp.likelihoods import Gaussian from ptgp.mean import Zero from ptgp.objectives import collapsed_elbo @@ -33,7 +33,7 @@ def test_incl_lik_adds_pointwise_noise(self): X_train, y_train, X_new = _data() X = pt.matrix("X", shape=(None, 1)) gp = Unapproximated( - kernel=ExpQuad(input_dim=1, ls=1.0), mean=Zero(), sigma=_hetero_sigma(X) + kernel=ExpQuad(input_dim=1, ls=1.0), mean=Zero(), sigma=_hetero_sigma(X), x=X ) X_new_t = pt.matrix("X_new", shape=(None, 1)) y_t = pt.vector("y", shape=(None,)) @@ -47,18 +47,6 @@ def test_incl_lik_adds_pointwise_noise(self): expected_sigma_new = 0.1 + 0.05 * X_new[:, 0] ** 2 np.testing.assert_allclose(yv, v + expected_sigma_new**2, atol=1e-10) - def test_wrong_X_raises(self): - """sigma built against X_a, predict called with X_b — ancestor check fires.""" - X_a = pt.matrix("X_a", shape=(None, 1)) - X_b = pt.matrix("X_b", shape=(None, 1)) - gp = Unapproximated( - kernel=ExpQuad(input_dim=1, ls=1.0), mean=Zero(), sigma=_hetero_sigma(X_a) - ) - X_new_t = pt.matrix("X_new", shape=(None, 1)) - y_t = pt.vector("y", shape=(None,)) - with pytest.raises(ValueError, match=r"Some replacements were not used"): - gp.predict_marginal(X_new_t, X_b, y_t, incl_lik=True) - class TestVFEHeteroskedastic: def _build(self, X): @@ -66,7 +54,7 @@ def _build(self, X): return VFE( kernel=ExpQuad(input_dim=1, ls=1.0), mean=Zero(), - sigma=_hetero_sigma(X), + sigma=_hetero_sigma(X), x=X, inducing_variable=Points(pt.as_tensor_variable(Z)), ) @@ -86,14 +74,30 @@ def test_incl_lik_adds_pointwise_noise(self): expected_sigma_new = 0.1 + 0.05 * X_new[:, 0] ** 2 np.testing.assert_allclose(yv, v + expected_sigma_new**2, atol=1e-10) - def test_wrong_X_raises(self): - X_a = pt.matrix("X_a", shape=(None, 1)) - X_b = pt.matrix("X_b", shape=(None, 1)) - vfe = self._build(X_a) + +class TestSVGPHeteroskedastic: + def test_incl_lik_adds_pointwise_noise(self): + X_train, y_train, X_new = _data() + M = 6 + Z = np.linspace(-1.5, 1.5, M)[:, None].astype(np.float64) + vp = init_variational_params(M) + X = pt.matrix("X", shape=(None, 1)) + svgp = SVGP( + kernel=ExpQuad(input_dim=1, ls=1.0), + mean=Zero(), + likelihood=Gaussian(_hetero_sigma(X), x=X), + inducing_variable=Points(pt.as_tensor_variable(Z)), + variational_params=vp, + ) X_new_t = pt.matrix("X_new", shape=(None, 1)) - y_t = pt.vector("y", shape=(None,)) - with pytest.raises(ValueError, match=r"Some replacements were not used"): - vfe.predict_marginal(X_new_t, X_b, y_t, incl_lik=True) + + _, fvar = svgp.predict_marginal(X_new_t) + _, yvar = svgp.predict_marginal(X_new_t, incl_lik=True) + fn = pytensor.function([X_new_t, *vp.extra_vars], [fvar, yvar], on_unused_input="ignore") + v, yv = fn(X_new, *vp.extra_init) + + expected_sigma_new = 0.1 + 0.05 * X_new[:, 0] ** 2 + np.testing.assert_allclose(yv - v, expected_sigma_new**2, atol=1e-10) class TestScalarSigmaUnaffected: @@ -145,10 +149,11 @@ def test_vfe_scalar(self): def test_pm_data_sigma_is_detected_as_data_dependent(): - """sigma built against a pm.Data SharedVariable must still be treated - as data-dependent. The heuristic filters by type (TensorType), not by - class (SharedVariable) — without this, pm.Data is silently invisible - to sigma_at and predictions use the wrong noise. + """sigma built against a pm.Data SharedVariable must still be re-rooted. + + The design matrix handle is keyed by identity (the ``x=`` argument), so a + pm.Data SharedVariable works exactly like a symbolic placeholder — passing + it as ``x=`` lets at swap it for the test inputs. """ X_train_arr = np.linspace(-1.5, 1.5, 10)[:, None].astype(np.float64) X_new_arr = np.linspace(-2.0, 2.0, 5)[:, None].astype(np.float64) @@ -158,7 +163,7 @@ class (SharedVariable) — without this, pm.Data is silently invisible gp = Unapproximated( kernel=ExpQuad(input_dim=1, ls=1.0), mean=Zero(), - sigma=_hetero_sigma(X), + sigma=_hetero_sigma(X), x=X, ) X_new_t = pt.matrix("X_new", shape=(None, 1)) y_t = pt.vector("y", shape=(None,)) @@ -182,7 +187,7 @@ def test_collapsed_elbo_with_heteroskedastic_sigma(): vfe = VFE( kernel=ExpQuad(input_dim=1, ls=1.0), mean=Zero(), - sigma=_hetero_sigma(X), + sigma=_hetero_sigma(X), x=X, inducing_variable=Points(pt.as_tensor_variable(Z)), ) elbo = collapsed_elbo(vfe, X, y).elbo diff --git a/tests/test_rewrites.py b/tests/test_rewrites.py index 345dcc1..0035258 100644 --- a/tests/test_rewrites.py +++ b/tests/test_rewrites.py @@ -25,10 +25,6 @@ # Install the rewrites under test (side-effect import). import ptgp.rewrites # noqa: F401 -# --------------------------------------------------------------------------- -# Helpers for the structural rewrites below. -# --------------------------------------------------------------------------- - def _has_op(graph, op_type): fg = FunctionGraph(outputs=[graph] if not isinstance(graph, list) else graph, clone=False) @@ -40,11 +36,6 @@ def _has_op(graph, op_type): return False -# --------------------------------------------------------------------------- -# Det(L @ L.T) -> (prod(diag(L)))**2 rewrite (Rule H). -# --------------------------------------------------------------------------- - - def test_det_of_LLT_takes_diag_product_when_L_lower_triangular(): """``Det(L @ L.T)`` lowers to ``(prod(diag(L)))**2`` — no Det Apply remains.""" from pytensor.tensor.linalg.summary import det @@ -70,11 +61,6 @@ def test_det_of_LLT_diag_product_is_numerically_correct(): np.testing.assert_allclose(f(L_val), expected, atol=1e-10) -# --------------------------------------------------------------------------- -# ExtractDiag(A @ A.T) -> sum(A**2, axis=-1) rewrite (Rule I). -# --------------------------------------------------------------------------- - - def test_diag_of_AAT_to_row_norms_squared_eliminates_dot(): """``ExtractDiag(A @ A.T)`` lowers to ``sum(A**2, axis=-1)`` — no ExtractDiag.""" A = pt.dmatrix("A") @@ -111,11 +97,6 @@ def test_diag_of_ATA_is_numerically_correct(): np.testing.assert_allclose(f(A_val), np.diag(A_val.T @ A_val), atol=1e-12) -# --------------------------------------------------------------------------- -# MatrixInverse(PSD A) -> cho_solve(L, eye) rewrite (Rule J). -# --------------------------------------------------------------------------- - - def test_matrix_inverse_of_LTL_uses_solve_triangular(): """``MatrixInverse(L.T @ L)`` lowers to two solve_triangular calls — no fresh Cholesky.""" L = pta.assume(pt.dmatrix("L"), lower_triangular=True) @@ -181,12 +162,10 @@ def test_matrix_inverse_lowering_is_numerically_correct(): np.testing.assert_allclose(f(K_val), np.linalg.inv(K_val), atol=1e-10) -# --------------------------------------------------------------------------- # Joint-graph cubic-op floor pinning (was: Blocker A / merge_composites_*). # These pin the invariant the deleted composite-merge passes were claimed to # enforce; they pass without those passes because pytensor 3.0.3's fusion # pipeline already arrives at the floor. -# --------------------------------------------------------------------------- def _gp_mll_like_joint(): diff --git a/tests/test_svgp_scipy_ref.py b/tests/test_svgp_scipy_ref.py index 3c8cb58..52167c6 100644 --- a/tests/test_svgp_scipy_ref.py +++ b/tests/test_svgp_scipy_ref.py @@ -32,9 +32,6 @@ ATOL = 1e-5 -# ---- Reference ELBO machinery (numpy + scipy, no PTGP internals) ---------- - - def _matern52_numpy(X1, X2, ls, eta): """Matern52 kernel in numpy: k(r) = eta^2 (1 + sqrt(5)r + 5r^2/3) exp(-sqrt(5)r).""" sqd = np.sum(X1**2, axis=-1)[:, None] + np.sum(X2**2, axis=-1)[None, :] - 2.0 * X1 @ X2.T @@ -98,9 +95,6 @@ def _fixed_config(rng, N=40, M=8, x_range=(-2.0, 2.0)): return X, Z, q_mu, L -# ---- StudentT ------------------------------------------------------------- - - def _studentt_logprob(nu, sigma): """Return a numpy log_prob(f, y) for StudentT(y; f, sigma, nu).""" @@ -237,9 +231,6 @@ def test_elbo_match(self): np.testing.assert_allclose(e_ptgp, e_ref, atol=ATOL) -# ---- NegativeBinomial ----------------------------------------------------- - - def _negbinom_logprob(alpha): """Return a numpy log_prob(f, y) for NegativeBinomial(y; exp(f), alpha).""" diff --git a/tests/test_utils.py b/tests/test_utils.py index a549a62..f2696cf 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -8,10 +8,6 @@ from ptgp.utils import _build_index_labels, check_init -# --------------------------------------------------------------------------- -# Helpers -# --------------------------------------------------------------------------- - def _make_simple_model_and_objective(): """1-D GP on synthetic data; returns (fun, theta0, X, y, model).""" @@ -42,11 +38,6 @@ def _make_simple_model_and_objective(): return fun, theta0, X, y, model -# --------------------------------------------------------------------------- -# check_init tests -# --------------------------------------------------------------------------- - - class TestCheckInit: def test_returns_true_on_valid_init(self): fun, theta0, X, y, model = _make_simple_model_and_objective() @@ -119,11 +110,6 @@ def test_top_k_capped_at_theta_size(self, capsys): assert len(table_lines) == theta0.size -# --------------------------------------------------------------------------- -# _build_index_labels unit tests -# --------------------------------------------------------------------------- - - class TestBuildIndexLabels: def test_returns_none_without_model(self): result = _build_index_labels(5, model=None, extra_vars=None, extra_init=None)