Skip to content

Video Streams: Significantly reduce RAM of Video Output Intermediates#13396

Open
rattus128 wants to merge 17 commits intoComfy-Org:masterfrom
rattus128:prs/video-streams-1
Open

Video Streams: Significantly reduce RAM of Video Output Intermediates#13396
rattus128 wants to merge 17 commits intoComfy-Org:masterfrom
rattus128:prs/video-streams-1

Conversation

@rattus128
Copy link
Copy Markdown
Contributor

Currently comfyui uses uncompressed 3d tensors to represent video and requires said tensor to fully exist for eager execution of nodes.

Implement a deferred production contract and a IO type that allows video producer nodes to splice output to input and avoid ever assembling the complete tensor in RAM at all. Here is the classic use case:

image

This will splice decoded frames straight into the saver as they are produced.

I have corresponding ComfyUI-Frontend diff to get the IO color and to restrict stream outputs to one sink (as multi-sinking is poorly defined).

Nodes added come in 3 flavours:

Stream Sources:

Stream sources generate a stream from some sort of complete input.

VAE Decode Stream - Decode a video latent into a stream. Only LTXV support so far.
Image Batch to Stream - Start a stream from an image batch

Stream Sinks:

Stream sinks manage the flow control and initiate the stream end to end execution.

Save Video Stream - Execute a stream and save the result to file (note this is conceptually Stream Sink + Save Passthrough)
Image Stream to Batch - Execute a stream and assemble the result as a tensor. This does consume the RAM for the result. Added largely for completeness .
Stream Sink - Execute a stream and discard the result. Useful for workflow testing and flow control. Use this to execute a chain of interposers.

Stream Interposers:

Interposers are stream-in stream-out nodes that can consume or manipulate the stream as it passes through.

Save+Passthrough Video Stream - Save a stream intermediate to file as it passes through
Upscale Image (Using Model) - existing model upscale node extended for stream support
Preview Image Stream - preview frames of the stream as the stream progresses in real time

This is an absolute MVP for the protocol, there is way more that can be done. Immediate follow up node ideas include the upcoming RIFE support, Load Video and VAE encode for symmetry and V2V functional completeness. WAN support for the VAE is a TBD.

General design:

The Stream IO type is a python class that primarily implements the pull() API to get a chunk of output from the provider (or immediately upstream interposer). When node are executed, no IO logic is executed, instead a ImageStream object is returned which the downstream node can call pull(). A source can be chain into any number of interposers and then finally into a single sink.

A sink type node is required at the end of the chain to implement the top level flow control on the stream. The sink sets the chunk size for the whole chain (This will change with RIFE interposer support which creates an inline multiplier), and loops to execute the stream until it is exhausted. The source sets the steam length and signals termination of the stream.

Sources are responsible for attaching their input state to the stream object for the sake or repeatable execution on cache hit of the source node itself. For example, VAE Decode Stream attaches the input latent to its ImageStream object such that the stream can be reset and re-executed, if for example a downstream saver changes frame rate, the stream source is able to re-execute at the start of the chain correctly.

Stream data is uncachable by design (as its a 0 copy protocol and there isn't a copy of intermediates at any time at all). Any input change to the stream chain does imply full re-execution of the full stream chain. Due to this, fanout of a stream socket also implies re-execution of the stream per output connection in the current execution model. This is confusing for the user, so I have a frontend change to just prohibit connecting a stream to multiple sinks. A node to split a stream either as a re-execution or buffered is a probable future enhancement.

As streams cannot fanout, and saving or previewing an intermediate is a common task, save and preview interposer forms are added to handle this common case.

Example Test Case:

Linux, RTX5090, 96GB RAM
LTX2.3 1280x720x1001f -> RealESRGANx4

image

Before (Normal save - lower branch ^^):

Upscale Image (using Model)

RuntimeError: [enforce fail at alloc_cpu.cpp:124] err == 0. DefaultCPUAllocator: can't allocate memory: you tried to allocate 103982039040 bytes. Error code 12 (Cannot allocate memory)

After (stream save - upper branch ^^):

(executing from cached latent - no ksampler)

got prompt
0 models unloaded.
Model VideoVAE prepared for dynamic VRAM loading. 1384MB Staged. 0 patches attached.
Prompt executed in 00:14:10
image image

80% complete stream execution 22GB + 57GB cache ram usage.

No surge in RAM on end of workflow ✅


Same but with ESRGAN2x

Before (~50GB RAM - final surge):

got prompt
Prompt executed in 206.38 seconds
Screenshot from 2026-04-14 13-06-13

After:

got prompt
0 models unloaded.
Model VideoVAE prepared for dynamic VRAM loading. 1384MB Staged. 0 patches attached.
Prompt executed in 229.62 seconds

(Performance slightly lower as stream flow re-executes VAE while non-stream cached VAE decode result).

image image

With no model upscaler - full workflow from clear cache).

