-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathlambda_function.py
More file actions
159 lines (126 loc) · 6.08 KB
/
Copy pathlambda_function.py
File metadata and controls
159 lines (126 loc) · 6.08 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
from collections import defaultdict
import os
import tempfile
import causalbench
from helper_services.causal_analysis_helper import run_causal_analysis
import math
from helper_services.causal_recommendation_helper import run_causal_recommendation
from helper_services.g2s_causal_recommendation_helper import run_g2s_causal_recommendation
from helper_services.download_helper import download_files
from helper_services.report_helper import generate_report
from helper_services.hp_dtype_helper import get_hp_dtypes
from helper_services.mail_helper import send_email
import numpy as np
def build_email_body(causal_analysis_results, event):
outcome_column = event.get('outcome_column', 'Time.Duration')
filters = event.get('filters', None)
metadata = causal_analysis_results.get('_metadata', {})
experiment_count = metadata.get('experiment_count', 0)
insufficient_data = metadata.get('insufficient_data', False)
insufficient_data_reason = metadata.get('insufficient_data_reason', None)
lines = ["CausalBench+ Causal Analysis Report", ""]
lines.append(f"Outcome metric: {outcome_column}")
lines.append(f"Experiments: Effects on {outcome_column} ({experiment_count} experiments)")
if filters:
filter_str = ", ".join(f"{k}={v}" for k, v in filters.items()) if isinstance(filters, dict) else str(filters)
lines.append(f"Filters applied: {filter_str}")
lines.append("")
if insufficient_data:
lines.append("INSUFFICIENT DATA: Causal effects could not be computed.")
if insufficient_data_reason:
lines.append(f"Reason: {insufficient_data_reason}")
lines.append("")
lines.append("To get results, run more experiments with varied hyperparameter configurations.")
lines.append("Minimum requirements: ≥ 2 data points per variable, ≥ 2 unique values per hyperparameter.")
else:
all_effects = {}
for group, group_data in causal_analysis_results.items():
if group == "_metadata":
continue
for k, v in group_data.get("effects", {}).items():
if isinstance(v, (int, float)) and math.isfinite(v):
all_effects[k] = v
if all_effects:
sorted_effects = sorted(all_effects.items(), key=lambda x: abs(x[1]), reverse=True)[:3]
lines.append("Top causal effects:")
for hp, effect in sorted_effects:
hp_name = hp.split(".", 1)[1] if "." in hp else hp
sign = "+" if effect >= 0 else ""
lines.append(f" {hp_name}: {sign}{effect:.4f}")
lines.append("")
lines.append("Full results are in the attached PDF report.")
return "\n".join(lines)
def configure_env():
"""
Directory setup to ensure isolation
"""
# fake temporary directory
temp_dir = tempfile.mkdtemp()
tempfile.tempdir = None
os.environ["TMPDIR"] = temp_dir
os.environ["TEMP"] = temp_dir
os.environ["TMP"] = temp_dir
# fake home directory
home_dir = os.path.join(temp_dir, "home")
os.makedirs(home_dir, exist_ok=True)
os.environ["HOME"] = home_dir
os.environ["USERPROFILE"] = home_dir
# fake mpl config directory
os.environ["MPLCONFIGDIR"] = os.path.join(temp_dir, "mplconfig")
def handler(event, context):
# configure the environment variables
configure_env()
# set JWT token
causalbench.services.auth.__access_token = event.get('jwt_token', None)
# maximum recommended points
max_points = max(math.ceil(np.sqrt(len(event.get('zip_urls', [])))), 50)
# outcome column
outcome_column = event.get('outcome_column', 'Time.Duration')
# download zip files
download_dir, downloaded_files = download_files(zip_urls=event.get('zip_urls', []))
# find all hyperparameter data types
hp_dtypes = get_hp_dtypes(download_dir)
# find all causal effects
causal_analysis_results, download_dir = run_causal_analysis(
download_dir=download_dir,
data_types=hp_dtypes,
outcome_column=outcome_column,
candidates=event.get('candidate_hyperparameters', None)
)
# find all causal recommendations
for group, group_data in causal_analysis_results.items():
if group == "_metadata":
continue
effects = group_data["effects"]
dimensions = defaultdict(dict)
for k, v in effects.items():
k = k.split(".")[1] # Remove 'HP.' prefix
if k in list(event.get('hyperparameter_limits', {}).keys()) and math.isfinite(v) and v != 0:
dimensions[k]['strength'] = v
dimensions[k]['min_val'] = event.get('hyperparameter_limits', {})[k]['min']
dimensions[k]['max_val'] = event.get('hyperparameter_limits', {})[k]['max']
group_data['recommend_dims'] = [f'{var}' for var in list(dimensions.keys())]
try:
if len(dimensions) > 0:
cols = ["HP." + dim for dim in dimensions.keys()]
sample_frame = group_data["data"][cols + ["outcome"]].copy()
group_data['recommendations'] = run_g2s_causal_recommendation(sample_frame, dimensions, hp_dtypes, max_points)
else:
print(f"Skipping Causal Recommendation for {group} as len(dimensions) == 0.")
except Exception as e:
print(f"Error during causal recommendation: {e}")
finally:
print(f"Causal Recommendation {group_data['recommendations']}!")
del group_data['data']
yaml_filepath, pdf_filepath, xlsx_filepath = generate_report(outcome_column, causal_analysis_results, event.get('unique_id'), event.get('run_ids'), event.get('filters'))
attachments = [pdf_filepath]
if os.path.exists(xlsx_filepath):
attachments.append(xlsx_filepath)
try:
send_email(event.get('user_email'), "[CausalBench] Causal Analysis Results", build_email_body(causal_analysis_results, event), attachments=attachments)
except Exception as e:
print(f"Error sending email: {e}")
response = {
"analysis_results": causal_analysis_results
}
return response