Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ def default_setup_args(*, version):
return setup_args


version = "0.4.2"
version = "0.5.0"
version = update_version(version, use_file_if_exists=False, create_file=True)

install_requires = [
Expand Down
50 changes: 37 additions & 13 deletions src/autogluon/cloud/backend/sagemaker_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,13 @@
import shutil
import tarfile
import tempfile
from typing import Any, Dict, List, Optional, Tuple, Union
from typing import Any, Dict, List, Literal, Optional, Tuple, Union

import pandas as pd
import sagemaker
from botocore.exceptions import ClientError
from sagemaker import Predictor
from sagemaker.serverless import ServerlessInferenceConfig

from autogluon.common.loaders import load_pd
from autogluon.common.utils.s3_utils import is_s3_url, s3_path_to_bucket_prefix
Expand Down Expand Up @@ -349,14 +350,16 @@ def deploy(
predictor_path: Optional[str] = None,
endpoint_name: Optional[str] = None,
framework_version: str = "latest",
instance_type: str = "ml.m5.2xlarge",
instance_type: Optional[str] = "ml.m5.2xlarge",
initial_instance_count: int = 1,
custom_image_uri: Optional[str] = None,
volume_size: Optional[int] = None,
wait: bool = True,
model_kwargs: Optional[Dict] = None,
deploy_kwargs: Optional[Dict] = None,
fm_serve_config: Optional[Dict[str, Any]] = None,
inference_mode: Literal["realtime", "serverless"] = "realtime",
inference_config: Optional[Dict[str, Any]] = None,
repack: bool = True,
) -> None:
"""
Expand Down Expand Up @@ -400,6 +403,12 @@ def deploy(
Please refer to https://sagemaker.readthedocs.io/en/stable/api/inference/model.html#sagemaker.model.Model.deploy for all options
fm_serve_config: Optional[Dict[str, Any]], default = None
Configuration dict passed to the FM serve script via the AG_FM_SERVE_CONFIG env var.
inference_mode: {"realtime", "serverless"}, default = "realtime"
Endpoint type. ``"serverless"`` provisions a SageMaker Serverless Inference endpoint
(no instance management, scales to zero).
inference_config: Optional[Dict[str, Any]], default = None
Mode-specific overrides forwarded to `sagemaker.serverless.ServerlessInferenceConfig`
(e.g. ``memory_size_in_mb``, ``max_concurrency``).
repack: bool, default = True
Whether the SageMaker SDK should download ``predictor_path``, inject the entry-point script, and re-upload
it. Set to False when ``predictor_path`` already contains the serve script (e.g. an artifact bundled by
Expand All @@ -409,6 +418,9 @@ def deploy(
assert self.endpoint is None, (
"There is an endpoint already attached. Either detach it with `detach` or clean it up with `cleanup_deployment`"
)
if inference_mode == "serverless" and instance_type is None:
# Needed to infer the container image (CPU vs GPU) downstream — serverless is CPU-only.
instance_type = "ml.m5.2xlarge"
if not endpoint_name:
endpoint_name = sagemaker.utils.unique_name_from_base(CLOUD_RESOURCE_PREFIX)

Expand Down Expand Up @@ -494,6 +506,12 @@ def deploy(
if fm_serve_config is not None:
model_kwargs_env["AG_FM_SERVE_CONFIG"] = json.dumps(fm_serve_config)

if inference_mode == "serverless":
# Serverless containers run with `/` as cwd and a read-only root, so TorchServe's
# default `logs/` path resolves to `/logs` and startup fails. Redirect to /tmp.
model_kwargs_env.setdefault("LOG_LOCATION", "/tmp")
model_kwargs_env.setdefault("METRICS_LOCATION", "/tmp")

model = model_cls(
model_data=predictor_path,
role=self.role_arn,
Expand All @@ -510,17 +528,23 @@ def deploy(
if deploy_kwargs is None:
deploy_kwargs = {}

logger.log(20, "Deploying model to the endpoint")
self.endpoint = SagemakerEndpoint(
model.deploy(
endpoint_name=endpoint_name,
instance_type=instance_type,
initial_instance_count=initial_instance_count,
volume_size=volume_size,
wait=wait,
**deploy_kwargs,
)
)
instance_kwargs = {
"instance_type": instance_type,
"initial_instance_count": initial_instance_count,
"volume_size": volume_size,
}
user_config = inference_config or {}
if inference_mode == "realtime":
mode_kwargs = instance_kwargs
elif inference_mode == "serverless":
preset = {"memory_size_in_mb": 4096, "max_concurrency": 5}
mode_kwargs = {"serverless_inference_config": ServerlessInferenceConfig(**{**preset, **user_config})}
else:
raise ValueError(f"Unsupported inference_mode={inference_mode!r}")

logger.log(20, f"Deploying model to the endpoint (inference_mode={inference_mode})")
predictor = model.deploy(endpoint_name=endpoint_name, wait=wait, **mode_kwargs, **deploy_kwargs)
self.endpoint = SagemakerEndpoint(predictor)

def _create_serve_script_tarball(self, serve_script_path: str, endpoint_name: str) -> str:
"""Create a minimal model.tar.gz containing the serve script + serving_utils/ under code/."""
Expand Down
52 changes: 52 additions & 0 deletions src/autogluon/cloud/endpoint/prediction_future.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
"""Pending-prediction handle for job-backed inference (e.g. ``predict(wait=False)``)."""

from __future__ import annotations

from typing import TYPE_CHECKING, Callable, Literal

import pandas as pd

from ..utils.ag_sagemaker import AutoGluonSagemakerEstimator

if TYPE_CHECKING:
from ..job.sagemaker_job import SageMakerFitJob

PredictionStatus = Literal["InProgress", "Completed", "Failed"]


class JobPredictionFuture:
"""Pending result from a SageMaker job (e.g. ``predict(wait=False)``).

Wraps the underlying job and exposes a small future-like surface: ``output_path``,
``status()``, and ``result()``.
"""

def __init__(self, job: "SageMakerFitJob", result_loader: Callable[[], pd.DataFrame]) -> None:
self._job = job
self._result_loader = result_loader

@property
def output_path(self) -> str:
return self._job.get_output_path() or ""

@property
def job_name(self) -> str:
return self._job.job_name

def status(self) -> PredictionStatus:
raw = self._job.get_job_status()
if raw == "Completed":
return "Completed"
if raw in ("Failed", "Stopped"):
return "Failed"
return "InProgress"

def result(self) -> pd.DataFrame:
if not self._job.completed:
AutoGluonSagemakerEstimator.attach(self._job.job_name, sagemaker_session=self._job.session).logs()
if self.status() == "Failed":
raise RuntimeError(
f"Prediction job {self._job.job_name!r} did not complete successfully "
f"(status={self._job.get_job_status()!r}). Check the SageMaker console for details."
)
return self._result_loader()
47 changes: 32 additions & 15 deletions src/autogluon/cloud/model/foundation_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

from ..backend.backend_factory import BackendFactory
from ..backend.constant import SAGEMAKER, TABULAR_SAGEMAKER, TIMESERIES_SAGEMAKER
from ..endpoint.prediction_future import JobPredictionFuture
from ..endpoint.timeseries_endpoint import TimeSeriesEndpoint
from ..scripts.script_manager import ScriptManager
from ..utils.aws_utils import resolve_cloud_output_path
Expand Down Expand Up @@ -181,10 +182,14 @@ def _deploy_backend(
framework_version: str = "latest",
custom_image_uri: Optional[str] = None,
wait: bool = True,
inference_mode: Literal["realtime", "serverless"] = "realtime",
inference_config: Optional[Dict[str, Any]] = None,
**backend_kwargs,
) -> None:
"""Shared deploy logic. Subclasses call this then wrap the endpoint."""
if instance_type is None:
if inference_mode == "serverless" and instance_type is not None:
raise ValueError("`instance_type` must not be set when `inference_mode='serverless'`.")
if instance_type is None and inference_mode == "realtime":
instance_type = self._config.deploy_instance_type

merged_hp = self._get_hyperparameters("inference", hyperparameters)
Expand Down Expand Up @@ -218,6 +223,8 @@ def _deploy_backend(
wait=wait,
model_kwargs=model_kwargs,
fm_serve_config=fm_serve_config,
inference_mode=inference_mode,
inference_config=inference_config,
repack=False,
**backend_kwargs,
)
Expand Down Expand Up @@ -383,19 +390,20 @@ def deploy(
framework_version: str = "latest",
custom_image_uri: Optional[str] = None,
wait: bool = True,
inference_mode: Literal["realtime", "serverless"] = "realtime",
inference_config: Optional[Dict[str, Any]] = None,
**backend_kwargs,
) -> TimeSeriesEndpoint:
"""
Deploy model to a real-time endpoint.
Deploy model to an inference endpoint.

Parameters
----------
instance_type
Instance type for the endpoint.
If None, will use the default from the model registry.
Instance type for the endpoint. Defaults to the model registry value. Must be ``None``
when ``inference_mode="serverless"``.
endpoint_name
Custom endpoint name.
If None, will auto-generate a unique name.
Custom endpoint name. If None, will auto-generate a unique name.
hyperparameters
Model hyperparameters for inference. Overrides values passed to the constructor.
framework_version
Expand All @@ -404,13 +412,15 @@ def deploy(
Custom Docker image URI for the inference container.
wait
Whether to block until the endpoint is ready.
inference_mode
Endpoint type. ``"serverless"`` provisions a SageMaker Serverless Inference endpoint
(no instance management, scales to zero).
inference_config
Mode-specific overrides forwarded to ``sagemaker.serverless.ServerlessInferenceConfig``
(e.g. ``memory_size_in_mb``, ``max_concurrency``).
**backend_kwargs
Backend-specific arguments (e.g., initial_instance_count, volume_size,
model_kwargs, deploy_kwargs).

Returns
-------
TimeSeriesEndpoint
"""
self._deploy_backend(
instance_type=instance_type,
Expand All @@ -419,6 +429,8 @@ def deploy(
framework_version=framework_version,
custom_image_uri=custom_image_uri,
wait=wait,
inference_mode=inference_mode,
inference_config=inference_config,
**backend_kwargs,
)
return TimeSeriesEndpoint(self._backend.endpoint)
Expand Down Expand Up @@ -463,7 +475,7 @@ def predict(
wait: bool = True,
predictions_path: Optional[str] = None,
**backend_kwargs,
) -> Optional[pd.DataFrame]:
) -> Union[pd.DataFrame, JobPredictionFuture]:
"""
Run batch prediction for time series.

Expand Down Expand Up @@ -498,7 +510,9 @@ def predict(
custom_image_uri
Custom Docker image URI for the container.
wait
If True, block and return DataFrame. If False, return the job handle.
If True, block and return a DataFrame. If False, return a
:class:`JobPredictionFuture` immediately — call ``.result()`` on it later to
retrieve the DataFrame, or ``.status()`` to check progress.
predictions_path
S3 URL where predictions will be written by the prediction job (e.g.
``s3://my-bucket/runs/2024-05-01/predictions.csv``). The container's SageMaker execution
Expand All @@ -512,7 +526,8 @@ def predict(

Returns
-------
Optional[pd.DataFrame]
pd.DataFrame or JobPredictionFuture
DataFrame if ``wait=True``; a :class:`JobPredictionFuture` otherwise.
"""
if instance_type is None:
instance_type = self._config.predict_instance_type
Expand Down Expand Up @@ -549,8 +564,10 @@ def predict(
)

if not wait:
# TODO: return a handle that supports polling status and fetching results
return None
return JobPredictionFuture(
job=self._backend._fit_job,
result_loader=self._backend.get_fit_predict_results,
)

return self._backend.get_fit_predict_results()

Expand Down
28 changes: 22 additions & 6 deletions src/autogluon/cloud/predictor/cloud_predictor.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from abc import ABC, abstractmethod
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, Optional, Tuple, Union
from typing import Any, Dict, Literal, Optional, Tuple, Union

import boto3
import pandas as pd
Expand Down Expand Up @@ -388,15 +388,17 @@ def deploy(
predictor_path: Optional[str] = None,
endpoint_name: Optional[str] = None,
framework_version: str = "latest",
instance_type: str = "ml.m5.2xlarge",
instance_type: Optional[str] = None,
initial_instance_count: int = 1,
custom_image_uri: Optional[str] = None,
volume_size: Optional[int] = None,
wait: bool = True,
inference_mode: Literal["realtime", "serverless"] = "realtime",
inference_config: Optional[Dict[str, Any]] = None,
backend_kwargs: Optional[Dict] = None,
) -> None:
"""
Deploy a predictor to an endpoint, which can be used to do real-time inference later.
Deploy a predictor to an inference endpoint.

Parameters
----------
Expand All @@ -412,10 +414,12 @@ def deploy(
If `latest`, will use the latest available container version.
If provided a specific version, will use this version.
If `custom_image_uri` is set, this argument will be ignored.
instance_type: str, default = 'ml.m5.2xlarge'
Instance to be deployed for the endpoint
instance_type: Optional[str], default = None
Instance to be deployed for the endpoint. Defaults to ``ml.m5.2xlarge``. Must be ``None``
when ``inference_mode="serverless"``.
initial_instance_count: int, default = 1,
Initial number of instances to be deployed for the endpoint
Initial number of instances to be deployed for the endpoint. Ignored when
``inference_mode="serverless"``.
custom_image_uri: Optional[str], default = None,
Custom image to use to deploy endpoint with.
If not specified, with use official DLC image:
Expand All @@ -426,6 +430,12 @@ def deploy(
wait: Bool, default = True,
Whether to wait for the endpoint to be deployed.
To be noticed, the function won't return immediately because there are some preparations needed prior deployment.
inference_mode: {"realtime", "serverless"}, default = "realtime"
Endpoint type. ``"serverless"`` provisions a SageMaker Serverless Inference endpoint
(no instance management, scales to zero).
inference_config: Optional[Dict[str, Any]], default = None
Mode-specific overrides forwarded to ``sagemaker.serverless.ServerlessInferenceConfig``
(e.g. ``memory_size_in_mb``, ``max_concurrency``).
backend_kwargs: dict, default = None
Any extra arguments needed to pass to the underneath backend.
For SageMaker backend, valid keys are:
Expand All @@ -436,6 +446,10 @@ def deploy(
Any extra arguments needed to pass to deploy.
Please refer to https://sagemaker.readthedocs.io/en/stable/api/inference/model.html#sagemaker.model.Model.deploy for all options
"""
if inference_mode == "serverless" and instance_type is not None:
raise ValueError("`instance_type` must not be set when `inference_mode='serverless'`.")
if instance_type is None and inference_mode == "realtime":
instance_type = "ml.m5.2xlarge"
if backend_kwargs is None:
backend_kwargs = {}
backend_kwargs = self.backend.parse_backend_deploy_kwargs(backend_kwargs)
Expand All @@ -448,6 +462,8 @@ def deploy(
custom_image_uri=custom_image_uri,
volume_size=volume_size,
wait=wait,
inference_mode=inference_mode,
inference_config=inference_config,
**backend_kwargs,
)

Expand Down
Loading
Loading