Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 16 additions & 3 deletions src/autogluon/cloud/backend/sagemaker_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import logging
import os
import pickle
import shutil
import tarfile
import tempfile
from typing import Any, Dict, List, Optional, Tuple, Union
Expand Down Expand Up @@ -505,12 +506,13 @@ def deploy(
)

def _create_serve_script_tarball(self, serve_script_path: str, endpoint_name: str) -> str:
"""Create a minimal model.tar.gz containing only the serve script under code/."""
"""Create a minimal model.tar.gz containing the serve script + serving_utils/ under code/."""

tarball_dir = tempfile.mkdtemp(prefix="ag_serve_")
tarball_path = os.path.join(tarball_dir, "model.tar.gz")
with tarfile.open(tarball_path, "w:gz") as tar:
tar.add(serve_script_path, arcname=f"code/{os.path.basename(serve_script_path)}")
tar.add(ScriptManager.SAGEMAKER_SERVING_UTILS_DIR, arcname="code/serving_utils")
s3_key = f"endpoints/{endpoint_name}/model/model.tar.gz"
s3_path = self._upload_predictor(tarball_path, s3_key)
return s3_path
Expand Down Expand Up @@ -1027,8 +1029,8 @@ def _upload_fit_artifact(
inputs["ag_args"] = self.sagemaker_session.upload_data(
path=ag_args, bucket=cloud_bucket, key_prefix=util_key_prefix
)
inputs["serving"] = self.sagemaker_session.upload_data(
path=serving_script, bucket=cloud_bucket, key_prefix=util_key_prefix
inputs["serving"] = self._upload_serving_files(
entry_point=serving_script, bucket=cloud_bucket, key_prefix=util_key_prefix
)

train_images_input = self._upload_fit_image_artifact(
Expand All @@ -1044,6 +1046,17 @@ def _upload_fit_artifact(

return inputs

def _upload_serving_files(self, entry_point: str, bucket: str, key_prefix: str) -> str:
staging_dir = tempfile.mkdtemp(prefix="ag_serving_")
try:
shutil.copy(entry_point, os.path.join(staging_dir, os.path.basename(entry_point)))
shutil.copytree(ScriptManager.SAGEMAKER_SERVING_UTILS_DIR, os.path.join(staging_dir, "serving_utils"))
return self.sagemaker_session.upload_data(
path=staging_dir, bucket=bucket, key_prefix=key_prefix + "/serving"
)
finally:
shutil.rmtree(staging_dir, ignore_errors=True)

def _upload_fit_image_artifact(self, image_dir_path, bucket, key_prefix):
upload_image_path = None
if image_dir_path is not None:
Expand Down
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
"""Serde helpers used by the timeseries serve scripts."""

import base64
import json
from io import BytesIO
from typing import Any, Dict, Optional, Tuple

import pandas as pd

from autogluon.timeseries import TimeSeriesDataFrame
from autogluon.timeseries.utils.forecast import make_future_data_frame

ParsedPayload = Tuple[TimeSeriesDataFrame, Optional[TimeSeriesDataFrame], Dict[str, Any]]


def parse_payload(
request_body,
content_type: str,
*,
id_column: str = "item_id",
timestamp_column: str = "timestamp",
target_column: str = "target",
) -> ParsedPayload:
"""Parse a request body into ``(past_data, known_covariates, inference_kwargs)``."""
# SageMaker passes ``str`` for text content types (csv/json) and ``bytes`` for binary ones.
if isinstance(request_body, str):
request_body = request_body.encode()
if content_type == "application/x-autogluon":
return _parse_x_autogluon(request_body, id_column=id_column, timestamp_column=timestamp_column)
elif content_type == "application/json":
return _parse_jumpstart(request_body, target_column=target_column)
elif content_type == "application/x-parquet":
data = pd.read_parquet(BytesIO(request_body))
elif content_type == "text/csv":
data = pd.read_csv(BytesIO(request_body))
elif content_type == "application/jsonl":
data = pd.read_json(BytesIO(request_body), orient="records", lines=True)
else:
raise ValueError(f"{content_type} input content type not supported.")
tsdf = TimeSeriesDataFrame.from_data_frame(data, id_column=id_column, timestamp_column=timestamp_column)
return tsdf, None, {}


def _parse_x_autogluon(request_body: bytes, *, id_column: str, timestamp_column: str) -> ParsedPayload:
payload = json.loads(request_body)
if payload.get("version") != 1:
raise ValueError(f"Unsupported x-autogluon payload version: {payload.get('version')}. Expected 1.")

inference_kwargs = payload.get("inference_kwargs") or {}
id_column = inference_kwargs.pop("id_column", id_column)
timestamp_column = inference_kwargs.pop("timestamp_column", timestamp_column)

static_features = _decode_parquet(payload.get("static_features"))
tsdf = TimeSeriesDataFrame.from_data_frame(
_decode_parquet(payload["data"]),
id_column=id_column,
timestamp_column=timestamp_column,
static_features_df=static_features,
)

known_covariates_df = _decode_parquet(payload.get("known_covariates"))
if known_covariates_df is not None:
known_covariates = TimeSeriesDataFrame.from_data_frame(
known_covariates_df, id_column=id_column, timestamp_column=timestamp_column
)
else:
known_covariates = None

return tsdf, known_covariates, inference_kwargs


def _parse_jumpstart(request_body: bytes, *, target_column: str = "target") -> ParsedPayload:
payload = json.loads(request_body)
inputs = payload["inputs"]
inference_kwargs = payload.get("parameters") or {}
freq = inference_kwargs.get("freq", "D")
prediction_length = inference_kwargs.get("prediction_length", 1)

past_df = pd.concat(
[
pd.DataFrame(
{
"item_id": ts.get("item_id", str(i)),
"timestamp": pd.date_range(
start=pd.Timestamp(ts.get("start", "2020-01-01")), periods=len(ts["target"]), freq=freq
),
target_column: ts["target"],
**(ts.get("past_covariates") or {}),
}
)
for i, ts in enumerate(inputs)
],
ignore_index=True,
)
tsdf = TimeSeriesDataFrame.from_data_frame(past_df)

if any("future_covariates" in ts for ts in inputs):
future_index_df = make_future_data_frame(tsdf, prediction_length=prediction_length, freq=freq)
future_values = pd.concat([pd.DataFrame(ts["future_covariates"]) for ts in inputs], ignore_index=True)
known_covariates = TimeSeriesDataFrame.from_data_frame(pd.concat([future_index_df, future_values], axis=1))
else:
known_covariates = None

return tsdf, known_covariates, inference_kwargs


def _decode_parquet(b64: Optional[str]) -> Optional[pd.DataFrame]:
if b64 is None:
return None
else:
return pd.read_parquet(BytesIO(base64.b64decode(b64)))


def render_response(predictions: TimeSeriesDataFrame, accept: str) -> Tuple[Any, str]:
"""Serialize predictions per the request's ``Accept`` header."""
if "application/json" in accept:
return _render_jumpstart(predictions), "application/json"
df = pd.DataFrame(predictions).reset_index()
df.columns = df.columns.astype(str)
if "application/x-parquet" in accept:
return df.to_parquet(), "application/x-parquet"
elif "text/csv" in accept:
return df.to_csv(index=False), "text/csv"
else:
raise ValueError(f"{accept} content type not supported")


def _render_jumpstart(predictions: TimeSeriesDataFrame) -> bytes:
forecast_list = []
for item_id, group in predictions.groupby("item_id", sort=False):
forecast = {col: group[col].tolist() for col in group.columns}
forecast["item_id"] = str(item_id)
forecast["start"] = group.index.get_level_values("timestamp")[0].isoformat()
forecast_list.append(forecast)
return json.dumps({"predictions": forecast_list}).encode("utf-8")
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,18 @@
{"model_name": "Chronos", "hyperparameters": {"model_path": "amazon/chronos-bolt-base", ...}}
"""

import base64
import json
import os
from io import BytesIO

import numpy as np
import pandas as pd
from serving_utils.timeseries import parse_payload, render_response

from autogluon.timeseries import TimeSeriesDataFrame
from autogluon.timeseries.models import ModelRegistry

_SERVE_CONFIG = json.loads(os.environ.get("AG_SERVE_CONFIG", "{}"))
_SUPPORTED_INPUT_CONTENT_TYPES = {"application/x-autogluon", "application/json"}


def model_fn(model_dir):
Expand Down Expand Up @@ -43,72 +43,20 @@ def model_fn(model_dir):
return model


def _parse_autogluon_payload(request_body):
"""Parse x-autogluon payload. Returns (data, known_covariates, inference_kwargs)."""
payload = json.loads(request_body)
if payload.get("version") != 1:
raise ValueError(f"Unsupported x-autogluon payload version: {payload.get('version')}. Expected 1.")
inference_kwargs = payload.get("inference_kwargs") or {}

try:
id_column = inference_kwargs.pop("id_column")
timestamp_column = inference_kwargs.pop("timestamp_column")
except KeyError as e:
raise ValueError(f"`application/x-autogluon` payload must include {e.args[0]!r} in inference_kwargs.") from e

data = pd.read_parquet(BytesIO(base64.b64decode(payload["data"])))
static_features = payload.get("static_features")
if static_features is not None:
static_features = pd.read_parquet(BytesIO(base64.b64decode(static_features)))

tsdf = TimeSeriesDataFrame.from_data_frame(
data, id_column=id_column, timestamp_column=timestamp_column, static_features_df=static_features
)

known_covariates = payload.get("known_covariates")
if known_covariates is not None:
known_covariates = TimeSeriesDataFrame.from_data_frame(
pd.read_parquet(BytesIO(base64.b64decode(known_covariates))),
id_column=id_column,
timestamp_column=timestamp_column,
)

return tsdf, known_covariates, inference_kwargs


def transform_fn(model, request_body, input_content_type, output_content_type="application/x-parquet"):
def transform_fn(model, request_body, input_content_type, output_content_type="application/json"):
"""Run inference with per-request prediction_length, quantile_levels, etc."""
if input_content_type == "application/x-autogluon":
tsdf, known_covariates, inference_kwargs = _parse_autogluon_payload(request_body)
elif input_content_type == "application/json":
raise NotImplementedError("JumpStart JSON input schema is not yet supported")
else:
raise ValueError(f"{input_content_type} input content type not supported.")

target = inference_kwargs.pop("target", "target")
prediction_length = inference_kwargs.pop("prediction_length", 1)
quantile_levels = inference_kwargs.pop("quantile_levels", None)
if input_content_type not in _SUPPORTED_INPUT_CONTENT_TYPES:
raise ValueError(
f"{input_content_type} input content type not supported. "
f"Supported: {sorted(_SUPPORTED_INPUT_CONTENT_TYPES)}"
)
tsdf, known_covariates, inference_kwargs = parse_payload(request_body, input_content_type)

model.target = target
model.prediction_length = prediction_length
if quantile_levels is not None:
model.quantile_levels = sorted(quantile_levels)
model.target = inference_kwargs.get("target", "target")
model.freq = inference_kwargs.get("freq", "D")
model.prediction_length = inference_kwargs.get("prediction_length", 1)
if "quantile_levels" in inference_kwargs:
model.quantile_levels = sorted(inference_kwargs["quantile_levels"])

predictions = model.predict(tsdf, known_covariates=known_covariates)
predictions = predictions.to_data_frame().reset_index()

# Serialize response — output_content_type may be a comma-separated accept list
if "application/x-parquet" in output_content_type:
predictions.columns = predictions.columns.astype(str)
output = predictions.to_parquet()
output_content_type = "application/x-parquet"
elif "text/csv" in output_content_type:
output = predictions.to_csv()
output_content_type = "text/csv"
elif "application/json" in output_content_type:
output = predictions.to_json()
output_content_type = "application/json"
else:
raise ValueError(f"{output_content_type} content type not supported")

return output, output_content_type
return render_response(predictions, output_content_type)
89 changes: 13 additions & 76 deletions src/autogluon/cloud/scripts/sagemaker_scripts/timeseries_serve.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
# flake8: noqa
import base64
import json
import os
import shutil
from io import BytesIO, StringIO

import pandas as pd
from autogluon.timeseries import TimeSeriesPredictor

from autogluon.timeseries import TimeSeriesDataFrame, TimeSeriesPredictor
from serving_utils.timeseries import parse_payload, render_response


def model_fn(model_dir):
Expand Down Expand Up @@ -36,76 +34,15 @@ def model_fn(model_dir):
return model


def _parse_autogluon_payload(request_body, *, id_column, timestamp_column):
"""Parse x-autogluon payload. Returns (data, known_covariates, inference_kwargs)."""
payload = json.loads(request_body)
if payload.get("version") != 1:
raise ValueError(f"Unsupported x-autogluon payload version: {payload.get('version')}. Expected 1.")
inference_kwargs = payload.get("inference_kwargs") or {}

data = pd.read_parquet(BytesIO(base64.b64decode(payload["data"])))
static_features = payload.get("static_features")
if static_features is not None:
static_features = pd.read_parquet(BytesIO(base64.b64decode(static_features)))

tsdf = TimeSeriesDataFrame.from_data_frame(
data, id_column=id_column, timestamp_column=timestamp_column, static_features_df=static_features
)

known_covariates = payload.get("known_covariates")
if known_covariates is not None:
known_covariates = TimeSeriesDataFrame.from_data_frame(
pd.read_parquet(BytesIO(base64.b64decode(known_covariates))),
id_column=id_column,
timestamp_column=timestamp_column,
)

return tsdf, known_covariates, inference_kwargs


def _parse_simple_payload(request_body, content_type, *, id_column, timestamp_column):
"""Parse plain parquet/csv/json payloads using the column names recorded at fit time."""
if content_type == "application/x-parquet":
data = pd.read_parquet(BytesIO(request_body))
elif content_type == "text/csv":
data = pd.read_csv(StringIO(request_body))
elif content_type == "application/json":
data = pd.read_json(StringIO(request_body))
elif content_type == "application/jsonl":
data = pd.read_json(StringIO(request_body), orient="records", lines=True)
else:
raise ValueError(f"{content_type} input content type not supported.")

tsdf = TimeSeriesDataFrame.from_data_frame(data, id_column=id_column, timestamp_column=timestamp_column)
return tsdf, None, {}


def transform_fn(model, request_body, input_content_type, output_content_type="application/json"):
id_column = model._id_column
timestamp_column = model._timestamp_column
if input_content_type == "application/x-autogluon":
tsdf, known_covariates, inference_kwargs = _parse_autogluon_payload(
request_body, id_column=id_column, timestamp_column=timestamp_column
)
else:
tsdf, known_covariates, inference_kwargs = _parse_simple_payload(
request_body, input_content_type, id_column=id_column, timestamp_column=timestamp_column
)

prediction = model.predict(tsdf, known_covariates=known_covariates, **inference_kwargs)
prediction = pd.DataFrame(prediction)

if "application/x-parquet" in output_content_type:
prediction.columns = prediction.columns.astype(str)
output = prediction.to_parquet()
output_content_type = "application/x-parquet"
elif "application/json" in output_content_type:
output = prediction.to_json()
output_content_type = "application/json"
elif "text/csv" in output_content_type:
output = prediction.to_csv(index=None)
output_content_type = "text/csv"
else:
raise ValueError(f"{output_content_type} content type not supported")

return output, output_content_type
# prediction_length / quantile_levels are baked into the predictor at fit time, so
# any "parameters" block in a JumpStart payload is parsed but not applied.
tsdf, known_covariates, _ = parse_payload(
request_body,
input_content_type,
id_column=model._id_column,
timestamp_column=model._timestamp_column,
target_column=model.target,
)
predictions = model.predict(tsdf, known_covariates=known_covariates)
return render_response(predictions, output_content_type)
Loading
Loading