From 0b7566660bcbf7a66f14fb9195fb577c3f602787 Mon Sep 17 00:00:00 2001 From: livepeer-robot Date: Thu, 2 Apr 2026 06:25:18 +0000 Subject: [PATCH 1/2] fix(graph): resolve VACE port fan-in and missing-port errors - graph_executor.py: deduplicate fan-in stream edges before queue construction, preferring pipeline-node sources over source-node sources for the same input port; raise a clearer error (with edge details) when two pipeline nodes both target the same port - graph_executor.py: in _validate_edge_ports, include VACE ports for VACEEnabledPipeline instances regardless of static config_class.inputs; gracefully handle PipelineNotAvailableException (pipeline reloading) by logging a warning and skipping port checks for that node - frame_processor.py: in _setup_graph_from_pipeline_ids, only add the last VACEEnabledPipeline to vace_input_video_ids (not all of them), preventing fan-in when a preprocessor like yolo_mask is also a VACEEnabledPipeline Fixes #804 Signed-off-by: livepeer-robot --- src/scope/server/frame_processor.py | 11 ++- src/scope/server/graph_executor.py | 104 +++++++++++++++++++++++++--- 2 files changed, 105 insertions(+), 10 deletions(-) diff --git a/src/scope/server/frame_processor.py b/src/scope/server/frame_processor.py index 053a06b03..0551ac29c 100644 --- a/src/scope/server/frame_processor.py +++ b/src/scope/server/frame_processor.py @@ -1104,10 +1104,19 @@ def _setup_pipelines_sync(self): if self.parameters.get("vace_enabled") and self.parameters.get( "vace_use_input_video", True ): + # Only the *last* VACEEnabledPipeline in the chain should + # receive raw input video as vace_input_frames. Earlier + # VACEEnabledPipeline instances (e.g. yolo_mask preprocessors) + # produce their own VACE frames that downstream pipelines + # consume — they must NOT also be wired to receive raw video + # on vace_input_frames, as that would create a fan-in conflict. + last_vace_pid: str | None = None for pid in self.pipeline_ids: pipeline = self.pipeline_manager.get_pipeline_by_id(pid) if isinstance(pipeline, VACEEnabledPipeline): - vace_input_video_ids.add(pid) + last_vace_pid = pid + if last_vace_pid is not None: + vace_input_video_ids.add(last_vace_pid) api_graph = build_linear_graph( self.pipeline_ids, diff --git a/src/scope/server/graph_executor.py b/src/scope/server/graph_executor.py index b0e1a044d..d1a1bf025 100644 --- a/src/scope/server/graph_executor.py +++ b/src/scope/server/graph_executor.py @@ -13,11 +13,15 @@ from typing import TYPE_CHECKING, Any from .graph_schema import GraphConfig +from .pipeline_manager import PipelineNotAvailableException from .pipeline_processor import PipelineProcessor if TYPE_CHECKING: from .pipeline_manager import PipelineManager +# VACE port names that VACEEnabledPipeline instances always accept +_VACE_INPUT_PORTS = frozenset({"vace_input_frames", "vace_input_masks"}) + logger = logging.getLogger(__name__) # Default queue sizes (match pipeline_processor) @@ -76,17 +80,78 @@ def build_graph( # Validate edge ports against pipeline input/output declarations _validate_edge_ports(graph, pipeline_manager) - # 1) Create one queue per edge (all edges are stream; frame-by-frame) + # 1a) Resolve fan-in conflicts before creating queues. + # + # When both a source-node edge and a pipeline-node edge target the same + # input port (e.g. "input:video → longlive:vace_input_frames" AND + # "yolo_mask:vace_input_frames → longlive:vace_input_frames"), prefer the + # pipeline-node edge — it carries preprocessed data and is the intended + # signal. Raise only if two pipeline-to-pipeline edges conflict, which + # is always a graph authoring error. + source_node_ids = set(graph.get_source_node_ids()) + stream_edges = [e for e in graph.edges if e.kind == "stream"] + + # Group by destination (to_node, to_port) + port_edges: dict[tuple[str, str], list] = {} + for e in stream_edges: + port_edges.setdefault((e.to_node, e.to_port), []).append(e) + + resolved_edges = [] + for (to_node, to_port), edge_list in port_edges.items(): + if len(edge_list) == 1: + resolved_edges.append(edge_list[0]) + continue + + pipeline_edges = [e for e in edge_list if e.from_node not in source_node_ids] + source_edges = [e for e in edge_list if e.from_node in source_node_ids] + + if len(pipeline_edges) > 1: + # Two pipeline nodes both feed the same input — genuine conflict. + conflict_desc = ", ".join( + f"{e.from_node!r}:{e.from_port!r}" for e in pipeline_edges + ) + raise ValueError( + f"Duplicate stream edges to the same input port: " + f"node={to_node!r}, port={to_port!r}. " + f"Multiple pipeline sources conflict: [{conflict_desc}]. " + f"Fan-in to a single port is not supported." + ) + elif pipeline_edges: + # One pipeline edge wins over any source edges — expected pattern + # when a preprocessor forwards VACE frames while the Workflow + # Builder also has "use input video as VACE" toggled on. + discarded = [f"{e.from_node!r}:{e.from_port!r}" for e in source_edges] + logger.info( + "Resolved fan-in on %s:%s — keeping pipeline edge from %r, " + "discarding source edge(s): [%s]", + to_node, + to_port, + pipeline_edges[0].from_node, + ", ".join(discarded), + ) + resolved_edges.append(pipeline_edges[0]) + else: + # Multiple source edges — still a conflict. + conflict_desc = ", ".join( + f"{e.from_node!r}:{e.from_port!r}" for e in source_edges + ) + raise ValueError( + f"Duplicate stream edges to the same input port: " + f"node={to_node!r}, port={to_port!r}. " + f"Multiple source edges conflict: [{conflict_desc}]. " + f"Fan-in to a single port is not supported." + ) + + # Rebuild graph with resolved (deduplicated) edges so downstream logic + # only sees the authoritative set. + non_stream_edges = [e for e in graph.edges if e.kind != "stream"] + graph = graph.model_copy(update={"edges": non_stream_edges + resolved_edges}) + + # 1b) Create one queue per resolved edge. stream_queues: dict[tuple[str, str], queue.Queue] = {} for e in graph.edges: if e.kind == "stream": key = (e.to_node, e.to_port) - if key in stream_queues: - raise ValueError( - f"Duplicate stream edges to the same input port: " - f"node={e.to_node!r}, port={e.to_port!r}. " - f"Fan-in to a single port is not supported." - ) stream_queues[key] = queue.Queue(maxsize=DEFAULT_INPUT_QUEUE_MAXSIZE) # 2) Create a processor per pipeline node and wire input_queues per port @@ -221,14 +286,35 @@ def _validate_edge_ports( Raises: ValueError: If any edge references an undeclared port. """ + from scope.core.pipelines.wan2_1.vace import VACEEnabledPipeline + # Build a map of node_id -> (declared_inputs, declared_outputs) port_map: dict[str, tuple[set[str], set[str]]] = {} for node in graph.nodes: if node.type != "pipeline" or node.pipeline_id is None: continue - pipeline = pipeline_manager.get_pipeline_by_id(node.id) + try: + pipeline = pipeline_manager.get_pipeline_by_id(node.id) + except PipelineNotAvailableException: + # Pipeline is being reloaded (e.g. vace_enabled toggled). + # Skip port validation for this node rather than crashing. + logger.warning( + "Pipeline %r unavailable during port validation (reloading?); " + "skipping port checks for this node.", + node.id, + ) + continue config_class = pipeline.get_config_class() - port_map[node.id] = (set(config_class.inputs), set(config_class.outputs)) + declared_inputs = set(config_class.inputs) + declared_outputs = set(config_class.outputs) + + # VACEEnabledPipeline instances always accept VACE input ports, + # even when the pipeline was loaded with vace_enabled=False and + # config_class.inputs only lists "video". + if isinstance(pipeline, VACEEnabledPipeline): + declared_inputs |= _VACE_INPUT_PORTS + + port_map[node.id] = (declared_inputs, declared_outputs) errors: list[str] = [] for e in graph.edges: From 75aa2390893dd9ee1b1be016f3b8705365013ca0 Mon Sep 17 00:00:00 2001 From: livepeer-robot Date: Thu, 2 Apr 2026 06:48:15 +0000 Subject: [PATCH 2/2] Handle TransferEncodingError as graceful network disconnect in media input loop When an orchestrator truncates the trickle connection mid-stream, aiohttp raises ClientPayloadError (subclass TransferEncodingError). Previously this was caught by the broad 'except Exception' handler and logged at ERROR level, causing noisy logs and unclean teardown. - Catch aiohttp.ClientPayloadError before the generic handler; log at WARNING and let the finally block run the normal media_output.close() path - Suppress ClientConnectorError during media_output.close() (logged at DEBUG) when the orchestrator is already unreachable at teardown time The deeper fix (in livepeer-python-gateway channel_reader.py / trickle_publisher.py) ensures the control channel subscription also terminates cleanly without raising. See: livepeer/livepeer-python-gateway#2 Fixes: #805 Related: #771 Signed-off-by: livepeer-robot --- src/scope/cloud/livepeer_app.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/src/scope/cloud/livepeer_app.py b/src/scope/cloud/livepeer_app.py index 3ee0a27de..81d07dc14 100644 --- a/src/scope/cloud/livepeer_app.py +++ b/src/scope/cloud/livepeer_app.py @@ -24,6 +24,7 @@ from pathlib import Path from typing import Any +import aiohttp import click import httpx import uvicorn @@ -210,11 +211,19 @@ async def _media_input_loop( frame_processor.put(frame) except asyncio.CancelledError: raise + except aiohttp.ClientPayloadError as exc: + # Orchestrator truncated the transfer mid-stream (e.g. TransferEncodingError 400) + # or dropped the connection. This is a network-level disconnect — treat as a + # graceful stop rather than an application error. + logger.warning("Media input loop: orchestrator disconnected mid-stream: %s", exc) except Exception as exc: logger.error("Media input loop failed: %s", exc) finally: try: await media_output.close() + except aiohttp.ClientConnectorError as exc: + # Host already unreachable (orchestrator went down); suppress cleanup errors. + logger.debug("Media output close: orchestrator unreachable (suppressed): %s", exc) except Exception as exc: logger.warning("Media output close failed: %s", exc) if session.media_output is media_output: