Video Streams: Significantly reduce RAM of Video Output Intermediates#13396
Video Streams: Significantly reduce RAM of Video Output Intermediates#13396rattus128 wants to merge 17 commits intoComfy-Org:masterfrom
Conversation
Add some nodes getting into and out of stream mode.
Split this up into start -> chunk -> finish so it can be saved piece by piece.
So none can mean none.
Consolidate these into a named tuple. This will expand with more content. Save it to the Decoder module itself for reusability.
If it doesnt fit stash it.
📝 WalkthroughWalkthroughThis pull request introduces pull-based image streaming support to ComfyUI. A new 🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 3
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (3)
comfy_api/latest/_input_impl/video_types.py (2)
419-445:⚠️ Potential issue | 🟠 MajorAdd exception handling in
save_start()to close the PyAV container if initialization fails.If metadata serialization or stream creation throws after
av.open()succeeds, the container is never returned to the caller, sosave_to()cannot close it. This leaks file handles and leaves partial files on disk.Suggested fix
output = av.open(path, mode='w', options={'movflags': 'use_metadata_tags'}, **extra_kwargs) - if True: + try: # Add metadata before writing any streams if metadata is not None: for key, value in metadata.items(): output.metadata[key] = json.dumps(value) @@ - self._frame_counter = 0 - return output, video_stream, audio_stream, audio_sample_rate, frame_rate + self._frame_counter = 0 + return output, video_stream, audio_stream, audio_sample_rate, frame_rate + except Exception: + output.close() + raise🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@comfy_api/latest/_input_impl/video_types.py` around lines 419 - 445, The initialization after av.open() (in save_start()) can throw while setting metadata or creating streams (e.g., when serializing metadata, creating video_stream/audio_stream or computing frame_rate), so wrap the block that sets output.metadata, frame_rate, video_stream, audio_stream and audio_sample_rate in a try/except; on any Exception call output.close() (or output.destroy/close method on the PyAV container) to release the file handle and then re-raise the exception so callers see the failure; ensure self._frame_counter is only set and the tuple (output, video_stream, audio_stream, audio_sample_rate, frame_rate) is returned on successful initialization.
398-415:⚠️ Potential issue | 🟡 MinorUpdate type annotations to include
io.BytesIOforsave_start()andsave_to()methods.The methods accept
io.BytesIOat runtime (as evidenced by theisinstance(path, io.BytesIO)check at line 412 and the docstring at line 479 promising BytesIO support), but the type annotations restrictpathtostr. This creates a mismatch where the public API is narrower than the runtime implementation, causing type checker warnings for valid code. The correct pattern is already used elsewhere in the file (line 309:path: str | io.BytesIO).Suggested fix
- path: str, + path: str | io.BytesIO,Apply to both
save_start()at line 398 andsave_to()at line 477.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@comfy_api/latest/_input_impl/video_types.py` around lines 398 - 415, The path parameter type annotation for save_start and save_to is too narrow (str only) but both functions accept io.BytesIO at runtime; update both method signatures (save_start and save_to) to annotate path as str | io.BytesIO (or typing.Union[str, io.BytesIO]) so the type hints match the isinstance(path, io.BytesIO) checks and the docstring, and ensure io is imported/available in the module.comfy_extras/nodes_upscale_model.py (1)
87-122:⚠️ Potential issue | 🟠 MajorNormalize EOF chunks to the same output device/dtype as normal chunks.
The empty-stream path returns
image.clone(), but non-empty chunks come back onintermediate_device()/intermediate_dtype(). That makes the final EOF batch metadata-dependent, and downstream consumers that concatenate the last chunk can fail on exact-multiple stream lengths after an upscale step.Suggested fix
def upscale_batch(cls, upscale_model, image: torch.Tensor) -> torch.Tensor: + output_device = model_management.intermediate_device() + output_dtype = model_management.intermediate_dtype() if image.shape[0] == 0: - return image.clone() + return image.to(device=output_device, dtype=output_dtype).clone() device = model_management.get_torch_device() @@ - output_device = model_management.intermediate_device() - oom = True try: @@ finally: upscale_model.to("cpu") - return torch.clamp(s.movedim(-3,-1), min=0, max=1.0).to(model_management.intermediate_dtype()) + return torch.clamp(s.movedim(-3, -1), min=0, max=1.0).to(device=output_device, dtype=output_dtype)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@comfy_extras/nodes_upscale_model.py` around lines 87 - 122, The early empty-stream return in upscale_batch (image.clone()) must produce the same device/dtype as non-empty outputs; move the output_device = model_management.intermediate_device() (and optionally capture intermediate_dtype = model_management.intermediate_dtype()) above the empty-check and change the empty return to return image.clone().to(device=output_device, dtype=intermediate_dtype) so EOF chunks match the tiled path (refer to upscale_batch, image.clone(), model_management.intermediate_device(), and model_management.intermediate_dtype()).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@comfy_api/latest/_input/image_stream_types.py`:
- Around line 22-30: The reset() implementation currently calls
get_progress_state().finish_progress(self._ctx.node_id) after invoking
do_reset(), which prematurely marks progress complete; move the finish_progress
call to run before entering the CurrentNodeContext/do_reset block (i.e., check
if self._ctx is not None and call
get_progress_state().finish_progress(self._ctx.node_id) prior to the with
CurrentNodeContext(...) and self.do_reset()), so stale progress is cleared
before streams initialize progress in do_reset() and not finished immediately
after.
In `@comfy_extras/nodes_video.py`:
- Around line 83-105: do_reset and do_pull can leave partially written files if
an exception occurs between _output_factory()/save_start and save_finalize; wrap
the critical sections so partial outputs are cleaned up: in do_reset, surround
the _output_factory()/open/_saver.save_start sequence with try/except/finally
and on any exception call self._discard_partial_output() and re-raise; in
do_pull, guard the pull/save_add/finalize sequence so if any exception occurs
you call self._discard_partial_output(), ensure any started save state
(self._save_state) is cleared or save_finalize is attempted safely, and then
re-raise the error; reference the symbols do_reset, do_pull, _output_factory,
_discard_partial_output, _saver.save_start, _saver.save_add,
_saver.save_finalize, and self._save_state when making the changes.
In `@comfy/ldm/lightricks/vae/causal_video_autoencoder.py`:
- Around line 1391-1395: decode_start currently injects fresh random noise every
call which breaks replayability; change it to generate and cache the noise once
per run and reuse it for subsequent decode_start calls. Specifically, add an
instance field (e.g., self._decode_noise or self._decode_generator +
self._decode_noise) and, in decode_start, if that field is unset create the
noise deterministically (use torch.randn_like with a torch.Generator seeded once
at run start) scaled by self.decode_noise_scale and reused on later calls; keep
clear_temporal_cache_state(self.decoder) and continue to pass
timestep=self.decode_timestep to self.decoder.forward_start. Ensure there is
also a clear/reset method that clears the cached noise when a new run is
intentionally started.
---
Outside diff comments:
In `@comfy_api/latest/_input_impl/video_types.py`:
- Around line 419-445: The initialization after av.open() (in save_start()) can
throw while setting metadata or creating streams (e.g., when serializing
metadata, creating video_stream/audio_stream or computing frame_rate), so wrap
the block that sets output.metadata, frame_rate, video_stream, audio_stream and
audio_sample_rate in a try/except; on any Exception call output.close() (or
output.destroy/close method on the PyAV container) to release the file handle
and then re-raise the exception so callers see the failure; ensure
self._frame_counter is only set and the tuple (output, video_stream,
audio_stream, audio_sample_rate, frame_rate) is returned on successful
initialization.
- Around line 398-415: The path parameter type annotation for save_start and
save_to is too narrow (str only) but both functions accept io.BytesIO at
runtime; update both method signatures (save_start and save_to) to annotate path
as str | io.BytesIO (or typing.Union[str, io.BytesIO]) so the type hints match
the isinstance(path, io.BytesIO) checks and the docstring, and ensure io is
imported/available in the module.
In `@comfy_extras/nodes_upscale_model.py`:
- Around line 87-122: The early empty-stream return in upscale_batch
(image.clone()) must produce the same device/dtype as non-empty outputs; move
the output_device = model_management.intermediate_device() (and optionally
capture intermediate_dtype = model_management.intermediate_dtype()) above the
empty-check and change the empty return to return
image.clone().to(device=output_device, dtype=intermediate_dtype) so EOF chunks
match the tiled path (refer to upscale_batch, image.clone(),
model_management.intermediate_device(), and
model_management.intermediate_dtype()).
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: b44b228f-7965-4730-acc1-e9cbe87ec61e
📒 Files selected for processing (14)
comfy/comfy_types/node_typing.pycomfy/ldm/lightricks/vae/causal_video_autoencoder.pycomfy/sd.pycomfy_api/input/__init__.pycomfy_api/input/image_stream_types.pycomfy_api/latest/__init__.pycomfy_api/latest/_input/__init__.pycomfy_api/latest/_input/image_stream_types.pycomfy_api/latest/_input_impl/video_types.pycomfy_api/latest/_io.pycomfy_extras/nodes_image_stream.pycomfy_extras/nodes_upscale_model.pycomfy_extras/nodes_video.pynodes.py
| def reset(self) -> None: | ||
| #This API is final. Subclasses must NOT override this for future core ComfyUI | ||
| #change compatability. Override do_reset instead. | ||
| with (nullcontext() if self._ctx is None else | ||
| CurrentNodeContext(self._ctx.prompt_id, self._ctx.node_id, self._ctx.list_index)): | ||
| self.do_reset() | ||
|
|
||
| if self._ctx is not None: | ||
| get_progress_state().finish_progress(self._ctx.node_id) |
There was a problem hiding this comment.
Don’t finish progress from reset().
do_reset() is where stream sources initialize progress state, but reset() immediately calls finish_progress() afterward. That marks the stream complete before the first pull(), so progress for sources like VAEDecodedImageStream can disappear or flicker instead of tracking the run. If you want to clear stale progress, it needs to happen before do_reset(), not after it.
Suggested ordering fix
def reset(self) -> None:
+ if self._ctx is not None:
+ get_progress_state().finish_progress(self._ctx.node_id)
with (nullcontext() if self._ctx is None else
CurrentNodeContext(self._ctx.prompt_id, self._ctx.node_id, self._ctx.list_index)):
self.do_reset()
-
- if self._ctx is not None:
- get_progress_state().finish_progress(self._ctx.node_id)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| def reset(self) -> None: | |
| #This API is final. Subclasses must NOT override this for future core ComfyUI | |
| #change compatability. Override do_reset instead. | |
| with (nullcontext() if self._ctx is None else | |
| CurrentNodeContext(self._ctx.prompt_id, self._ctx.node_id, self._ctx.list_index)): | |
| self.do_reset() | |
| if self._ctx is not None: | |
| get_progress_state().finish_progress(self._ctx.node_id) | |
| def reset(self) -> None: | |
| `#This` API is final. Subclasses must NOT override this for future core ComfyUI | |
| `#change` compatability. Override do_reset instead. | |
| if self._ctx is not None: | |
| get_progress_state().finish_progress(self._ctx.node_id) | |
| with (nullcontext() if self._ctx is None else | |
| CurrentNodeContext(self._ctx.prompt_id, self._ctx.node_id, self._ctx.list_index)): | |
| self.do_reset() |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@comfy_api/latest/_input/image_stream_types.py` around lines 22 - 30, The
reset() implementation currently calls
get_progress_state().finish_progress(self._ctx.node_id) after invoking
do_reset(), which prematurely marks progress complete; move the finish_progress
call to run before entering the CurrentNodeContext/do_reset block (i.e., check
if self._ctx is not None and call
get_progress_state().finish_progress(self._ctx.node_id) prior to the with
CurrentNodeContext(...) and self.do_reset()), so stale progress is cleared
before streams initialize progress in do_reset() and not finished immediately
after.
| def do_reset(self) -> None: | ||
| self._discard_partial_output() | ||
| self._stream.reset() | ||
| self._path, self._preview_ui = self._output_factory() | ||
| assert self._path is not None | ||
| open(self._path, "ab").close() | ||
| self._save_state = self._saver.save_start( | ||
| self._path, | ||
| format=self._format, | ||
| codec=self._codec, | ||
| metadata=self._metadata, | ||
| ) | ||
|
|
||
| def do_pull(self, max_frames: int) -> Input.Image: | ||
| assert self._save_state is not None | ||
| chunk = self._stream.pull(max_frames) | ||
| self._saver.save_add(self._save_state[0], self._save_state[1], chunk) | ||
| if chunk.shape[0] < max_frames: | ||
| self._saver.save_finalize(*self._save_state) | ||
| self._save_state = None | ||
| self._emit_preview() | ||
| self._path = None | ||
| return chunk |
There was a problem hiding this comment.
Clean up partially written outputs when a streamed save fails.
Any exception between _output_factory() and save_finalize() bypasses _discard_partial_output(). For SaveVideoStream, that means a mid-stream failure leaves a corrupt file in the output directory and can keep the container state alive longer than necessary.
Suggested cleanup path
def do_reset(self) -> None:
self._discard_partial_output()
- self._stream.reset()
- self._path, self._preview_ui = self._output_factory()
- assert self._path is not None
- open(self._path, "ab").close()
- self._save_state = self._saver.save_start(
- self._path,
- format=self._format,
- codec=self._codec,
- metadata=self._metadata,
- )
+ try:
+ self._stream.reset()
+ self._path, self._preview_ui = self._output_factory()
+ assert self._path is not None
+ open(self._path, "ab").close()
+ self._save_state = self._saver.save_start(
+ self._path,
+ format=self._format,
+ codec=self._codec,
+ metadata=self._metadata,
+ )
+ except Exception:
+ self._discard_partial_output()
+ raise
def do_pull(self, max_frames: int) -> Input.Image:
assert self._save_state is not None
- chunk = self._stream.pull(max_frames)
- self._saver.save_add(self._save_state[0], self._save_state[1], chunk)
- if chunk.shape[0] < max_frames:
- self._saver.save_finalize(*self._save_state)
- self._save_state = None
- self._emit_preview()
- self._path = None
+ try:
+ chunk = self._stream.pull(max_frames)
+ self._saver.save_add(self._save_state[0], self._save_state[1], chunk)
+ if chunk.shape[0] < max_frames:
+ self._saver.save_finalize(*self._save_state)
+ self._save_state = None
+ self._path = None
+ except Exception:
+ self._discard_partial_output()
+ raise
+ if chunk.shape[0] < max_frames:
+ self._emit_preview()
return chunk🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@comfy_extras/nodes_video.py` around lines 83 - 105, do_reset and do_pull can
leave partially written files if an exception occurs between
_output_factory()/save_start and save_finalize; wrap the critical sections so
partial outputs are cleaned up: in do_reset, surround the
_output_factory()/open/_saver.save_start sequence with try/except/finally and on
any exception call self._discard_partial_output() and re-raise; in do_pull,
guard the pull/save_add/finalize sequence so if any exception occurs you call
self._discard_partial_output(), ensure any started save state (self._save_state)
is cleared or save_finalize is attempted safely, and then re-raise the error;
reference the symbols do_reset, do_pull, _output_factory,
_discard_partial_output, _saver.save_start, _saver.save_add,
_saver.save_finalize, and self._save_state when making the changes.
| def decode_start(self, x): | ||
| clear_temporal_cache_state(self.decoder) | ||
| if self.timestep_conditioning: #TODO: seed | ||
| x = torch.randn_like(x) * self.decode_noise_scale + (1.0 - self.decode_noise_scale) * x | ||
| return self.decoder.forward_start(self.per_channel_statistics.un_normalize(x), timestep=self.decode_timestep) |
There was a problem hiding this comment.
Seed streamed decode once per run.
The #TODO: seed is observable here: every decode_start() reset injects fresh random noise, so the same latent is no longer replayable across two drains of the same stream. That breaks the new resettable-stream behavior and can make preview/save passes disagree if the source is re-executed. The noise/seed needs to be captured once for the run and reused for all resumed chunks.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@comfy/ldm/lightricks/vae/causal_video_autoencoder.py` around lines 1391 -
1395, decode_start currently injects fresh random noise every call which breaks
replayability; change it to generate and cache the noise once per run and reuse
it for subsequent decode_start calls. Specifically, add an instance field (e.g.,
self._decode_noise or self._decode_generator + self._decode_noise) and, in
decode_start, if that field is unset create the noise deterministically (use
torch.randn_like with a torch.Generator seeded once at run start) scaled by
self.decode_noise_scale and reused on later calls; keep
clear_temporal_cache_state(self.decoder) and continue to pass
timestep=self.decode_timestep to self.decoder.forward_start. Ensure there is
also a clear/reset method that clears the cached noise when a new run is
intentionally started.
guill
left a comment
There was a problem hiding this comment.
I am strongly opposed to merging this PR. It fundamentally violates the guarantees made by the execution engine and will likely result in significant instability on the frontend and in all sorts of custom nodes.
If we want to support streaming, we should do it the right way and update the execution engine to support streaming (i.e. IS_LIST that execute depth-first instead of breadth-first).
| with (nullcontext() if self._ctx is None else | ||
| CurrentNodeContext(self._ctx.prompt_id, self._ctx.node_id, self._ctx.list_index)): | ||
| result = self.do_pull(max_frames) | ||
|
|
||
| if self._ctx is not None: | ||
| registry = get_progress_state() | ||
| entry = registry.nodes.get(self._ctx.node_id) | ||
| if (int(result.shape[0]) < max_frames or | ||
| (entry is not None and entry["max"] > 0 and entry["value"] >= entry["max"])): | ||
| registry.finish_progress(self._ctx.node_id) |
There was a problem hiding this comment.
As mentioned in Slack, I am very strongly opposed to moving forward with this change as-is. Overriding the CurrentNodeContext like this from outside the execution system means that we will be receiving progress updates for nodes that have already finished executing. This fundamentally violates the lifecycle of nodes and is likely to cause many subtle bugs on both the default frontend (where we can no longer reliably 'clean up' when a node finishes executing) and custom nodes.
| server.send_sync( | ||
| "executed", | ||
| { | ||
| "node": current.node_id, | ||
| "display_node": current.node_id, | ||
| "output": preview_output, | ||
| "prompt_id": current.prompt_id, | ||
| }, | ||
| server.client_id, | ||
| ) |
There was a problem hiding this comment.
Doesn't this mean we can now receive executed messages multiple times for the same node?
| get_progress_state().update_progress( | ||
| current.node_id, | ||
| value=float(value), | ||
| max_value=float(max(self._total_frames, 1)), | ||
| ) |
There was a problem hiding this comment.
See concern from above about how this fundamentally violates the lifecycle of node execution.
Currently comfyui uses uncompressed 3d tensors to represent video and requires said tensor to fully exist for eager execution of nodes.
Implement a deferred production contract and a IO type that allows video producer nodes to splice output to input and avoid ever assembling the complete tensor in RAM at all. Here is the classic use case:
This will splice decoded frames straight into the saver as they are produced.
I have corresponding ComfyUI-Frontend diff to get the IO color and to restrict stream outputs to one sink (as multi-sinking is poorly defined).
Nodes added come in 3 flavours:
Stream Sources:
Stream sources generate a stream from some sort of complete input.
VAE Decode Stream - Decode a video latent into a stream. Only LTXV support so far.
Image Batch to Stream - Start a stream from an image batch
Stream Sinks:
Stream sinks manage the flow control and initiate the stream end to end execution.
Save Video Stream - Execute a stream and save the result to file (note this is conceptually Stream Sink + Save Passthrough)
Image Stream to Batch - Execute a stream and assemble the result as a tensor. This does consume the RAM for the result. Added largely for completeness .
Stream Sink - Execute a stream and discard the result. Useful for workflow testing and flow control. Use this to execute a chain of interposers.
Stream Interposers:
Interposers are stream-in stream-out nodes that can consume or manipulate the stream as it passes through.
Save+Passthrough Video Stream - Save a stream intermediate to file as it passes through
Upscale Image (Using Model) - existing model upscale node extended for stream support
Preview Image Stream - preview frames of the stream as the stream progresses in real time
This is an absolute MVP for the protocol, there is way more that can be done. Immediate follow up node ideas include the upcoming RIFE support, Load Video and VAE encode for symmetry and V2V functional completeness. WAN support for the VAE is a TBD.
General design:
The Stream IO type is a python class that primarily implements the pull() API to get a chunk of output from the provider (or immediately upstream interposer). When node are executed, no IO logic is executed, instead a ImageStream object is returned which the downstream node can call pull(). A source can be chain into any number of interposers and then finally into a single sink.
A sink type node is required at the end of the chain to implement the top level flow control on the stream. The sink sets the chunk size for the whole chain (This will change with RIFE interposer support which creates an inline multiplier), and loops to execute the stream until it is exhausted. The source sets the steam length and signals termination of the stream.
Sources are responsible for attaching their input state to the stream object for the sake or repeatable execution on cache hit of the source node itself. For example, VAE Decode Stream attaches the input latent to its ImageStream object such that the stream can be reset and re-executed, if for example a downstream saver changes frame rate, the stream source is able to re-execute at the start of the chain correctly.
Stream data is uncachable by design (as its a 0 copy protocol and there isn't a copy of intermediates at any time at all). Any input change to the stream chain does imply full re-execution of the full stream chain. Due to this, fanout of a stream socket also implies re-execution of the stream per output connection in the current execution model. This is confusing for the user, so I have a frontend change to just prohibit connecting a stream to multiple sinks. A node to split a stream either as a re-execution or buffered is a probable future enhancement.
As streams cannot fanout, and saving or previewing an intermediate is a common task, save and preview interposer forms are added to handle this common case.
Example Test Case:
Linux, RTX5090, 96GB RAM
LTX2.3 1280x720x1001f -> RealESRGANx4
Before (Normal save - lower branch ^^):
After (stream save - upper branch ^^):
(executing from cached latent - no ksampler)
80% complete stream execution 22GB + 57GB cache ram usage.
No surge in RAM on end of workflow ✅
Same but with ESRGAN2x
Before (~50GB RAM - final surge):
After:
(Performance slightly lower as stream flow re-executes VAE while non-stream cached VAE decode result).
With no model upscaler - full workflow from clear cache).
Before (Peak RAM approx 20GB):
After (Peak RAM approx 15GB):
Note: Save chunk-size = 32f.
Regression Tests:
Linux, 5090, LTX2.3 ✅
Linux, 5090, Wan 2.2 ✅
Linux, 5090, LTX2.0 ✅
Linux, 5090, LTX1 ✅
Linux, 5090, SDXL ✅