diff --git a/src/eaa/task_managers/base.py b/src/eaa/task_managers/base.py index b186c21..6b4457c 100644 --- a/src/eaa/task_managers/base.py +++ b/src/eaa/task_managers/base.py @@ -86,6 +86,8 @@ def build_db(self, *args, **kwargs): def build_agent(self, *args, **kwargs): """Build the assistant(s).""" + if self.llm_config is None: + return if not isinstance(self.llm_config, LLMConfig): raise ValueError( "`llm_config` must be an instance of `LLMConfig`. The type of this " diff --git a/src/eaa/task_managers/tuning/bo.py b/src/eaa/task_managers/tuning/bo.py new file mode 100644 index 0000000..9e5733a --- /dev/null +++ b/src/eaa/task_managers/tuning/bo.py @@ -0,0 +1,120 @@ +import logging +from typing import Optional, Callable + +import torch + +from eaa.task_managers.base import BaseTaskManager +from eaa.tools.base import BaseTool +from eaa.tools.bo import BayesianOptimizationTool +from eaa.api.llm_config import LLMConfig + +logger = logging.getLogger(__name__) + + +class BayesianOptimizationTaskManager(BaseTaskManager): + + def __init__( + self, + llm_config: LLMConfig = None, + tools: list[BaseTool] = [], + bayesian_optimization_tool: BayesianOptimizationTool = None, + initial_points: Optional[torch.Tensor] = None, + n_initial_points: int = 20, + objective_function: Callable = None, + message_db_path: Optional[str] = None, + build: bool = True, + *args, **kwargs + ) -> None: + """Bayesian optimization task manager. + + Parameters + ---------- + llm_config : LLMConfig, optional + The configuration for the LLM. + tools : list[BaseTool], optional + A list of tools for the agent. This should NOT include the + `BayesianOptimizationTool`. + bayesian_optimization_tool : BayesianOptimizationTool + The Bayesian optimization tool to use. + initial_points : torch.Tensor, optional + A (n_points, n_features) tensor giving the initial points where + the objective function should be evaluated to initialize the + Gaussian process model. If None, random initial points will be + generated. + n_initial_points : int, optional + The number of initial points to generate if `initial_points` is None. + objective_function : Callable + The objective function to be maximized. This function should take + a single argument, which is a (n_points, n_features) tensor of + points to evaluate the objective function at. It should return + a (n_points, n_objectives) tensor of objective function values. + message_db_path : Optional[str] + If provided, the entire chat history will be stored in + a SQLite database at the given path. This is essential + if you want to use the WebUI, which polls the database + for new messages. + build : bool, optional + Whether to build the internal state of the task manager. + """ + if bayesian_optimization_tool is None: + raise ValueError( + "Bayesian optimization tool should be explicitly passed to " + "`bayesian_optimization_tool`." + ) + if objective_function is None: + raise ValueError("`objective_function` is required.") + + self.bayesian_optimization_tool = bayesian_optimization_tool + + for tool in tools: + if isinstance(tool, BayesianOptimizationTool): + raise ValueError( + "`BayesianOptimizationTool` should not be included in `tools`. " + "Instead, pass it to `bayesian_optimization_tool`." + ) + + self.objective_function = objective_function + + self.initial_points = initial_points + self.n_initial_points = n_initial_points + + super().__init__( + llm_config=llm_config, + tools=tools, + message_db_path=message_db_path, + build=build, + *args, **kwargs + ) + + def run( + self, + n_iterations: int = 50, + *args, **kwargs + ) -> None: + """Run Bayesian optimization. Upon the second or later call, + this function continues from the last iteration. + + Parameters + ---------- + n_iterations : int, optional + The number of iterations to run. + """ + if len(self.bayesian_optimization_tool.xs_untransformed) == 0: + if self.initial_points is None: + xs_init = self.bayesian_optimization_tool.get_random_initial_points(n_points=self.n_initial_points) + else: + xs_init = self.initial_points + logger.info(f"Initial points (shape: {xs_init.shape}):\n{xs_init}") + + for x in xs_init: + x = x[None, :] + y = self.objective_function(x) + self.bayesian_optimization_tool.update(x, y) + self.bayesian_optimization_tool.build() + + for i in range(n_iterations): + candidates = self.bayesian_optimization_tool.suggest(n_suggestions=1) + logger.info(f"Candidate suggested: {candidates[0]}") + y = self.objective_function(candidates) + logger.info(f"Objective function value: {y.item()}") + self.bayesian_optimization_tool.update(candidates, y) diff --git a/src/eaa/task_managers/tuning/bo_mic_optics.py b/src/eaa/task_managers/tuning/bo_mic_optics.py new file mode 100644 index 0000000..4ce5fc1 --- /dev/null +++ b/src/eaa/task_managers/tuning/bo_mic_optics.py @@ -0,0 +1,148 @@ +import logging +from typing import Optional + +import torch +import numpy as np +from PIL import Image + +from eaa.task_managers.tuning.bo import BayesianOptimizationTaskManager +from eaa.task_managers.imaging.feature_tracking import FeatureTrackingTaskManager +from eaa.tools.base import BaseTool +from eaa.tools.bo import BayesianOptimizationTool +from eaa.api.llm_config import LLMConfig + +logger = logging.getLogger(__name__) + + +class MicroscopyOpticsTuningBOTaskManager( + BayesianOptimizationTaskManager, + FeatureTrackingTaskManager +): + def __init__( + self, + llm_config: LLMConfig = None, + image_acquisition_tool: BaseTool = None, + parameter_setting_tool: BaseTool = None, + bayesian_optimization_tool: BayesianOptimizationTool = None, + initial_points: Optional[torch.Tensor] = None, + n_initial_points: int = 20, + image_acquisition_kwargs: dict = {}, + feature_tracking_kwargs: dict = {}, + message_db_path: Optional[str] = None, + *args, **kwargs + ): + """The Bayesian optimization task manager for microscopy optics tuning. + + Parameters + ---------- + llm_config : LLMConfig, optional + The configuration for the LLM. + bayesian_optimization_tool : BayesianOptimizationTool + The Bayesian optimization tool to use. + initial_points : torch.Tensor, optional + A (n_points, n_features) tensor giving the initial points where + the objective function should be evaluated to initialize the + Gaussian process model. If None, random initial points will be + generated. + n_initial_points : int, optional + The number of initial points to generate if `initial_points` is None. + image_acquisition_kwargs : dict, optional + The arguments of the image acquisition tool that should be used + when acquiring images for evaluating the objective function. + feature_tracking_kwargs : dict, optional + The arguments of the feature tracking task manager's `run` method. + message_db_path : Optional[str] + If provided, the entire chat history will be stored in + a SQLite database at the given path. This is essential + if you want to use the WebUI, which polls the database + for new messages. + build : bool, optional + Whether to build the internal state of the task manager. + """ + if bayesian_optimization_tool is None: + raise ValueError( + "Bayesian optimization tool should be explicitly passed to " + "`bayesian_optimization_tool`." + ) + if image_acquisition_tool is None: + raise ValueError( + "Image acquisition tool should be explicitly passed to " + "`image_acquisition_tool`." + ) + if parameter_setting_tool is None: + raise ValueError( + "Parameter setting tool should be explicitly passed to " + "`parameter_setting_tool`." + ) + + self.image_acquisition_tool = image_acquisition_tool + self.parameter_setting_tool = parameter_setting_tool + self.image_acquisition_kwargs = image_acquisition_kwargs + self.feature_tracking_kwargs = feature_tracking_kwargs + + BayesianOptimizationTaskManager.__init__( + self, + llm_config=llm_config, + tools=[], + bayesian_optimization_tool=bayesian_optimization_tool, + initial_points=initial_points, + n_initial_points=n_initial_points, + objective_function=self.objective_function, + message_db_path=message_db_path, + build=False, + *args, **kwargs + ) + FeatureTrackingTaskManager.__init__( + self, + llm_config=llm_config, + tools=[image_acquisition_tool], + build=True, + *args, **kwargs + ) + + def objective_function(self, x: torch.Tensor, *args, **kwargs): + """Calculate the objective function value. + + Parameters + ---------- + x : torch.Tensor + A (n_points, n_features) tensor of points to evaluate the + objective function at. + + Returns + ------- + torch.Tensor + A (n_points, 1) tensor of objective function values. + """ + if x.ndim != 2: + raise ValueError( + "`x` should be a 2D tensor of shape (n_points, n_features)." + ) + + objective_values = torch.zeros(x.shape[0], 1, device=x.device) + + for i, x_i in enumerate(x): + # Acquire an image with the current parameters. It will be used + # as the reference image for feature tracking. + acquired_image_path = self.image_acquisition_tool.acquire_image( + **self.image_acquisition_kwargs + ) + + # Apply parameters. + self.parameter_setting_tool.set_parameters(x_i) + + # Now the original feature will have drifted. Run feature tracking + # to bring it back. + self.run_feature_tracking( + **self.feature_tracking_kwargs + ) + + # Get a new image after feature tracking. + acquired_image_path = self.image_acquisition_tool.acquire_image( + **self.image_acquisition_kwargs + ) + image = Image.open(acquired_image_path) + image = np.array(image) + + objective_values[i, 0] = np.std(image) + return objective_values diff --git a/src/eaa/tools/bo.py b/src/eaa/tools/bo.py index d6a41aa..9fe1c70 100644 --- a/src/eaa/tools/bo.py +++ b/src/eaa/tools/bo.py @@ -1,4 +1,4 @@ -from typing import Annotated, Callable, Tuple, List, Type +from typing import Annotated, Callable, Tuple, List, Type, Dict, Any import logging import botorch.generation @@ -10,7 +10,7 @@ import gpytorch import torch -from eaa.tools.base import BaseTool +from eaa.tools.base import BaseTool, ToolReturnType from eaa.util import to_tensor logger = logging.getLogger(__name__) @@ -102,17 +102,30 @@ def __init__( ) # Untransformed data - self.xs_raw = torch.tensor([]) - self.ys_raw = torch.tensor([]) + self.xs_untransformed = torch.tensor([]) + self.ys_untransformed = torch.tensor([]) # Transformed data - self.xs = torch.tensor([]) - self.ys = torch.tensor([]) + self.xs_transformed = torch.tensor([]) + self.ys_transformed = torch.tensor([]) self.input_transform = None self.outcome_transform = None super().__init__(*args, build=False, **kwargs) + + self.exposed_tools: List[Dict[str, Any]] = [ + { + "name": "update", + "function": self.update, + "return_type": ToolReturnType.NUMBER + }, + { + "name": "suggest", + "function": self.suggest, + "return_type": ToolReturnType.NUMBER + } + ] def check_x_data(self, data: torch.Tensor): if not (data.ndim == 2 and data.shape[1] == self.n_dims_in): @@ -157,7 +170,7 @@ def build(self) -> None: """ self.initialize_transforms() self.train_transforms_and_transform_data() - self.initialize_model(self.xs, self.ys) + self.initialize_model(self.xs_transformed, self.ys_transformed) self.fit_kernel_hyperparameters() self.build_acquisition_function() @@ -219,8 +232,8 @@ def train_transforms_and_transform_data(self): The bounds of the input transformed are set when the transform is instantiated and are not trained here. """ - self.xs, self.ys = self.transform_data( - self.xs_raw, self.ys_raw, train_x=False, train_y=True + self.xs_transformed, self.ys_transformed = self.transform_data( + self.xs_untransformed, self.ys_untransformed, train_x=False, train_y=True ) def transform_data(self, x=None, y=None, train_x=False, train_y=False): @@ -408,26 +421,49 @@ def update( ---------- x : torch.Tensor | np.ndarray A tensor or numpy array of shape (n_samples, n_features) giving - the *un-transformed* input parameters. + the *un-transformed* input parameters (raw values before normalization). y : torch.Tensor | np.ndarray A tensor or numpy array of shape (n_samples, n_observations) giving the - *un-transformed* observations of the objective function. For multi-task - optimization problems, `n_observations` should be equal to the number of - tasks. + *un-transformed* observations (raw values before standardization) + of the objective function. For multi-task optimization problems, + `n_observations` should be equal to the number of tasks. """ x = to_tensor(x) y = to_tensor(y) - x, y = self.transform_data(x, y) - self.check_x_data(x) self.check_y_data(y) - self.xs_raw = torch.cat([self.xs_raw, x]) - self.ys_raw = torch.cat([self.ys_raw, y]) + self.xs_untransformed = torch.cat([self.xs_untransformed, x]) + self.ys_untransformed = torch.cat([self.ys_untransformed, y]) if self.input_transform is not None and self.outcome_transform is not None: - self.xs, self.ys = self.transform_data(x, y) + x, y = self.transform_data(x, y) + self.xs_transformed = torch.cat([self.xs_transformed, x]) + self.ys_transformed = torch.cat([self.ys_transformed, y]) + if self.model is not None: + self.model.condition_on_observations(x, y) + else: + logger.info( + "GP model is not updated because it is not built yet by calling " + "`build`." + ) + else: + logger.info( + "GP model and variable buffers are not updated because normalization " + "and standardization transforms are not built yet by calling `build`." + ) + + if ( + len(self.xs_untransformed) != len(self.xs_transformed) + or len(self.ys_untransformed) != len(self.ys_transformed) + ): + logger.debug( + "The number of untransformed and transformed data are not equal. " + "This is expected if normalization and standardization transforms " + "are not built yet by calling `build`. However, if you have " + "already done so, this is unexpected." + ) def suggest( self, n_suggestions: int = 1 @@ -443,6 +479,8 @@ def suggest( ------- torch.Tensor A (n_samples, n_features) tensor giving the suggested points to observe. + The values are in the untransformed space (raw observation before + normalization and standardization). """ if isinstance( self.acquisition_function, botorch.acquisition.AnalyticAcquisitionFunction @@ -456,8 +494,8 @@ def suggest( acq_function=self.acquisition_function, bounds=torch.stack( [ - torch.zeros(self.n_dims_in, dtype=self.xs.dtype), - torch.ones(self.n_dims_in, dtype=self.xs.dtype), + torch.zeros(self.n_dims_in, dtype=self.xs_transformed.dtype), + torch.ones(self.n_dims_in, dtype=self.xs_transformed.dtype), ] ), q=n_suggestions, diff --git a/tests/test_bo.py b/tests/test_bo.py index fbd7abe..6beaf87 100644 --- a/tests/test_bo.py +++ b/tests/test_bo.py @@ -8,6 +8,7 @@ import torch from eaa.tools.bo import BayesianOptimizationTool +from eaa.task_managers.tuning.bo import BayesianOptimizationTaskManager import test_utils as tutils @@ -34,12 +35,12 @@ def visualize_gp(self, bo_tool: BayesianOptimizationTool): import matplotlib.pyplot as plt fig, ax = plt.subplots(1, 2) ax[0].imshow(mu, extent=[x_ticks.min(), x_ticks.max(), y_ticks.min(), y_ticks.max()]) - ax[0].scatter(bo_tool.xs_raw[:, 0], bo_tool.xs_raw[:, 1], color='gray') - ax[0].scatter(bo_tool.xs_raw[-1:, 0], bo_tool.xs_raw[-1:, 1], color='red', marker='x') + ax[0].scatter(bo_tool.xs_untransformed[:, 0], bo_tool.xs_untransformed[:, 1], color='gray') + ax[0].scatter(bo_tool.xs_untransformed[-1:, 0], bo_tool.xs_untransformed[-1:, 1], color='red', marker='x') ax[0].set_title('mean') ax[1].imshow(sigma, extent=[x_ticks.min(), x_ticks.max(), y_ticks.min(), y_ticks.max()]) - ax[1].scatter(bo_tool.xs_raw[:, 0], bo_tool.xs_raw[:, 1], color='gray') - ax[1].scatter(bo_tool.xs_raw[-1:, 0], bo_tool.xs_raw[-1:, 1], color='red', marker='x') + ax[1].scatter(bo_tool.xs_untransformed[:, 0], bo_tool.xs_untransformed[:, 1], color='gray') + ax[1].scatter(bo_tool.xs_untransformed[-1:, 0], bo_tool.xs_untransformed[-1:, 1], color='red', marker='x') ax[1].set_title('std') plt.show() @@ -76,14 +77,50 @@ def objective_function(x: torch.Tensor) -> torch.Tensor: for i in range(50): candidates = tool.suggest(n_suggestions=1) - logging.info(f"Candidate suggested: {candidates[0]}") + logger.info(f"Candidate suggested: {candidates[0]}") y = objective_function(candidates) + logger.info(f"Objective function value: {y.reshape(-1)}") tool.update(candidates, y) # if self.debug: # self.visualize_gp(tool) final_suggestion = candidates[0] assert torch.allclose(final_suggestion.float(), torch.tensor([1.0, 2.0]), rtol=0.1) + + def test_bo_task_manager(self): + def objective_function(x: torch.Tensor) -> torch.Tensor: + # Expected input shape: (n_samples, n_features) + # Maximum: x = [1, 2] + y = torch.exp(-((x[:, 0] - 1) ** 2 + (x[:, 1] - 2) ** 2) / (2 * 10 ** 2)) + return y[:, None] + + tutils.set_seed(42) + + bo_tool = BayesianOptimizationTool( + bounds=([-10, -10], [10, 10]), + acquisition_function_class=botorch.acquisition.LogExpectedImprovement, + acquisition_function_kwargs={ + "best_f": -100 + }, + model_class=botorch.models.SingleTaskGP, + model_kwargs={ + "covar_module": gpytorch.kernels.MaternKernel( + nu=2.5, + ) + }, + optimization_function=botorch.optim.optimize_acqf, + ) + + task_manager = BayesianOptimizationTaskManager( + llm_config=None, + bayesian_optimization_tool=bo_tool, + n_initial_points=20, + objective_function=objective_function, + ) + task_manager.run(n_iterations=20) + + final_suggestion = task_manager.bayesian_optimization_tool.xs_untransformed[-1] + assert torch.allclose(final_suggestion.float(), torch.tensor([1.0, 2.0]), rtol=0.1) if __name__ == '__main__': @@ -93,3 +130,4 @@ def objective_function(x: torch.Tensor) -> torch.Tensor: tester = TestBayesianOptimization() tester.setup_method(name="", generate_data=False, generate_gold=False, debug=True) tester.test_bayesian_optimization() + tester.test_bo_task_manager()