Spaces:
Running
Running
| ## This code contains the base classess used in generating synthetic data | |
| from linearmodels.iv import IV2SLS | |
| from dowhy import CausalModel | |
| from dowhy import datasets as dset | |
| from sklearn.linear_model import LogisticRegression | |
| import statsmodels.api as sm | |
| import statsmodels.formula.api as smf | |
| import numpy as np | |
| import pandas as pd | |
| from pathlib import Path | |
| import matplotlib.pyplot as plt | |
| class DataGenerator: | |
| """ | |
| Base class for generating synthetic data | |
| Attributes: | |
| n_observations (int): Number of observations | |
| n_continuous_covars (int): Number of covariates | |
| n_covars (int): total number of covariates (continuous + binary) | |
| n_treatments (int): Number of treatments | |
| true_effect (float): True effect size | |
| seed (int): Random seed for reproducibility | |
| data (pd.DataFrame): Generated data | |
| info (dict): Dictionary to store additional information about the data | |
| method (str): the causal inference method assocated with the synthetic | |
| mean (np.ndarray): mean of the covariates | |
| covar (np.ndarray): covariance matrix for the covariates | |
| heterogeneity (bool): whether or not the treatment effects are heterogeneous | |
| """ | |
| def __init__(self, n_observations, n_continuous_covars, n_binary_covars=2, mean=None, | |
| covar = None, n_treatments=1, true_effect=0 ,seed=111, heterogeneity=0): | |
| np.random.seed(seed) | |
| self.n_observations = n_observations | |
| self.n_continuous_covars = n_continuous_covars | |
| self.n_covars = n_continuous_covars + n_binary_covars | |
| self.n_treatments = n_treatments | |
| self.n_binary_covars = n_binary_covars | |
| self.data = None | |
| self.seed = seed | |
| self.true_effect = true_effect | |
| self.method = None | |
| self.mean = mean | |
| self.covar = covar | |
| if mean is None: | |
| self.mean = np.random.randint(3, 20, size=self.n_continuous_covars) | |
| if self.covar is None: | |
| self.covar = np.identity(self.n_continuous_covars) | |
| self.heterogeneity = heterogeneity | |
| def generate_data(self): | |
| """ | |
| Generates the synthetic data | |
| Returns: | |
| pd.DataFrame: The generated data | |
| """ | |
| raise NotImplementedError("Invoke the method in the subclass") | |
| def save_data(self, folder, filename): | |
| """ | |
| Saves the generated data as a CSV file | |
| Args: | |
| folder (str): path to the folder where the data is saved | |
| filename (str): name of the file | |
| """ | |
| if self.data is None: | |
| raise ValueError("Data not generated yet. Please generate data first.") | |
| path = Path(folder) | |
| path.mkdir(parents=True, exist_ok=True) | |
| if not filename.endswith('.csv'): | |
| filename += '.csv' | |
| self.data.to_csv(path / filename, index=False) | |
| def test_data(self, print_=False): | |
| """ | |
| Test the generated data, using the appropriate method. | |
| """ | |
| raise NotImplementedError("This method should be overridden by subclasses") | |
| def generate_covariates(self): | |
| """ | |
| Generate covariates. For continuous covariates, we use multivariate normal distribution, and for | |
| binary covars, we use binomial distribution. The non-binary covariates are discretized to their floor | |
| integer. | |
| """ | |
| X_c = np.random.multivariate_normal(mean=self.mean, cov=self.covar, | |
| size=self.n_observations) | |
| p = np.random.uniform(0.3, 0.7) | |
| X_b = np.random.binomial(1, p, size=(self.n_observations, self.n_binary_covars)).astype(int) | |
| covariates = np.hstack((X_c, X_b)) | |
| covariates = covariates.astype(int) | |
| return covariates | |
| class MultiTreatRCTGenerator(DataGenerator): | |
| """ | |
| Base class for generating synthetic data for multi-treatment RCTs | |
| Additional Attributes: | |
| true_effect_vec (np.ndarray): the treatment effect for different treatments. | |
| """ | |
| def __init__(self, n_observations, n_continuous_covars, n_treatments, n_binary_covars=2, | |
| mean=None, covar=None, true_effect=1.0, true_effect_vec = None, | |
| seed=111, heterogeneity=0): | |
| super().__init__(n_observations, n_continuous_covars, n_binary_covars=n_binary_covars, | |
| mean=mean, covar=covar, true_effect=true_effect, seed=seed, | |
| heterogeneity=heterogeneity, n_treatments=n_treatments) | |
| self.method = "MultiTreatRCT" | |
| self.true_effect_vec = true_effect_vec | |
| ## if true effect vec is None, we set the treatment effects to be the same for all treatments | |
| if true_effect_vec is None: | |
| self.true_effect_vec = np.zeros(n_treatments) | |
| for i in range(1, n_treatments): | |
| self.true_effect_vec[i] = self.true_effect | |
| def generate_data(self): | |
| X = self.generate_covariates() | |
| cols = [f"X{i+1}" for i in range(self.n_covars)] | |
| df = pd.DataFrame(X, columns=cols) | |
| df['D'] = np.random.randint(0, self.n_treatments+1, size=self.n_observations) | |
| vec = np.random.uniform(0, 1, size=self.n_covars) | |
| intercept = np.random.normal(50, 3) | |
| noise = np.random.normal(0, 1, size=self.n_observations) | |
| # Apply appropriate treatment effect per treatment arm | |
| treatment_effects = np.array(self.true_effect_vec) | |
| df['treat_effect'] = treatment_effects[df['D']] | |
| df['Y'] = intercept + X.dot(vec) + df['treat_effect'] + noise | |
| df.drop(columns='treat_effect', inplace=True) | |
| self.data = df | |
| return df | |
| def test_data(self, print_=False): | |
| if self.data is None: | |
| raise ValueError("Data not generated yet. Please generate data first.") | |
| model = smf.ols('Y ~ C(D)', data=self.data).fit() | |
| result = model.summary() | |
| if print_: | |
| print(result) | |
| return result | |
| # Front-Door Criterion Generator | |
| class FrontDoorGenerator(DataGenerator): | |
| """ | |
| Generates synthetic data satisfying the front-door criterion. | |
| D → M → Y, D ← U → Y | |
| """ | |
| def __init__(self, n_observations, n_continuous_covars=2, n_binary_covars=2, | |
| mean=None, covar=None, seed=111, true_effect=2.0, heterogeneity=0): | |
| super().__init__(n_observations, n_continuous_covars, n_binary_covars=n_binary_covars, | |
| mean=mean, covar=covar, seed=seed, true_effect=true_effect, | |
| n_treatments=1, heterogeneity=heterogeneity) | |
| self.method = "FrontDoor" | |
| def generate_data(self): | |
| X = self.generate_covariates() | |
| cols = [f"X{i+1}" for i in range(self.n_covars)] | |
| df = pd.DataFrame(X, columns=cols) | |
| # Latent confounder | |
| U = np.random.normal(0, 1, self.n_observations) | |
| # Treatment depends on U and X | |
| vec_d = np.random.uniform(0.5, 1.5, size=self.n_covars) | |
| df['D'] = (X @ vec_d + 0.8 * U + np.random.normal(0, 1, self.n_observations)) > 0 | |
| df['D'] = df['D'].astype(int) | |
| # Mediator depends on D and X | |
| vec_m = np.random.uniform(0.5, 1.5, size=self.n_covars) | |
| df['M'] = X @ vec_m + df['D'] * 1.5 + np.random.normal(0, 1, self.n_observations) | |
| # Outcome depends on M, U and X | |
| vec_y = np.random.uniform(0.5, 1.5, size=self.n_covars) | |
| df['Y'] = 50 + 2.0 * df['M'] + 1.0 * U + X @ vec_y + np.random.normal(0, 1, self.n_observations) | |
| self.data = df | |
| return df | |
| def test_data(self, print_=False): | |
| if self.data is None: | |
| raise ValueError("Data not generated yet. Please generate data first.") | |
| model_m = smf.ols("M ~ D", data=self.data).fit() | |
| model_y = smf.ols("Y ~ M + D", data=self.data).fit() | |
| if print_: | |
| print("Regression: M ~ D") | |
| print(model_m.summary()) | |
| print("\nRegression: Y ~ M + D") | |
| print(model_y.summary()) | |
| return {"M~D": model_m.summary(), "Y~M+D": model_y.summary()} | |
| class ObservationalDataGenerator(DataGenerator): | |
| """ | |
| Generate synthetic data for observational studies. | |
| Additional Attributes: | |
| self.weights (np.ndarray): the propoensity score weights for each observation | |
| """ | |
| def __init__(self, n_observations, n_continuous_covars, n_binary_covars=2, mean=None, covar=None, | |
| true_effect=1.0, seed=111, heterogeneity=0): | |
| super().__init__(n_observations, n_continuous_covars, n_binary_covars=n_binary_covars, mean=mean, covar=covar, | |
| true_effect=true_effect, seed=seed, heterogeneity=heterogeneity) | |
| def generate_data(self): | |
| X = self.generate_covariates() | |
| cols = [f"X{i+1}" for i in range(self.n_covars)] | |
| df = pd.DataFrame(X, columns=cols) | |
| X_norm = (X - X.mean(axis=0)) / X.std(axis=0) | |
| vec1 = np.random.normal(0, 0.5, size=self.n_covars) | |
| lin = X_norm @ vec1 + np.random.normal(0, 1, self.n_observations) | |
| ## the propensity score | |
| ps = 1 / (1 + np.exp(-lin)) | |
| ## we do this for stability reasons | |
| ps = np.clip(ps, 1e-3, 1 -1e-3) | |
| df['D'] = np.random.binomial(1, ps).astype(int) | |
| vec2 = np.random.normal(0, 0.5, size=self.n_covars) | |
| intercept = np.random.normal(50, 3) | |
| noise = np.random.normal(0, 1, size=self.n_observations) | |
| df['Y'] = intercept + X @ vec2 + self.true_effect * df['D'] + noise | |
| self.propensity = ps | |
| self.weights = np.where(df['D'] == 1, 1 / ps, 1 / (1 - ps)) | |
| self.data = df | |
| return self.data | |
| class PSMGenerator(ObservationalDataGenerator): | |
| """ | |
| Generate synthetic data for Propensity Score Matching (PSM) | |
| """ | |
| def __init__(self, n_observations, n_continuous_covars, n_binary_covars=2, mean=None, covar=None, | |
| true_effect=1.0, seed=111, heterogeneity=0): | |
| super().__init__(n_observations, n_continuous_covars, n_binary_covars=n_binary_covars, mean=mean, covar=covar, | |
| true_effect=true_effect, seed=seed, heterogeneity=heterogeneity) | |
| self.method = "PSM" | |
| def test_data(self, print_=False): | |
| """ | |
| Test the generated data | |
| """ | |
| if self.data is None: | |
| raise ValueError("Data not generated yet. Please generate data first.") | |
| lr = LogisticRegression(solver='lbfgs') | |
| X = self.data[[f"X{i+1}" for i in range(self.n_covars)]] | |
| lr.fit(X, self.data['D']) | |
| ps_hat = lr.predict_proba(X)[:, 1] | |
| treated = self.data[self.data['D'] == 1] | |
| control = self.data[self.data['D'] == 0] | |
| ## perform matching using the propensity scores | |
| match_idxs = [np.abs(ps_hat[control.index] - ps_hat[i]).argmin() for i in treated.index] | |
| matches = control.iloc[match_idxs] | |
| att = treated['Y'].mean() - matches['Y'].mean() | |
| result = f"Estimated ATT (matching): {att:.3f} | True: {self.true_effect}" | |
| if print_: | |
| print(result) | |
| return result | |
| class PSWGenerator(ObservationalDataGenerator): | |
| """ | |
| Generate synthetic data for Propensity Score Weighting (PSW) | |
| """ | |
| def __init__(self, n_observations, n_continuous_covars, n_binary_covars=2, mean=None, covar=None, | |
| true_effect=1.0, seed=111, heterogeneity=0): | |
| super().__init__(n_observations, n_continuous_covars, n_binary_covars=n_binary_covars, mean=mean, covar=covar, | |
| true_effect=true_effect, seed=seed, heterogeneity=heterogeneity) | |
| self.method = "PSW" | |
| def test_data(self, print_=False): | |
| """ | |
| Test the generated data | |
| """ | |
| if self.data is None: | |
| raise ValueError("Data not generated yet. Please generate data first.") | |
| df = self.data.copy() | |
| D = df['D'] | |
| Y = df['Y'] | |
| treated = D == 1 | |
| control = D == 0 | |
| w = np.zeros(self.n_observations) | |
| w[control] = self.propensity[control] / (1 - self.propensity[control]) | |
| w[treated] = 1 | |
| Y1 = Y[treated].mean() | |
| Y0_weighted = np.average(Y[control], weights=w[control]) | |
| att = Y1 - Y0_weighted | |
| ate = np.average(Y * D / self.propensity - (1 - D) * Y / (1 - self.propensity)) | |
| result = f"Estimated ATT (IPW): {att:.3f} | True: {self.true_effect}\nEstimated ATE: {ate:.3f} | True:{self.true_effect}" | |
| if print_: | |
| print(result) | |
| return result | |
| class RCTGenerator(DataGenerator): | |
| """ | |
| Generate synthetic data for Randomized Controlled Trials (RCT) | |
| """ | |
| def __init__(self, n_observations, n_continuous_covars, n_binary_covars=2, mean=None, | |
| covar=None, true_effect=1.0, seed=111, heterogeneity=0): | |
| super().__init__(n_observations, n_continuous_covars, n_binary_covars=n_binary_covars, | |
| mean=mean, covar=covar, true_effect=true_effect, seed=seed, | |
| heterogeneity=heterogeneity) | |
| self.method = "RCT" | |
| def generate_data(self): | |
| X = self.generate_covariates() | |
| cols = [f"X{i+1}" for i in range(self.n_covars)] | |
| df = pd.DataFrame(X, columns=cols) | |
| df['D'] = np.random.binomial(1, 0.5, size=self.n_observations) | |
| vec = np.random.uniform(0, 1, size=self.n_covars) | |
| intercept = np.random.normal(50, 3) | |
| noise = np.random.normal(0, 1, size=self.n_observations) | |
| df['Y'] = (intercept + X.dot(vec) + self.true_effect * df['D'] + noise) | |
| self.data = df | |
| def test_data(self, print=False): | |
| if self.data is None: | |
| raise ValueError("Data not generated yet. Please generate data first.") | |
| model = smf.ols('Y ~ D', data=self.data).fit() | |
| result = model.summary() | |
| if print: | |
| print(result) | |
| est = model.params['D'] | |
| conf_int = model.conf_int().loc['D'] | |
| result = f"TRUE ATE: {self.true_effect:.3f}, ESTIMATED ATE: {est:.3f}, \ | |
| 95% CI: [{conf_int[0]:.3f}, {conf_int[1]:.3f}]" | |
| return result | |
| class IVGenerator(DataGenerator): | |
| """ | |
| Generate synthetic data for Instrumental Variables (IV) analysis. We assume two forms: | |
| 1. Encouragement Design: | |
| Z -> D -> Y | |
| In this setting, encouragements (Z) is randomized. For instance, consider the administering of vaccines. | |
| We cannot force people to take vaccines, however we can encourage them to take the vaccine. We could run | |
| a vaccine awareness campaign, where we randomly pick participants, and inform them about the benefits of | |
| vaccine. The user can either comply (take the vaccine) or not comply (not take the vaccine). Likewise, in the control | |
| group, the user can comply (not take the vaccine) or defy (take the vaccine) | |
| 2. | |
| U | |
| / \ | |
| Z -> D -> Y | |
| This is the classical setting where we have an unobserved confounder affecting both treatment (D) and outcome (Y). | |
| Additional Attributes: | |
| alpha (float): the effect of the instrument on the treatment (Z on D) | |
| encouragement (bool): whether or not this is an encouragement design | |
| beta_d (float): effect of the unobserved confounder (U) on treatment (D) | |
| beta_y (float): effect of the unobserved confounders (U) on outcome (Y) | |
| """ | |
| def __init__(self, n_observations, n_continuous_covars, n_binary_covars=2, mean=None, beta_d = 1.0, | |
| beta_y = 1.5, covar=None, true_effect=1.0, seed=111, heterogeneity=0, alpha=0.5, | |
| encouragement=False): | |
| super().__init__(n_observations, n_continuous_covars, n_binary_covars=n_binary_covars, mean=mean, | |
| covar=covar, true_effect=true_effect, seed=seed, heterogeneity=heterogeneity) | |
| self.method = "IV" | |
| self.alpha = alpha | |
| self.encouragement = encouragement | |
| self.beta_d = beta_d | |
| self.beta_y = beta_y | |
| def generate_data(self): | |
| X = self.generate_covariates() | |
| mean = np.random.randint(8, 13) | |
| Z = np.random.normal(mean, 2, size=self.n_observations).astype(int) | |
| U = np.random.normal(0, 1, size=self.n_observations) | |
| vec1 = np.random.normal(0, 0.5, size=self.n_covars) | |
| intercept1 = np.random.normal(30, 2) | |
| D = self.alpha * Z + X @ vec1 + np.random.normal(size=self.n_observations) + intercept1 | |
| if self.encouragement: | |
| D = (D > np.mean(D)).astype(int) | |
| else: | |
| D = D + self.beta_d * U | |
| D = D.astype(int) | |
| intercept2 = np.random.normal(50, 3) | |
| vec2 = np.random.normal(0, 0.5, size=self.n_covars) | |
| Y = self.true_effect * D + X @ vec2 + np.random.normal(size=self.n_observations) + intercept2 | |
| if not self.encouragement: | |
| Y = Y + self.beta_y * U | |
| df = pd.DataFrame(X, columns=[f"X{i+1}" for i in range(self.n_covars)]) | |
| df['Z'] = Z | |
| df['D'] = D | |
| df['Y'] = Y | |
| self.data = df | |
| return self.data | |
| def test_data(self, print_=False): | |
| if self.data is None: | |
| raise ValueError("Data not generated yet.") | |
| model = IV2SLS.from_formula('Y ~ 1 + [D ~ Z]', data=self.data).fit() | |
| est = model.params['D'] | |
| conf_int = model.conf_int().loc['D'] | |
| result = f"TRUE LATE: {self.true_effect:.3f}, ESTIMATED LATE: {est:.3f}, \ | |
| 95% CI: [{conf_int[0]:.3f}, {conf_int[1]:.3f}]" | |
| if print_: | |
| print(result) | |
| return result | |
| class RDDGenerator(DataGenerator): | |
| """ | |
| Generate synthetic data for (sharp) Regression Discontinuity Design (RDD). | |
| Additional Attributes: | |
| cutoff (float): the cutoff for treatment assignment | |
| bandwidth (float): the bandwidth for the running variable we consider when estimating the treatment effects | |
| plot (bool): whether we plot the data or not | |
| """ | |
| def __init__(self, n_observations, n_continuous_covars, n_binary_covars=2, mean=None, plot=False, | |
| covar=None, true_effect=1.0, seed=111, heterogeneity=0, cutoff=10, bandwidth=0.1): | |
| super().__init__(n_observations, n_continuous_covars, n_binary_covars=n_binary_covars, | |
| mean=mean, covar=covar, true_effect=true_effect, seed=seed, | |
| heterogeneity=heterogeneity) | |
| self.cutoff = cutoff | |
| self.bandwidth = bandwidth | |
| self.method = "RDD" | |
| self.plot=plot | |
| print("self.plot", self.plot) | |
| def generate_data(self): | |
| X = self.generate_covariates() | |
| cols = [f"X{i+1}" for i in range(self.n_covars)] | |
| df = pd.DataFrame(X, columns=cols) | |
| df['running_X'] = np.random.normal(0, 2, size=self.n_observations) + self.cutoff | |
| df['D'] = (df['running_X'] >= self.cutoff).astype(int) | |
| intercept = 10 | |
| coeffs = np.random.normal(0, 0.1, size=self.n_covars) | |
| ## slope of the line below the threshold | |
| m_below = 1.5 | |
| ## slope of the line above the threshold | |
| m_above = 0.8 | |
| df['running_centered'] = df['running_X'] - self.cutoff | |
| # Use centered version for slope | |
| df["Y"] = (intercept + self.true_effect * df['D'] + m_below * df['running_centered'] * (1 - df['D']) + \ | |
| m_above * df['running_centered'] * df['D'] + X @ coeffs + np.random.normal(0, 0.5, size=self.n_observations)) | |
| if self.plot: | |
| plt.figure(figsize=(10, 6)) | |
| plt.scatter(df[df['D']==0]['running_X'], df[df['D']==0]['Y'], | |
| alpha=0.5, label='Control', color='blue') | |
| plt.scatter(df[df['D']==1]['running_X'], df[df['D']==1]['Y'], | |
| alpha=0.5, label='Treatment', color='red') | |
| plt.axvline(self.cutoff, color='black', linestyle='--', label='Cutoff') | |
| plt.show() | |
| self.data = df[[cols for cols in df.columns if cols != 'running_centered']] | |
| return self.data | |
| def test_data(self, print_=False): | |
| if self.data is None: | |
| raise ValueError("Data not generated yet.") | |
| df = self.data.copy() | |
| df['running_adj'] = df['running_X'].astype(float) - self.cutoff | |
| df = df[np.abs(df['running_adj']) <= self.bandwidth].copy() | |
| model = smf.ols('Y ~ D + running_adj + D:running_adj', data=df).fit() | |
| est = model.params['D'] | |
| conf_int = model.conf_int().loc['D'] | |
| result = f"TRUE LATE: {self.true_effect:.3f}, ESTIMATED LATE: {est:.3f}, \ | |
| 95% CI: [{conf_int[0]:.3f}, {conf_int[1]:.3f}]" | |
| if print_: | |
| print(result) | |
| return result | |
| class DiDGenerator(DataGenerator): | |
| """ | |
| Generate synthetic data for Difference-in-Differences (DiD) analysis | |
| Additional Attributes: | |
| 1. n_periods (int): number of time-periods | |
| """ | |
| def __init__(self, n_observations, n_continuous_covars, n_binary_covars=2, n_periods=2, | |
| mean=None, covar=None, true_effect=1.0, seed=111, heterogeneity=0): | |
| super().__init__(n_observations, n_continuous_covars, n_binary_covars=n_binary_covars, | |
| mean=mean, covar=covar, true_effect=true_effect, | |
| seed=seed, heterogeneity=heterogeneity) | |
| self.method = "DiD" | |
| self.n_periods = n_periods | |
| def canonical_did_model(self): | |
| """ | |
| This is the classical DiD setting with two periods (pre and post treatment) and two groups (treatment and control) | |
| """ | |
| ## fraction of observations that receives the treatment | |
| frac_treated = np.random.uniform(0.35, 0.65) | |
| n_treated = int(frac_treated * self.n_observations) | |
| unit_ids = np.arange(self.n_observations) | |
| treatment_status = np.zeros(self.n_observations, dtype=int) | |
| treatment_status[:n_treated] = 1 | |
| np.random.shuffle(treatment_status) | |
| X = self.generate_covariates() | |
| cols = [f"X{i+1}" for i in range(self.n_covars)] | |
| covar_df = pd.DataFrame(X, columns=cols) | |
| vec = np.random.normal(0, 0.1, size=self.n_covars) | |
| intercept = np.random.normal(50, 3) | |
| treat_effect = np.random.normal(0, 1) | |
| time_effect = np.random.normal(0, 1) | |
| covar_term = X @ vec | |
| pre_noise = np.random.normal(0, 1, self.n_observations) | |
| pre_outcome = intercept + covar_term + pre_noise + treat_effect * treatment_status | |
| pre_data = pd.DataFrame({'unit_id': unit_ids, 'post': 0, 'D': treatment_status, | |
| 'Y': pre_outcome}) | |
| post_noise = np.random.normal(0, 1, self.n_observations) | |
| post_outcome = (intercept + time_effect + covar_term + self.true_effect * treatment_status | |
| + treat_effect * treatment_status + post_noise) | |
| post_data = pd.DataFrame({'unit_id': unit_ids, 'post': 1, 'D': treatment_status, | |
| 'Y': post_outcome}) | |
| df = pd.concat([pre_data, post_data], ignore_index=True) | |
| df = df.merge(covar_df, left_on="unit_id", right_index=True) | |
| return df[['unit_id', 'post', 'D', 'Y'] + cols] | |
| def twfe_model(self): | |
| """ | |
| Generate panel data for Two-Way Fixed Effects DiD model. This is a generalization of 2-period DiD for multi-year treatments | |
| """ | |
| ## fraction of observations that receives the treatment | |
| frac_treated = np.random.uniform(0.35, 0.65) | |
| unit_ids = np.arange(1, self.n_observations + 1) | |
| time_periods = np.arange(0, self.n_periods) | |
| df = pd.DataFrame([(i, t) for i in unit_ids for t in time_periods], | |
| columns=["unit", "time"]) | |
| X = self.generate_covariates() | |
| for j in range(self.n_covars): | |
| df[f"X{j+1}"] = np.repeat(X[:, j], self.n_periods) | |
| ## Assign treatment timing | |
| n_treated = int(frac_treated * self.n_observations) | |
| treated_units = np.random.choice(unit_ids, size=n_treated, replace=False) | |
| treatment_start = {unit: np.random.randint(1, self.n_periods) for unit in treated_units} | |
| df["treat_post"] = df.apply(lambda row: int(row["unit"] in treatment_start and | |
| row["time"] >= treatment_start[row["unit"]]),axis=1) | |
| ## State fixed effects | |
| unit_effects = dict(zip(unit_ids, np.random.normal(0, 1.0, self.n_observations))) | |
| ## Time fixed effects | |
| time_effects = dict(zip(time_periods, np.random.normal(0, 1, len(time_periods)))) | |
| df["unit_fe"] = df["unit"].map(unit_effects) | |
| df["time_fe"] = df["time"].map(time_effects) | |
| covar_effects = np.random.normal(0, 0.1, self.n_covars) | |
| X_matrix = df[[f"X{j+1}" for j in range(self.n_covars)]].values | |
| covar_term = X_matrix @ covar_effects | |
| intercept = np.random.normal(50, 3) | |
| noise = np.random.normal(0, 1, len(df)) | |
| df["Y"] = intercept + covar_term + df["unit_fe"] + df["time_fe"] + self.true_effect * df["treat_post"] + noise | |
| final_df = df[["unit", "time", "treat_post", "Y"] + [f"X{j+1}" for j in range(self.n_covars)]] | |
| final_df = final_df.rename(columns={"time": "year", "treat_post": "D"}) | |
| return final_df | |
| def generate_data(self): | |
| if self.n_periods == 2: | |
| self.data = self.canonical_did_model() | |
| else: | |
| self.data = self.twfe_model() | |
| return self.data | |
| def test_data(self, print_=False): | |
| estimated_att = None | |
| if self.data is None: | |
| raise ValueError("Data not generated yet.") | |
| if self.n_periods == 2: | |
| print("Testing canonical DiD model") | |
| model = smf.ols('Y ~ D * post', data=self.data).fit() | |
| estimated_att = model.params['D:post'] | |
| conf_int = model.conf_int().loc['D:post'] | |
| else: | |
| print("Testing TWFE model") | |
| model = smf.ols('Y ~ D + C(unit) + C(year)', data=self.data).fit() | |
| estimated_att = model.params['D'] | |
| conf_int = model.conf_int().loc['D'] | |
| result = "TRUE ATT: {:.3f}, EMPRICAL ATT:{:.3f}\nCONFIDENCE INTERVAL:{}".format( | |
| self.true_effect, estimated_att, conf_int) | |
| if print_: | |
| print(result) | |
| return result | |