-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathsimulation.py
More file actions
345 lines (279 loc) · 14.8 KB
/
simulation.py
File metadata and controls
345 lines (279 loc) · 14.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
import integrator
from typing import Optional, Union, Iterable
import numpy as np
import pandas as pd
import casadi as cas
import name_and_value_fun
from base import Model
from data_type import VariableFullCodeCollectionType, VariableCodeType, VariableCodeCollectionType
from data_type import VariableMixCodeCollectionType, SimulationResultType, ValueArray
from integrator import Integrator
from name_and_value_fun import get_full_var_codes, assign_values_to_model, assign_symbols_to_model
from results import SnapshotManager
class Simulation:
def __init__(self,
model: Model,
integrator: Integrator,
p_est: VariableMixCodeCollectionType = (),
p_invar: VariableMixCodeCollectionType = (),
p_var: VariableMixCodeCollectionType = (),
log_param: bool = False,
initialization: bool = True):
self.model = model
self.integrator = integrator
self.p_est = get_full_var_codes(model.vars, p_est)
self.p_var = get_full_var_codes(model.vars, p_var)
self.p_invar = get_full_var_codes(model.vars, p_invar)
self.log_param = log_param
self.initialization = initialization
self.n_est = len(self.p_est)
self.n_variant = len(self.p_var)
self.n_invariant = len(self.p_invar)
# Store the symbolic output of sequential simulation or simulation with events
self.simulation_results: Optional[SimulationResultType] = None
self.snapshots = SnapshotManager(self.model)
self.snapshots.take_snapshot('default')
self.time_points = np.array([])
self.working_tols = [] # Adopted tolerances in all the event-simulations
def simulate_one_event(self, p_var: Iterable, is_symbol: bool, t0=0., tf=1., t_grid: np.ndarray = None, batch_i=0):
"""Simulate with only one event happening,
Applicable for both cases with symbolic and numerical inputs
"""
print('Current t0: {:.3f}'.format(t0))
# Assign current parameter input to model
assign_values_to_model(self.model, self.p_var, p_var)
# Adjust the values of t_event_start_time variable in the model
for submodel in self.model.submodels:
t_event_start_name = f'{submodel.name}.t_event_start'
if t_event_start_name in submodel.vars:
assign_values_to_model(submodel, [(t_event_start_name, 0)], [t0])
# Collect initial values
xzp0 = self.model.collect_var_val()
# Set integration time (start, end, grids)
if t_grid is None:
# The update part can be saved if T is defined as a parameter in model
self.integrator.options['t0'] = t0
self.integrator.options['tf'] = tf
ti_grid = None
else:
# Get ti_grid, the time points in during the event i and i+1
id1 = np.where(t_grid > t0)[0]
id2 = np.where(t_grid < tf)[0]
id4grid = sorted(list(set(id1).intersection(set(id2))))
ti_grid = np.concatenate([[t0], t_grid[id4grid], [tf]])
self.integrator.options['grid'] = ti_grid
self.integrator.options['output_t0'] = True
if is_symbol:
working_tol = None
if len(self.working_tols) > 0:
working_tol = self.working_tols[batch_i]
self.integrator.options['abstol'] = working_tol
self.integrator.options['reltol'] = working_tol
self.integrator.update_dae_fun()
sol = self.integrator.dae_fun(x0=xzp0['x'], z0=xzp0['z'], p=xzp0['p'])
else:
self.integrator.update_dae_fun()
sol, working_tol = integrate_with_adaptive_tolerance(self.integrator,
xzp0['x'],
xzp0['z'],
xzp0['p'],
max_tol=1e-4)
print('Current tf: {:.3f}'.format(tf))
# Transpose to guarantee each row corresponds to one time point
xf = sol['xf'].T
zf = sol['zf'].T
self.model.load_var_trajectory(xf, 'x', batch_i=batch_i)
self.model.load_var_trajectory(zf, 'z', batch_i=batch_i)
if t_grid is None:
self.model.load_var_values(xf, 'x')
self.model.load_var_values(zf, 'z')
else:
# Only store the results at the last time point in variable.val
self.model.load_var_values(xf[-1, :], 'x')
self.model.load_var_values(zf[-1, :], 'z')
return ti_grid, working_tol
def get_simulation_vector_fun(self, simulation_output: dict[VariableCodeType, list[cas.MX]]) \
-> cas.Function:
"""
:simulation_results, the dictionary of trajectory.
the key of outer level are the names of responses, the keys of inner level are the names of times
:fun, casadi function for one simulation and all response variable.
"""
vec_output = [cas.vcat(trajectory) for trajectory in simulation_output.values()]
vec_output = cas.hcat(vec_output)
p_est = cas.MX.sym('p_est', self.n_est)
fun = cas.Function('fun', [p_est], [vec_output])
return fun
def _assign_p_invariant_and_initialize(self, p_invar: Iterable[float], p_est: Iterable[Union[float, cas.MX]]):
"""Preprocessing before builiding the simulation function
"""
# Empty the initial variable trajectory
for var in self.model.vars.values():
var.trajectory_batch = 0
# Assign initial control parameters and initial conditions
assign_values_to_model(self.model, self.p_invar, p_invar)
if p_est is not None :
if self.log_param and isinstance(p_est[0], cas.MX):
assign_values_to_model(self.model, self.p_est, cas.exp(p_est)) # Apply exponential function
else:
assign_values_to_model(self.model, self.p_est, p_est)
# Do initialization (from known to get unknown)
if self.initialization:
self.integrator.initializer.initialize(update_fixed=True)
def get_output_trajectory_symbol(self, response_names: Iterable[VariableCodeType]) -> dict[VariableCodeType, cas.MX]:
"""Get the trajectorys of a group of variables
"""
# Desired output variable
vars = {}
for var_code in response_names:
# list[cas.MX], each element corresponds the result at one time point
var = self.model.get_var_component_trajectory(var_code)
vars[var_code] = var
return vars
def get_output_trajectory_numerical(self, response_names: Iterable[VariableCodeType]) -> pd.DataFrame:
"""Get numerical values of the trajectories of a group of variables
"""
# Desired output variable
response_vars = {}
for var_code in response_names:
# list[cas.DM], each element corresponds the result at one time point
response_var = self.model.get_var_component_trajectory(var_code)
response_vars[var_code] = cas.vcat(response_var).full().flatten()
response_vars = pd.DataFrame(response_vars, index=self.time_points)
return response_vars
def get_output_endpoint_symbol(self, response_names: Iterable[VariableCodeType]) -> dict[VariableCodeType, cas.MX]:
"""Get the trajectorys of a group of variables
"""
# Desired output variable
output_list = name_and_value_fun.get_variable_collection_values(self.model.vars, response_names)
output = dict(zip(response_names, output_list))
return output
class SimulationDyn(Simulation):
def simulation_with_events(self,
t_event: Iterable[float] = (0., 1.),
p_invar: Iterable[float] = (),
p_var: Iterable[tuple[float, ...]] = (()),
p_est: Optional[cas.MX] = None):
"""
N_grids is the number of grid points in each time interval (it should be at least 2),
N_grids is None implies that we want to generate symbolic results instead of float values.
single_output is to address the case when we only want to generate the end point value for the simulation
"""
# Assign invariant parameters, estimation parameters to model and conduct initialization
self._assign_p_invariant_and_initialize(p_invar, p_est)
initial_integrator_options = self.integrator.options.copy()
for i in range(len(t_event)-1):
self.simulate_one_event(p_var[i],
is_symbol=True,
t0=t_event[i],
tf=t_event[i + 1],
batch_i=i)
# Recover the integrator options to default options
self.integrator.options = initial_integrator_options
self.integrator.update_dae_fun()
def simulation_with_events_numerical(self,
t_event: Iterable[float] = (0., 1.),
p_invar: Iterable[float] = (),
p_var: Iterable[Iterable[float]] = (()),
t_grid: np.ndarray = np.array([0, 1.]),
p_est: Optional[Iterable[float]] = None,
):
"""
N_grids is the number of grid points in each time interval (it should be at least 2),
N_grids is None implies that we want to generate symbolic results instead of float values.
single_output is to address the case when we only want to generate the end point value for the simulation
t_grid include the time points in the whole horizon
"""
# Assign invariant parameters, estimation parameters to model and conduct initialization
self._assign_p_invariant_and_initialize(p_invar, p_est)
time_points = []
self.working_tols = []
initial_integrator_options = self.integrator.options.copy()
for i in range(len(t_event)-1):
ti_grid, working_tol = self.simulate_one_event(p_var[i],
is_symbol=False,
t0=t_event[i],
tf=t_event[i+1],
t_grid=t_grid,
batch_i=i)
time_points.append(ti_grid)
self.working_tols.append(working_tol)
# Recover the integrator options to default options
self.integrator.options = initial_integrator_options
self.integrator.update_dae_fun()
self.time_points = np.concatenate(time_points)
class SimulationSS(Simulation):
def simulation_steady_state(self, p_invar: Iterable[float], p_est: Optional[cas.MX] = None):
"""
N_grids is the number of grid points in each time interval (it should be at least 2),
N_grids is None implies that we want to generate symbolic results instead of float values.
single_output is to address the case when we only want to generate the end point value for the simulation
"""
# Assign invariant parameters, estimation parameters to model and conduct initialization
self._assign_p_invariant_and_initialize(p_invar, p_est)
self.integrator.initializer.initialize(update_fixed=True)
xzp0 = self.model.collect_var_val()
sol = self.integrator.dae_fun(x0=xzp0['x'], z0=xzp0['z'], p=xzp0['p'])
xf = sol['xf'].T
zf = sol['zf'].T
self.model.load_var_values(xf, 'x')
self.model.load_var_values(zf, 'z')
def simulation_SS_numerical(self,
p_invar: Iterable[float] = (),
t_grid: np.ndarray = np.array([0, 1.]),
p_est: Optional[Iterable[float]] = None,
):
"""
N_grids is the number of grid points in each time interval (it should be at least 2),
N_grids is None implies that we want to generate symbolic results instead of float values.
single_output is to address the case when we only want to generate the end point value for the simulation
t_grid include the time points in the whole horizon
"""
# Assign invariant parameters, estimation parameters to model and conduct initialization
self._assign_p_invariant_and_initialize(p_invar, p_est)
time_points = []
self.working_tols = []
initial_integrator_options = self.integrator.options.copy()
ti_grid, working_tol = self.simulate_one_event((),
is_symbol=False,
t0=t_grid[0],
tf=t_grid[-1],
t_grid=t_grid,
batch_i=0)
time_points.append(ti_grid)
self.working_tols.append(working_tol)
# Recover the integrator options to default options
self.integrator.options = initial_integrator_options
self.integrator.update_dae_fun()
self.time_points = np.concatenate(time_points)
def integrate_with_adaptive_tolerance(integrator: Integrator, x0, z0, p, max_tol=1e-4):
"""Integrate a DAE with tolerance adjusted to guarantee convergence.
Only used in simulation for numerical results
"""
# Set integrator tolerances
default_tol = integrator.options['abstol']
working_tol = default_tol
solution_stat = False
sol = None
# Solve adaptively
while integrator.options['abstol'] <= max_tol:
try:
sol = integrator.dae_fun(x0=x0, z0=z0, p=p)
solution_stat = True
break
except Exception as err:
print(f"When tolerance={integrator.options['abstol']}:", err)
# Increase the tolerance
working_tol = integrator.options['abstol'] * 10
integrator.options['abstol'] = working_tol
integrator.options['reltol'] = working_tol
# Update integrator
integrator.update_dae_fun()
if not solution_stat:
raise ValueError(err)
# If not at default tolerance, reset it back
if integrator.options['abstol'] != default_tol:
integrator.options['abstol'] = default_tol
integrator.options['reltol'] = default_tol
integrator.update_dae_fun()
return sol, working_tol