From 482b1b250b2df71c82a7c395bb67af685a2d9006 Mon Sep 17 00:00:00 2001 From: Claude Date: Sun, 4 Jan 2026 22:19:08 +0000 Subject: [PATCH 1/3] Refactor estimators.py to reduce module size Split large estimators.py (~1812 lines) into smaller focused modules: - estimators.py (~975 lines): DifferenceInDifferences, MultiPeriodDiD - twfe.py (~355 lines): TwoWayFixedEffects - synthetic_did.py (~540 lines): SyntheticDiD Backward compatibility maintained via re-exports from estimators.py. All 100 estimator tests pass. Updated CLAUDE.md and TODO.md. --- CLAUDE.md | 7 +- TODO.md | 8 +- diff_diff/estimators.py | 886 +------------------------------------ diff_diff/synthetic_did.py | 540 ++++++++++++++++++++++ diff_diff/twfe.py | 357 +++++++++++++++ 5 files changed, 933 insertions(+), 865 deletions(-) create mode 100644 diff_diff/synthetic_did.py create mode 100644 diff_diff/twfe.py diff --git a/CLAUDE.md b/CLAUDE.md index 280d0b2..2514be3 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -37,8 +37,13 @@ mypy diff_diff - **`diff_diff/estimators.py`** - Core estimator classes implementing DiD methods: - `DifferenceInDifferences` - Basic 2x2 DiD with formula or column-name interface - - `TwoWayFixedEffects` - Panel DiD with unit and time fixed effects (within-transformation) - `MultiPeriodDiD` - Event-study style DiD with period-specific treatment effects + - Re-exports `TwoWayFixedEffects` and `SyntheticDiD` for backward compatibility + +- **`diff_diff/twfe.py`** - Two-Way Fixed Effects estimator: + - `TwoWayFixedEffects` - Panel DiD with unit and time fixed effects (within-transformation) + +- **`diff_diff/synthetic_did.py`** - Synthetic DiD estimator: - `SyntheticDiD` - Synthetic control combined with DiD (Arkhangelsky et al. 2021) - **`diff_diff/staggered.py`** - Staggered adoption DiD estimators: diff --git a/TODO.md b/TODO.md index fe53a99..cec6d86 100644 --- a/TODO.md +++ b/TODO.md @@ -78,7 +78,9 @@ Current line counts (target: < 1000 lines per module): | File | Lines | Status | |------|-------|--------| | `staggered.py` | 1822 | Consider splitting | -| `estimators.py` | 1812 | Consider splitting | +| `estimators.py` | ~975 | OK (refactored) | +| `twfe.py` | ~355 | OK (new) | +| `synthetic_did.py` | ~540 | OK (new) | | `honest_did.py` | 1491 | Acceptable | | `utils.py` | 1350 | Acceptable | | `power.py` | 1350 | Acceptable | @@ -86,8 +88,10 @@ Current line counts (target: < 1000 lines per module): | `visualization.py` | 1388 | Acceptable | | `bacon.py` | 1027 | OK | +**Completed splits:** +- ~~`estimators.py` → `twfe.py`, `synthetic_did.py` (keep base classes in estimators.py)~~ - Done in 1.0.1 + **Potential splits:** -- `estimators.py` → `twfe.py`, `synthetic_did.py` (keep base classes in estimators.py) - `staggered.py` → `staggered_bootstrap.py` (move bootstrap logic) --- diff --git a/diff_diff/estimators.py b/diff_diff/estimators.py index ad3d123..c82431c 100644 --- a/diff_diff/estimators.py +++ b/diff_diff/estimators.py @@ -1,26 +1,27 @@ """ Difference-in-Differences estimators with sklearn-like API. + +This module contains the core DiD estimators: +- DifferenceInDifferences: Basic 2x2 DiD estimator +- MultiPeriodDiD: Event-study style DiD with period-specific treatment effects + +Additional estimators are in separate modules: +- TwoWayFixedEffects: See diff_diff.twfe +- SyntheticDiD: See diff_diff.synthetic_did + +For backward compatibility, all estimators are re-exported from this module. """ -import warnings -from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple +from typing import Any, Dict, List, Optional, Tuple import numpy as np import pandas as pd -from numpy.linalg import LinAlgError -if TYPE_CHECKING: - from diff_diff.bacon import BaconDecompositionResults - -from diff_diff.results import DiDResults, MultiPeriodDiDResults, PeriodEffect, SyntheticDiDResults +from diff_diff.results import DiDResults, MultiPeriodDiDResults, PeriodEffect from diff_diff.utils import ( compute_confidence_interval, compute_p_value, - compute_placebo_effects, compute_robust_se, - compute_sdid_estimator, - compute_synthetic_weights, - compute_time_weights, validate_binary, wild_bootstrap_se, ) @@ -615,342 +616,6 @@ def print_summary(self) -> None: print(self.summary()) -class TwoWayFixedEffects(DifferenceInDifferences): - """ - Two-Way Fixed Effects (TWFE) estimator for panel DiD. - - Extends DifferenceInDifferences to handle panel data with unit - and time fixed effects. - - Parameters - ---------- - robust : bool, default=True - Whether to use heteroskedasticity-robust standard errors. - cluster : str, optional - Column name for cluster-robust standard errors. - If None, automatically clusters at the unit level (the `unit` - parameter passed to `fit()`). This differs from - DifferenceInDifferences where cluster=None means no clustering. - alpha : float, default=0.05 - Significance level for confidence intervals. - - Notes - ----- - This estimator uses the regression: - - Y_it = α_i + γ_t + β*(D_i × Post_t) + X_it'δ + ε_it - - where α_i are unit fixed effects and γ_t are time fixed effects. - - Warning: TWFE can be biased with staggered treatment timing - and heterogeneous treatment effects. Consider using - more robust estimators (e.g., Callaway-Sant'Anna) for - staggered designs. - """ - - def fit( # type: ignore[override] - self, - data: pd.DataFrame, - outcome: str, - treatment: str, - time: str, - unit: str, - covariates: Optional[List[str]] = None - ) -> DiDResults: - """ - Fit Two-Way Fixed Effects model. - - Parameters - ---------- - data : pd.DataFrame - Panel data. - outcome : str - Name of outcome variable column. - treatment : str - Name of treatment indicator column. - time : str - Name of time period column. - unit : str - Name of unit identifier column. - covariates : list, optional - List of covariate column names. - - Returns - ------- - DiDResults - Estimation results. - """ - # Validate unit column exists - if unit not in data.columns: - raise ValueError(f"Unit column '{unit}' not found in data") - - # Check for staggered treatment timing and warn if detected - self._check_staggered_treatment(data, treatment, time, unit) - - # Use unit-level clustering if not specified (use local variable to avoid mutation) - cluster_var = self.cluster if self.cluster is not None else unit - - # Demean data (within transformation for fixed effects) - data_demeaned = self._within_transform(data, outcome, unit, time, covariates) - - # Create treatment × post interaction - # For staggered designs, we'd need to identify treatment timing per unit - # For now, assume standard 2-period design - data_demeaned["_treatment_post"] = ( - data_demeaned[treatment] * data_demeaned[time] - ) - - # Extract variables for regression - y = data_demeaned[f"{outcome}_demeaned"].values - X_list = [data_demeaned["_treatment_post"].values] - - if covariates: - for cov in covariates: - X_list.append(data_demeaned[f"{cov}_demeaned"].values) - - X = np.column_stack([np.ones(len(y))] + X_list) - - # Fit OLS on demeaned data - coefficients, residuals, fitted, r_squared = self._fit_ols(X, y) - - # ATT is the coefficient on treatment_post (index 1) - att_idx = 1 - att = coefficients[att_idx] - - # Degrees of freedom adjustment for fixed effects - n_units = data[unit].nunique() - n_times = data[time].nunique() - df = len(y) - X.shape[1] - n_units - n_times + 2 - - # Compute standard errors and inference - cluster_ids = data[cluster_var].values - if self.inference == "wild_bootstrap": - # Wild cluster bootstrap for few-cluster inference - bootstrap_results = wild_bootstrap_se( - X, y, residuals, cluster_ids, - coefficient_index=att_idx, - n_bootstrap=self.n_bootstrap, - weight_type=self.bootstrap_weights, - alpha=self.alpha, - seed=self.seed, - return_distribution=False - ) - self._bootstrap_results = bootstrap_results - se = bootstrap_results.se - p_value = bootstrap_results.p_value - conf_int = (bootstrap_results.ci_lower, bootstrap_results.ci_upper) - t_stat = bootstrap_results.t_stat_original - vcov = compute_robust_se(X, residuals, cluster_ids) - else: - # Standard cluster-robust SE - vcov = compute_robust_se(X, residuals, cluster_ids) - se = np.sqrt(vcov[att_idx, att_idx]) - t_stat = att / se - p_value = compute_p_value(t_stat, df=df) - conf_int = compute_confidence_interval(att, se, self.alpha, df=df) - - # Count observations - treated_units = data[data[treatment] == 1][unit].unique() - n_treated = len(treated_units) - n_control = n_units - n_treated - - # Determine inference method and bootstrap info - inference_method = "analytical" - n_bootstrap_used = None - n_clusters_used = None - if self._bootstrap_results is not None: - inference_method = "wild_bootstrap" - n_bootstrap_used = self._bootstrap_results.n_bootstrap - n_clusters_used = self._bootstrap_results.n_clusters - - self.results_ = DiDResults( - att=att, - se=se, - t_stat=t_stat, - p_value=p_value, - conf_int=conf_int, - n_obs=len(y), - n_treated=n_treated, - n_control=n_control, - alpha=self.alpha, - coefficients={"ATT": float(att)}, - vcov=vcov, - residuals=residuals, - fitted_values=fitted, - r_squared=r_squared, - inference_method=inference_method, - n_bootstrap=n_bootstrap_used, - n_clusters=n_clusters_used, - ) - - self.is_fitted_ = True - return self.results_ - - def _within_transform( - self, - data: pd.DataFrame, - outcome: str, - unit: str, - time: str, - covariates: Optional[List[str]] = None - ) -> pd.DataFrame: - """ - Apply within transformation to remove unit and time fixed effects. - - This implements the standard two-way within transformation: - y_it - y_i. - y_.t + y_.. - - Parameters - ---------- - data : pd.DataFrame - Panel data. - outcome : str - Outcome variable name. - unit : str - Unit identifier column. - time : str - Time period column. - covariates : list, optional - Covariate column names. - - Returns - ------- - pd.DataFrame - Data with demeaned variables. - """ - data = data.copy() - variables = [outcome] + (covariates or []) - - for var in variables: - # Unit means - unit_means = data.groupby(unit)[var].transform("mean") - # Time means - time_means = data.groupby(time)[var].transform("mean") - # Grand mean - grand_mean = data[var].mean() - - # Within transformation - data[f"{var}_demeaned"] = data[var] - unit_means - time_means + grand_mean - - return data - - def _check_staggered_treatment( - self, - data: pd.DataFrame, - treatment: str, - time: str, - unit: str, - ) -> None: - """ - Check for staggered treatment timing and warn if detected. - - Identifies if different units start treatment at different times, - which can bias TWFE estimates when treatment effects are heterogeneous. - """ - # Find first treatment time for each unit - treated_obs = data[data[treatment] == 1] - if len(treated_obs) == 0: - return # No treated observations - - # Get first treatment time per unit - first_treat_times = treated_obs.groupby(unit)[time].min() - unique_treat_times = first_treat_times.unique() - - if len(unique_treat_times) > 1: - n_groups = len(unique_treat_times) - warnings.warn( - f"Staggered treatment timing detected: {n_groups} treatment cohorts " - f"start treatment at different times. TWFE can be biased when treatment " - f"effects are heterogeneous across time. Consider using:\n" - f" - CallawaySantAnna estimator for robust estimates\n" - f" - TwoWayFixedEffects.decompose() to diagnose the decomposition\n" - f" - bacon_decompose() to see weight on 'forbidden' comparisons", - UserWarning, - stacklevel=3, - ) - - def decompose( - self, - data: pd.DataFrame, - outcome: str, - unit: str, - time: str, - first_treat: str, - weights: str = "approximate", - ) -> "BaconDecompositionResults": - """ - Perform Goodman-Bacon decomposition of TWFE estimate. - - Decomposes the TWFE estimate into a weighted average of all possible - 2x2 DiD comparisons, revealing which comparisons drive the estimate - and whether problematic "forbidden comparisons" are involved. - - Parameters - ---------- - data : pd.DataFrame - Panel data with unit and time identifiers. - outcome : str - Name of outcome variable column. - unit : str - Name of unit identifier column. - time : str - Name of time period column. - first_treat : str - Name of column indicating when each unit was first treated. - Use 0 (or np.inf) for never-treated units. - weights : str, default="approximate" - Weight calculation method: - - "approximate": Fast simplified formula (default). Good for - diagnostic purposes where relative weights are sufficient. - - "exact": Variance-based weights from Goodman-Bacon (2021) - Theorem 1. Use for publication-quality decompositions. - - Returns - ------- - BaconDecompositionResults - Decomposition results showing: - - TWFE estimate and its weighted-average breakdown - - List of all 2x2 comparisons with estimates and weights - - Total weight by comparison type (clean vs forbidden) - - Examples - -------- - >>> twfe = TwoWayFixedEffects() - >>> decomp = twfe.decompose( - ... data, outcome='y', unit='id', time='t', first_treat='treat_year' - ... ) - >>> decomp.print_summary() - >>> # Check weight on forbidden comparisons - >>> if decomp.total_weight_later_vs_earlier > 0.2: - ... print("Warning: significant forbidden comparison weight") - - Notes - ----- - This decomposition is essential for understanding potential TWFE bias - in staggered adoption designs. The three comparison types are: - - 1. **Treated vs Never-treated**: Clean comparisons using never-treated - units as controls. These are always valid. - - 2. **Earlier vs Later treated**: Uses later-treated units as controls - before they receive treatment. These are valid. - - 3. **Later vs Earlier treated**: Uses already-treated units as controls. - These "forbidden comparisons" can introduce bias when treatment - effects are dynamic (changing over time since treatment). - - See Also - -------- - bacon_decompose : Standalone decomposition function - BaconDecomposition : Class-based decomposition interface - CallawaySantAnna : Robust estimator that avoids forbidden comparisons - """ - from diff_diff.bacon import BaconDecomposition - - decomp = BaconDecomposition(weights=weights) - return decomp.fit(data, outcome, unit, time, first_treat) - - class MultiPeriodDiD(DifferenceInDifferences): """ Multi-Period Difference-in-Differences estimator. @@ -1296,519 +961,16 @@ def summary(self) -> str: return self.results_.summary() -class SyntheticDiD(DifferenceInDifferences): - """ - Synthetic Difference-in-Differences (SDID) estimator. - - Combines the strengths of Difference-in-Differences and Synthetic Control - methods by re-weighting control units to better match treated units' - pre-treatment trends. - - This method is particularly useful when: - - You have few treated units (possibly just one) - - Parallel trends assumption may be questionable - - Control units are heterogeneous and need reweighting - - You want robustness to pre-treatment differences - - Parameters - ---------- - lambda_reg : float, default=0.0 - L2 regularization for unit weights. Larger values shrink weights - toward uniform. Useful when n_pre_periods < n_control_units. - zeta : float, default=1.0 - Regularization for time weights. Larger values give more uniform - time weights (closer to standard DiD). - alpha : float, default=0.05 - Significance level for confidence intervals. - n_bootstrap : int, default=200 - Number of bootstrap replications for standard error estimation. - Set to 0 to use placebo-based inference instead. - seed : int, optional - Random seed for reproducibility. If None (default), results - will vary between runs. - - Attributes - ---------- - results_ : SyntheticDiDResults - Estimation results after calling fit(). - is_fitted_ : bool - Whether the model has been fitted. +# Re-export estimators from submodules for backward compatibility +# These can also be imported directly from their respective modules: +# - from diff_diff.twfe import TwoWayFixedEffects +# - from diff_diff.synthetic_did import SyntheticDiD +from diff_diff.synthetic_did import SyntheticDiD # noqa: E402 +from diff_diff.twfe import TwoWayFixedEffects # noqa: E402 - Examples - -------- - Basic usage with panel data: - - >>> import pandas as pd - >>> from diff_diff import SyntheticDiD - >>> - >>> # Panel data with units observed over multiple time periods - >>> # Treatment occurs at period 5 for treated units - >>> data = pd.DataFrame({ - ... 'unit': [...], # Unit identifier - ... 'period': [...], # Time period - ... 'outcome': [...], # Outcome variable - ... 'treated': [...] # 1 if unit is ever treated, 0 otherwise - ... }) - >>> - >>> # Fit SDID model - >>> sdid = SyntheticDiD() - >>> results = sdid.fit( - ... data, - ... outcome='outcome', - ... treatment='treated', - ... unit='unit', - ... time='period', - ... post_periods=[5, 6, 7, 8] - ... ) - >>> - >>> # View results - >>> results.print_summary() - >>> print(f"ATT: {results.att:.3f} (SE: {results.se:.3f})") - >>> - >>> # Examine unit weights - >>> weights_df = results.get_unit_weights_df() - >>> print(weights_df.head(10)) - - Notes - ----- - The SDID estimator (Arkhangelsky et al., 2021) computes: - - τ̂ = (Ȳ_treated,post - Σ_t λ_t * Y_treated,t) - - Σ_j ω_j * (Ȳ_j,post - Σ_t λ_t * Y_j,t) - - Where: - - ω_j are unit weights (sum to 1, non-negative) - - λ_t are time weights (sum to 1, non-negative) - - Unit weights ω are chosen to match pre-treatment outcomes: - min ||Σ_j ω_j * Y_j,pre - Y_treated,pre||² - - This interpolates between: - - Standard DiD (uniform weights): ω_j = 1/N_control - - Synthetic Control (exact matching): concentrated weights - - References - ---------- - Arkhangelsky, D., Athey, S., Hirshberg, D. A., Imbens, G. W., & Wager, S. - (2021). Synthetic Difference-in-Differences. American Economic Review, - 111(12), 4088-4118. - """ - - def __init__( - self, - lambda_reg: float = 0.0, - zeta: float = 1.0, - alpha: float = 0.05, - n_bootstrap: int = 200, - seed: Optional[int] = None - ): - super().__init__(robust=True, cluster=None, alpha=alpha) - self.lambda_reg = lambda_reg - self.zeta = zeta - self.n_bootstrap = n_bootstrap - self.seed = seed - - self._unit_weights = None - self._time_weights = None - - def fit( # type: ignore[override] - self, - data: pd.DataFrame, - outcome: str, - treatment: str, - unit: str, - time: str, - post_periods: Optional[List[Any]] = None, - covariates: Optional[List[str]] = None - ) -> SyntheticDiDResults: - """ - Fit the Synthetic Difference-in-Differences model. - - Parameters - ---------- - data : pd.DataFrame - Panel data with observations for multiple units over multiple - time periods. - outcome : str - Name of the outcome variable column. - treatment : str - Name of the treatment group indicator column (0/1). - Should be 1 for all observations of treated units - (both pre and post treatment). - unit : str - Name of the unit identifier column. - time : str - Name of the time period column. - post_periods : list, optional - List of time period values that are post-treatment. - If None, uses the last half of periods. - covariates : list, optional - List of covariate column names. Covariates are residualized - out before computing the SDID estimator. - - Returns - ------- - SyntheticDiDResults - Object containing the ATT estimate, standard error, - unit weights, and time weights. - - Raises - ------ - ValueError - If required parameters are missing or data validation fails. - """ - # Validate inputs - if outcome is None or treatment is None or unit is None or time is None: - raise ValueError( - "Must provide 'outcome', 'treatment', 'unit', and 'time'" - ) - - # Check columns exist - required_cols = [outcome, treatment, unit, time] - if covariates: - required_cols.extend(covariates) - - missing = [c for c in required_cols if c not in data.columns] - if missing: - raise ValueError(f"Missing columns: {missing}") - - # Validate treatment is binary - validate_binary(data[treatment].values, "treatment") - - # Get all unique time periods - all_periods = sorted(data[time].unique()) - - if len(all_periods) < 2: - raise ValueError("Need at least 2 time periods") - - # Determine pre and post periods - if post_periods is None: - mid = len(all_periods) // 2 - post_periods = list(all_periods[mid:]) - pre_periods = list(all_periods[:mid]) - else: - post_periods = list(post_periods) - pre_periods = [p for p in all_periods if p not in post_periods] - - if len(post_periods) == 0: - raise ValueError("Must have at least one post-treatment period") - if len(pre_periods) == 0: - raise ValueError("Must have at least one pre-treatment period") - - # Validate post_periods are in data - for p in post_periods: - if p not in all_periods: - raise ValueError(f"Post-period '{p}' not found in time column") - - # Identify treated and control units - # Treatment indicator should be constant within unit - unit_treatment = data.groupby(unit)[treatment].first() - treated_units = unit_treatment[unit_treatment == 1].index.tolist() - control_units = unit_treatment[unit_treatment == 0].index.tolist() - - if len(treated_units) == 0: - raise ValueError("No treated units found") - if len(control_units) == 0: - raise ValueError("No control units found") - - # Residualize covariates if provided - working_data = data.copy() - if covariates: - working_data = self._residualize_covariates( - working_data, outcome, covariates, unit, time - ) - - # Create outcome matrices - # Shape: (n_periods, n_units) - Y_pre_control, Y_post_control, Y_pre_treated, Y_post_treated = \ - self._create_outcome_matrices( - working_data, outcome, unit, time, - pre_periods, post_periods, treated_units, control_units - ) - - # Compute unit weights (synthetic control weights) - # Average treated outcomes across treated units - Y_pre_treated_mean = np.mean(Y_pre_treated, axis=1) - - unit_weights = compute_synthetic_weights( - Y_pre_control, - Y_pre_treated_mean, - lambda_reg=self.lambda_reg - ) - - # Compute time weights - time_weights = compute_time_weights( - Y_pre_control, - Y_pre_treated_mean, - zeta=self.zeta - ) - - # Compute SDID estimate - Y_post_treated_mean = np.mean(Y_post_treated, axis=1) - - att = compute_sdid_estimator( - Y_pre_control, - Y_post_control, - Y_pre_treated_mean, - Y_post_treated_mean, - unit_weights, - time_weights - ) - - # Compute pre-treatment fit (RMSE) - synthetic_pre = Y_pre_control @ unit_weights - pre_fit_rmse = np.sqrt(np.mean((Y_pre_treated_mean - synthetic_pre) ** 2)) - - # Compute standard errors - if self.n_bootstrap > 0: - se, placebo_effects = self._bootstrap_se( - working_data, outcome, unit, time, - pre_periods, post_periods, treated_units, control_units - ) - else: - # Use placebo-based inference - placebo_effects = compute_placebo_effects( - Y_pre_control, - Y_post_control, - Y_pre_treated_mean, - unit_weights, - time_weights, - control_units - ) - se = np.std(placebo_effects, ddof=1) if len(placebo_effects) > 1 else 0.0 - - # Compute test statistics - if se > 0: - t_stat = att / se - # Use placebo distribution for p-value if available - if len(placebo_effects) > 0: - # Two-sided p-value from placebo distribution - p_value = np.mean(np.abs(placebo_effects) >= np.abs(att)) - p_value = max(p_value, 1.0 / (len(placebo_effects) + 1)) - else: - p_value = compute_p_value(t_stat) - else: - t_stat = 0.0 - p_value = 1.0 - - # Confidence interval - conf_int = compute_confidence_interval(att, se, self.alpha) - - # Create weight dictionaries - unit_weights_dict = { - unit_id: w for unit_id, w in zip(control_units, unit_weights) - } - time_weights_dict = { - period: w for period, w in zip(pre_periods, time_weights) - } - - # Store results - self.results_ = SyntheticDiDResults( - att=att, - se=se, - t_stat=t_stat, - p_value=p_value, - conf_int=conf_int, - n_obs=len(data), - n_treated=len(treated_units), - n_control=len(control_units), - unit_weights=unit_weights_dict, - time_weights=time_weights_dict, - pre_periods=pre_periods, - post_periods=post_periods, - alpha=self.alpha, - lambda_reg=self.lambda_reg, - pre_treatment_fit=pre_fit_rmse, - placebo_effects=placebo_effects if len(placebo_effects) > 0 else None - ) - - self._unit_weights = unit_weights - self._time_weights = time_weights - self.is_fitted_ = True - - return self.results_ - - def _create_outcome_matrices( - self, - data: pd.DataFrame, - outcome: str, - unit: str, - time: str, - pre_periods: List[Any], - post_periods: List[Any], - treated_units: List[Any], - control_units: List[Any] - ) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]: - """ - Create outcome matrices for SDID estimation. - - Returns - ------- - tuple - (Y_pre_control, Y_post_control, Y_pre_treated, Y_post_treated) - Each is a 2D array with shape (n_periods, n_units) - """ - # Pivot data to wide format - pivot = data.pivot(index=time, columns=unit, values=outcome) - - # Extract submatrices - Y_pre_control = pivot.loc[pre_periods, control_units].values - Y_post_control = pivot.loc[post_periods, control_units].values - Y_pre_treated = pivot.loc[pre_periods, treated_units].values - Y_post_treated = pivot.loc[post_periods, treated_units].values - - return ( - Y_pre_control.astype(float), - Y_post_control.astype(float), - Y_pre_treated.astype(float), - Y_post_treated.astype(float) - ) - - def _residualize_covariates( - self, - data: pd.DataFrame, - outcome: str, - covariates: List[str], - unit: str, - time: str - ) -> pd.DataFrame: - """ - Residualize outcome by regressing out covariates. - - Uses two-way fixed effects to partial out covariates. - """ - data = data.copy() - - # Create design matrix with covariates - X = data[covariates].values.astype(float) - - # Add unit and time dummies - unit_dummies = pd.get_dummies(data[unit], prefix='u', drop_first=True) - time_dummies = pd.get_dummies(data[time], prefix='t', drop_first=True) - - X_full = np.column_stack([ - np.ones(len(data)), - X, - unit_dummies.values, - time_dummies.values - ]) - - y = data[outcome].values.astype(float) - - # Fit and get residuals - coeffs = np.linalg.lstsq(X_full, y, rcond=None)[0] - residuals = y - X_full @ coeffs - - # Add back the mean for interpretability - data[outcome] = residuals + np.mean(y) - - return data - - def _bootstrap_se( - self, - data: pd.DataFrame, - outcome: str, - unit: str, - time: str, - pre_periods: List[Any], - post_periods: List[Any], - treated_units: List[Any], - control_units: List[Any] - ) -> Tuple[float, np.ndarray]: - """ - Compute bootstrap standard error. - - Uses block bootstrap at the unit level. - """ - rng = np.random.default_rng(self.seed) - - all_units = treated_units + control_units - n_units = len(all_units) - - bootstrap_estimates = [] - - for _ in range(self.n_bootstrap): - # Sample units with replacement - sampled_units = rng.choice(all_units, size=n_units, replace=True) - - # Create bootstrap sample - boot_data = pd.concat([ - data[data[unit] == u].assign(**{unit: f"{u}_{i}"}) - for i, u in enumerate(sampled_units) - ], ignore_index=True) - - # Identify treated/control in bootstrap sample - boot_treated = [ - f"{u}_{i}" for i, u in enumerate(sampled_units) - if u in treated_units - ] - boot_control = [ - f"{u}_{i}" for i, u in enumerate(sampled_units) - if u in control_units - ] - - if len(boot_treated) == 0 or len(boot_control) == 0: - continue - - try: - # Create matrices - Y_pre_c, Y_post_c, Y_pre_t, Y_post_t = self._create_outcome_matrices( - boot_data, outcome, unit, time, - pre_periods, post_periods, boot_treated, boot_control - ) - - # Compute weights - Y_pre_t_mean = np.mean(Y_pre_t, axis=1) - Y_post_t_mean = np.mean(Y_post_t, axis=1) - - w = compute_synthetic_weights(Y_pre_c, Y_pre_t_mean, self.lambda_reg) - t_w = compute_time_weights(Y_pre_c, Y_pre_t_mean, self.zeta) - - # Compute estimate - tau = compute_sdid_estimator( - Y_pre_c, Y_post_c, Y_pre_t_mean, Y_post_t_mean, w, t_w - ) - bootstrap_estimates.append(tau) - - except (ValueError, LinAlgError, KeyError): - # Skip failed bootstrap iterations (e.g., singular matrices, - # missing data in resampled units, or invalid weight computations) - continue - - bootstrap_estimates = np.array(bootstrap_estimates) - - # Warn if too many bootstrap iterations failed - n_successful = len(bootstrap_estimates) - failure_rate = 1 - (n_successful / self.n_bootstrap) - if failure_rate > 0.05: - warnings.warn( - f"Only {n_successful}/{self.n_bootstrap} bootstrap iterations succeeded " - f"({failure_rate:.1%} failure rate). Standard errors may be unreliable. " - f"This can occur with small samples, near-singular weight matrices, " - f"or insufficient pre-treatment periods.", - UserWarning, - stacklevel=2, - ) - - se = np.std(bootstrap_estimates, ddof=1) if len(bootstrap_estimates) > 1 else 0.0 - - return se, bootstrap_estimates - - def get_params(self) -> Dict[str, Any]: - """Get estimator parameters.""" - return { - "lambda_reg": self.lambda_reg, - "zeta": self.zeta, - "alpha": self.alpha, - "n_bootstrap": self.n_bootstrap, - "seed": self.seed, - } - - def set_params(self, **params) -> "SyntheticDiD": - """Set estimator parameters.""" - for key, value in params.items(): - if hasattr(self, key): - setattr(self, key, value) - else: - raise ValueError(f"Unknown parameter: {key}") - return self +__all__ = [ + "DifferenceInDifferences", + "MultiPeriodDiD", + "TwoWayFixedEffects", + "SyntheticDiD", +] diff --git a/diff_diff/synthetic_did.py b/diff_diff/synthetic_did.py new file mode 100644 index 0000000..3b2b77e --- /dev/null +++ b/diff_diff/synthetic_did.py @@ -0,0 +1,540 @@ +""" +Synthetic Difference-in-Differences estimator. +""" + +import warnings +from typing import Any, Dict, List, Optional, Tuple + +import numpy as np +import pandas as pd +from numpy.linalg import LinAlgError + +from diff_diff.estimators import DifferenceInDifferences +from diff_diff.results import SyntheticDiDResults +from diff_diff.utils import ( + compute_confidence_interval, + compute_p_value, + compute_placebo_effects, + compute_sdid_estimator, + compute_synthetic_weights, + compute_time_weights, + validate_binary, +) + + +class SyntheticDiD(DifferenceInDifferences): + """ + Synthetic Difference-in-Differences (SDID) estimator. + + Combines the strengths of Difference-in-Differences and Synthetic Control + methods by re-weighting control units to better match treated units' + pre-treatment trends. + + This method is particularly useful when: + - You have few treated units (possibly just one) + - Parallel trends assumption may be questionable + - Control units are heterogeneous and need reweighting + - You want robustness to pre-treatment differences + + Parameters + ---------- + lambda_reg : float, default=0.0 + L2 regularization for unit weights. Larger values shrink weights + toward uniform. Useful when n_pre_periods < n_control_units. + zeta : float, default=1.0 + Regularization for time weights. Larger values give more uniform + time weights (closer to standard DiD). + alpha : float, default=0.05 + Significance level for confidence intervals. + n_bootstrap : int, default=200 + Number of bootstrap replications for standard error estimation. + Set to 0 to use placebo-based inference instead. + seed : int, optional + Random seed for reproducibility. If None (default), results + will vary between runs. + + Attributes + ---------- + results_ : SyntheticDiDResults + Estimation results after calling fit(). + is_fitted_ : bool + Whether the model has been fitted. + + Examples + -------- + Basic usage with panel data: + + >>> import pandas as pd + >>> from diff_diff import SyntheticDiD + >>> + >>> # Panel data with units observed over multiple time periods + >>> # Treatment occurs at period 5 for treated units + >>> data = pd.DataFrame({ + ... 'unit': [...], # Unit identifier + ... 'period': [...], # Time period + ... 'outcome': [...], # Outcome variable + ... 'treated': [...] # 1 if unit is ever treated, 0 otherwise + ... }) + >>> + >>> # Fit SDID model + >>> sdid = SyntheticDiD() + >>> results = sdid.fit( + ... data, + ... outcome='outcome', + ... treatment='treated', + ... unit='unit', + ... time='period', + ... post_periods=[5, 6, 7, 8] + ... ) + >>> + >>> # View results + >>> results.print_summary() + >>> print(f"ATT: {results.att:.3f} (SE: {results.se:.3f})") + >>> + >>> # Examine unit weights + >>> weights_df = results.get_unit_weights_df() + >>> print(weights_df.head(10)) + + Notes + ----- + The SDID estimator (Arkhangelsky et al., 2021) computes: + + τ̂ = (Ȳ_treated,post - Σ_t λ_t * Y_treated,t) + - Σ_j ω_j * (Ȳ_j,post - Σ_t λ_t * Y_j,t) + + Where: + - ω_j are unit weights (sum to 1, non-negative) + - λ_t are time weights (sum to 1, non-negative) + + Unit weights ω are chosen to match pre-treatment outcomes: + min ||Σ_j ω_j * Y_j,pre - Y_treated,pre||² + + This interpolates between: + - Standard DiD (uniform weights): ω_j = 1/N_control + - Synthetic Control (exact matching): concentrated weights + + References + ---------- + Arkhangelsky, D., Athey, S., Hirshberg, D. A., Imbens, G. W., & Wager, S. + (2021). Synthetic Difference-in-Differences. American Economic Review, + 111(12), 4088-4118. + """ + + def __init__( + self, + lambda_reg: float = 0.0, + zeta: float = 1.0, + alpha: float = 0.05, + n_bootstrap: int = 200, + seed: Optional[int] = None + ): + super().__init__(robust=True, cluster=None, alpha=alpha) + self.lambda_reg = lambda_reg + self.zeta = zeta + self.n_bootstrap = n_bootstrap + self.seed = seed + + self._unit_weights = None + self._time_weights = None + + def fit( # type: ignore[override] + self, + data: pd.DataFrame, + outcome: str, + treatment: str, + unit: str, + time: str, + post_periods: Optional[List[Any]] = None, + covariates: Optional[List[str]] = None + ) -> SyntheticDiDResults: + """ + Fit the Synthetic Difference-in-Differences model. + + Parameters + ---------- + data : pd.DataFrame + Panel data with observations for multiple units over multiple + time periods. + outcome : str + Name of the outcome variable column. + treatment : str + Name of the treatment group indicator column (0/1). + Should be 1 for all observations of treated units + (both pre and post treatment). + unit : str + Name of the unit identifier column. + time : str + Name of the time period column. + post_periods : list, optional + List of time period values that are post-treatment. + If None, uses the last half of periods. + covariates : list, optional + List of covariate column names. Covariates are residualized + out before computing the SDID estimator. + + Returns + ------- + SyntheticDiDResults + Object containing the ATT estimate, standard error, + unit weights, and time weights. + + Raises + ------ + ValueError + If required parameters are missing or data validation fails. + """ + # Validate inputs + if outcome is None or treatment is None or unit is None or time is None: + raise ValueError( + "Must provide 'outcome', 'treatment', 'unit', and 'time'" + ) + + # Check columns exist + required_cols = [outcome, treatment, unit, time] + if covariates: + required_cols.extend(covariates) + + missing = [c for c in required_cols if c not in data.columns] + if missing: + raise ValueError(f"Missing columns: {missing}") + + # Validate treatment is binary + validate_binary(data[treatment].values, "treatment") + + # Get all unique time periods + all_periods = sorted(data[time].unique()) + + if len(all_periods) < 2: + raise ValueError("Need at least 2 time periods") + + # Determine pre and post periods + if post_periods is None: + mid = len(all_periods) // 2 + post_periods = list(all_periods[mid:]) + pre_periods = list(all_periods[:mid]) + else: + post_periods = list(post_periods) + pre_periods = [p for p in all_periods if p not in post_periods] + + if len(post_periods) == 0: + raise ValueError("Must have at least one post-treatment period") + if len(pre_periods) == 0: + raise ValueError("Must have at least one pre-treatment period") + + # Validate post_periods are in data + for p in post_periods: + if p not in all_periods: + raise ValueError(f"Post-period '{p}' not found in time column") + + # Identify treated and control units + # Treatment indicator should be constant within unit + unit_treatment = data.groupby(unit)[treatment].first() + treated_units = unit_treatment[unit_treatment == 1].index.tolist() + control_units = unit_treatment[unit_treatment == 0].index.tolist() + + if len(treated_units) == 0: + raise ValueError("No treated units found") + if len(control_units) == 0: + raise ValueError("No control units found") + + # Residualize covariates if provided + working_data = data.copy() + if covariates: + working_data = self._residualize_covariates( + working_data, outcome, covariates, unit, time + ) + + # Create outcome matrices + # Shape: (n_periods, n_units) + Y_pre_control, Y_post_control, Y_pre_treated, Y_post_treated = \ + self._create_outcome_matrices( + working_data, outcome, unit, time, + pre_periods, post_periods, treated_units, control_units + ) + + # Compute unit weights (synthetic control weights) + # Average treated outcomes across treated units + Y_pre_treated_mean = np.mean(Y_pre_treated, axis=1) + + unit_weights = compute_synthetic_weights( + Y_pre_control, + Y_pre_treated_mean, + lambda_reg=self.lambda_reg + ) + + # Compute time weights + time_weights = compute_time_weights( + Y_pre_control, + Y_pre_treated_mean, + zeta=self.zeta + ) + + # Compute SDID estimate + Y_post_treated_mean = np.mean(Y_post_treated, axis=1) + + att = compute_sdid_estimator( + Y_pre_control, + Y_post_control, + Y_pre_treated_mean, + Y_post_treated_mean, + unit_weights, + time_weights + ) + + # Compute pre-treatment fit (RMSE) + synthetic_pre = Y_pre_control @ unit_weights + pre_fit_rmse = np.sqrt(np.mean((Y_pre_treated_mean - synthetic_pre) ** 2)) + + # Compute standard errors + if self.n_bootstrap > 0: + se, placebo_effects = self._bootstrap_se( + working_data, outcome, unit, time, + pre_periods, post_periods, treated_units, control_units + ) + else: + # Use placebo-based inference + placebo_effects = compute_placebo_effects( + Y_pre_control, + Y_post_control, + Y_pre_treated_mean, + unit_weights, + time_weights, + control_units + ) + se = np.std(placebo_effects, ddof=1) if len(placebo_effects) > 1 else 0.0 + + # Compute test statistics + if se > 0: + t_stat = att / se + # Use placebo distribution for p-value if available + if len(placebo_effects) > 0: + # Two-sided p-value from placebo distribution + p_value = np.mean(np.abs(placebo_effects) >= np.abs(att)) + p_value = max(p_value, 1.0 / (len(placebo_effects) + 1)) + else: + p_value = compute_p_value(t_stat) + else: + t_stat = 0.0 + p_value = 1.0 + + # Confidence interval + conf_int = compute_confidence_interval(att, se, self.alpha) + + # Create weight dictionaries + unit_weights_dict = { + unit_id: w for unit_id, w in zip(control_units, unit_weights) + } + time_weights_dict = { + period: w for period, w in zip(pre_periods, time_weights) + } + + # Store results + self.results_ = SyntheticDiDResults( + att=att, + se=se, + t_stat=t_stat, + p_value=p_value, + conf_int=conf_int, + n_obs=len(data), + n_treated=len(treated_units), + n_control=len(control_units), + unit_weights=unit_weights_dict, + time_weights=time_weights_dict, + pre_periods=pre_periods, + post_periods=post_periods, + alpha=self.alpha, + lambda_reg=self.lambda_reg, + pre_treatment_fit=pre_fit_rmse, + placebo_effects=placebo_effects if len(placebo_effects) > 0 else None + ) + + self._unit_weights = unit_weights + self._time_weights = time_weights + self.is_fitted_ = True + + return self.results_ + + def _create_outcome_matrices( + self, + data: pd.DataFrame, + outcome: str, + unit: str, + time: str, + pre_periods: List[Any], + post_periods: List[Any], + treated_units: List[Any], + control_units: List[Any] + ) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]: + """ + Create outcome matrices for SDID estimation. + + Returns + ------- + tuple + (Y_pre_control, Y_post_control, Y_pre_treated, Y_post_treated) + Each is a 2D array with shape (n_periods, n_units) + """ + # Pivot data to wide format + pivot = data.pivot(index=time, columns=unit, values=outcome) + + # Extract submatrices + Y_pre_control = pivot.loc[pre_periods, control_units].values + Y_post_control = pivot.loc[post_periods, control_units].values + Y_pre_treated = pivot.loc[pre_periods, treated_units].values + Y_post_treated = pivot.loc[post_periods, treated_units].values + + return ( + Y_pre_control.astype(float), + Y_post_control.astype(float), + Y_pre_treated.astype(float), + Y_post_treated.astype(float) + ) + + def _residualize_covariates( + self, + data: pd.DataFrame, + outcome: str, + covariates: List[str], + unit: str, + time: str + ) -> pd.DataFrame: + """ + Residualize outcome by regressing out covariates. + + Uses two-way fixed effects to partial out covariates. + """ + data = data.copy() + + # Create design matrix with covariates + X = data[covariates].values.astype(float) + + # Add unit and time dummies + unit_dummies = pd.get_dummies(data[unit], prefix='u', drop_first=True) + time_dummies = pd.get_dummies(data[time], prefix='t', drop_first=True) + + X_full = np.column_stack([ + np.ones(len(data)), + X, + unit_dummies.values, + time_dummies.values + ]) + + y = data[outcome].values.astype(float) + + # Fit and get residuals + coeffs = np.linalg.lstsq(X_full, y, rcond=None)[0] + residuals = y - X_full @ coeffs + + # Add back the mean for interpretability + data[outcome] = residuals + np.mean(y) + + return data + + def _bootstrap_se( + self, + data: pd.DataFrame, + outcome: str, + unit: str, + time: str, + pre_periods: List[Any], + post_periods: List[Any], + treated_units: List[Any], + control_units: List[Any] + ) -> Tuple[float, np.ndarray]: + """ + Compute bootstrap standard error. + + Uses block bootstrap at the unit level. + """ + rng = np.random.default_rng(self.seed) + + all_units = treated_units + control_units + n_units = len(all_units) + + bootstrap_estimates = [] + + for _ in range(self.n_bootstrap): + # Sample units with replacement + sampled_units = rng.choice(all_units, size=n_units, replace=True) + + # Create bootstrap sample + boot_data = pd.concat([ + data[data[unit] == u].assign(**{unit: f"{u}_{i}"}) + for i, u in enumerate(sampled_units) + ], ignore_index=True) + + # Identify treated/control in bootstrap sample + boot_treated = [ + f"{u}_{i}" for i, u in enumerate(sampled_units) + if u in treated_units + ] + boot_control = [ + f"{u}_{i}" for i, u in enumerate(sampled_units) + if u in control_units + ] + + if len(boot_treated) == 0 or len(boot_control) == 0: + continue + + try: + # Create matrices + Y_pre_c, Y_post_c, Y_pre_t, Y_post_t = self._create_outcome_matrices( + boot_data, outcome, unit, time, + pre_periods, post_periods, boot_treated, boot_control + ) + + # Compute weights + Y_pre_t_mean = np.mean(Y_pre_t, axis=1) + Y_post_t_mean = np.mean(Y_post_t, axis=1) + + w = compute_synthetic_weights(Y_pre_c, Y_pre_t_mean, self.lambda_reg) + t_w = compute_time_weights(Y_pre_c, Y_pre_t_mean, self.zeta) + + # Compute estimate + tau = compute_sdid_estimator( + Y_pre_c, Y_post_c, Y_pre_t_mean, Y_post_t_mean, w, t_w + ) + bootstrap_estimates.append(tau) + + except (ValueError, LinAlgError, KeyError): + # Skip failed bootstrap iterations (e.g., singular matrices, + # missing data in resampled units, or invalid weight computations) + continue + + bootstrap_estimates = np.array(bootstrap_estimates) + + # Warn if too many bootstrap iterations failed + n_successful = len(bootstrap_estimates) + failure_rate = 1 - (n_successful / self.n_bootstrap) + if failure_rate > 0.05: + warnings.warn( + f"Only {n_successful}/{self.n_bootstrap} bootstrap iterations succeeded " + f"({failure_rate:.1%} failure rate). Standard errors may be unreliable. " + f"This can occur with small samples, near-singular weight matrices, " + f"or insufficient pre-treatment periods.", + UserWarning, + stacklevel=2, + ) + + se = np.std(bootstrap_estimates, ddof=1) if len(bootstrap_estimates) > 1 else 0.0 + + return se, bootstrap_estimates + + def get_params(self) -> Dict[str, Any]: + """Get estimator parameters.""" + return { + "lambda_reg": self.lambda_reg, + "zeta": self.zeta, + "alpha": self.alpha, + "n_bootstrap": self.n_bootstrap, + "seed": self.seed, + } + + def set_params(self, **params) -> "SyntheticDiD": + """Set estimator parameters.""" + for key, value in params.items(): + if hasattr(self, key): + setattr(self, key, value) + else: + raise ValueError(f"Unknown parameter: {key}") + return self diff --git a/diff_diff/twfe.py b/diff_diff/twfe.py new file mode 100644 index 0000000..6a30c7d --- /dev/null +++ b/diff_diff/twfe.py @@ -0,0 +1,357 @@ +""" +Two-Way Fixed Effects estimator for panel Difference-in-Differences. +""" + +import warnings +from typing import TYPE_CHECKING, List, Optional + +import numpy as np +import pandas as pd + +if TYPE_CHECKING: + from diff_diff.bacon import BaconDecompositionResults + +from diff_diff.estimators import DifferenceInDifferences +from diff_diff.results import DiDResults +from diff_diff.utils import ( + compute_confidence_interval, + compute_p_value, + compute_robust_se, + wild_bootstrap_se, +) + + +class TwoWayFixedEffects(DifferenceInDifferences): + """ + Two-Way Fixed Effects (TWFE) estimator for panel DiD. + + Extends DifferenceInDifferences to handle panel data with unit + and time fixed effects. + + Parameters + ---------- + robust : bool, default=True + Whether to use heteroskedasticity-robust standard errors. + cluster : str, optional + Column name for cluster-robust standard errors. + If None, automatically clusters at the unit level (the `unit` + parameter passed to `fit()`). This differs from + DifferenceInDifferences where cluster=None means no clustering. + alpha : float, default=0.05 + Significance level for confidence intervals. + + Notes + ----- + This estimator uses the regression: + + Y_it = α_i + γ_t + β*(D_i × Post_t) + X_it'δ + ε_it + + where α_i are unit fixed effects and γ_t are time fixed effects. + + Warning: TWFE can be biased with staggered treatment timing + and heterogeneous treatment effects. Consider using + more robust estimators (e.g., Callaway-Sant'Anna) for + staggered designs. + """ + + def fit( # type: ignore[override] + self, + data: pd.DataFrame, + outcome: str, + treatment: str, + time: str, + unit: str, + covariates: Optional[List[str]] = None + ) -> DiDResults: + """ + Fit Two-Way Fixed Effects model. + + Parameters + ---------- + data : pd.DataFrame + Panel data. + outcome : str + Name of outcome variable column. + treatment : str + Name of treatment indicator column. + time : str + Name of time period column. + unit : str + Name of unit identifier column. + covariates : list, optional + List of covariate column names. + + Returns + ------- + DiDResults + Estimation results. + """ + # Validate unit column exists + if unit not in data.columns: + raise ValueError(f"Unit column '{unit}' not found in data") + + # Check for staggered treatment timing and warn if detected + self._check_staggered_treatment(data, treatment, time, unit) + + # Use unit-level clustering if not specified (use local variable to avoid mutation) + cluster_var = self.cluster if self.cluster is not None else unit + + # Demean data (within transformation for fixed effects) + data_demeaned = self._within_transform(data, outcome, unit, time, covariates) + + # Create treatment × post interaction + # For staggered designs, we'd need to identify treatment timing per unit + # For now, assume standard 2-period design + data_demeaned["_treatment_post"] = ( + data_demeaned[treatment] * data_demeaned[time] + ) + + # Extract variables for regression + y = data_demeaned[f"{outcome}_demeaned"].values + X_list = [data_demeaned["_treatment_post"].values] + + if covariates: + for cov in covariates: + X_list.append(data_demeaned[f"{cov}_demeaned"].values) + + X = np.column_stack([np.ones(len(y))] + X_list) + + # Fit OLS on demeaned data + coefficients, residuals, fitted, r_squared = self._fit_ols(X, y) + + # ATT is the coefficient on treatment_post (index 1) + att_idx = 1 + att = coefficients[att_idx] + + # Degrees of freedom adjustment for fixed effects + n_units = data[unit].nunique() + n_times = data[time].nunique() + df = len(y) - X.shape[1] - n_units - n_times + 2 + + # Compute standard errors and inference + cluster_ids = data[cluster_var].values + if self.inference == "wild_bootstrap": + # Wild cluster bootstrap for few-cluster inference + bootstrap_results = wild_bootstrap_se( + X, y, residuals, cluster_ids, + coefficient_index=att_idx, + n_bootstrap=self.n_bootstrap, + weight_type=self.bootstrap_weights, + alpha=self.alpha, + seed=self.seed, + return_distribution=False + ) + self._bootstrap_results = bootstrap_results + se = bootstrap_results.se + p_value = bootstrap_results.p_value + conf_int = (bootstrap_results.ci_lower, bootstrap_results.ci_upper) + t_stat = bootstrap_results.t_stat_original + vcov = compute_robust_se(X, residuals, cluster_ids) + else: + # Standard cluster-robust SE + vcov = compute_robust_se(X, residuals, cluster_ids) + se = np.sqrt(vcov[att_idx, att_idx]) + t_stat = att / se + p_value = compute_p_value(t_stat, df=df) + conf_int = compute_confidence_interval(att, se, self.alpha, df=df) + + # Count observations + treated_units = data[data[treatment] == 1][unit].unique() + n_treated = len(treated_units) + n_control = n_units - n_treated + + # Determine inference method and bootstrap info + inference_method = "analytical" + n_bootstrap_used = None + n_clusters_used = None + if self._bootstrap_results is not None: + inference_method = "wild_bootstrap" + n_bootstrap_used = self._bootstrap_results.n_bootstrap + n_clusters_used = self._bootstrap_results.n_clusters + + self.results_ = DiDResults( + att=att, + se=se, + t_stat=t_stat, + p_value=p_value, + conf_int=conf_int, + n_obs=len(y), + n_treated=n_treated, + n_control=n_control, + alpha=self.alpha, + coefficients={"ATT": float(att)}, + vcov=vcov, + residuals=residuals, + fitted_values=fitted, + r_squared=r_squared, + inference_method=inference_method, + n_bootstrap=n_bootstrap_used, + n_clusters=n_clusters_used, + ) + + self.is_fitted_ = True + return self.results_ + + def _within_transform( + self, + data: pd.DataFrame, + outcome: str, + unit: str, + time: str, + covariates: Optional[List[str]] = None + ) -> pd.DataFrame: + """ + Apply within transformation to remove unit and time fixed effects. + + This implements the standard two-way within transformation: + y_it - y_i. - y_.t + y_.. + + Parameters + ---------- + data : pd.DataFrame + Panel data. + outcome : str + Outcome variable name. + unit : str + Unit identifier column. + time : str + Time period column. + covariates : list, optional + Covariate column names. + + Returns + ------- + pd.DataFrame + Data with demeaned variables. + """ + data = data.copy() + variables = [outcome] + (covariates or []) + + for var in variables: + # Unit means + unit_means = data.groupby(unit)[var].transform("mean") + # Time means + time_means = data.groupby(time)[var].transform("mean") + # Grand mean + grand_mean = data[var].mean() + + # Within transformation + data[f"{var}_demeaned"] = data[var] - unit_means - time_means + grand_mean + + return data + + def _check_staggered_treatment( + self, + data: pd.DataFrame, + treatment: str, + time: str, + unit: str, + ) -> None: + """ + Check for staggered treatment timing and warn if detected. + + Identifies if different units start treatment at different times, + which can bias TWFE estimates when treatment effects are heterogeneous. + """ + # Find first treatment time for each unit + treated_obs = data[data[treatment] == 1] + if len(treated_obs) == 0: + return # No treated observations + + # Get first treatment time per unit + first_treat_times = treated_obs.groupby(unit)[time].min() + unique_treat_times = first_treat_times.unique() + + if len(unique_treat_times) > 1: + n_groups = len(unique_treat_times) + warnings.warn( + f"Staggered treatment timing detected: {n_groups} treatment cohorts " + f"start treatment at different times. TWFE can be biased when treatment " + f"effects are heterogeneous across time. Consider using:\n" + f" - CallawaySantAnna estimator for robust estimates\n" + f" - TwoWayFixedEffects.decompose() to diagnose the decomposition\n" + f" - bacon_decompose() to see weight on 'forbidden' comparisons", + UserWarning, + stacklevel=3, + ) + + def decompose( + self, + data: pd.DataFrame, + outcome: str, + unit: str, + time: str, + first_treat: str, + weights: str = "approximate", + ) -> "BaconDecompositionResults": + """ + Perform Goodman-Bacon decomposition of TWFE estimate. + + Decomposes the TWFE estimate into a weighted average of all possible + 2x2 DiD comparisons, revealing which comparisons drive the estimate + and whether problematic "forbidden comparisons" are involved. + + Parameters + ---------- + data : pd.DataFrame + Panel data with unit and time identifiers. + outcome : str + Name of outcome variable column. + unit : str + Name of unit identifier column. + time : str + Name of time period column. + first_treat : str + Name of column indicating when each unit was first treated. + Use 0 (or np.inf) for never-treated units. + weights : str, default="approximate" + Weight calculation method: + - "approximate": Fast simplified formula (default). Good for + diagnostic purposes where relative weights are sufficient. + - "exact": Variance-based weights from Goodman-Bacon (2021) + Theorem 1. Use for publication-quality decompositions. + + Returns + ------- + BaconDecompositionResults + Decomposition results showing: + - TWFE estimate and its weighted-average breakdown + - List of all 2x2 comparisons with estimates and weights + - Total weight by comparison type (clean vs forbidden) + + Examples + -------- + >>> twfe = TwoWayFixedEffects() + >>> decomp = twfe.decompose( + ... data, outcome='y', unit='id', time='t', first_treat='treat_year' + ... ) + >>> decomp.print_summary() + >>> # Check weight on forbidden comparisons + >>> if decomp.total_weight_later_vs_earlier > 0.2: + ... print("Warning: significant forbidden comparison weight") + + Notes + ----- + This decomposition is essential for understanding potential TWFE bias + in staggered adoption designs. The three comparison types are: + + 1. **Treated vs Never-treated**: Clean comparisons using never-treated + units as controls. These are always valid. + + 2. **Earlier vs Later treated**: Uses later-treated units as controls + before they receive treatment. These are valid. + + 3. **Later vs Earlier treated**: Uses already-treated units as controls. + These "forbidden comparisons" can introduce bias when treatment + effects are dynamic (changing over time since treatment). + + See Also + -------- + bacon_decompose : Standalone decomposition function + BaconDecomposition : Class-based decomposition interface + CallawaySantAnna : Robust estimator that avoids forbidden comparisons + """ + from diff_diff.bacon import BaconDecomposition + + decomp = BaconDecomposition(weights=weights) + return decomp.fit(data, outcome, unit, time, first_treat) From ab874c406bc0be1295bdc886d2eaffcf930ebc38 Mon Sep 17 00:00:00 2001 From: Claude Date: Sun, 4 Jan 2026 22:22:28 +0000 Subject: [PATCH 2/3] Update documentation for estimators module refactoring - README.md: Use `from diff_diff import TwoWayFixedEffects` (preferred) - docs/api/estimators.rst: Document new module structure (twfe.py, synthetic_did.py) with backward-compatible import instructions --- README.md | 2 +- docs/api/estimators.rst | 34 ++++++++++++++++++++++++++-------- 2 files changed, 27 insertions(+), 9 deletions(-) diff --git a/README.md b/README.md index 9666c1a..e69e988 100644 --- a/README.md +++ b/README.md @@ -539,7 +539,7 @@ Works with `DifferenceInDifferences` and `TwoWayFixedEffects` estimators. ### Two-Way Fixed Effects (Panel Data) ```python -from diff_diff.estimators import TwoWayFixedEffects +from diff_diff import TwoWayFixedEffects twfe = TwoWayFixedEffects() results = twfe.fit( diff --git a/docs/api/estimators.rst b/docs/api/estimators.rst index 9a650dc..7a394a3 100644 --- a/docs/api/estimators.rst +++ b/docs/api/estimators.rst @@ -3,6 +3,20 @@ Estimators Core estimator classes for Difference-in-Differences analysis. +The main estimators module (``diff_diff.estimators``) contains the base classes +``DifferenceInDifferences`` and ``MultiPeriodDiD``. Additional estimators are +organized in separate modules for maintainability: + +- ``diff_diff.twfe`` - ``TwoWayFixedEffects`` estimator +- ``diff_diff.synthetic_did`` - ``SyntheticDiD`` estimator + +All estimators are re-exported from ``diff_diff.estimators`` and ``diff_diff`` +for backward compatibility, so you can import any of them using: + +.. code-block:: python + + from diff_diff import DifferenceInDifferences, TwoWayFixedEffects, MultiPeriodDiD, SyntheticDiD + .. module:: diff_diff.estimators DifferenceInDifferences @@ -24,23 +38,25 @@ Basic 2x2 DiD estimator. ~DifferenceInDifferences.get_params ~DifferenceInDifferences.set_params -TwoWayFixedEffects ------------------- +MultiPeriodDiD +-------------- -Panel DiD with unit and time fixed effects. +Event study estimator with period-specific treatment effects. -.. autoclass:: diff_diff.TwoWayFixedEffects +.. autoclass:: diff_diff.MultiPeriodDiD :members: :undoc-members: :show-inheritance: :inherited-members: -MultiPeriodDiD --------------- +TwoWayFixedEffects +------------------ -Event study estimator with period-specific treatment effects. +Panel DiD with unit and time fixed effects. -.. autoclass:: diff_diff.MultiPeriodDiD +.. module:: diff_diff.twfe + +.. autoclass:: diff_diff.TwoWayFixedEffects :members: :undoc-members: :show-inheritance: @@ -51,6 +67,8 @@ SyntheticDiD Synthetic control combined with DiD (Arkhangelsky et al. 2021). +.. module:: diff_diff.synthetic_did + .. autoclass:: diff_diff.SyntheticDiD :members: :undoc-members: From ff2ae0f5b8eb0f4f69a06c065d0301fbb6df7b3c Mon Sep 17 00:00:00 2001 From: Claude Date: Sun, 4 Jan 2026 22:46:05 +0000 Subject: [PATCH 3/3] Bump version to 1.0.2 --- TODO.md | 2 +- diff_diff/__init__.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/TODO.md b/TODO.md index cec6d86..b4b2ae1 100644 --- a/TODO.md +++ b/TODO.md @@ -89,7 +89,7 @@ Current line counts (target: < 1000 lines per module): | `bacon.py` | 1027 | OK | **Completed splits:** -- ~~`estimators.py` → `twfe.py`, `synthetic_did.py` (keep base classes in estimators.py)~~ - Done in 1.0.1 +- ~~`estimators.py` → `twfe.py`, `synthetic_did.py` (keep base classes in estimators.py)~~ - Done in 1.0.2 **Potential splits:** - `staggered.py` → `staggered_bootstrap.py` (move bootstrap logic) diff --git a/diff_diff/__init__.py b/diff_diff/__init__.py index 47d3238..f348367 100644 --- a/diff_diff/__init__.py +++ b/diff_diff/__init__.py @@ -85,7 +85,7 @@ plot_sensitivity, ) -__version__ = "1.0.0" +__version__ = "1.0.2" __all__ = [ # Estimators "DifferenceInDifferences",