Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions src/autogluon/cloud/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,14 @@
import logging

import sagemaker
from packaging.version import Version

if Version(sagemaker.__version__) >= Version("3.0"):
raise ImportError(
f"SageMaker SDK >= 3.0 is currently not supported (found {sagemaker.__version__}). "
"Please downgrade: pip install -U 'sagemaker<3'"
)

from autogluon.common.utils.log_utils import _add_stream_handler

from .cloud_setup import bootstrap, register, status, teardown
Expand Down
6 changes: 5 additions & 1 deletion src/autogluon/cloud/backend/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,11 @@ def __init__(self, **kwargs) -> None:
@property
def cloud_output_path(self) -> str:
if not self._cloud_output_path:
raise ValueError(f"cloud_output_path is needed. Please pass it during init of {self.__class__.__name__}")
raise ValueError(
"No `cloud_output_path` was provided and no bucket is configured in "
"~/.autogluon/cloud.yaml. Either pass `cloud_output_path=` explicitly, or run "
"`autogluon.cloud.bootstrap()` / `register(bucket=...)` once to persist a bucket."
)
return self._cloud_output_path

def initialize(
Expand Down
42 changes: 35 additions & 7 deletions src/autogluon/cloud/backend/sagemaker_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,14 @@ def fit(
if extra_ag_args:
ag_args.update(extra_ag_args)
if ag_args.get("predict_after_fit"):
ag_args.setdefault("predictions_path", f"{self.cloud_output_path}/{job_name}/predictions.csv")
predictions_path = ag_args.get("predictions_path")
if predictions_path is None:
ag_args["predictions_path"] = f"{self.cloud_output_path}/{job_name}/predictions.csv"
elif not is_s3_url(predictions_path) or not predictions_path.endswith((".csv", ".parquet")):
raise ValueError(
f"`predictions_path` must be a full S3 URL ending in '.csv' or '.parquet' "
f"(e.g. 's3://bucket/key/predictions.parquet'), got {predictions_path!r}."
)
ag_args_path = os.path.join(self.local_output_path, "utils", "ag_args.pkl")
self.prepare_args(path=ag_args_path, **ag_args)
inputs = self._upload_fit_artifact(
Expand Down Expand Up @@ -639,7 +646,7 @@ def parse_backend_predict_kwargs(self, kwargs: Dict) -> Dict[str, Any]:
"""Parse backend specific kwargs and get them ready to be sent to predict call"""
download = kwargs.get("download", True)
persist = kwargs.get("persist", True)
save_path = kwargs.get("persist", None)
save_path = kwargs.get("save_path", None)
model_kwargs = kwargs.get("model_kwargs", None)
transformer_kwargs = kwargs.get("transformer_kwargs", None)
transform_kwargs = kwargs.get("transform_kwargs", None)
Expand Down Expand Up @@ -954,15 +961,36 @@ def download_predict_results(self, job_name: Optional[str] = None, save_path: Op

def get_fit_predict_results(self) -> pd.DataFrame:
"""Read predictions produced by a completed ``fit_predict`` job from S3."""
ag_args_path = os.path.join(self.local_output_path, "utils", "ag_args.pkl")
with open(ag_args_path, "rb") as f:
ag_args = pickle.load(f)
ag_args = self._download_ag_args_from_job()
predictions_path = ag_args.get("predictions_path")
assert predictions_path is not None, "No fit_predict job found. Call `fit_predict()` first."
bucket, key = s3_path_to_bucket_prefix(predictions_path)
with tempfile.TemporaryDirectory(prefix="ag_fit_predict_") as tmpdir:
self.sagemaker_session.download_data(path=tmpdir, bucket=bucket, key_prefix=key)
return load_pd.load(os.path.join(tmpdir, os.path.basename(key)))
local_path = os.path.join(tmpdir, os.path.basename(key))
self.sagemaker_session.boto_session.client("s3").download_file(bucket, key, local_path)
return load_pd.load(local_path)

def _download_ag_args_from_job(self) -> Dict[str, Any]:
"""Fetch and unpickle the ``ag_args.pkl`` that was uploaded as the ``ag_args`` channel.

Each training job carries the exact pickle it was launched with as an input channel,
making this the authoritative source — independent of local-disk lifetime.
"""
job_name = self._fit_job.job_name
assert job_name is not None, "No fit job found. Call `fit()` / `fit_predict()` first."
desc = self.sagemaker_session.describe_training_job(job_name)
channels = {c["ChannelName"]: c["DataSource"]["S3DataSource"]["S3Uri"] for c in desc["InputDataConfig"]}
ag_args_uri = channels.get("ag_args")
assert ag_args_uri is not None, (
f"Training job {job_name!r} has no `ag_args` input channel — cannot recover predictions_path."
)
bucket, key = s3_path_to_bucket_prefix(ag_args_uri)
assert key.endswith(".pkl"), f"Expected ag_args channel to point to a .pkl file, got {ag_args_uri!r}"
with tempfile.TemporaryDirectory(prefix="ag_args_") as tmpdir:
local_path = os.path.join(tmpdir, os.path.basename(key))
self.sagemaker_session.boto_session.client("s3").download_file(bucket, key, local_path)
with open(local_path, "rb") as f:
return pickle.load(f)

def _construct_ag_args(self, predictor_init_args, predictor_fit_args, leaderboard, **kwargs):
config = dict(
Expand Down
6 changes: 6 additions & 0 deletions src/autogluon/cloud/cloud_setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,12 @@ def register(
"""
if backend not in SUPPORTED_BACKENDS:
raise ValueError(f"Unsupported backend {backend!r}. Choose from {SUPPORTED_BACKENDS}.")
bucket = bucket.removeprefix("s3://").rstrip("/")
if "/" in bucket:
raise ValueError(
f"`bucket` must be a bare bucket name without prefixes (got {bucket!r}). "
"Pass prefixes via `cloud_output_path=` on the predictor/model instead."
)
config = load_config() or CloudConfig()
config.backends[backend] = BackendConfig(
region=region,
Expand Down
28 changes: 24 additions & 4 deletions src/autogluon/cloud/model/foundation_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from ..backend.constant import SAGEMAKER, TABULAR_SAGEMAKER, TIMESERIES_SAGEMAKER
from ..endpoint.timeseries_endpoint import TimeSeriesEndpoint
from ..scripts.script_manager import ScriptManager
from ..utils.aws_utils import resolve_cloud_output_path
from .registry import get_model_config


Expand Down Expand Up @@ -59,7 +60,14 @@ def __init__(
backend
Cloud backend to use.
cloud_output_path
S3 path to store intermediate artifacts.
S3 location where intermediate artifacts are stored. Accepts:

* ``s3://bucket`` — a unique timestamped subfolder ``ag-<timestamp>`` is appended.
* ``s3://bucket/prefix`` — used verbatim. Re-running with the same prefix
will overwrite previously written artifacts.
* ``None`` (default) — use the bucket saved in ``~/.autogluon/cloud.yaml`` (set
by :func:`autogluon.cloud.bootstrap` / :func:`autogluon.cloud.register`) and
append a timestamped subfolder. Raises if no bucket is configured.
hyperparameters
Default hyperparameters applied to inference and (when supported) training.
role
Expand All @@ -68,7 +76,7 @@ def __init__(
:func:`autogluon.cloud.register`), and finally to ``sagemaker.get_execution_role()``.
"""
self.model_id = model_id
self.cloud_output_path = cloud_output_path
self.cloud_output_path = resolve_cloud_output_path(cloud_output_path, backend_name=backend)
self._config = get_model_config(model_id)
self._hyperparameter_overrides = hyperparameters or {}
self._tmpdir = tempfile.TemporaryDirectory(prefix="ag_fm_")
Expand All @@ -82,7 +90,7 @@ def __init__(
self._backend = BackendFactory.get_backend(
backend=backend_name,
local_output_path=self._tmpdir.name,
cloud_output_path=cloud_output_path,
cloud_output_path=self.cloud_output_path,
predictor_type=self._predictor_type,
role=role,
)
Expand Down Expand Up @@ -321,6 +329,7 @@ def predict(
framework_version: str = "latest",
custom_image_uri: Optional[str] = None,
wait: bool = True,
predictions_path: Optional[str] = None,
**backend_kwargs,
) -> Optional[pd.DataFrame]:
"""
Expand Down Expand Up @@ -358,6 +367,13 @@ def predict(
Custom Docker image URI for the container.
wait
If True, block and return DataFrame. If False, return the job handle.
predictions_path
S3 URL where predictions will be written by the prediction job (e.g.
``s3://my-bucket/runs/2024-05-01/predictions.csv``). The container's SageMaker execution
role must have ``s3:PutObject`` permission for this location. Defaults to
``{cloud_output_path}/{job_name}/predictions.csv``. Predictions use AutoGluon's canonical
column names ``item_id`` and ``timestamp``, regardless of the ``id_column`` /
``timestamp_column`` passed in.
**backend_kwargs
Additional backend-specific arguments (e.g., job_name, volume_size,
autogluon_sagemaker_estimator_kwargs).
Expand All @@ -382,6 +398,10 @@ def predict(
"static_features": static_features,
}

extra_ag_args: Dict[str, Any] = {"predict_after_fit": True}
if predictions_path is not None:
extra_ag_args["predictions_path"] = predictions_path

self._backend.fit(
predictor_init_args=predictor_init_args,
predictor_fit_args=predictor_fit_args,
Expand All @@ -392,7 +412,7 @@ def predict(
instance_type=instance_type,
custom_image_uri=custom_image_uri,
wait=wait,
extra_ag_args={"predict_after_fit": True},
extra_ag_args=extra_ag_args,
**backend_kwargs,
)

Expand Down
38 changes: 11 additions & 27 deletions src/autogluon/cloud/predictor/cloud_predictor.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@

import boto3
import pandas as pd
import sagemaker

from autogluon.common.loaders import load_pkl
from autogluon.common.savers import save_pkl
Expand All @@ -24,6 +23,7 @@
from ..backend.backend_factory import BackendFactory
from ..backend.constant import SAGEMAKER
from ..endpoint.endpoint import Endpoint
from ..utils.aws_utils import resolve_cloud_output_path
from ..utils.utils import unzip_file

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -52,13 +52,15 @@ def __init__(
you must specify different `local_output_path` locations or don't specify `local_output_path` at all.
Otherwise files from first `fit()` will be overwritten by second `fit()`.
cloud_output_path: Optional[str], default = None
Path to s3 location where intermediate artifacts will be uploaded and trained models should be saved.
This has to be provided because s3 buckets are unique globally, so it is hard to create one for you.
If you only provided the bucket but not the subfolder, a time-stamped folder called "YOUR_BUCKET/ag-[TIMESTAMP]" will be created.
If you provided both the bucket and the subfolder, then we will use that instead.
Note: To call `fit()` twice and save all results of each fit,
you must either specify different `cloud_output_path` locations or only provide the bucket but not the subfolder.
Otherwise files from first `fit()` will be overwritten by second `fit()`.
S3 location where intermediate artifacts and trained models are stored. Accepts:

* ``s3://bucket`` — a unique timestamped subfolder ``ag-<timestamp>`` is appended,
so each call gets its own folder and repeated runs don't overwrite each other.
* ``s3://bucket/prefix`` — used verbatim. Re-running with the same prefix will
overwrite previously written artifacts.
* ``None`` (default) — use the bucket saved in ``~/.autogluon/cloud.yaml`` (set
by :func:`autogluon.cloud.bootstrap` / :func:`autogluon.cloud.register`) and
append a timestamped subfolder. Raises if no bucket is configured.
backend: str, default = "sagemaker"
The backend to use. Valid options are: "sagemaker" and "ray_aws".
SageMaker backend supports training, deploying and batch inference on AWS SageMaker. Only single instance training is supported.
Expand All @@ -77,7 +79,7 @@ def __init__(
cloud_logger = logging.getLogger("autogluon.cloud")
set_logger_verbosity(self.verbosity, logger=cloud_logger)
self.local_output_path = self._setup_local_output_path(local_output_path)
self.cloud_output_path = self._setup_cloud_output_path(cloud_output_path)
self.cloud_output_path = resolve_cloud_output_path(cloud_output_path, backend_name=backend)
self.backend: Backend = BackendFactory.get_backend(
backend=self.backend_map[backend],
local_output_path=self.local_output_path,
Expand Down Expand Up @@ -157,24 +159,6 @@ def _setup_local_output_path(self, path):
)
return os.path.abspath(path)

def _setup_cloud_output_path(self, path):
if not path:
return path
if path.endswith("/"):
path = path[:-1]
path_cleaned = path
try:
path_cleaned = path.split("://", 1)[1]
except Exception:
pass
path_split = path_cleaned.split("/", 1)
# If user only provided the bucket, we create a subfolder with timestamp for them
if len(path_split) == 1:
path = os.path.join(path, f"ag-{sagemaker.utils.sagemaker_timestamp()}")
if is_s3_url(path):
return path
return "s3://" + path

def fit(
self,
train_data: Optional[Union[str, Path, pd.DataFrame]] = None,
Expand Down
8 changes: 0 additions & 8 deletions src/autogluon/cloud/predictor/timeseries_cloud_predictor.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@

import pandas as pd

from autogluon.common.utils.s3_utils import is_s3_url

from ..backend.constant import SAGEMAKER, TIMESERIES_SAGEMAKER
from .cloud_predictor import CloudPredictor

Expand Down Expand Up @@ -391,12 +389,6 @@ def fit_predict(
Optional[pd.DataFrame]
Predictions as a DataFrame. Returns ``None`` when ``wait`` is False.
"""
if predictions_path is not None:
if not is_s3_url(predictions_path) or not predictions_path.endswith((".csv", ".parquet")):
raise ValueError(
f"`predictions_path` must be a full S3 URL ending in '.csv' or '.parquet' "
f"(e.g. 's3://bucket/key/predictions.parquet'), got {predictions_path!r}."
)
if backend_kwargs is None:
backend_kwargs = {}
else:
Expand Down
64 changes: 63 additions & 1 deletion src/autogluon/cloud/utils/aws_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
import sagemaker
from botocore.config import Config

from autogluon.common.utils.s3_utils import is_s3_url

from ..config import load_config

logger = logging.getLogger(__name__)
Expand All @@ -25,11 +27,71 @@ def resolve_execution_role(role: Optional[str], backend_name: str) -> str:
if config is not None:
entry = config.backends.get(backend_name)
if entry is not None and entry.role_arn:
logger.log(20, f"Using execution role from ~/.autogluon/cloud.yaml: {entry.role_arn}")
logger.info(f"Using execution role from ~/.autogluon/cloud.yaml: {entry.role_arn}")
return entry.role_arn
return sagemaker.get_execution_role()


def resolve_cloud_output_path(path: Optional[str], backend_name: str) -> Optional[str]:
"""Resolve the S3 location where AutoGluon-Cloud will read/write artifacts.

Resolution order for the bucket:

1. ``path`` argument if provided (``s3://bucket`` or ``s3://bucket/prefix``).
2. ``bucket`` from ``~/.autogluon/cloud.yaml`` under the matching backend slot.

Prefix behavior:

* Bucket only (no prefix) — a unique timestamped subfolder ``ag-<timestamp>`` is appended.
Each call gets its own folder, so repeated runs don't overwrite each other.
* Bucket and prefix — the path is used verbatim. Re-running with the same prefix
will overwrite previously written artifacts; pick a fresh prefix per run if you
want them kept side by side.

Returns ``None`` if no path is given and no bucket is configured. Callers that
require a path (e.g. ``fit()``) should check and raise at the point of use.
"""
if path is None:
config = load_config()
entry = config.backends.get(backend_name) if config is not None else None
if entry is None or not entry.bucket:
return None
path = f"s3://{entry.bucket}"
logger.info(f"Using bucket from ~/.autogluon/cloud.yaml: {entry.bucket}")

path = path.rstrip("/")
if not is_s3_url(path):
path = "s3://" + path
body = path[len("s3://") :]
bucket, _, prefix = body.partition("/")
if not prefix:
path = f"s3://{bucket}/ag-{sagemaker.utils.sagemaker_timestamp()}"
logger.info(f"cloud_output_path set to {path} (timestamped subfolder under bucket).")
else:
logger.info(f"cloud_output_path set to {path}.")
if _s3_prefix_has_objects(bucket, prefix):
logger.warning(
f"cloud_output_path {path} already contains objects. Running fit()/deploy() "
"will overwrite the existing artifacts. Pass a fresh prefix, or pass just "
"`s3://<bucket>` to get a unique timestamped subfolder."
)
return path


def _s3_prefix_has_objects(bucket: str, prefix: str) -> bool:
"""Return True if any object exists under ``s3://bucket/prefix``.

Swallows errors (missing credentials, AccessDenied, NoSuchBucket) and returns False —
we use this only for an advisory warning, so it must never break construction.
"""
try:
response = boto3.client("s3").list_objects_v2(Bucket=bucket, Prefix=prefix, MaxKeys=1)
return response.get("KeyCount", 0) > 0
except Exception as e:
logger.debug(f"Skipping cloud_output_path emptiness check ({type(e).__name__}: {e})")
return False


def get_latest_amazon_linux_ami(region="us-east-1", version="al2023"):
ec2_client = boto3.client("ec2", region_name=region)
filters = [
Expand Down
Loading
Loading