diff --git a/src/autogluon/cloud/__init__.py b/src/autogluon/cloud/__init__.py index 3cae57a..8bad79b 100644 --- a/src/autogluon/cloud/__init__.py +++ b/src/autogluon/cloud/__init__.py @@ -1,5 +1,14 @@ import logging +import sagemaker +from packaging.version import Version + +if Version(sagemaker.__version__) >= Version("3.0"): + raise ImportError( + f"SageMaker SDK >= 3.0 is currently not supported (found {sagemaker.__version__}). " + "Please downgrade: pip install -U 'sagemaker<3'" + ) + from autogluon.common.utils.log_utils import _add_stream_handler from .cloud_setup import bootstrap, register, status, teardown diff --git a/src/autogluon/cloud/backend/backend.py b/src/autogluon/cloud/backend/backend.py index 2fc2550..3817b70 100644 --- a/src/autogluon/cloud/backend/backend.py +++ b/src/autogluon/cloud/backend/backend.py @@ -19,7 +19,11 @@ def __init__(self, **kwargs) -> None: @property def cloud_output_path(self) -> str: if not self._cloud_output_path: - raise ValueError(f"cloud_output_path is needed. Please pass it during init of {self.__class__.__name__}") + raise ValueError( + "No `cloud_output_path` was provided and no bucket is configured in " + "~/.autogluon/cloud.yaml. Either pass `cloud_output_path=` explicitly, or run " + "`autogluon.cloud.bootstrap()` / `register(bucket=...)` once to persist a bucket." + ) return self._cloud_output_path def initialize( diff --git a/src/autogluon/cloud/backend/sagemaker_backend.py b/src/autogluon/cloud/backend/sagemaker_backend.py index a781f05..b3c8cb1 100644 --- a/src/autogluon/cloud/backend/sagemaker_backend.py +++ b/src/autogluon/cloud/backend/sagemaker_backend.py @@ -295,7 +295,14 @@ def fit( if extra_ag_args: ag_args.update(extra_ag_args) if ag_args.get("predict_after_fit"): - ag_args.setdefault("predictions_path", f"{self.cloud_output_path}/{job_name}/predictions.csv") + predictions_path = ag_args.get("predictions_path") + if predictions_path is None: + ag_args["predictions_path"] = f"{self.cloud_output_path}/{job_name}/predictions.csv" + elif not is_s3_url(predictions_path) or not predictions_path.endswith((".csv", ".parquet")): + raise ValueError( + f"`predictions_path` must be a full S3 URL ending in '.csv' or '.parquet' " + f"(e.g. 's3://bucket/key/predictions.parquet'), got {predictions_path!r}." + ) ag_args_path = os.path.join(self.local_output_path, "utils", "ag_args.pkl") self.prepare_args(path=ag_args_path, **ag_args) inputs = self._upload_fit_artifact( @@ -639,7 +646,7 @@ def parse_backend_predict_kwargs(self, kwargs: Dict) -> Dict[str, Any]: """Parse backend specific kwargs and get them ready to be sent to predict call""" download = kwargs.get("download", True) persist = kwargs.get("persist", True) - save_path = kwargs.get("persist", None) + save_path = kwargs.get("save_path", None) model_kwargs = kwargs.get("model_kwargs", None) transformer_kwargs = kwargs.get("transformer_kwargs", None) transform_kwargs = kwargs.get("transform_kwargs", None) @@ -954,15 +961,36 @@ def download_predict_results(self, job_name: Optional[str] = None, save_path: Op def get_fit_predict_results(self) -> pd.DataFrame: """Read predictions produced by a completed ``fit_predict`` job from S3.""" - ag_args_path = os.path.join(self.local_output_path, "utils", "ag_args.pkl") - with open(ag_args_path, "rb") as f: - ag_args = pickle.load(f) + ag_args = self._download_ag_args_from_job() predictions_path = ag_args.get("predictions_path") assert predictions_path is not None, "No fit_predict job found. Call `fit_predict()` first." bucket, key = s3_path_to_bucket_prefix(predictions_path) with tempfile.TemporaryDirectory(prefix="ag_fit_predict_") as tmpdir: - self.sagemaker_session.download_data(path=tmpdir, bucket=bucket, key_prefix=key) - return load_pd.load(os.path.join(tmpdir, os.path.basename(key))) + local_path = os.path.join(tmpdir, os.path.basename(key)) + self.sagemaker_session.boto_session.client("s3").download_file(bucket, key, local_path) + return load_pd.load(local_path) + + def _download_ag_args_from_job(self) -> Dict[str, Any]: + """Fetch and unpickle the ``ag_args.pkl`` that was uploaded as the ``ag_args`` channel. + + Each training job carries the exact pickle it was launched with as an input channel, + making this the authoritative source — independent of local-disk lifetime. + """ + job_name = self._fit_job.job_name + assert job_name is not None, "No fit job found. Call `fit()` / `fit_predict()` first." + desc = self.sagemaker_session.describe_training_job(job_name) + channels = {c["ChannelName"]: c["DataSource"]["S3DataSource"]["S3Uri"] for c in desc["InputDataConfig"]} + ag_args_uri = channels.get("ag_args") + assert ag_args_uri is not None, ( + f"Training job {job_name!r} has no `ag_args` input channel — cannot recover predictions_path." + ) + bucket, key = s3_path_to_bucket_prefix(ag_args_uri) + assert key.endswith(".pkl"), f"Expected ag_args channel to point to a .pkl file, got {ag_args_uri!r}" + with tempfile.TemporaryDirectory(prefix="ag_args_") as tmpdir: + local_path = os.path.join(tmpdir, os.path.basename(key)) + self.sagemaker_session.boto_session.client("s3").download_file(bucket, key, local_path) + with open(local_path, "rb") as f: + return pickle.load(f) def _construct_ag_args(self, predictor_init_args, predictor_fit_args, leaderboard, **kwargs): config = dict( diff --git a/src/autogluon/cloud/cloud_setup.py b/src/autogluon/cloud/cloud_setup.py index dd63417..5456940 100644 --- a/src/autogluon/cloud/cloud_setup.py +++ b/src/autogluon/cloud/cloud_setup.py @@ -132,6 +132,12 @@ def register( """ if backend not in SUPPORTED_BACKENDS: raise ValueError(f"Unsupported backend {backend!r}. Choose from {SUPPORTED_BACKENDS}.") + bucket = bucket.removeprefix("s3://").rstrip("/") + if "/" in bucket: + raise ValueError( + f"`bucket` must be a bare bucket name without prefixes (got {bucket!r}). " + "Pass prefixes via `cloud_output_path=` on the predictor/model instead." + ) config = load_config() or CloudConfig() config.backends[backend] = BackendConfig( region=region, diff --git a/src/autogluon/cloud/model/foundation_model.py b/src/autogluon/cloud/model/foundation_model.py index 6298dfa..2f384ff 100644 --- a/src/autogluon/cloud/model/foundation_model.py +++ b/src/autogluon/cloud/model/foundation_model.py @@ -13,6 +13,7 @@ from ..backend.constant import SAGEMAKER, TABULAR_SAGEMAKER, TIMESERIES_SAGEMAKER from ..endpoint.timeseries_endpoint import TimeSeriesEndpoint from ..scripts.script_manager import ScriptManager +from ..utils.aws_utils import resolve_cloud_output_path from .registry import get_model_config @@ -59,7 +60,14 @@ def __init__( backend Cloud backend to use. cloud_output_path - S3 path to store intermediate artifacts. + S3 location where intermediate artifacts are stored. Accepts: + + * ``s3://bucket`` — a unique timestamped subfolder ``ag-`` is appended. + * ``s3://bucket/prefix`` — used verbatim. Re-running with the same prefix + will overwrite previously written artifacts. + * ``None`` (default) — use the bucket saved in ``~/.autogluon/cloud.yaml`` (set + by :func:`autogluon.cloud.bootstrap` / :func:`autogluon.cloud.register`) and + append a timestamped subfolder. Raises if no bucket is configured. hyperparameters Default hyperparameters applied to inference and (when supported) training. role @@ -68,7 +76,7 @@ def __init__( :func:`autogluon.cloud.register`), and finally to ``sagemaker.get_execution_role()``. """ self.model_id = model_id - self.cloud_output_path = cloud_output_path + self.cloud_output_path = resolve_cloud_output_path(cloud_output_path, backend_name=backend) self._config = get_model_config(model_id) self._hyperparameter_overrides = hyperparameters or {} self._tmpdir = tempfile.TemporaryDirectory(prefix="ag_fm_") @@ -82,7 +90,7 @@ def __init__( self._backend = BackendFactory.get_backend( backend=backend_name, local_output_path=self._tmpdir.name, - cloud_output_path=cloud_output_path, + cloud_output_path=self.cloud_output_path, predictor_type=self._predictor_type, role=role, ) @@ -321,6 +329,7 @@ def predict( framework_version: str = "latest", custom_image_uri: Optional[str] = None, wait: bool = True, + predictions_path: Optional[str] = None, **backend_kwargs, ) -> Optional[pd.DataFrame]: """ @@ -358,6 +367,13 @@ def predict( Custom Docker image URI for the container. wait If True, block and return DataFrame. If False, return the job handle. + predictions_path + S3 URL where predictions will be written by the prediction job (e.g. + ``s3://my-bucket/runs/2024-05-01/predictions.csv``). The container's SageMaker execution + role must have ``s3:PutObject`` permission for this location. Defaults to + ``{cloud_output_path}/{job_name}/predictions.csv``. Predictions use AutoGluon's canonical + column names ``item_id`` and ``timestamp``, regardless of the ``id_column`` / + ``timestamp_column`` passed in. **backend_kwargs Additional backend-specific arguments (e.g., job_name, volume_size, autogluon_sagemaker_estimator_kwargs). @@ -382,6 +398,10 @@ def predict( "static_features": static_features, } + extra_ag_args: Dict[str, Any] = {"predict_after_fit": True} + if predictions_path is not None: + extra_ag_args["predictions_path"] = predictions_path + self._backend.fit( predictor_init_args=predictor_init_args, predictor_fit_args=predictor_fit_args, @@ -392,7 +412,7 @@ def predict( instance_type=instance_type, custom_image_uri=custom_image_uri, wait=wait, - extra_ag_args={"predict_after_fit": True}, + extra_ag_args=extra_ag_args, **backend_kwargs, ) diff --git a/src/autogluon/cloud/predictor/cloud_predictor.py b/src/autogluon/cloud/predictor/cloud_predictor.py index 8ab8871..cca4047 100644 --- a/src/autogluon/cloud/predictor/cloud_predictor.py +++ b/src/autogluon/cloud/predictor/cloud_predictor.py @@ -12,7 +12,6 @@ import boto3 import pandas as pd -import sagemaker from autogluon.common.loaders import load_pkl from autogluon.common.savers import save_pkl @@ -24,6 +23,7 @@ from ..backend.backend_factory import BackendFactory from ..backend.constant import SAGEMAKER from ..endpoint.endpoint import Endpoint +from ..utils.aws_utils import resolve_cloud_output_path from ..utils.utils import unzip_file logger = logging.getLogger(__name__) @@ -52,13 +52,15 @@ def __init__( you must specify different `local_output_path` locations or don't specify `local_output_path` at all. Otherwise files from first `fit()` will be overwritten by second `fit()`. cloud_output_path: Optional[str], default = None - Path to s3 location where intermediate artifacts will be uploaded and trained models should be saved. - This has to be provided because s3 buckets are unique globally, so it is hard to create one for you. - If you only provided the bucket but not the subfolder, a time-stamped folder called "YOUR_BUCKET/ag-[TIMESTAMP]" will be created. - If you provided both the bucket and the subfolder, then we will use that instead. - Note: To call `fit()` twice and save all results of each fit, - you must either specify different `cloud_output_path` locations or only provide the bucket but not the subfolder. - Otherwise files from first `fit()` will be overwritten by second `fit()`. + S3 location where intermediate artifacts and trained models are stored. Accepts: + + * ``s3://bucket`` — a unique timestamped subfolder ``ag-`` is appended, + so each call gets its own folder and repeated runs don't overwrite each other. + * ``s3://bucket/prefix`` — used verbatim. Re-running with the same prefix will + overwrite previously written artifacts. + * ``None`` (default) — use the bucket saved in ``~/.autogluon/cloud.yaml`` (set + by :func:`autogluon.cloud.bootstrap` / :func:`autogluon.cloud.register`) and + append a timestamped subfolder. Raises if no bucket is configured. backend: str, default = "sagemaker" The backend to use. Valid options are: "sagemaker" and "ray_aws". SageMaker backend supports training, deploying and batch inference on AWS SageMaker. Only single instance training is supported. @@ -77,7 +79,7 @@ def __init__( cloud_logger = logging.getLogger("autogluon.cloud") set_logger_verbosity(self.verbosity, logger=cloud_logger) self.local_output_path = self._setup_local_output_path(local_output_path) - self.cloud_output_path = self._setup_cloud_output_path(cloud_output_path) + self.cloud_output_path = resolve_cloud_output_path(cloud_output_path, backend_name=backend) self.backend: Backend = BackendFactory.get_backend( backend=self.backend_map[backend], local_output_path=self.local_output_path, @@ -157,24 +159,6 @@ def _setup_local_output_path(self, path): ) return os.path.abspath(path) - def _setup_cloud_output_path(self, path): - if not path: - return path - if path.endswith("/"): - path = path[:-1] - path_cleaned = path - try: - path_cleaned = path.split("://", 1)[1] - except Exception: - pass - path_split = path_cleaned.split("/", 1) - # If user only provided the bucket, we create a subfolder with timestamp for them - if len(path_split) == 1: - path = os.path.join(path, f"ag-{sagemaker.utils.sagemaker_timestamp()}") - if is_s3_url(path): - return path - return "s3://" + path - def fit( self, train_data: Optional[Union[str, Path, pd.DataFrame]] = None, diff --git a/src/autogluon/cloud/predictor/timeseries_cloud_predictor.py b/src/autogluon/cloud/predictor/timeseries_cloud_predictor.py index 4a8591c..3afad3b 100644 --- a/src/autogluon/cloud/predictor/timeseries_cloud_predictor.py +++ b/src/autogluon/cloud/predictor/timeseries_cloud_predictor.py @@ -7,8 +7,6 @@ import pandas as pd -from autogluon.common.utils.s3_utils import is_s3_url - from ..backend.constant import SAGEMAKER, TIMESERIES_SAGEMAKER from .cloud_predictor import CloudPredictor @@ -391,12 +389,6 @@ def fit_predict( Optional[pd.DataFrame] Predictions as a DataFrame. Returns ``None`` when ``wait`` is False. """ - if predictions_path is not None: - if not is_s3_url(predictions_path) or not predictions_path.endswith((".csv", ".parquet")): - raise ValueError( - f"`predictions_path` must be a full S3 URL ending in '.csv' or '.parquet' " - f"(e.g. 's3://bucket/key/predictions.parquet'), got {predictions_path!r}." - ) if backend_kwargs is None: backend_kwargs = {} else: diff --git a/src/autogluon/cloud/utils/aws_utils.py b/src/autogluon/cloud/utils/aws_utils.py index 00a7471..8b9a276 100644 --- a/src/autogluon/cloud/utils/aws_utils.py +++ b/src/autogluon/cloud/utils/aws_utils.py @@ -5,6 +5,8 @@ import sagemaker from botocore.config import Config +from autogluon.common.utils.s3_utils import is_s3_url + from ..config import load_config logger = logging.getLogger(__name__) @@ -25,11 +27,71 @@ def resolve_execution_role(role: Optional[str], backend_name: str) -> str: if config is not None: entry = config.backends.get(backend_name) if entry is not None and entry.role_arn: - logger.log(20, f"Using execution role from ~/.autogluon/cloud.yaml: {entry.role_arn}") + logger.info(f"Using execution role from ~/.autogluon/cloud.yaml: {entry.role_arn}") return entry.role_arn return sagemaker.get_execution_role() +def resolve_cloud_output_path(path: Optional[str], backend_name: str) -> Optional[str]: + """Resolve the S3 location where AutoGluon-Cloud will read/write artifacts. + + Resolution order for the bucket: + + 1. ``path`` argument if provided (``s3://bucket`` or ``s3://bucket/prefix``). + 2. ``bucket`` from ``~/.autogluon/cloud.yaml`` under the matching backend slot. + + Prefix behavior: + + * Bucket only (no prefix) — a unique timestamped subfolder ``ag-`` is appended. + Each call gets its own folder, so repeated runs don't overwrite each other. + * Bucket and prefix — the path is used verbatim. Re-running with the same prefix + will overwrite previously written artifacts; pick a fresh prefix per run if you + want them kept side by side. + + Returns ``None`` if no path is given and no bucket is configured. Callers that + require a path (e.g. ``fit()``) should check and raise at the point of use. + """ + if path is None: + config = load_config() + entry = config.backends.get(backend_name) if config is not None else None + if entry is None or not entry.bucket: + return None + path = f"s3://{entry.bucket}" + logger.info(f"Using bucket from ~/.autogluon/cloud.yaml: {entry.bucket}") + + path = path.rstrip("/") + if not is_s3_url(path): + path = "s3://" + path + body = path[len("s3://") :] + bucket, _, prefix = body.partition("/") + if not prefix: + path = f"s3://{bucket}/ag-{sagemaker.utils.sagemaker_timestamp()}" + logger.info(f"cloud_output_path set to {path} (timestamped subfolder under bucket).") + else: + logger.info(f"cloud_output_path set to {path}.") + if _s3_prefix_has_objects(bucket, prefix): + logger.warning( + f"cloud_output_path {path} already contains objects. Running fit()/deploy() " + "will overwrite the existing artifacts. Pass a fresh prefix, or pass just " + "`s3://` to get a unique timestamped subfolder." + ) + return path + + +def _s3_prefix_has_objects(bucket: str, prefix: str) -> bool: + """Return True if any object exists under ``s3://bucket/prefix``. + + Swallows errors (missing credentials, AccessDenied, NoSuchBucket) and returns False — + we use this only for an advisory warning, so it must never break construction. + """ + try: + response = boto3.client("s3").list_objects_v2(Bucket=bucket, Prefix=prefix, MaxKeys=1) + return response.get("KeyCount", 0) > 0 + except Exception as e: + logger.debug(f"Skipping cloud_output_path emptiness check ({type(e).__name__}: {e})") + return False + + def get_latest_amazon_linux_ami(region="us-east-1", version="al2023"): ec2_client = boto3.client("ec2", region_name=region) filters = [ diff --git a/tests/unittests/general/test_aws_utils.py b/tests/unittests/general/test_aws_utils.py index 4a3d508..db60ecc 100644 --- a/tests/unittests/general/test_aws_utils.py +++ b/tests/unittests/general/test_aws_utils.py @@ -9,7 +9,7 @@ CloudConfig, save_config, ) -from autogluon.cloud.utils.aws_utils import resolve_execution_role +from autogluon.cloud.utils.aws_utils import resolve_cloud_output_path, resolve_execution_role @pytest.fixture(autouse=True) @@ -81,3 +81,114 @@ def test_falls_back_to_env_when_backend_missing_in_config(): ) as mock_env: assert resolve_execution_role(None, backend_name="sagemaker") == env_role mock_env.assert_called_once() + + +def _save_bucket_in_config(backend_name: str, bucket: str) -> None: + save_config( + CloudConfig( + backends={ + backend_name: BackendConfig( + region="us-east-1", + role_arn="arn:aws:iam::111111111111:role/r", + bucket=bucket, + stack_name=None, + ) + } + ) + ) + + +@pytest.fixture +def no_s3_check(monkeypatch): + """Stub the S3 emptiness check so tests don't make AWS calls.""" + monkeypatch.setattr( + "autogluon.cloud.utils.aws_utils._s3_prefix_has_objects", + lambda bucket, prefix: False, + ) + + +def test_resolve_path_bucket_only_appends_timestamp(no_s3_check): + resolved = resolve_cloud_output_path("s3://my-bucket", backend_name="sagemaker") + assert resolved.startswith("s3://my-bucket/ag-") + assert resolved.count("/") == 3 # s3://my-bucket/ag- + + +def test_resolve_path_bucket_only_no_scheme_appends_timestamp(no_s3_check): + resolved = resolve_cloud_output_path("my-bucket", backend_name="sagemaker") + assert resolved.startswith("s3://my-bucket/ag-") + + +def test_resolve_path_bucket_with_prefix_used_verbatim(no_s3_check): + resolved = resolve_cloud_output_path("s3://my-bucket/exp-1", backend_name="sagemaker") + assert resolved == "s3://my-bucket/exp-1" + + +def test_resolve_path_strips_trailing_slash_then_appends_timestamp(no_s3_check): + resolved = resolve_cloud_output_path("s3://my-bucket/", backend_name="sagemaker") + assert resolved.startswith("s3://my-bucket/ag-") + + +def test_resolve_path_uses_bucket_from_config_when_path_is_none(no_s3_check): + _save_bucket_in_config("sagemaker", "config-bucket") + resolved = resolve_cloud_output_path(None, backend_name="sagemaker") + assert resolved.startswith("s3://config-bucket/ag-") + + +def test_resolve_path_returns_none_when_path_none_and_no_config(no_s3_check): + assert resolve_cloud_output_path(None, backend_name="sagemaker") is None + + +def test_resolve_path_returns_none_when_backend_not_in_config(no_s3_check): + _save_bucket_in_config("ray_aws", "config-bucket") + assert resolve_cloud_output_path(None, backend_name="sagemaker") is None + + +def _capture_aws_utils_logs(level: int) -> tuple[list[logging.LogRecord], logging.Handler]: + records: list[logging.LogRecord] = [] + handler = logging.Handler() + handler.setLevel(level) + handler.emit = records.append + aws_logger = logging.getLogger("autogluon.cloud.utils.aws_utils") + aws_logger.addHandler(handler) + aws_logger.setLevel(level) + return records, handler + + +def test_resolve_path_warns_when_prefix_already_has_objects(monkeypatch): + monkeypatch.setattr( + "autogluon.cloud.utils.aws_utils._s3_prefix_has_objects", + lambda bucket, prefix: True, + ) + records, handler = _capture_aws_utils_logs(logging.WARNING) + try: + resolved = resolve_cloud_output_path("s3://my-bucket/exp-1", backend_name="sagemaker") + finally: + logging.getLogger("autogluon.cloud.utils.aws_utils").removeHandler(handler) + assert resolved == "s3://my-bucket/exp-1" + assert any("already contains objects" in r.getMessage() for r in records) + + +def test_resolve_path_no_warning_when_prefix_empty(no_s3_check): + records, handler = _capture_aws_utils_logs(logging.WARNING) + try: + resolve_cloud_output_path("s3://my-bucket/exp-1", backend_name="sagemaker") + finally: + logging.getLogger("autogluon.cloud.utils.aws_utils").removeHandler(handler) + assert not any("already contains objects" in r.getMessage() for r in records) + + +def test_resolve_path_no_warning_for_bucket_only(monkeypatch): + # Even if list_objects_v2 returns truthy, bucket-only inputs build a fresh timestamped + # prefix that did not exist a moment ago, so we should not run the emptiness check on it. + called = [] + monkeypatch.setattr( + "autogluon.cloud.utils.aws_utils._s3_prefix_has_objects", + lambda bucket, prefix: called.append((bucket, prefix)) or True, + ) + records, handler = _capture_aws_utils_logs(logging.WARNING) + try: + resolve_cloud_output_path("s3://my-bucket", backend_name="sagemaker") + finally: + logging.getLogger("autogluon.cloud.utils.aws_utils").removeHandler(handler) + assert called == [] + assert not any("already contains objects" in r.getMessage() for r in records) diff --git a/tests/unittests/general/test_cloud_setup.py b/tests/unittests/general/test_cloud_setup.py index 23917a7..3d85465 100644 --- a/tests/unittests/general/test_cloud_setup.py +++ b/tests/unittests/general/test_cloud_setup.py @@ -92,6 +92,25 @@ def test_register_rejects_unknown_backend(): ) +def test_register_rejects_bucket_with_prefix(): + with pytest.raises(ValueError, match="bare bucket name"): + register( + role="arn:aws:iam::111122223333:role/x", + bucket="my-bucket/some-prefix", + region="us-east-1", + ) + + +@pytest.mark.parametrize("bucket", ["s3://my-bucket", "s3://my-bucket/", "my-bucket/"]) +def test_register_normalizes_bucket(bucket): + register( + role="arn:aws:iam::111122223333:role/x", + bucket=bucket, + region="us-east-1", + ) + assert load_config().backends["sagemaker"].bucket == "my-bucket" + + def test_register_records_stack_name_when_given(): register( role="arn:aws:iam::111122223333:role/x", diff --git a/tests/unittests/general/test_full_functionality.py b/tests/unittests/general/test_full_functionality.py index 3774921..6e5c3fd 100644 --- a/tests/unittests/general/test_full_functionality.py +++ b/tests/unittests/general/test_full_functionality.py @@ -23,7 +23,7 @@ def test_full_functionality(test_helper, framework_version): predictor_init_args = dict(label="class", eval_metric="roc_auc") predictor_fit_args = dict(time_limit=time_limit) dummy_cloud_predictor = TabularCloudPredictor() - with pytest.raises(ValueError): + with pytest.raises(ValueError, match="No `cloud_output_path` was provided"): dummy_cloud_predictor.fit( train_data=train_data, predictor_init_args=predictor_init_args, diff --git a/tests/unittests/timeseries/test_timeseries.py b/tests/unittests/timeseries/test_timeseries.py index e5361d3..026d6c3 100644 --- a/tests/unittests/timeseries/test_timeseries.py +++ b/tests/unittests/timeseries/test_timeseries.py @@ -175,16 +175,22 @@ def test_timeseries_fit_predict_chronos( def test_foundation_model_predict(test_helper, framework_version, retail_sales_dataset): """Test FoundationModel batch prediction via the fit_predict training job pattern.""" + import boto3 + ds = retail_sales_dataset timestamp = test_helper.get_utc_timestamp_now() + bucket = "autogluon-cloud-ci" + predictions_key = f"test-fm-predict/{framework_version}/{timestamp}/custom_predictions.parquet" + predictions_path = f"s3://{bucket}/{predictions_key}" + with tempfile.TemporaryDirectory() as temp_dir: os.chdir(temp_dir) expected_item_ids = sorted(ds["train_data"][ds["id_column"]].unique()) model = FoundationModel( "chronos-2", - cloud_output_path=f"s3://autogluon-cloud-ci/test-fm-predict/{framework_version}/{timestamp}", + cloud_output_path=f"s3://{bucket}/test-fm-predict/{framework_version}/{timestamp}", ) predictions = model.predict( @@ -195,10 +201,14 @@ def test_foundation_model_predict(test_helper, framework_version, retail_sales_d prediction_length=ds["prediction_length"], known_covariates=ds["known_covariates"], instance_type="ml.m5.2xlarge", + predictions_path=predictions_path, ) _assert_timeseries_predictions(predictions, expected_item_ids, ds["prediction_length"]) + head = boto3.client("s3").head_object(Bucket=bucket, Key=predictions_key) + assert head["ContentLength"] > 0, "predictions file on S3 should not be empty" + def test_foundation_model_deploy(test_helper, framework_version, retail_sales_dataset): """Test FoundationModel deploy to a real-time endpoint and predict."""