-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathpredictive_analytics.py
More file actions
188 lines (151 loc) · 6.59 KB
/
predictive_analytics.py
File metadata and controls
188 lines (151 loc) · 6.59 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
import numpy as np
import pandas as pd
from statsmodels.tsa.arima.model import ARIMA
from statsmodels.tsa.holtwinters import ExponentialSmoothing
import logging
from typing import Dict, List, Tuple, Optional
import json
log = logging.getLogger("predictive_analytics")
class TimeSeriesForecaster:
def __init__(self, method="arima", seasonal_periods=24):
"""
Initialize time series forecaster.
Args:
method: Forecasting method ('arima', 'exponential_smoothing')
seasonal_periods: Number of periods in a season (e.g., 24 for hourly data)
"""
self.method = method
self.seasonal_periods = seasonal_periods
self.model = None
def fit(self, data: List[float]) -> None:
"""Fit the forecasting model on historical data."""
if len(data) < 10:
log.warning("Insufficient data for forecasting")
return
data_array = np.array(data)
if self.method == "arima":
try:
# Simple ARIMA(1,1,1) - can be made more sophisticated
self.model = ARIMA(data_array, order=(1, 1, 1))
self.model = self.model.fit()
except Exception as e:
log.warning(f"ARIMA fitting failed: {e}")
self.model = None
elif self.method == "exponential_smoothing":
try:
self.model = ExponentialSmoothing(data_array, seasonal_periods=self.seasonal_periods, trend='add', seasonal='add')
self.model = self.model.fit()
except Exception as e:
log.warning(f"Exponential smoothing fitting failed: {e}")
self.model = None
log.info(f"Time series forecaster fitted with method: {self.method}")
def forecast(self, steps: int = 24) -> Tuple[List[float], List[float], List[float]]:
"""
Generate forecast.
Args:
steps: Number of steps to forecast
Returns:
Tuple of (forecast_values, lower_bounds, upper_bounds)
"""
if self.model is None:
log.warning("Model not fitted, cannot forecast")
return [], [], []
try:
if self.method == "arima":
forecast_result = self.model.forecast(steps=steps)
forecast_values = forecast_result.tolist()
# Simple confidence intervals (can be improved)
std_dev = np.std(self.model.resid) if hasattr(self.model, 'resid') else 1.0
lower_bounds = (np.array(forecast_values) - 1.96 * std_dev).tolist()
upper_bounds = (np.array(forecast_values) + 1.96 * std_dev).tolist()
elif self.method == "exponential_smoothing":
forecast_result = self.model.forecast(steps)
forecast_values = forecast_result.tolist()
# Placeholder confidence intervals
std_dev = np.std(self.model.resid) if hasattr(self.model, 'resid') else 1.0
lower_bounds = (np.array(forecast_values) - 1.96 * std_dev).tolist()
upper_bounds = (np.array(forecast_values) + 1.96 * std_dev).tolist()
return forecast_values, lower_bounds, upper_bounds
except Exception as e:
log.warning(f"Forecasting failed: {e}")
return [], [], []
def forecast_timeseries(filepath: str, hours_ahead: int = 24, method: str = "arima") -> Dict[str, Dict]:
"""
Forecast timeseries data from JSONL file.
Args:
filepath: Path to timeseries JSONL file
hours_ahead: Hours to forecast ahead
method: Forecasting method
Returns:
Dictionary with forecast data by entity
"""
try:
# Load timeseries data
data = []
with open(filepath, 'r', encoding='utf-8') as f:
for line in f:
data.append(json.loads(line.strip()))
if not data:
return {}
# Group by entity and metric
grouped_data = {}
for record in data:
key = f"{record['entity_id']}_{record['metric_name']}"
if key not in grouped_data:
grouped_data[key] = []
grouped_data[key].append(record)
forecast_results = {}
for key, records in grouped_data.items():
# Sort by timeframe
records.sort(key=lambda x: x['timeframe']['from'])
# Extract values
values = []
timestamps = []
for record in records:
if record['points']:
for point in record['points']:
values.append(point['value'])
timestamps.append(point['timestamp'])
if len(values) < 10:
log.warning(f"Insufficient data for {key}")
continue
# Fit forecaster
forecaster = TimeSeriesForecaster(method=method)
forecaster.fit(values)
# Generate forecast
forecast_values, lower_bounds, upper_bounds = forecaster.forecast(steps=hours_ahead)
if forecast_values:
# Generate future timestamps (assuming hourly data)
last_timestamp = timestamps[-1] if timestamps else int(pd.Timestamp.now().timestamp() * 1000)
future_timestamps = [last_timestamp + (i + 1) * 3600 * 1000 for i in range(hours_ahead)]
forecast_results[key] = {
'historical': {
'timestamps': timestamps,
'values': values
},
'forecast': {
'timestamps': future_timestamps,
'values': forecast_values,
'lower_bound': lower_bounds,
'upper_bound': upper_bounds
}
}
return forecast_results
except FileNotFoundError:
log.warning(f"Timeseries file not found: {filepath}")
return {}
except Exception as e:
log.warning(f"Forecasting failed: {e}")
return {}
if __name__ == "__main__":
# Example usage
import random
# Generate sample data
historical_data = [100 + random.gauss(0, 5) for _ in range(50)]
forecaster = TimeSeriesForecaster(method="arima")
forecaster.fit(historical_data)
forecast_values, lower_bounds, upper_bounds = forecaster.forecast(steps=10)
print(f"Generated {len(forecast_values)} forecast points")
if forecast_values:
print(f"First forecast value: {forecast_values[0]:.2f}")
print(f"Confidence interval: [{lower_bounds[0]:.2f}, {upper_bounds[0]:.2f}]")