Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/livepeer_gateway/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
from .lv2v import LiveVideoToVideo, StartJobRequest, start_lv2v
from .live_runner import (
LiveRunnerCallResult,
LiveRunnerCallStream,
LiveRunnerGPU,
LiveRunnerInstance,
LiveRunnerPriceInfo,
Expand Down Expand Up @@ -96,6 +97,7 @@
"get_orch_info",
"LiveVideoToVideo",
"LiveRunnerCallResult",
"LiveRunnerCallStream",
"LiveRunnerGPU",
"LiveRunnerInstance",
"LiveRunnerPriceInfo",
Expand Down
40 changes: 40 additions & 0 deletions src/livepeer_gateway/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,46 @@ async def request_json(
return data


async def open_stream(
url: str,
*,
method: Optional[str] = None,
payload: Optional[dict[str, Any]] = None,
headers: Optional[dict[str, str]] = None,
connect_timeout: float = 10.0,
) -> "tuple[aiohttp.ClientSession, aiohttp.ClientResponse]":
"""
Open an HTTP request and return the live (session, response) without reading the
body, for streaming responses (SSE, chunked). The caller owns both and must close
them.

No total timeout (streams run indefinitely) only connect/first-byte are bounded.
Raises LivepeerHTTPError on >= 400 (e.g. the 402 payment retry).
"""
resolved_method, req_headers, body = _json_request_parts(
url,
method=method,
payload=payload,
headers=headers,
)

timeout = aiohttp.ClientTimeout(total=None, sock_connect=connect_timeout, sock_read=None)
session = aiohttp.ClientSession(timeout=timeout, connector=aiohttp.TCPConnector(ssl=False))
try:
resp = await session.request(resolved_method, url, data=body, headers=req_headers)
except (aiohttp.ClientError, asyncio.TimeoutError) as e:
await session.close()
raise LivepeerGatewayError(
f"HTTP stream error: failed to reach endpoint: {getattr(e, 'message', e)} (url={url})"
) from e
if resp.status >= 400:
raw = await resp.text()
resp.release()
await session.close()
_raise_http_json_error(resp.status, url, raw, dict(resp.headers.items()))
return session, resp


async def post_json(
url: str,
payload: dict[str, Any],
Expand Down
114 changes: 109 additions & 5 deletions src/livepeer_gateway/live_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,27 @@
import shutil
import subprocess
from dataclasses import dataclass, field
from typing import Any, Awaitable, Callable, Literal, NotRequired, Optional, Protocol, TypedDict, cast
from typing import (
Any,
AsyncIterator,
Awaitable,
Callable,
Literal,
Mapping,
NotRequired,
Optional,
Protocol,
TypedDict,
cast,
overload,
)
from urllib.parse import quote, urlparse, urlunparse

import aiohttp

from .channel_reader import ChannelReader
from .errors import LivepeerGatewayError, LivepeerHTTPError, SignerRefreshRequired
from .http import post_json, request_json
from .http import open_stream, post_json, request_json
from .remote_signer import (
GetPaymentResponse,
LivePaymentSession,
Expand Down Expand Up @@ -104,6 +117,46 @@ class LiveRunnerCallResult:
)


@dataclass
class LiveRunnerCallStream:
"""A streaming live-runner response (SSE / chunked).

Returned by ``call_runner(..., stream=True)``. It owns the underlying HTTP session
and response, so use it as an async context manager (or call ``aclose()``) to
release the connection.
"""

status: int
headers: Mapping[str, str]
runner_url: str
runner: Optional[LiveRunnerInstance]
payment_session: Optional[LivePaymentSession]
_session: aiohttp.ClientSession = field(repr=False, compare=False)
_response: aiohttp.ClientResponse = field(repr=False, compare=False)

@property
def content_type(self) -> str:
return self._response.content_type

async def aiter_bytes(self) -> AsyncIterator[bytes]:
async for chunk in self._response.content.iter_any():
yield chunk

async def aiter_lines(self) -> AsyncIterator[str]:
async for line in self._response.content:
yield line.decode(errors="replace").rstrip("\n")

async def aclose(self) -> None:
self._response.release()
await self._session.close()

async def __aenter__(self) -> LiveRunnerCallStream:
return self

async def __aexit__(self, *exc: object) -> None:
await self.aclose()


@dataclass(frozen=True)
class LiveRunnerGPU:
id: str = ""
Expand Down Expand Up @@ -185,7 +238,7 @@ def __init__(
self._task: Optional[asyncio.Task[None]] = None
self._o2r_task: Optional[asyncio.Task[None]] = None

async def start(self) -> "LiveRunnerRegistration":
async def start(self) -> LiveRunnerRegistration:
await self._send_heartbeat()
self._task = asyncio.create_task(self._heartbeat_loop())
return self
Expand Down Expand Up @@ -233,7 +286,7 @@ async def close(self) -> None:
except Exception:
_LOG.debug("Live runner unregister failed", exc_info=True)

async def __aenter__(self) -> "LiveRunnerRegistration":
async def __aenter__(self) -> LiveRunnerRegistration:
return self

async def __aexit__(self, exc_type: object, exc: object, tb: object) -> None:
Expand Down Expand Up @@ -554,6 +607,36 @@ async def remove_trickle_channels(
return deleted


@overload
async def call_runner(
runner_url: str = ...,
*,
runner: Optional[LiveRunnerInstance] = ...,
payload: Optional[dict[str, Any]] = ...,
method: str = ...,
signer_url: Optional[str] = ...,
signer_headers: Optional[dict[str, str]] = ...,
timeout: float = ...,
max_payment_challenge_retries: int = ...,
stream: Literal[False] = False,
) -> LiveRunnerCallResult: ...


@overload
async def call_runner(
runner_url: str = ...,
*,
runner: Optional[LiveRunnerInstance] = ...,
payload: Optional[dict[str, Any]] = ...,
method: str = ...,
signer_url: Optional[str] = ...,
signer_headers: Optional[dict[str, str]] = ...,
timeout: float = ...,
max_payment_challenge_retries: int = ...,
stream: Literal[True],
) -> LiveRunnerCallStream: ...


async def call_runner(
runner_url: str = "",
*,
Expand All @@ -564,7 +647,14 @@ async def call_runner(
signer_headers: Optional[dict[str, str]] = None,
timeout: float = 5.0,
max_payment_challenge_retries: int = 3,
) -> LiveRunnerCallResult:
stream: bool = False,
) -> LiveRunnerCallResult | LiveRunnerCallStream:
"""Call a runner once and return its result (or a live stream if ``stream=True``).

With ``signer_url`` set, payment is automatic and **per call**: a 402 challenge is
paid via the signer and retried (up to ``max_payment_challenge_retries``), one job,
one upfront payment. Raises ``LivepeerHTTPError`` on non-402 errors.
"""
runner_url = runner_url.strip() or (runner.url.strip() if runner is not None else "")
if not runner_url:
raise LivepeerGatewayError("Live runner call requires runner_url")
Expand Down Expand Up @@ -607,6 +697,20 @@ async def call_runner(
request_kwargs: dict[str, Any] = {"timeout": timeout}
if request_headers:
request_kwargs["headers"] = request_headers

if stream:
# Hand back the live response unbuffered. open_stream raises on a 402
# before any body, so the payment retry below still catches it.
session, resp = await open_stream(
runner_url,
method=method,
payload=request_payload,
headers=request_headers or None,
)
return LiveRunnerCallStream(
resp.status, resp.headers, runner_url, runner, payment_session, session, resp,
)

data = await request_json(
runner_url,
method=method,
Expand Down