diff --git a/setup.py b/setup.py index 822bd25b..5f5b9549 100644 --- a/setup.py +++ b/setup.py @@ -104,7 +104,7 @@ def default_setup_args(*, version): return setup_args -version = "0.4.2" +version = "0.5.0" version = update_version(version, use_file_if_exists=False, create_file=True) install_requires = [ diff --git a/src/autogluon/cloud/backend/sagemaker_backend.py b/src/autogluon/cloud/backend/sagemaker_backend.py index 559427c8..e3cdf794 100644 --- a/src/autogluon/cloud/backend/sagemaker_backend.py +++ b/src/autogluon/cloud/backend/sagemaker_backend.py @@ -6,12 +6,13 @@ import shutil import tarfile import tempfile -from typing import Any, Dict, List, Optional, Tuple, Union +from typing import Any, Dict, List, Literal, Optional, Tuple, Union import pandas as pd import sagemaker from botocore.exceptions import ClientError from sagemaker import Predictor +from sagemaker.serverless import ServerlessInferenceConfig from autogluon.common.loaders import load_pd from autogluon.common.utils.s3_utils import is_s3_url, s3_path_to_bucket_prefix @@ -349,7 +350,7 @@ def deploy( predictor_path: Optional[str] = None, endpoint_name: Optional[str] = None, framework_version: str = "latest", - instance_type: str = "ml.m5.2xlarge", + instance_type: Optional[str] = "ml.m5.2xlarge", initial_instance_count: int = 1, custom_image_uri: Optional[str] = None, volume_size: Optional[int] = None, @@ -357,6 +358,8 @@ def deploy( model_kwargs: Optional[Dict] = None, deploy_kwargs: Optional[Dict] = None, fm_serve_config: Optional[Dict[str, Any]] = None, + inference_mode: Literal["realtime", "serverless"] = "realtime", + inference_config: Optional[Dict[str, Any]] = None, repack: bool = True, ) -> None: """ @@ -400,6 +403,12 @@ def deploy( Please refer to https://sagemaker.readthedocs.io/en/stable/api/inference/model.html#sagemaker.model.Model.deploy for all options fm_serve_config: Optional[Dict[str, Any]], default = None Configuration dict passed to the FM serve script via the AG_FM_SERVE_CONFIG env var. + inference_mode: {"realtime", "serverless"}, default = "realtime" + Endpoint type. ``"serverless"`` provisions a SageMaker Serverless Inference endpoint + (no instance management, scales to zero). + inference_config: Optional[Dict[str, Any]], default = None + Mode-specific overrides forwarded to `sagemaker.serverless.ServerlessInferenceConfig` + (e.g. ``memory_size_in_mb``, ``max_concurrency``). repack: bool, default = True Whether the SageMaker SDK should download ``predictor_path``, inject the entry-point script, and re-upload it. Set to False when ``predictor_path`` already contains the serve script (e.g. an artifact bundled by @@ -409,6 +418,9 @@ def deploy( assert self.endpoint is None, ( "There is an endpoint already attached. Either detach it with `detach` or clean it up with `cleanup_deployment`" ) + if inference_mode == "serverless" and instance_type is None: + # Needed to infer the container image (CPU vs GPU) downstream — serverless is CPU-only. + instance_type = "ml.m5.2xlarge" if not endpoint_name: endpoint_name = sagemaker.utils.unique_name_from_base(CLOUD_RESOURCE_PREFIX) @@ -494,6 +506,12 @@ def deploy( if fm_serve_config is not None: model_kwargs_env["AG_FM_SERVE_CONFIG"] = json.dumps(fm_serve_config) + if inference_mode == "serverless": + # Serverless containers run with `/` as cwd and a read-only root, so TorchServe's + # default `logs/` path resolves to `/logs` and startup fails. Redirect to /tmp. + model_kwargs_env.setdefault("LOG_LOCATION", "/tmp") + model_kwargs_env.setdefault("METRICS_LOCATION", "/tmp") + model = model_cls( model_data=predictor_path, role=self.role_arn, @@ -510,17 +528,23 @@ def deploy( if deploy_kwargs is None: deploy_kwargs = {} - logger.log(20, "Deploying model to the endpoint") - self.endpoint = SagemakerEndpoint( - model.deploy( - endpoint_name=endpoint_name, - instance_type=instance_type, - initial_instance_count=initial_instance_count, - volume_size=volume_size, - wait=wait, - **deploy_kwargs, - ) - ) + instance_kwargs = { + "instance_type": instance_type, + "initial_instance_count": initial_instance_count, + "volume_size": volume_size, + } + user_config = inference_config or {} + if inference_mode == "realtime": + mode_kwargs = instance_kwargs + elif inference_mode == "serverless": + preset = {"memory_size_in_mb": 4096, "max_concurrency": 5} + mode_kwargs = {"serverless_inference_config": ServerlessInferenceConfig(**{**preset, **user_config})} + else: + raise ValueError(f"Unsupported inference_mode={inference_mode!r}") + + logger.log(20, f"Deploying model to the endpoint (inference_mode={inference_mode})") + predictor = model.deploy(endpoint_name=endpoint_name, wait=wait, **mode_kwargs, **deploy_kwargs) + self.endpoint = SagemakerEndpoint(predictor) def _create_serve_script_tarball(self, serve_script_path: str, endpoint_name: str) -> str: """Create a minimal model.tar.gz containing the serve script + serving_utils/ under code/.""" diff --git a/src/autogluon/cloud/endpoint/prediction_future.py b/src/autogluon/cloud/endpoint/prediction_future.py new file mode 100644 index 00000000..d73bdf4b --- /dev/null +++ b/src/autogluon/cloud/endpoint/prediction_future.py @@ -0,0 +1,52 @@ +"""Pending-prediction handle for job-backed inference (e.g. ``predict(wait=False)``).""" + +from __future__ import annotations + +from typing import TYPE_CHECKING, Callable, Literal + +import pandas as pd + +from ..utils.ag_sagemaker import AutoGluonSagemakerEstimator + +if TYPE_CHECKING: + from ..job.sagemaker_job import SageMakerFitJob + +PredictionStatus = Literal["InProgress", "Completed", "Failed"] + + +class JobPredictionFuture: + """Pending result from a SageMaker job (e.g. ``predict(wait=False)``). + + Wraps the underlying job and exposes a small future-like surface: ``output_path``, + ``status()``, and ``result()``. + """ + + def __init__(self, job: "SageMakerFitJob", result_loader: Callable[[], pd.DataFrame]) -> None: + self._job = job + self._result_loader = result_loader + + @property + def output_path(self) -> str: + return self._job.get_output_path() or "" + + @property + def job_name(self) -> str: + return self._job.job_name + + def status(self) -> PredictionStatus: + raw = self._job.get_job_status() + if raw == "Completed": + return "Completed" + if raw in ("Failed", "Stopped"): + return "Failed" + return "InProgress" + + def result(self) -> pd.DataFrame: + if not self._job.completed: + AutoGluonSagemakerEstimator.attach(self._job.job_name, sagemaker_session=self._job.session).logs() + if self.status() == "Failed": + raise RuntimeError( + f"Prediction job {self._job.job_name!r} did not complete successfully " + f"(status={self._job.get_job_status()!r}). Check the SageMaker console for details." + ) + return self._result_loader() diff --git a/src/autogluon/cloud/model/foundation_model.py b/src/autogluon/cloud/model/foundation_model.py index 4429240b..0e0cade3 100644 --- a/src/autogluon/cloud/model/foundation_model.py +++ b/src/autogluon/cloud/model/foundation_model.py @@ -16,6 +16,7 @@ from ..backend.backend_factory import BackendFactory from ..backend.constant import SAGEMAKER, TABULAR_SAGEMAKER, TIMESERIES_SAGEMAKER +from ..endpoint.prediction_future import JobPredictionFuture from ..endpoint.timeseries_endpoint import TimeSeriesEndpoint from ..scripts.script_manager import ScriptManager from ..utils.aws_utils import resolve_cloud_output_path @@ -181,10 +182,14 @@ def _deploy_backend( framework_version: str = "latest", custom_image_uri: Optional[str] = None, wait: bool = True, + inference_mode: Literal["realtime", "serverless"] = "realtime", + inference_config: Optional[Dict[str, Any]] = None, **backend_kwargs, ) -> None: """Shared deploy logic. Subclasses call this then wrap the endpoint.""" - if instance_type is None: + if inference_mode == "serverless" and instance_type is not None: + raise ValueError("`instance_type` must not be set when `inference_mode='serverless'`.") + if instance_type is None and inference_mode == "realtime": instance_type = self._config.deploy_instance_type merged_hp = self._get_hyperparameters("inference", hyperparameters) @@ -218,6 +223,8 @@ def _deploy_backend( wait=wait, model_kwargs=model_kwargs, fm_serve_config=fm_serve_config, + inference_mode=inference_mode, + inference_config=inference_config, repack=False, **backend_kwargs, ) @@ -383,19 +390,20 @@ def deploy( framework_version: str = "latest", custom_image_uri: Optional[str] = None, wait: bool = True, + inference_mode: Literal["realtime", "serverless"] = "realtime", + inference_config: Optional[Dict[str, Any]] = None, **backend_kwargs, ) -> TimeSeriesEndpoint: """ - Deploy model to a real-time endpoint. + Deploy model to an inference endpoint. Parameters ---------- instance_type - Instance type for the endpoint. - If None, will use the default from the model registry. + Instance type for the endpoint. Defaults to the model registry value. Must be ``None`` + when ``inference_mode="serverless"``. endpoint_name - Custom endpoint name. - If None, will auto-generate a unique name. + Custom endpoint name. If None, will auto-generate a unique name. hyperparameters Model hyperparameters for inference. Overrides values passed to the constructor. framework_version @@ -404,13 +412,15 @@ def deploy( Custom Docker image URI for the inference container. wait Whether to block until the endpoint is ready. + inference_mode + Endpoint type. ``"serverless"`` provisions a SageMaker Serverless Inference endpoint + (no instance management, scales to zero). + inference_config + Mode-specific overrides forwarded to ``sagemaker.serverless.ServerlessInferenceConfig`` + (e.g. ``memory_size_in_mb``, ``max_concurrency``). **backend_kwargs Backend-specific arguments (e.g., initial_instance_count, volume_size, model_kwargs, deploy_kwargs). - - Returns - ------- - TimeSeriesEndpoint """ self._deploy_backend( instance_type=instance_type, @@ -419,6 +429,8 @@ def deploy( framework_version=framework_version, custom_image_uri=custom_image_uri, wait=wait, + inference_mode=inference_mode, + inference_config=inference_config, **backend_kwargs, ) return TimeSeriesEndpoint(self._backend.endpoint) @@ -463,7 +475,7 @@ def predict( wait: bool = True, predictions_path: Optional[str] = None, **backend_kwargs, - ) -> Optional[pd.DataFrame]: + ) -> Union[pd.DataFrame, JobPredictionFuture]: """ Run batch prediction for time series. @@ -498,7 +510,9 @@ def predict( custom_image_uri Custom Docker image URI for the container. wait - If True, block and return DataFrame. If False, return the job handle. + If True, block and return a DataFrame. If False, return a + :class:`JobPredictionFuture` immediately — call ``.result()`` on it later to + retrieve the DataFrame, or ``.status()`` to check progress. predictions_path S3 URL where predictions will be written by the prediction job (e.g. ``s3://my-bucket/runs/2024-05-01/predictions.csv``). The container's SageMaker execution @@ -512,7 +526,8 @@ def predict( Returns ------- - Optional[pd.DataFrame] + pd.DataFrame or JobPredictionFuture + DataFrame if ``wait=True``; a :class:`JobPredictionFuture` otherwise. """ if instance_type is None: instance_type = self._config.predict_instance_type @@ -549,8 +564,10 @@ def predict( ) if not wait: - # TODO: return a handle that supports polling status and fetching results - return None + return JobPredictionFuture( + job=self._backend._fit_job, + result_loader=self._backend.get_fit_predict_results, + ) return self._backend.get_fit_predict_results() diff --git a/src/autogluon/cloud/predictor/cloud_predictor.py b/src/autogluon/cloud/predictor/cloud_predictor.py index cca40479..b7d5a28e 100644 --- a/src/autogluon/cloud/predictor/cloud_predictor.py +++ b/src/autogluon/cloud/predictor/cloud_predictor.py @@ -8,7 +8,7 @@ from abc import ABC, abstractmethod from datetime import datetime from pathlib import Path -from typing import Any, Dict, Optional, Tuple, Union +from typing import Any, Dict, Literal, Optional, Tuple, Union import boto3 import pandas as pd @@ -388,15 +388,17 @@ def deploy( predictor_path: Optional[str] = None, endpoint_name: Optional[str] = None, framework_version: str = "latest", - instance_type: str = "ml.m5.2xlarge", + instance_type: Optional[str] = None, initial_instance_count: int = 1, custom_image_uri: Optional[str] = None, volume_size: Optional[int] = None, wait: bool = True, + inference_mode: Literal["realtime", "serverless"] = "realtime", + inference_config: Optional[Dict[str, Any]] = None, backend_kwargs: Optional[Dict] = None, ) -> None: """ - Deploy a predictor to an endpoint, which can be used to do real-time inference later. + Deploy a predictor to an inference endpoint. Parameters ---------- @@ -412,10 +414,12 @@ def deploy( If `latest`, will use the latest available container version. If provided a specific version, will use this version. If `custom_image_uri` is set, this argument will be ignored. - instance_type: str, default = 'ml.m5.2xlarge' - Instance to be deployed for the endpoint + instance_type: Optional[str], default = None + Instance to be deployed for the endpoint. Defaults to ``ml.m5.2xlarge``. Must be ``None`` + when ``inference_mode="serverless"``. initial_instance_count: int, default = 1, - Initial number of instances to be deployed for the endpoint + Initial number of instances to be deployed for the endpoint. Ignored when + ``inference_mode="serverless"``. custom_image_uri: Optional[str], default = None, Custom image to use to deploy endpoint with. If not specified, with use official DLC image: @@ -426,6 +430,12 @@ def deploy( wait: Bool, default = True, Whether to wait for the endpoint to be deployed. To be noticed, the function won't return immediately because there are some preparations needed prior deployment. + inference_mode: {"realtime", "serverless"}, default = "realtime" + Endpoint type. ``"serverless"`` provisions a SageMaker Serverless Inference endpoint + (no instance management, scales to zero). + inference_config: Optional[Dict[str, Any]], default = None + Mode-specific overrides forwarded to ``sagemaker.serverless.ServerlessInferenceConfig`` + (e.g. ``memory_size_in_mb``, ``max_concurrency``). backend_kwargs: dict, default = None Any extra arguments needed to pass to the underneath backend. For SageMaker backend, valid keys are: @@ -436,6 +446,10 @@ def deploy( Any extra arguments needed to pass to deploy. Please refer to https://sagemaker.readthedocs.io/en/stable/api/inference/model.html#sagemaker.model.Model.deploy for all options """ + if inference_mode == "serverless" and instance_type is not None: + raise ValueError("`instance_type` must not be set when `inference_mode='serverless'`.") + if instance_type is None and inference_mode == "realtime": + instance_type = "ml.m5.2xlarge" if backend_kwargs is None: backend_kwargs = {} backend_kwargs = self.backend.parse_backend_deploy_kwargs(backend_kwargs) @@ -448,6 +462,8 @@ def deploy( custom_image_uri=custom_image_uri, volume_size=volume_size, wait=wait, + inference_mode=inference_mode, + inference_config=inference_config, **backend_kwargs, ) diff --git a/tests/unittests/general/test_inference_modes.py b/tests/unittests/general/test_inference_modes.py new file mode 100644 index 00000000..e6bea7ce --- /dev/null +++ b/tests/unittests/general/test_inference_modes.py @@ -0,0 +1,62 @@ +"""Verify that ``inference_mode`` translates to the right ``sagemaker.Model.deploy(...)`` kwargs.""" + +from unittest import mock + +import pytest +from sagemaker.serverless import ServerlessInferenceConfig + +from autogluon.cloud.backend.sagemaker_backend import SagemakerBackend + + +@pytest.fixture +def deploy_kwargs(): + """Run ``SagemakerBackend.deploy(...)`` with AWS calls and the SDK Model class mocked, + and return the kwargs that reached ``model.deploy(...)``.""" + sb = "autogluon.cloud.backend.sagemaker_backend" + with ( + mock.patch(f"{sb}.setup_sagemaker_session", return_value=mock.MagicMock(boto_region_name="us-east-1")), + mock.patch(f"{sb}.resolve_execution_role", return_value="arn:aws:iam::000000000000:role/test"), + mock.patch(f"{sb}.AutoGluonNonRepackInferenceModel") as model_cls, + mock.patch.object(SagemakerBackend, "_create_serve_script_tarball", return_value="s3://stub/m.tar.gz"), + ): + backend = SagemakerBackend( + local_output_path="/tmp/test", + cloud_output_path="s3://bucket/run", + predictor_type="timeseries", + ) + backend._fit_job = None # deploy a serve-script tarball, not a fit-job artifact + + def run(**kwargs): + backend.endpoint = None # allow re-deploy across cases + backend.deploy(endpoint_name="ep", model_kwargs={"entry_point": "stub.py"}, **kwargs) + return model_cls.return_value.deploy.call_args.kwargs + + yield run + + +def test_when_inference_mode_realtime_then_instance_kwargs_are_passed(deploy_kwargs): + captured = deploy_kwargs(instance_type="ml.m5.xlarge", initial_instance_count=2) + assert captured["instance_type"] == "ml.m5.xlarge" + assert captured["initial_instance_count"] == 2 + assert "serverless_inference_config" not in captured + + +def test_when_inference_mode_serverless_then_preset_serverless_config_is_used(deploy_kwargs): + captured = deploy_kwargs(inference_mode="serverless") + cfg = captured["serverless_inference_config"] + assert isinstance(cfg, ServerlessInferenceConfig) + assert cfg.memory_size_in_mb == 4096 + assert cfg.max_concurrency == 5 + assert "instance_type" not in captured + + +def test_when_inference_config_provided_then_user_values_override_preset(deploy_kwargs): + captured = deploy_kwargs(inference_mode="serverless", inference_config={"memory_size_in_mb": 8192}) + cfg = captured["serverless_inference_config"] + assert cfg.memory_size_in_mb == 8192 + assert cfg.max_concurrency == 5 # preset wins for keys the user didn't override + + +def test_when_inference_mode_is_unknown_then_value_error_is_raised(deploy_kwargs): + with pytest.raises(ValueError, match="Unsupported inference_mode"): + deploy_kwargs(inference_mode="batch") diff --git a/tests/unittests/timeseries/test_timeseries.py b/tests/unittests/timeseries/test_timeseries.py index 026d6c3b..67a608a8 100644 --- a/tests/unittests/timeseries/test_timeseries.py +++ b/tests/unittests/timeseries/test_timeseries.py @@ -210,6 +210,40 @@ def test_foundation_model_predict(test_helper, framework_version, retail_sales_d assert head["ContentLength"] > 0, "predictions file on S3 should not be empty" +def test_foundation_model_cache_artifact_then_deploy_serverless(test_helper, framework_version, retail_sales_dataset): + """Cache model artifact to S3, deploy to a serverless endpoint, and verify predictions.""" + ds = retail_sales_dataset + timestamp = test_helper.get_utc_timestamp_now() + + with tempfile.TemporaryDirectory() as temp_dir: + os.chdir(temp_dir) + inference_custom_image_uri = test_helper.get_custom_image_uri(framework_version, type="inference", gpu=False) + + cloud_output_path = f"s3://autogluon-cloud-ci/test-fm-cache-serverless/{framework_version}/{timestamp}" + model = FoundationModel("chronos-bolt-tiny", cloud_output_path=cloud_output_path) + cached_model = model.cache_model_artifact(f"{cloud_output_path}/cache") + assert cached_model.model_artifact_uri is not None + assert cached_model.model_artifact_uri.startswith("s3://") + + endpoint = cached_model.deploy( + custom_image_uri=inference_custom_image_uri, + inference_mode="serverless", + inference_config={"memory_size_in_mb": 6144}, + ) + try: + expected_item_ids = sorted(ds["train_data"][ds["id_column"]].unique()) + predictions = endpoint.predict( + data=ds["train_data"], + target=ds["target"], + id_column=ds["id_column"], + timestamp_column=ds["timestamp_column"], + prediction_length=ds["prediction_length"], + ) + _assert_timeseries_predictions(predictions, expected_item_ids, ds["prediction_length"]) + finally: + endpoint.delete_endpoint() + + def test_foundation_model_deploy(test_helper, framework_version, retail_sales_dataset): """Test FoundationModel deploy to a real-time endpoint and predict.""" ds = retail_sales_dataset