Before (Peak RAM approx 20GB):

...
Requested to load LTXAV
Model LTXAV prepared for dynamic VRAM loading. 23838MB Staged. 1660 patches attached.
100%|██████████| 8/8 [00:47<00:00,  5.98s/it]                                   
0 models unloaded.
Model LTXAV prepared for dynamic VRAM loading. 23838MB Staged. 1660 patches attached.
100%|██████████| 3/3 [02:27<00:00, 49.19s/it]                                   
Requested to load VideoVAE
0 models unloaded.
Model VideoVAE prepared for dynamic VRAM loading. 1384MB Staged. 0 patches attached.
Prompt executed in 220.17 seconds
image

After (Peak RAM approx 15GB):

Note: Save chunk-size = 32f.

...
Requested to load LTXAV
Model LTXAV prepared for dynamic VRAM loading. 23838MB Staged. 1660 patches attached.
100%|██████████| 8/8 [00:47<00:00,  6.00s/it]                                   
0 models unloaded.
Model LTXAV prepared for dynamic VRAM loading. 23838MB Staged. 1660 patches attached.
100%|██████████| 3/3 [02:27<00:00, 49.31s/it]                                   
Requested to load AudioVAE
loaded completely; 14269.84 MB usable, 693.46 MB loaded, full load: True
Requested to load VideoVAE
0 models unloaded.
Model VideoVAE prepared for dynamic VRAM loading. 1384MB Staged. 0 patches attached.
Prompt executed in 223.03 seconds
image

Regression Tests:

Linux, 5090, LTX2.3 ✅
Linux, 5090, Wan 2.2 ✅
Linux, 5090, LTX2.0 ✅
Linux, 5090, LTX1 ✅
Linux, 5090, SDXL ✅

@rattus128 rattus128 changed the title Video Streams: Massively reduce RAM of Video Output Intermediates Video Streams: Significantly reduce RAM of Video Output Intermediates Apr 14, 2026
@coderabbitai
Copy link
Copy Markdown

coderabbitai bot commented Apr 14, 2026

📝 Walkthrough

Walkthrough

This pull request introduces pull-based image streaming support to ComfyUI. A new IMAGE_STREAM type is added to the I/O enumeration. The VAE decoder is refactored to support pausable, resumable decoding with shared run state management. New ImageStreamInput abstract base class defines the pull-based streaming API with reset and pull methods. The VAE wrapper gains streaming-aware decode methods. New nodes expose streaming utilities: image batch-to-stream conversion, stream-to-batch collection, VAE-decoded streams, stream previewing, and stream consumption. Upscale and video save nodes are extended to handle both tensor and stream inputs. Video frame export is refactored into start/add/finalize phases to support incremental processing.

🚥 Pre-merge checks | ✅ 2 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 6.82% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Title check ✅ Passed The title accurately summarizes the main objective of the PR: implementing video streams to significantly reduce RAM usage of video output intermediates.
Description check ✅ Passed The description comprehensively explains the motivation, design, implementation, and performance benefits of the streaming protocol, with concrete test results demonstrating RAM reduction.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.


Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (3)
comfy_api/latest/_input_impl/video_types.py (2)

419-445: ⚠️ Potential issue | 🟠 Major

Add exception handling in save_start() to close the PyAV container if initialization fails.

If metadata serialization or stream creation throws after av.open() succeeds, the container is never returned to the caller, so save_to() cannot close it. This leaks file handles and leaves partial files on disk.

Suggested fix
         output = av.open(path, mode='w', options={'movflags': 'use_metadata_tags'}, **extra_kwargs)
-        if True:
+        try:
 
             # Add metadata before writing any streams
             if metadata is not None:
                 for key, value in metadata.items():
                     output.metadata[key] = json.dumps(value)
@@
-        self._frame_counter = 0
-        return output, video_stream, audio_stream, audio_sample_rate, frame_rate
+            self._frame_counter = 0
+            return output, video_stream, audio_stream, audio_sample_rate, frame_rate
+        except Exception:
+            output.close()
+            raise
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@comfy_api/latest/_input_impl/video_types.py` around lines 419 - 445, The
initialization after av.open() (in save_start()) can throw while setting
metadata or creating streams (e.g., when serializing metadata, creating
video_stream/audio_stream or computing frame_rate), so wrap the block that sets
output.metadata, frame_rate, video_stream, audio_stream and audio_sample_rate in
a try/except; on any Exception call output.close() (or output.destroy/close
method on the PyAV container) to release the file handle and then re-raise the
exception so callers see the failure; ensure self._frame_counter is only set and
the tuple (output, video_stream, audio_stream, audio_sample_rate, frame_rate) is
returned on successful initialization.

398-415: ⚠️ Potential issue | 🟡 Minor

Update type annotations to include io.BytesIO for save_start() and save_to() methods.

The methods accept io.BytesIO at runtime (as evidenced by the isinstance(path, io.BytesIO) check at line 412 and the docstring at line 479 promising BytesIO support), but the type annotations restrict path to str. This creates a mismatch where the public API is narrower than the runtime implementation, causing type checker warnings for valid code. The correct pattern is already used elsewhere in the file (line 309: path: str | io.BytesIO).

Suggested fix
-        path: str,
+        path: str | io.BytesIO,

Apply to both save_start() at line 398 and save_to() at line 477.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@comfy_api/latest/_input_impl/video_types.py` around lines 398 - 415, The path
parameter type annotation for save_start and save_to is too narrow (str only)
but both functions accept io.BytesIO at runtime; update both method signatures
(save_start and save_to) to annotate path as str | io.BytesIO (or
typing.Union[str, io.BytesIO]) so the type hints match the isinstance(path,
io.BytesIO) checks and the docstring, and ensure io is imported/available in the
module.
comfy_extras/nodes_upscale_model.py (1)

87-122: ⚠️ Potential issue | 🟠 Major

Normalize EOF chunks to the same output device/dtype as normal chunks.

The empty-stream path returns image.clone(), but non-empty chunks come back on intermediate_device() / intermediate_dtype(). That makes the final EOF batch metadata-dependent, and downstream consumers that concatenate the last chunk can fail on exact-multiple stream lengths after an upscale step.

Suggested fix
     def upscale_batch(cls, upscale_model, image: torch.Tensor) -> torch.Tensor:
+        output_device = model_management.intermediate_device()
+        output_dtype = model_management.intermediate_dtype()
         if image.shape[0] == 0:
-            return image.clone()
+            return image.to(device=output_device, dtype=output_dtype).clone()

         device = model_management.get_torch_device()
@@
-        output_device = model_management.intermediate_device()
-
         oom = True
         try:
@@
         finally:
             upscale_model.to("cpu")

-        return torch.clamp(s.movedim(-3,-1), min=0, max=1.0).to(model_management.intermediate_dtype())
+        return torch.clamp(s.movedim(-3, -1), min=0, max=1.0).to(device=output_device, dtype=output_dtype)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@comfy_extras/nodes_upscale_model.py` around lines 87 - 122, The early
empty-stream return in upscale_batch (image.clone()) must produce the same
device/dtype as non-empty outputs; move the output_device =
model_management.intermediate_device() (and optionally capture
intermediate_dtype = model_management.intermediate_dtype()) above the
empty-check and change the empty return to return
image.clone().to(device=output_device, dtype=intermediate_dtype) so EOF chunks
match the tiled path (refer to upscale_batch, image.clone(),
model_management.intermediate_device(), and
model_management.intermediate_dtype()).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@comfy_api/latest/_input/image_stream_types.py`:
- Around line 22-30: The reset() implementation currently calls
get_progress_state().finish_progress(self._ctx.node_id) after invoking
do_reset(), which prematurely marks progress complete; move the finish_progress
call to run before entering the CurrentNodeContext/do_reset block (i.e., check
if self._ctx is not None and call
get_progress_state().finish_progress(self._ctx.node_id) prior to the with
CurrentNodeContext(...) and self.do_reset()), so stale progress is cleared
before streams initialize progress in do_reset() and not finished immediately
after.

In `@comfy_extras/nodes_video.py`:
- Around line 83-105: do_reset and do_pull can leave partially written files if
an exception occurs between _output_factory()/save_start and save_finalize; wrap
the critical sections so partial outputs are cleaned up: in do_reset, surround
the _output_factory()/open/_saver.save_start sequence with try/except/finally
and on any exception call self._discard_partial_output() and re-raise; in
do_pull, guard the pull/save_add/finalize sequence so if any exception occurs
you call self._discard_partial_output(), ensure any started save state
(self._save_state) is cleared or save_finalize is attempted safely, and then
re-raise the error; reference the symbols do_reset, do_pull, _output_factory,
_discard_partial_output, _saver.save_start, _saver.save_add,
_saver.save_finalize, and self._save_state when making the changes.

In `@comfy/ldm/lightricks/vae/causal_video_autoencoder.py`:
- Around line 1391-1395: decode_start currently injects fresh random noise every
call which breaks replayability; change it to generate and cache the noise once
per run and reuse it for subsequent decode_start calls. Specifically, add an
instance field (e.g., self._decode_noise or self._decode_generator +
self._decode_noise) and, in decode_start, if that field is unset create the
noise deterministically (use torch.randn_like with a torch.Generator seeded once
at run start) scaled by self.decode_noise_scale and reused on later calls; keep
clear_temporal_cache_state(self.decoder) and continue to pass
timestep=self.decode_timestep to self.decoder.forward_start. Ensure there is
also a clear/reset method that clears the cached noise when a new run is
intentionally started.

---

Outside diff comments:
In `@comfy_api/latest/_input_impl/video_types.py`:
- Around line 419-445: The initialization after av.open() (in save_start()) can
throw while setting metadata or creating streams (e.g., when serializing
metadata, creating video_stream/audio_stream or computing frame_rate), so wrap
the block that sets output.metadata, frame_rate, video_stream, audio_stream and
audio_sample_rate in a try/except; on any Exception call output.close() (or
output.destroy/close method on the PyAV container) to release the file handle
and then re-raise the exception so callers see the failure; ensure
self._frame_counter is only set and the tuple (output, video_stream,
audio_stream, audio_sample_rate, frame_rate) is returned on successful
initialization.
- Around line 398-415: The path parameter type annotation for save_start and
save_to is too narrow (str only) but both functions accept io.BytesIO at
runtime; update both method signatures (save_start and save_to) to annotate path
as str | io.BytesIO (or typing.Union[str, io.BytesIO]) so the type hints match
the isinstance(path, io.BytesIO) checks and the docstring, and ensure io is
imported/available in the module.

In `@comfy_extras/nodes_upscale_model.py`:
- Around line 87-122: The early empty-stream return in upscale_batch
(image.clone()) must produce the same device/dtype as non-empty outputs; move
the output_device = model_management.intermediate_device() (and optionally
capture intermediate_dtype = model_management.intermediate_dtype()) above the
empty-check and change the empty return to return
image.clone().to(device=output_device, dtype=intermediate_dtype) so EOF chunks
match the tiled path (refer to upscale_batch, image.clone(),
model_management.intermediate_device(), and
model_management.intermediate_dtype()).
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

Run ID: b44b228f-7965-4730-acc1-e9cbe87ec61e

📥 Commits

Reviewing files that changed from the base of the PR and between 722bc73 and 1c2d379.

📒 Files selected for processing (14)
  • comfy/comfy_types/node_typing.py
  • comfy/ldm/lightricks/vae/causal_video_autoencoder.py
  • comfy/sd.py
  • comfy_api/input/__init__.py
  • comfy_api/input/image_stream_types.py
  • comfy_api/latest/__init__.py
  • comfy_api/latest/_input/__init__.py
  • comfy_api/latest/_input/image_stream_types.py
  • comfy_api/latest/_input_impl/video_types.py
  • comfy_api/latest/_io.py
  • comfy_extras/nodes_image_stream.py
  • comfy_extras/nodes_upscale_model.py
  • comfy_extras/nodes_video.py
  • nodes.py

Comment on lines +22 to +30
def reset(self) -> None:
#This API is final. Subclasses must NOT override this for future core ComfyUI
#change compatability. Override do_reset instead.
with (nullcontext() if self._ctx is None else
CurrentNodeContext(self._ctx.prompt_id, self._ctx.node_id, self._ctx.list_index)):
self.do_reset()

if self._ctx is not None:
get_progress_state().finish_progress(self._ctx.node_id)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Don’t finish progress from reset().

do_reset() is where stream sources initialize progress state, but reset() immediately calls finish_progress() afterward. That marks the stream complete before the first pull(), so progress for sources like VAEDecodedImageStream can disappear or flicker instead of tracking the run. If you want to clear stale progress, it needs to happen before do_reset(), not after it.

Suggested ordering fix
     def reset(self) -> None:
+        if self._ctx is not None:
+            get_progress_state().finish_progress(self._ctx.node_id)
         with (nullcontext() if self._ctx is None else
               CurrentNodeContext(self._ctx.prompt_id, self._ctx.node_id, self._ctx.list_index)):
             self.do_reset()
-
-        if self._ctx is not None:
-            get_progress_state().finish_progress(self._ctx.node_id)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def reset(self) -> None:
#This API is final. Subclasses must NOT override this for future core ComfyUI
#change compatability. Override do_reset instead.
with (nullcontext() if self._ctx is None else
CurrentNodeContext(self._ctx.prompt_id, self._ctx.node_id, self._ctx.list_index)):
self.do_reset()
if self._ctx is not None:
get_progress_state().finish_progress(self._ctx.node_id)
def reset(self) -> None:
`#This` API is final. Subclasses must NOT override this for future core ComfyUI
`#change` compatability. Override do_reset instead.
if self._ctx is not None:
get_progress_state().finish_progress(self._ctx.node_id)
with (nullcontext() if self._ctx is None else
CurrentNodeContext(self._ctx.prompt_id, self._ctx.node_id, self._ctx.list_index)):
self.do_reset()
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@comfy_api/latest/_input/image_stream_types.py` around lines 22 - 30, The
reset() implementation currently calls
get_progress_state().finish_progress(self._ctx.node_id) after invoking
do_reset(), which prematurely marks progress complete; move the finish_progress
call to run before entering the CurrentNodeContext/do_reset block (i.e., check
if self._ctx is not None and call
get_progress_state().finish_progress(self._ctx.node_id) prior to the with
CurrentNodeContext(...) and self.do_reset()), so stale progress is cleared
before streams initialize progress in do_reset() and not finished immediately
after.

Comment on lines +83 to +105
def do_reset(self) -> None:
self._discard_partial_output()
self._stream.reset()
self._path, self._preview_ui = self._output_factory()
assert self._path is not None
open(self._path, "ab").close()
self._save_state = self._saver.save_start(
self._path,
format=self._format,
codec=self._codec,
metadata=self._metadata,
)

def do_pull(self, max_frames: int) -> Input.Image:
assert self._save_state is not None
chunk = self._stream.pull(max_frames)
self._saver.save_add(self._save_state[0], self._save_state[1], chunk)
if chunk.shape[0] < max_frames:
self._saver.save_finalize(*self._save_state)
self._save_state = None
self._emit_preview()
self._path = None
return chunk
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Clean up partially written outputs when a streamed save fails.

Any exception between _output_factory() and save_finalize() bypasses _discard_partial_output(). For SaveVideoStream, that means a mid-stream failure leaves a corrupt file in the output directory and can keep the container state alive longer than necessary.

Suggested cleanup path
     def do_reset(self) -> None:
         self._discard_partial_output()
-        self._stream.reset()
-        self._path, self._preview_ui = self._output_factory()
-        assert self._path is not None
-        open(self._path, "ab").close()
-        self._save_state = self._saver.save_start(
-            self._path,
-            format=self._format,
-            codec=self._codec,
-            metadata=self._metadata,
-        )
+        try:
+            self._stream.reset()
+            self._path, self._preview_ui = self._output_factory()
+            assert self._path is not None
+            open(self._path, "ab").close()
+            self._save_state = self._saver.save_start(
+                self._path,
+                format=self._format,
+                codec=self._codec,
+                metadata=self._metadata,
+            )
+        except Exception:
+            self._discard_partial_output()
+            raise

     def do_pull(self, max_frames: int) -> Input.Image:
         assert self._save_state is not None
-        chunk = self._stream.pull(max_frames)
-        self._saver.save_add(self._save_state[0], self._save_state[1], chunk)
-        if chunk.shape[0] < max_frames:
-            self._saver.save_finalize(*self._save_state)
-            self._save_state = None
-            self._emit_preview()
-            self._path = None
+        try:
+            chunk = self._stream.pull(max_frames)
+            self._saver.save_add(self._save_state[0], self._save_state[1], chunk)
+            if chunk.shape[0] < max_frames:
+                self._saver.save_finalize(*self._save_state)
+                self._save_state = None
+                self._path = None
+        except Exception:
+            self._discard_partial_output()
+            raise
+        if chunk.shape[0] < max_frames:
+            self._emit_preview()
         return chunk
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@comfy_extras/nodes_video.py` around lines 83 - 105, do_reset and do_pull can
leave partially written files if an exception occurs between
_output_factory()/save_start and save_finalize; wrap the critical sections so
partial outputs are cleaned up: in do_reset, surround the
_output_factory()/open/_saver.save_start sequence with try/except/finally and on
any exception call self._discard_partial_output() and re-raise; in do_pull,
guard the pull/save_add/finalize sequence so if any exception occurs you call
self._discard_partial_output(), ensure any started save state (self._save_state)
is cleared or save_finalize is attempted safely, and then re-raise the error;
reference the symbols do_reset, do_pull, _output_factory,
_discard_partial_output, _saver.save_start, _saver.save_add,
_saver.save_finalize, and self._save_state when making the changes.

Comment on lines +1391 to +1395
def decode_start(self, x):
clear_temporal_cache_state(self.decoder)
if self.timestep_conditioning: #TODO: seed
x = torch.randn_like(x) * self.decode_noise_scale + (1.0 - self.decode_noise_scale) * x
return self.decoder.forward_start(self.per_channel_statistics.un_normalize(x), timestep=self.decode_timestep)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Seed streamed decode once per run.

The #TODO: seed is observable here: every decode_start() reset injects fresh random noise, so the same latent is no longer replayable across two drains of the same stream. That breaks the new resettable-stream behavior and can make preview/save passes disagree if the source is re-executed. The noise/seed needs to be captured once for the run and reused for all resumed chunks.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@comfy/ldm/lightricks/vae/causal_video_autoencoder.py` around lines 1391 -
1395, decode_start currently injects fresh random noise every call which breaks
replayability; change it to generate and cache the noise once per run and reuse
it for subsequent decode_start calls. Specifically, add an instance field (e.g.,
self._decode_noise or self._decode_generator + self._decode_noise) and, in
decode_start, if that field is unset create the noise deterministically (use
torch.randn_like with a torch.Generator seeded once at run start) scaled by
self.decode_noise_scale and reused on later calls; keep
clear_temporal_cache_state(self.decoder) and continue to pass
timestep=self.decode_timestep to self.decoder.forward_start. Ensure there is
also a clear/reset method that clears the cached noise when a new run is
intentionally started.

Copy link
Copy Markdown
Member

@guill guill left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am strongly opposed to merging this PR. It fundamentally violates the guarantees made by the execution engine and will likely result in significant instability on the frontend and in all sorts of custom nodes.
If we want to support streaming, we should do it the right way and update the execution engine to support streaming (i.e. IS_LIST that execute depth-first instead of breadth-first).

Comment on lines +35 to +44
with (nullcontext() if self._ctx is None else
CurrentNodeContext(self._ctx.prompt_id, self._ctx.node_id, self._ctx.list_index)):
result = self.do_pull(max_frames)

if self._ctx is not None:
registry = get_progress_state()
entry = registry.nodes.get(self._ctx.node_id)
if (int(result.shape[0]) < max_frames or
(entry is not None and entry["max"] > 0 and entry["value"] >= entry["max"])):
registry.finish_progress(self._ctx.node_id)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As mentioned in Slack, I am very strongly opposed to moving forward with this change as-is. Overriding the CurrentNodeContext like this from outside the execution system means that we will be receiving progress updates for nodes that have already finished executing. This fundamentally violates the lifecycle of nodes and is likely to cause many subtle bugs on both the default frontend (where we can no longer reliably 'clean up' when a node finishes executing) and custom nodes.

Comment on lines +103 to +112
server.send_sync(
"executed",
{
"node": current.node_id,
"display_node": current.node_id,
"output": preview_output,
"prompt_id": current.prompt_id,
},
server.client_id,
)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't this mean we can now receive executed messages multiple times for the same node?

Comment on lines +149 to +153
get_progress_state().update_progress(
current.node_id,
value=float(value),
max_value=float(max(self._total_frames, 1)),
)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See concern from above about how this fundamentally violates the lifecycle of node execution.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants