diff --git a/.github/workflows/docker.yaml b/.github/workflows/docker.yaml index 608a0edc4..68f374747 100644 --- a/.github/workflows/docker.yaml +++ b/.github/workflows/docker.yaml @@ -83,28 +83,6 @@ jobs: cache-from: type=registry,ref=livepeer/comfyui-base:build-cache cache-to: type=registry,mode=max,ref=livepeer/comfyui-base:build-cache - trigger: - name: Trigger ai-runner workflow - needs: base - if: ${{ github.repository == 'livepeer/comfystream' }} - runs-on: ubuntu-latest - steps: - - name: Send workflow dispatch event to ai-runner - uses: actions/github-script@v7 - with: - github-token: ${{ secrets.CI_GITHUB_TOKEN }} - script: | - await github.rest.actions.createWorkflowDispatch({ - owner: context.repo.owner, - repo: "ai-runner", - workflow_id: "comfyui-trigger.yaml", - ref: "main", - inputs: { - "comfyui-base-digest": "${{ needs.base.outputs.image-digest }}", - "triggering-branch": "${{ github.head_ref || github.ref_name }}", - }, - }); - comfystream: name: comfystream image needs: base diff --git a/.github/workflows/opencv-cuda-artifact.yml b/.github/workflows/opencv-cuda-artifact.yml new file mode 100644 index 000000000..c5deab076 --- /dev/null +++ b/.github/workflows/opencv-cuda-artifact.yml @@ -0,0 +1,184 @@ +name: Build OpenCV CUDA Artifact + +on: + workflow_dispatch: + inputs: + python_version: + description: 'Python version to build' + required: false + default: '3.12' + type: string + cuda_version: + description: 'CUDA version to build' + required: false + default: '12.8' + type: string + +concurrency: + group: ${{ github.workflow }}-${{ github.head_ref || github.run_id }} + cancel-in-progress: true + +env: + PYTHON_VERSION: ${{ github.event.inputs.python_version || '3.12' }} + CUDA_VERSION: ${{ github.event.inputs.cuda_version || '12.8' }} + +jobs: + build-opencv-artifact: + name: Build OpenCV CUDA Artifact + runs-on: [self-hosted, linux, gpu] + + steps: + - name: Checkout code + uses: actions/checkout@v4 + with: + fetch-depth: 0 + ref: ${{ github.event.pull_request.head.sha || github.sha }} + + - name: Set up Docker Buildx + uses: docker/setup-buildx-action@v3 + + - name: Build OpenCV CUDA Docker image + uses: docker/build-push-action@v6 + with: + context: . + file: docker/Dockerfile.opencv + build-args: | + BASE_IMAGE=nvidia/cuda:${{ env.CUDA_VERSION }}.1-cudnn-devel-ubuntu22.04 + PYTHON_VERSION=${{ env.PYTHON_VERSION }} + CUDA_VERSION=${{ env.CUDA_VERSION }} + tags: opencv-cuda-artifact:latest + load: true + cache-from: type=gha + cache-to: type=gha,mode=max + + - name: Extract OpenCV libraries from Docker container + run: | + echo "Creating temporary container..." + docker create --name opencv-extract opencv-cuda-artifact:latest + + echo "Creating workspace directory..." + mkdir -p ./opencv-artifacts + + # Try to copy from system installation + docker cp opencv-extract:/usr/local/lib/python${{ env.PYTHON_VERSION }}/site-packages/cv2 ./opencv-artifacts/cv2 || echo "cv2 not found in system site-packages" + + echo "Copying OpenCV source directories..." + # Copy opencv and opencv_contrib source directories + docker cp opencv-extract:/workspace/opencv ./opencv-artifacts/ || echo "opencv source not found" + docker cp opencv-extract:/workspace/opencv_contrib ./opencv-artifacts/ || echo "opencv_contrib source not found" + + echo "Cleaning up container..." + docker rm opencv-extract + + echo "Contents of opencv-artifacts:" + ls -la ./opencv-artifacts/ + + - name: Create tarball artifact + run: | + echo "Creating opencv-cuda-release.tar.gz..." + cd ./opencv-artifacts + tar -czf ../opencv-cuda-release.tar.gz . || echo "Failed to create tarball" + cd .. + + echo "Generating checksums..." + sha256sum opencv-cuda-release.tar.gz > opencv-cuda-release.tar.gz.sha256 + md5sum opencv-cuda-release.tar.gz > opencv-cuda-release.tar.gz.md5 + + echo "Verifying archive contents..." + echo "Archive size: $(ls -lh opencv-cuda-release.tar.gz | awk '{print $5}')" + echo "First 20 files in archive:" + tar -tzf opencv-cuda-release.tar.gz | head -20 + + - name: Extract and verify tarball + run: | + echo "Testing tarball extraction..." + mkdir -p test-extract + cd test-extract + tar -xzf ../opencv-cuda-release.tar.gz + echo "Extracted contents:" + find . -maxdepth 2 -type d | sort + cd .. + rm -rf test-extract + + - name: Upload OpenCV CUDA Release Artifact + uses: actions/upload-artifact@v4 + with: + name: opencv-cuda-release-python${{ env.PYTHON_VERSION }}-cuda${{ env.CUDA_VERSION }}-${{ github.sha }} + path: | + opencv-cuda-release.tar.gz + opencv-cuda-release.tar.gz.sha256 + opencv-cuda-release.tar.gz.md5 + retention-days: 30 + + - name: Create Release Notes + run: | + cat > release-info.txt << EOF + OpenCV CUDA Release Artifact + + Build Details: + - Python Version: ${{ env.PYTHON_VERSION }} + - CUDA Version: ${{ env.CUDA_VERSION }} + - OpenCV Version: 4.11.0 + - Built on: $(date -u) + - Commit SHA: ${{ github.sha }} + + Contents: + - cv2: Python OpenCV module with CUDA support + - opencv: OpenCV source code + - opencv_contrib: OpenCV contrib modules source + - lib: Compiled OpenCV libraries + - include: OpenCV header files + + Installation: + 1. Download opencv-cuda-release.tar.gz + 2. Extract: tar -xzf opencv-cuda-release.tar.gz + 3. Copy cv2 to your Python environment's site-packages + 4. Ensure CUDA libraries are in your system PATH + + Checksums: + SHA256: $(cat opencv-cuda-release.tar.gz.sha256) + MD5: $(cat opencv-cuda-release.tar.gz.md5) + EOF + + - name: Upload Release Info + uses: actions/upload-artifact@v4 + with: + name: release-info-python${{ env.PYTHON_VERSION }}-cuda${{ env.CUDA_VERSION }}-${{ github.sha }} + path: release-info.txt + retention-days: 30 + + create-release-draft: + name: Create Release Draft + needs: build-opencv-artifact + runs-on: ubuntu-latest + if: github.event_name == 'push' && github.ref == 'refs/heads/main' + + steps: + - name: Checkout code + uses: actions/checkout@v4 + + - name: Download artifacts + uses: actions/download-artifact@v4 + with: + name: opencv-cuda-release-python${{ env.PYTHON_VERSION }}-cuda${{ env.CUDA_VERSION }}-${{ github.sha }} + path: ./artifacts + + - name: Download release info + uses: actions/download-artifact@v4 + with: + name: release-info-python${{ env.PYTHON_VERSION }}-cuda${{ env.CUDA_VERSION }}-${{ github.sha }} + path: ./artifacts + + - name: Create Release Draft + uses: softprops/action-gh-release@v1 + with: + tag_name: opencv-cuda-v${{ env.PYTHON_VERSION }}-${{ env.CUDA_VERSION }}-${{ github.run_number }} + name: OpenCV CUDA Release - Python ${{ env.PYTHON_VERSION }} CUDA ${{ env.CUDA_VERSION }} + body_path: ./artifacts/release-info.txt + draft: true + files: | + ./artifacts/opencv-cuda-release.tar.gz + ./artifacts/opencv-cuda-release.tar.gz.sha256 + ./artifacts/opencv-cuda-release.tar.gz.md5 + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} \ No newline at end of file diff --git a/.github/workflows/release.yaml b/.github/workflows/release.yaml index 01900b71a..b46ff8ba2 100644 --- a/.github/workflows/release.yaml +++ b/.github/workflows/release.yaml @@ -14,7 +14,7 @@ jobs: - name: Checkout code uses: actions/checkout@v4 - - uses: actions/setup-node@v4 + - uses: actions/setup-node@v5 with: node-version: 18 cache: npm diff --git a/.vscode/launch.json b/.vscode/launch.json index 540197b14..f05e02f5e 100755 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -31,10 +31,40 @@ "--media-ports=5678", "--host=0.0.0.0", "--port=8889", - "--log-level=DEBUG", + "--log-level=INFO", + "--comfyui-inference-log-level=DEBUG", ], - "python": "/workspace/miniconda3/envs/comfystream/bin/python", - "justMyCode": true + "justMyCode": true, + "python": "${command:python.interpreterPath}" + }, + { + "name": "Run ComfyStream BYOC", + "type": "debugpy", + "request": "launch", + "cwd": "/workspace/ComfyUI", + "program": "/workspace/comfystream/server/byoc.py", + "console": "integratedTerminal", + "args": [ + "--workspace=/workspace/ComfyUI", + "--host=0.0.0.0", + "--port=8000", + "--log-level=INFO", + "--comfyui-inference-log-level=DEBUG", + "--width=512", + "--height=512" + ], + "env": { + "ORCH_URL": "https://172.17.0.1:9995", + "ORCH_SECRET": "orch-secret", + "CAPABILITY_NAME": "comfystream", + "CAPABILITY_DESCRIPTION": "ComfyUI streaming processor for BYOC mode", + "CAPABILITY_URL": "http://172.17.0.1:8000", + "CAPABILITY_PRICE_PER_UNIT": "0", + "CAPABILITY_PRICE_SCALING": "1", + "CAPABILITY_CAPACITY": "1" + }, + "justMyCode": true, + "python": "${command:python.interpreterPath}" }, { "name": "Run ComfyStream UI (Node.js)", diff --git a/README.md b/README.md index 52c864f13..a9edf4be5 100644 --- a/README.md +++ b/README.md @@ -27,7 +27,7 @@ This repo also includes a WebRTC server and UI that uses comfystream to support Refer to [.devcontainer/README.md](.devcontainer/README.md) to setup ComfyStream in a devcontainer using a pre-configured ComfyUI docker environment. -For other installation options, refer to [Install ComfyUI and ComfyStream](https://pipelines.livepeer.org/docs/technical/install/local-testing) in the Livepeer pipelines documentation. +For other installation options, refer to [Install ComfyUI and ComfyStream](https://docs.comfystream.org/technical/get-started/install) in the ComfyStream documentation. For additional information, refer to the remaining sections below. @@ -35,7 +35,7 @@ For additional information, refer to the remaining sections below. You can quickly deploy ComfyStream using the docker image `livepeer/comfystream` -Refer to the documentation at [https://pipelines.livepeer.org/docs/technical/getting-started/install-comfystream](https://pipelines.livepeer.org/docs/technical/getting-started/install-comfystream) for instructions to run locally or on a remote server. +Refer to the documentation at [https://docs.comfystream.org/technical/get-started/install](https://docs.comfystream.org/technical/get-started/install) for instructions to run locally or on a remote server. #### RunPod diff --git a/configs/models.yaml b/configs/models.yaml index 09149f954..bbcf0cdc7 100644 --- a/configs/models.yaml +++ b/configs/models.yaml @@ -29,19 +29,19 @@ models: # TAESD models taesd: name: "TAESD" - url: "https://raw.githubusercontent.com/madebyollin/taesd/main/taesd_decoder.pth" - path: "vae_approx/taesd_decoder.pth" + url: "https://huggingface.co/madebyollin/taesd/resolve/main/taesd_decoder.safetensors" + path: "vae_approx/taesd_decoder.safetensors" type: "vae_approx" extra_files: - - url: "https://raw.githubusercontent.com/madebyollin/taesd/main/taesd_encoder.pth" - path: "vae_approx/taesd_encoder.pth" + - url: "https://huggingface.co/madebyollin/taesd/resolve/main/taesd_encoder.safetensors" + path: "vae_approx/taesd_encoder.safetensors" # ControlNet models controlnet-depth: name: "ControlNet Depth" url: "https://huggingface.co/comfyanonymous/ControlNet-v1-1_fp16_safetensors/resolve/main/control_v11f1p_sd15_depth_fp16.safetensors" path: "controlnet/control_v11f1p_sd15_depth_fp16.safetensors" - type: "controlnet" + type: "controlnet" controlnet-mediapipe-face: name: "ControlNet MediaPipe Face" @@ -74,8 +74,82 @@ models: path: "text_encoders/CLIPText/model.fp16.safetensors" type: "text_encoder" + # JoyVASA models for ComfyUI-FasterLivePortrait + joyvasa_motion_generator: + name: "JoyVASA Motion Generator" + url: "https://huggingface.co/jdh-algo/JoyVASA/resolve/main/motion_generator/motion_generator_hubert_chinese.pt?download=true" + path: "liveportrait_onnx/joyvasa_models/motion_generator_hubert_chinese.pt" + type: "torch" + + joyvasa_audio_model: + name: "JoyVASA Hubert Chinese" + url: "https://huggingface.co/TencentGameMate/chinese-hubert-base/resolve/main/chinese-hubert-base-fairseq-ckpt.pt?download=true" + path: "liveportrait_onnx/joyvasa_models/chinese-hubert-base-fairseq-ckpt.pt" + type: "torch" + + joyvasa_motion_template: + name: "JoyVASA Motion Template" + url: "https://huggingface.co/jdh-algo/JoyVASA/resolve/main/motion_template/motion_template.pkl?download=true" + path: "liveportrait_onnx/joyvasa_models/motion_template.pkl" + type: "pickle" + + # LivePortrait ONNX models - only necessary to build TRT engines + warping_spade: + name: "WarpingSpadeModel" + url: "https://huggingface.co/warmshao/FasterLivePortrait/resolve/main/liveportrait_onnx/warping_spade-fix.onnx?download=true" + path: "liveportrait_onnx/warping_spade-fix.onnx" + type: "onnx" + + motion_extractor: + name: "MotionExtractorModel" + url: "https://huggingface.co/warmshao/FasterLivePortrait/resolve/main/liveportrait_onnx/motion_extractor.onnx?download=true" + path: "liveportrait_onnx/motion_extractor.onnx" + type: "onnx" + + landmark: + name: "LandmarkModel" + url: "https://huggingface.co/warmshao/FasterLivePortrait/resolve/main/liveportrait_onnx/landmark.onnx?download=true" + path: "liveportrait_onnx/landmark.onnx" + type: "onnx" + + face_analysis_retinaface: + name: "FaceAnalysisModel - RetinaFace" + url: "https://huggingface.co/warmshao/FasterLivePortrait/resolve/main/liveportrait_onnx/retinaface_det_static.onnx?download=true" + path: "liveportrait_onnx/retinaface_det_static.onnx" + type: "onnx" + + face_analysis_2dpose: + name: "FaceAnalysisModel - 2DPose" + url: "https://huggingface.co/warmshao/FasterLivePortrait/resolve/main/liveportrait_onnx/face_2dpose_106_static.onnx?download=true" + path: "liveportrait_onnx/face_2dpose_106_static.onnx" + type: "onnx" + + appearance_feature_extractor: + name: "AppearanceFeatureExtractorModel" + url: "https://huggingface.co/warmshao/FasterLivePortrait/resolve/main/liveportrait_onnx/appearance_feature_extractor.onnx?download=true" + path: "liveportrait_onnx/appearance_feature_extractor.onnx" + type: "onnx" + + stitching: + name: "StitchingModel" + url: "https://huggingface.co/warmshao/FasterLivePortrait/resolve/main/liveportrait_onnx/stitching.onnx?download=true" + path: "liveportrait_onnx/stitching.onnx" + type: "onnx" + + stitching_eye_retarget: + name: "StitchingModel (Eye Retargeting)" + url: "https://huggingface.co/warmshao/FasterLivePortrait/resolve/main/liveportrait_onnx/stitching_eye.onnx?download=true" + path: "liveportrait_onnx/stitching_eye.onnx" + type: "onnx" + + stitching_lip_retarget: + name: "StitchingModel (Lip Retargeting)" + url: "https://huggingface.co/warmshao/FasterLivePortrait/resolve/main/liveportrait_onnx/stitching_lip.onnx?download=true" + path: "liveportrait_onnx/stitching_lip.onnx" + type: "onnx" + sd-turbo: name: "SD-Turbo" url: "https://huggingface.co/stabilityai/sd-turbo/resolve/main/sd_turbo.safetensors" path: "checkpoints/SD1.5/sd_turbo.safetensors" - type: "checkpoint" \ No newline at end of file + type: "checkpoint" diff --git a/configs/nodes.yaml b/configs/nodes.yaml index 49d422a57..cf497e07c 100644 --- a/configs/nodes.yaml +++ b/configs/nodes.yaml @@ -19,6 +19,12 @@ nodes: branch: "main" type: "tensorrt" + comfyui-fasterliveportrait: + name: "ComfyUI FasterLivePortrait" + url: "https://github.com/pschroedl/ComfyUI-FasterLivePortrait.git" + branch: "main" + type: "tensorrt" + # Ryan's nodes comfyui-ryanontheinside: name: "ComfyUI RyanOnTheInside" diff --git a/docker/Dockerfile.base b/docker/Dockerfile.base index c8f6b7ff1..878f4c9bd 100644 --- a/docker/Dockerfile.base +++ b/docker/Dockerfile.base @@ -8,25 +8,20 @@ ARG CONDA_VERSION \ PYTHON_VERSION ENV DEBIAN_FRONTEND=noninteractive \ + TensorRT_ROOT=/opt/TensorRT-10.12.0.36 \ CONDA_VERSION="${CONDA_VERSION}" \ PATH="/workspace/miniconda3/bin:${PATH}" \ PYTHON_VERSION="${PYTHON_VERSION}" - + # System dependencies RUN apt update && apt install -yqq --no-install-recommends \ - git \ - wget \ - nano \ - socat \ - libsndfile1 \ - build-essential \ - llvm \ - tk-dev \ - libglvnd-dev \ - cmake \ - swig \ - libprotobuf-dev \ - protobuf-compiler \ + git wget nano socat \ + libsndfile1 build-essential llvm tk-dev \ + libglvnd-dev cmake swig libprotobuf-dev \ + protobuf-compiler libcairo2-dev libpango1.0-dev libgdk-pixbuf2.0-dev \ + libffi-dev libgirepository1.0-dev pkg-config libgflags-dev \ + libgoogle-glog-dev libjpeg-dev libavcodec-dev libavformat-dev \ + libavutil-dev libswscale-dev \ && rm -rf /var/lib/apt/lists/* #enable opengl support with nvidia gpu @@ -55,8 +50,24 @@ conda run -n comfystream --no-capture-output pip install wheel COPY ./src/comfystream/scripts /workspace/comfystream/src/comfystream/scripts COPY ./configs /workspace/comfystream/configs +# TensorRT SDK +WORKDIR /opt +RUN wget --progress=dot:giga \ + https://developer.nvidia.com/downloads/compute/machine-learning/tensorrt/10.12.0/tars/TensorRT-10.12.0.36.Linux.x86_64-gnu.cuda-12.9.tar.gz \ + && tar -xzf TensorRT-10.12.0.36.Linux.x86_64-gnu.cuda-12.9.tar.gz \ + && rm TensorRT-10.12.0.36.Linux.x86_64-gnu.cuda-12.9.tar.gz + +# Link libraries and update linker cache +RUN echo "${TensorRT_ROOT}/lib" > /etc/ld.so.conf.d/tensorrt.conf \ + && ldconfig + +# Install matching TensorRT Python bindings for CPython 3.12 +RUN conda run -n comfystream pip install --no-cache-dir \ + ${TensorRT_ROOT}/python/tensorrt-10.12.0.36-cp312-none-linux_x86_64.whl + # Clone ComfyUI -RUN git clone --branch v0.3.56 --depth 1 https://github.com/comfyanonymous/ComfyUI.git /workspace/ComfyUI +RUN git clone --branch v0.3.60 --depth 1 https://github.com/comfyanonymous/ComfyUI.git /workspace/ComfyUI +RUN git clone https://github.com/Comfy-Org/ComfyUI-Manager.git /workspace/ComfyUI/custom_nodes/ComfyUI-Manager # Copy ComfyStream files into ComfyUI COPY . /workspace/comfystream @@ -66,9 +77,14 @@ RUN conda run -n comfystream --cwd /workspace/comfystream --no-capture-output pi # Copy comfystream and example workflows to ComfyUI COPY ./workflows/comfyui/* /workspace/ComfyUI/user/default/workflows/ COPY ./test/example-512x512.png /workspace/ComfyUI/input +COPY ./docker/entrypoint.sh /workspace/comfystream/docker/entrypoint.sh + +# Install OpenCV CUDA +RUN conda run -n comfystream --no-capture-output --cwd /workspace/comfystream --no-capture-output docker/entrypoint.sh --opencv-cuda # Install ComfyUI requirements RUN conda run -n comfystream --no-capture-output --cwd /workspace/ComfyUI pip install -r requirements.txt --root-user-action=ignore +RUN cd /workspace/ComfyUI/custom_nodes/ComfyUI-Manager && pip install -r requirements.txt # Install ComfyStream requirements RUN ln -s /workspace/comfystream /workspace/ComfyUI/custom_nodes/comfystream diff --git a/docker/Dockerfile.opencv b/docker/Dockerfile.opencv new file mode 100644 index 000000000..848db1fe8 --- /dev/null +++ b/docker/Dockerfile.opencv @@ -0,0 +1,124 @@ +ARG BASE_IMAGE=nvidia/cuda:12.8.1-cudnn-devel-ubuntu22.04 \ + CONDA_VERSION=latest \ + PYTHON_VERSION=3.12 \ + CUDA_VERSION=12.8 + +FROM "${BASE_IMAGE}" + +ARG CONDA_VERSION \ + PYTHON_VERSION \ + CUDA_VERSION + +ENV DEBIAN_FRONTEND=noninteractive \ + CONDA_VERSION="${CONDA_VERSION}" \ + PATH="/workspace/miniconda3/bin:${PATH}" \ + PYTHON_VERSION="${PYTHON_VERSION}" \ + CUDA_VERSION="${CUDA_VERSION}" + +# System dependencies +RUN apt update && apt install -yqq \ + git \ + wget \ + nano \ + socat \ + libsndfile1 \ + build-essential \ + llvm \ + tk-dev \ + cmake \ + libgflags-dev \ + libgoogle-glog-dev \ + libjpeg-dev \ + libavcodec-dev \ + libavformat-dev \ + libavutil-dev \ + libswscale-dev && \ + rm -rf /var/lib/apt/lists/* + +RUN mkdir -p /workspace/comfystream && \ + wget "https://repo.anaconda.com/miniconda/Miniconda3-${CONDA_VERSION}-Linux-x86_64.sh" -O /tmp/miniconda.sh && \ + bash /tmp/miniconda.sh -b -p /workspace/miniconda3 && \ + eval "$(/workspace/miniconda3/bin/conda shell.bash hook)" && \ + conda tos accept --override-channels --channel https://repo.anaconda.com/pkgs/main && \ + conda tos accept --override-channels --channel https://repo.anaconda.com/pkgs/r && \ + conda create -n comfystream python="${PYTHON_VERSION}" -c conda-forge -y && \ + rm /tmp/miniconda.sh && \ + conda run -n comfystream --no-capture-output pip install numpy==1.26.4 aiortc aiohttp requests tqdm pyyaml --root-user-action=ignore + +# Clone ComfyUI +ADD --link https://github.com/comfyanonymous/ComfyUI.git /workspace/ComfyUI + +# OpenCV with CUDA support +WORKDIR /workspace + +# Clone OpenCV repositories +RUN git clone --depth 1 --branch 4.11.0 https://github.com/opencv/opencv.git && \ + git clone --depth 1 --branch 4.11.0 https://github.com/opencv/opencv_contrib.git + +# Create build directory +RUN mkdir -p /workspace/opencv/build + +# Create a toolchain file with absolute path +RUN echo '# Custom toolchain file to exclude Conda paths\n\ +\n\ +# Set system compilers\n\ +set(CMAKE_C_COMPILER "/usr/bin/gcc")\n\ +set(CMAKE_CXX_COMPILER "/usr/bin/g++")\n\ +\n\ +# Set system root directories\n\ +set(CMAKE_FIND_ROOT_PATH "/usr")\n\ +set(CMAKE_FIND_ROOT_PATH_MODE_PROGRAM NEVER)\n\ +set(CMAKE_FIND_ROOT_PATH_MODE_LIBRARY ONLY)\n\ +set(CMAKE_FIND_ROOT_PATH_MODE_INCLUDE ONLY)\n\ +set(CMAKE_FIND_ROOT_PATH_MODE_PACKAGE ONLY)\n\ +\n\ +# Explicitly exclude Conda paths\n\ +list(APPEND CMAKE_IGNORE_PATH \n\ + "/workspace/miniconda3"\n\ + "/workspace/miniconda3/envs"\n\ + "/workspace/miniconda3/envs/comfystream"\n\ + "/workspace/miniconda3/envs/comfystream/lib"\n\ +)\n\ +\n\ +# Set RPATH settings\n\ +set(CMAKE_SKIP_BUILD_RPATH FALSE)\n\ +set(CMAKE_BUILD_WITH_INSTALL_RPATH FALSE)\n\ +set(CMAKE_INSTALL_RPATH "/usr/local/lib:/usr/lib/x86_64-linux-gnu")\n\ +set(PYTHON_LIBRARY "/workspace/miniconda3/envs/comfystream/lib/")\n\ +set(CMAKE_INSTALL_RPATH_USE_LINK_PATH TRUE)' > /workspace/custom_toolchain.cmake + +# Set environment variables for OpenCV +RUN echo 'export LD_LIBRARY_PATH=/usr/local/lib:$LD_LIBRARY_PATH' >> /root/.bashrc + +# Build and install OpenCV with CUDA support +RUN cd /workspace/opencv/build && \ + # Build OpenCV + cmake \ + -D CMAKE_TOOLCHAIN_FILE=/workspace/custom_toolchain.cmake \ + -D CMAKE_BUILD_TYPE=RELEASE \ + -D CMAKE_INSTALL_PREFIX=/usr/local \ + -D WITH_CUDA=ON \ + -D WITH_CUDNN=ON \ + -D WITH_CUBLAS=ON \ + -D WITH_TBB=ON \ + -D CUDA_ARCH_LIST="8.0+PTX" \ + -D OPENCV_DNN_CUDA=ON \ + -D OPENCV_ENABLE_NONFREE=ON \ + -D CUDA_TOOLKIT_ROOT_DIR=/usr/local/cuda \ + -D OPENCV_EXTRA_MODULES_PATH=/workspace/opencv_contrib/modules \ + -D PYTHON3_EXECUTABLE=/workspace/miniconda3/envs/comfystream/bin/python3.12 \ + -D PYTHON_INCLUDE_DIR=/workspace/miniconda3/envs/comfystream/include/python3.12 \ + -D PYTHON_LIBRARY=/workspace/miniconda3/envs/comfystream/lib/libpython3.12.so \ + -D HAVE_opencv_python3=ON \ + -D WITH_NVCUVID=OFF \ + -D WITH_NVCUVENC=OFF \ + .. && \ + make -j$(nproc) && \ + make install && \ + ldconfig + +# Configure no environment activation by default +RUN conda config --set auto_activate_base false && \ + conda init bash + +WORKDIR /workspace/comfystream diff --git a/docker/entrypoint.sh b/docker/entrypoint.sh index ef55c77e2..e6d44463a 100755 --- a/docker/entrypoint.sh +++ b/docker/entrypoint.sh @@ -130,6 +130,14 @@ if [ "$1" = "--build-engines" ]; then echo "Engine for DepthAnything2 (large) already exists at ${DEPTH_ANYTHING_DIR}/${DEPTH_ANYTHING_ENGINE_LARGE}, skipping..." fi + # Build Engines for FasterLivePortrait + if [ ! -f "$FASTERLIVEPORTRAIT_DIR/warping_spade-fix.trt" ]; then + cd "$FASTERLIVEPORTRAIT_DIR" + bash /workspace/ComfyUI/custom_nodes/ComfyUI-FasterLivePortrait/scripts/build_fasterliveportrait_trt.sh "${FASTERLIVEPORTRAIT_DIR}" "${FASTERLIVEPORTRAIT_DIR}" "${FASTERLIVEPORTRAIT_DIR}" + else + echo "Engines for FasterLivePortrait already exists, skipping..." + fi + # Build Engine for StreamDiffusion if [ ! -f "$TENSORRT_DIR/StreamDiffusion-engines/stabilityai/sd-turbo--lcm_lora-True--tiny_vae-True--max_batch-3--min_batch-3--mode-img2img/unet.engine.opt.onnx" ]; then cd /workspace/ComfyUI/custom_nodes/ComfyUI-StreamDiffusion @@ -158,7 +166,7 @@ if [ "$1" = "--opencv-cuda" ]; then if [ ! -f "/workspace/comfystream/opencv-cuda-release.tar.gz" ]; then # Download and extract OpenCV CUDA build DOWNLOAD_NAME="opencv-cuda-release.tar.gz" - wget -q -O "$DOWNLOAD_NAME" https://github.com/JJassonn69/ComfyUI-Stream-Pack/releases/download/v2/opencv-cuda-release.tar.gz + wget -q -O "$DOWNLOAD_NAME" https://github.com/JJassonn69/ComfyUI-Stream-Pack/releases/download/v2.1/opencv-cuda-release.tar.gz tar -xzf "$DOWNLOAD_NAME" -C /workspace/comfystream/ rm "$DOWNLOAD_NAME" else @@ -166,15 +174,6 @@ if [ "$1" = "--opencv-cuda" ]; then fi # Install required libraries - apt-get update && apt-get install -y \ - libgflags-dev \ - libgoogle-glog-dev \ - libjpeg-dev \ - libavcodec-dev \ - libavformat-dev \ - libavutil-dev \ - libswscale-dev - # Remove existing cv2 package SITE_PACKAGES_DIR="/workspace/miniconda3/envs/comfystream/lib/python3.12/site-packages" rm -rf "${SITE_PACKAGES_DIR}/cv2"* diff --git a/nodes/audio_utils/load_audio_tensor.py b/nodes/audio_utils/load_audio_tensor.py index 52fa6fa37..eed09eea9 100644 --- a/nodes/audio_utils/load_audio_tensor.py +++ b/nodes/audio_utils/load_audio_tensor.py @@ -1,53 +1,88 @@ import numpy as np - +import torch +import queue from comfystream import tensor_cache +from comfystream.exceptions import ComfyStreamInputTimeoutError, ComfyStreamAudioBufferError + class LoadAudioTensor: - CATEGORY = "audio_utils" - RETURN_TYPES = ("WAVEFORM", "INT") - RETURN_NAMES = ("audio", "sample_rate") + CATEGORY = "ComfyStream/Loaders" + RETURN_TYPES = ("AUDIO",) + RETURN_NAMES = ("audio",) FUNCTION = "execute" + DESCRIPTION = "Load audio tensor from ComfyStream input with timeout." def __init__(self): self.audio_buffer = np.empty(0, dtype=np.int16) self.buffer_samples = None self.sample_rate = None + self.leftover = np.empty(0, dtype=np.int16) @classmethod - def INPUT_TYPES(s): + def INPUT_TYPES(cls): return { "required": { - "buffer_size": ("FLOAT", {"default": 500.0}), + "buffer_size": ("FLOAT", { + "default": 500.0, + "tooltip": "Audio buffer size in milliseconds" + }), + }, + "optional": { + "timeout_seconds": ("FLOAT", { + "default": 1.0, + "min": 0.1, + "max": 30.0, + "step": 0.1, + "tooltip": "Timeout in seconds" + }), } } @classmethod - def IS_CHANGED(): + def IS_CHANGED(cls, **kwargs): return float("nan") - - def execute(self, buffer_size): + + def execute(self, buffer_size: float, timeout_seconds: float = 1.0): + # Initialize if needed if self.sample_rate is None or self.buffer_samples is None: - frame = tensor_cache.audio_inputs.get(block=True) - self.sample_rate = frame.sample_rate - self.buffer_samples = int(self.sample_rate * buffer_size / 1000) - self.leftover = frame.side_data.input + try: + frame = tensor_cache.audio_inputs.get(block=True, timeout=timeout_seconds) + self.sample_rate = frame.sample_rate + self.buffer_samples = int(self.sample_rate * buffer_size / 1000) + self.leftover = frame.side_data.input + except queue.Empty: + raise ComfyStreamInputTimeoutError("audio", timeout_seconds) - if self.leftover.shape[0] < self.buffer_samples: + # Use leftover data if available + if self.leftover.shape[0] >= self.buffer_samples: + buffered_audio = self.leftover[:self.buffer_samples] + self.leftover = self.leftover[self.buffer_samples:] + else: + # Collect more audio chunks chunks = [self.leftover] if self.leftover.size > 0 else [] total_samples = self.leftover.shape[0] while total_samples < self.buffer_samples: - frame = tensor_cache.audio_inputs.get(block=True) - if frame.sample_rate != self.sample_rate: - raise ValueError("Sample rate mismatch") - chunks.append(frame.side_data.input) - total_samples += frame.side_data.input.shape[0] + try: + frame = tensor_cache.audio_inputs.get(block=True, timeout=timeout_seconds) + if frame.sample_rate != self.sample_rate: + raise ValueError(f"Sample rate mismatch: expected {self.sample_rate}Hz, got {frame.sample_rate}Hz") + chunks.append(frame.side_data.input) + total_samples += frame.side_data.input.shape[0] + except queue.Empty: + raise ComfyStreamAudioBufferError(timeout_seconds, self.buffer_samples, total_samples) merged_audio = np.concatenate(chunks, dtype=np.int16) buffered_audio = merged_audio[:self.buffer_samples] - self.leftover = merged_audio[self.buffer_samples:] - else: - buffered_audio = self.leftover[:self.buffer_samples] - self.leftover = self.leftover[self.buffer_samples:] + self.leftover = merged_audio[self.buffer_samples:] if merged_audio.shape[0] > self.buffer_samples else np.empty(0, dtype=np.int16) - return buffered_audio, self.sample_rate + # Convert to ComfyUI AUDIO format + waveform_tensor = torch.from_numpy(buffered_audio.astype(np.float32) / 32768.0) + + # Ensure proper tensor shape: (batch, channels, samples) + if waveform_tensor.dim() == 1: + waveform_tensor = waveform_tensor.unsqueeze(0).unsqueeze(0) + elif waveform_tensor.dim() == 2: + waveform_tensor = waveform_tensor.unsqueeze(0) + + return ({"waveform": waveform_tensor, "sample_rate": self.sample_rate},) \ No newline at end of file diff --git a/nodes/audio_utils/pitch_shift.py b/nodes/audio_utils/pitch_shift.py index ed2b2b383..2fba9ee59 100644 --- a/nodes/audio_utils/pitch_shift.py +++ b/nodes/audio_utils/pitch_shift.py @@ -1,17 +1,17 @@ import numpy as np import librosa +import torch class PitchShifter: CATEGORY = "audio_utils" - RETURN_TYPES = ("WAVEFORM", "INT") + RETURN_TYPES = ("AUDIO",) FUNCTION = "execute" @classmethod def INPUT_TYPES(cls): return { "required": { - "audio": ("WAVEFORM",), - "sample_rate": ("INT",), + "audio": ("AUDIO",), "pitch_shift": ("FLOAT", { "default": 4.0, "min": 0.0, @@ -25,8 +25,41 @@ def INPUT_TYPES(cls): def IS_CHANGED(cls): return float("nan") - def execute(self, audio, sample_rate, pitch_shift): - audio_float = audio.astype(np.float32) / 32768.0 - shifted_audio = librosa.effects.pitch_shift(y=audio_float, sr=sample_rate, n_steps=pitch_shift) - shifted_int16 = np.clip(shifted_audio * 32768.0, -32768, 32767).astype(np.int16) - return shifted_int16, sample_rate + def execute(self, audio, pitch_shift): + # Extract waveform and sample rate from AUDIO format + waveform = audio["waveform"] + sample_rate = audio["sample_rate"] + + # Convert tensor to numpy and ensure proper format for librosa + if isinstance(waveform, torch.Tensor): + audio_numpy = waveform.squeeze().cpu().numpy() + else: + audio_numpy = waveform.squeeze() + + # Ensure float32 format and proper normalization for librosa processing + if audio_numpy.dtype != np.float32: + audio_numpy = audio_numpy.astype(np.float32) + + # Check if data needs normalization (librosa expects [-1, 1] range) + max_abs_val = np.abs(audio_numpy).max() + if max_abs_val > 1.0: + # Data appears to be in int16 range, normalize it + audio_numpy = audio_numpy / 32768.0 + + # Apply pitch shift + shifted_audio = librosa.effects.pitch_shift(y=audio_numpy, sr=sample_rate, n_steps=pitch_shift) + + # Convert back to tensor and restore original shape + shifted_tensor = torch.from_numpy(shifted_audio).float() + if waveform.dim() == 3: # (batch, channels, samples) + shifted_tensor = shifted_tensor.unsqueeze(0).unsqueeze(0) + elif waveform.dim() == 2: # (channels, samples) + shifted_tensor = shifted_tensor.unsqueeze(0) + + # Return AUDIO format + result_audio = { + "waveform": shifted_tensor, + "sample_rate": sample_rate + } + + return (result_audio,) diff --git a/nodes/audio_utils/save_audio_tensor.py b/nodes/audio_utils/save_audio_tensor.py index 6f86b57c3..6b7b0281c 100644 --- a/nodes/audio_utils/save_audio_tensor.py +++ b/nodes/audio_utils/save_audio_tensor.py @@ -1,3 +1,4 @@ +import numpy as np from comfystream import tensor_cache class SaveAudioTensor: @@ -11,7 +12,7 @@ class SaveAudioTensor: def INPUT_TYPES(s): return { "required": { - "audio": ("WAVEFORM",) + "audio": ("AUDIO",) } } @@ -20,5 +21,26 @@ def IS_CHANGED(s): return float("nan") def execute(self, audio): - tensor_cache.audio_outputs.put_nowait(audio) + # Extract waveform tensor from AUDIO format + waveform = audio["waveform"] + + # Convert to numpy and flatten for pipeline compatibility + if hasattr(waveform, 'cpu'): + # PyTorch tensor + waveform_numpy = waveform.squeeze().cpu().numpy() + else: + # Already numpy + waveform_numpy = waveform.squeeze() + + # Ensure 1D array for pipeline buffer concatenation + if waveform_numpy.ndim > 1: + waveform_numpy = waveform_numpy.flatten() + + # Convert to int16 if needed (pipeline expects int16) + if waveform_numpy.dtype == np.float32: + waveform_numpy = (waveform_numpy * 32767).astype(np.int16) + elif waveform_numpy.dtype != np.int16: + waveform_numpy = waveform_numpy.astype(np.int16) + + tensor_cache.audio_outputs.put_nowait(waveform_numpy) return (audio,) diff --git a/nodes/tensor_utils/load_tensor.py b/nodes/tensor_utils/load_tensor.py index c39fe8a1d..9923a996a 100644 --- a/nodes/tensor_utils/load_tensor.py +++ b/nodes/tensor_utils/load_tensor.py @@ -1,20 +1,37 @@ +import torch +import queue from comfystream import tensor_cache +from comfystream.exceptions import ComfyStreamInputTimeoutError class LoadTensor: - CATEGORY = "tensor_utils" + CATEGORY = "ComfyStream/Loaders" RETURN_TYPES = ("IMAGE",) FUNCTION = "execute" + DESCRIPTION = "Load image tensor from ComfyStream input with timeout." @classmethod - def INPUT_TYPES(s): - return {} + def INPUT_TYPES(cls): + return { + "optional": { + "timeout_seconds": ("FLOAT", { + "default": 1.0, + "min": 0.1, + "max": 30.0, + "step": 0.1, + "tooltip": "Timeout in seconds" + }), + } + } @classmethod - def IS_CHANGED(): + def IS_CHANGED(cls, **kwargs): return float("nan") - def execute(self): - frame = tensor_cache.image_inputs.get(block=True) - frame.side_data.skipped = False - return (frame.side_data.input,) + def execute(self, timeout_seconds: float = 1.0): + try: + frame = tensor_cache.image_inputs.get(block=True, timeout=timeout_seconds) + frame.side_data.skipped = False + return (frame.side_data.input,) + except queue.Empty: + raise ComfyStreamInputTimeoutError("video", timeout_seconds) diff --git a/nodes/tensor_utils/save_text_tensor.py b/nodes/tensor_utils/save_text_tensor.py index 525f2a1b9..098887e07 100644 --- a/nodes/tensor_utils/save_text_tensor.py +++ b/nodes/tensor_utils/save_text_tensor.py @@ -18,7 +18,7 @@ def INPUT_TYPES(s): } @classmethod - def IS_CHANGED(s): + def IS_CHANGED(s, **kwargs): return float("nan") def execute(self, data, remove_linebreaks=True): diff --git a/pyproject.toml b/pyproject.toml index 8cc486456..ecf846f78 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -5,11 +5,12 @@ build-backend = "setuptools.build_meta" [project] name = "comfystream" description = "Build Live AI Video with ComfyUI" -version = "0.1.5" +version = "0.1.6" license = { file = "LICENSE" } dependencies = [ "asyncio", - "comfyui @ git+https://github.com/hiddenswitch/ComfyUI.git@58622c7e91cb5cc2bca985d713db55e5681ff316", + "pytrickle @ git+https://github.com/livepeer/pytrickle.git@v0.1.4", + "comfyui @ git+https://github.com/hiddenswitch/ComfyUI.git@e62df3a8811d8c652a195d4669f4fb27f6c9a9ba", "aiortc", "aiohttp", "aiohttp_cors", diff --git a/requirements.txt b/requirements.txt index 7ff3310b6..b35bb52cc 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,5 +1,6 @@ asyncio -comfyui @ git+https://github.com/hiddenswitch/ComfyUI.git@58622c7e91cb5cc2bca985d713db55e5681ff316 +pytrickle @ git+https://github.com/livepeer/pytrickle.git@v0.1.4 +comfyui @ git+https://github.com/hiddenswitch/ComfyUI.git@e62df3a8811d8c652a195d4669f4fb27f6c9a9ba aiortc aiohttp aiohttp_cors diff --git a/scripts/requirements.txt b/scripts/requirements.txt index 6ea80ee2e..eb7cee7f7 100644 --- a/scripts/requirements.txt +++ b/scripts/requirements.txt @@ -8,4 +8,4 @@ bcrypt rich # Profiler psutil -pynvml +nvidia-ml-py diff --git a/server/app.py b/server/app.py index ecf5751f4..46f39ebb9 100644 --- a/server/app.py +++ b/server/app.py @@ -12,9 +12,6 @@ if torch.cuda.is_available(): torch.cuda.init() - -from aiohttp import web, MultipartWriter -from aiohttp_cors import setup as setup_cors, ResourceOptions from aiohttp import web from aiortc import ( MediaStreamTrack, @@ -24,12 +21,12 @@ RTCSessionDescription, ) # Import HTTP streaming modules -from http_streaming.routes import setup_routes from aiortc.codecs import h264 from aiortc.rtcrtpsender import RTCRtpSender from comfystream.pipeline import Pipeline from twilio.rest import Client from comfystream.server.utils import patch_loop_datagram, add_prefix_to_app_routes, FPSMeter +from comfystream.exceptions import ComfyStreamTimeoutFilter from comfystream.server.metrics import MetricsManager, StreamStatsManager import time @@ -40,6 +37,7 @@ MAX_BITRATE = 2000000 MIN_BITRATE = 2000000 +TEXT_POLL_INTERVAL = 0.25 # Interval in seconds to poll for text outputs class VideoStreamTrack(MediaStreamTrack): @@ -110,15 +108,6 @@ async def recv(self): """ processed_frame = await self.pipeline.get_processed_video_frame() - # Update the frame buffer with the processed frame - try: - from frame_buffer import FrameBuffer - frame_buffer = FrameBuffer.get_instance() - frame_buffer.update_frame(processed_frame) - except Exception as e: - # Don't let frame buffer errors affect the main pipeline - print(f"Error updating frame buffer: {e}") - # Increment the frame count to calculate FPS. await self.fps_meter.increment_frame_count() @@ -390,11 +379,11 @@ async def forward_text(): try: while channel.readyState == "open": try: - # Use timeout to prevent indefinite blocking - text = await asyncio.wait_for( - pipeline.get_text_output(), - timeout=1.0 # Check every second if channel is still open - ) + # Non-blocking poll; sleep if no text to avoid tight loop + text = await pipeline.get_text_output() + if text is None or text.strip() == "": + await asyncio.sleep(TEXT_POLL_INTERVAL) + continue if channel.readyState == "open": # Send as JSON string for extensibility try: @@ -402,9 +391,6 @@ async def forward_text(): except Exception as e: logger.debug(f"[TextChannel] Send failed, stopping forwarder: {e}") break - except asyncio.TimeoutError: - # No text available, continue checking - continue except asyncio.CancelledError: logger.debug("[TextChannel] Forward text task cancelled") break @@ -575,6 +561,7 @@ async def on_startup(app: web.Application): gpu_only=True, preview_method='none', comfyui_inference_log_level=app.get("comfui_inference_log_level", None), + blacklist_nodes=["ComfyUI-Manager"] ) app["pcs"] = set() app["video_tracks"] = {} @@ -638,16 +625,6 @@ async def on_shutdown(app: web.Application): app = web.Application() app["media_ports"] = args.media_ports.split(",") if args.media_ports else None app["workspace"] = args.workspace - - # Setup CORS - cors = setup_cors(app, defaults={ - "*": ResourceOptions( - allow_credentials=True, - expose_headers="*", - allow_headers="*", - allow_methods=["GET", "POST", "OPTIONS"] - ) - }) app.on_startup.append(on_startup) app.on_shutdown.append(on_shutdown) @@ -659,12 +636,6 @@ async def on_shutdown(app: web.Application): app.router.add_post("/offer", offer) app.router.add_post("/prompt", set_prompt) - # Setup HTTP streaming routes - setup_routes(app, cors) - - # Serve static files from the public directory - app.router.add_static("/", path=os.path.join(os.path.dirname(__file__), "public"), name="static") - # Add routes for getting stream statistics. stream_stats_manager = StreamStatsManager(app) app.router.add_get( @@ -696,6 +667,9 @@ def force_print(*args, **kwargs): if args.comfyui_log_level: log_level = logging._nameToLevel.get(args.comfyui_log_level.upper()) logging.getLogger("comfy").setLevel(log_level) + + # Add ComfyStream timeout filter to suppress verbose execution logging + logging.getLogger("comfy.cmd.execution").addFilter(ComfyStreamTimeoutFilter()) if args.comfyui_inference_log_level: app["comfui_inference_log_level"] = args.comfyui_inference_log_level diff --git a/server/byoc.py b/server/byoc.py new file mode 100644 index 000000000..895667482 --- /dev/null +++ b/server/byoc.py @@ -0,0 +1,215 @@ +import argparse +import asyncio +import logging +import os +import sys + +import torch +# Initialize CUDA before any other imports to prevent core dump. +if torch.cuda.is_available(): + torch.cuda.init() + +from aiohttp import web +from pytrickle.stream_processor import StreamProcessor +from pytrickle.utils.register import RegisterCapability +from pytrickle.frame_skipper import FrameSkipConfig +from frame_processor import ComfyStreamFrameProcessor +from comfystream.exceptions import ComfyStreamTimeoutFilter + +logger = logging.getLogger(__name__) + + +async def register_orchestrator(orch_url=None, orch_secret=None, capability_name=None, host="127.0.0.1", port=8889): + """Register capability with orchestrator if configured.""" + try: + orch_url = orch_url or os.getenv("ORCH_URL") + orch_secret = orch_secret or os.getenv("ORCH_SECRET") + + if orch_url and orch_secret: + os.environ.update({ + "CAPABILITY_NAME": capability_name or os.getenv("CAPABILITY_NAME") or "comfystream-processor", + "CAPABILITY_DESCRIPTION": "ComfyUI streaming processor", + "CAPABILITY_URL": f"http://{host}:{port}", + "CAPABILITY_CAPACITY": "1", + "ORCH_URL": orch_url, + "ORCH_SECRET": orch_secret + }) + + # Pass through explicit capability_name to ensure CLI/env override takes effect + result = await RegisterCapability.register( + logger=logger, + capability_name=capability_name + ) + if result: + logger.info(f"Registered capability: {result.geturl()}") + except Exception as e: + logger.error(f"Orchestrator registration failed: {e}") + + +def main(): + parser = argparse.ArgumentParser( + description="Run comfystream server in BYOC (Bring Your Own Compute) mode using pytrickle." + ) + parser.add_argument("--port", default=8889, help="Set the server port") + parser.add_argument("--host", default="127.0.0.1", help="Set the host") + parser.add_argument( + "--workspace", default=None, required=True, help="Set Comfy workspace" + ) + parser.add_argument( + "--log-level", + default="INFO", + choices=["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"], + help="Set the logging level", + ) + parser.add_argument( + "--comfyui-log-level", + default=None, + choices=logging._nameToLevel.keys(), + help="Set the global logging level for ComfyUI", + ) + parser.add_argument( + "--comfyui-inference-log-level", + default=None, + choices=logging._nameToLevel.keys(), + help="Set the logging level for ComfyUI inference", + ) + parser.add_argument( + "--orch-url", + default=None, + help="Orchestrator URL for capability registration", + ) + parser.add_argument( + "--orch-secret", + default=None, + help="Orchestrator secret for capability registration", + ) + parser.add_argument( + "--capability-name", + default=None, + help="Name for this capability (default: comfystream-processor)", + ) + parser.add_argument( + "--disable-frame-skip", + default=False, + action="store_true", + help="Disable adaptive frame skipping based on queue sizes (enabled by default)", + ) + parser.add_argument( + "--width", + default=512, + type=int, + help="Default video width for processing", + ) + parser.add_argument( + "--height", + default=512, + type=int, + help="Default video height for processing", + ) + args = parser.parse_args() + + logging.basicConfig( + level=args.log_level.upper(), + format="%(asctime)s [%(levelname)s] %(message)s", + datefmt="%H:%M:%S", + ) + + # Allow overriding of ComfyUI log levels. + if args.comfyui_log_level: + log_level = logging._nameToLevel.get(args.comfyui_log_level.upper()) + logging.getLogger("comfy").setLevel(log_level) + + # Add ComfyStream timeout filter to suppress verbose execution logging + logging.getLogger("comfy.cmd.execution").addFilter(ComfyStreamTimeoutFilter()) + + def force_print(*args, **kwargs): + print(*args, **kwargs, flush=True) + sys.stdout.flush() + + logger.info("Starting ComfyStream BYOC server with pytrickle StreamProcessor...") + + # Create frame processor with configuration + frame_processor = ComfyStreamFrameProcessor( + width=args.width, + height=args.height, + workspace=args.workspace, + disable_cuda_malloc=True, + gpu_only=True, + preview_method='none', + comfyui_inference_log_level=args.comfyui_inference_log_level + ) + + # Create frame skip configuration only if enabled + frame_skip_config = None + if args.disable_frame_skip: + logger.info("Frame skipping disabled") + else: + frame_skip_config = FrameSkipConfig() + logger.info("Frame skipping enabled: adaptive skipping based on queue sizes") + + # Create StreamProcessor with frame processor + processor = StreamProcessor( + video_processor=frame_processor.process_video_async, + audio_processor=frame_processor.process_audio_async, + model_loader=frame_processor.load_model, + param_updater=frame_processor.update_params, + on_stream_stop=frame_processor.on_stream_stop, + # Align processor name with capability for consistent logs + name=(args.capability_name or os.getenv("CAPABILITY_NAME") or "comfystream-processor"), + port=int(args.port), + host=args.host, + frame_skip_config=frame_skip_config, + # Ensure server metadata reflects the desired capability name + capability_name=(args.capability_name or os.getenv("CAPABILITY_NAME") or "comfystream-processor") + ) + + # Set the stream processor reference for text data publishing + frame_processor.set_stream_processor(processor) + + # Create async startup function to load model + async def load_model_on_startup(app): + await processor._frame_processor.load_model() + + # Create async startup function for orchestrator registration + async def register_orchestrator_startup(app): + await register_orchestrator( + orch_url=args.orch_url, + orch_secret=args.orch_secret, + capability_name=args.capability_name, + host=args.host, + port=args.port + ) + + # Add model loading and registration to startup hooks + processor.server.app.on_startup.append(load_model_on_startup) + processor.server.app.on_startup.append(register_orchestrator_startup) + + # Add warmup endpoint: accepts same body as prompts update + async def warmup_handler(request): + try: + body = await request.json() + except Exception as e: + logger.error(f"Invalid JSON in warmup request: {e}") + return web.json_response({"error": "Invalid JSON"}, status=400) + try: + # Inject sentinel to trigger warmup inside update_params on the model thread + if isinstance(body, dict): + body["warmup"] = True + else: + body = {"warmup": True} + # Fire-and-forget: do not await warmup; update_params will schedule it + asyncio.get_running_loop().create_task(frame_processor.update_params(body)) + return web.json_response({"status": "accepted"}) + except Exception as e: + logger.error(f"Warmup failed: {e}") + return web.json_response({"error": str(e)}, status=500) + + # Mount at same API namespace as StreamProcessor defaults + processor.server.add_route("POST", "/api/stream/warmup", warmup_handler) + + # Run the processor + processor.run() + + +if __name__ == "__main__": + main() diff --git a/server/frame_buffer.py b/server/frame_buffer.py deleted file mode 100644 index 2a16407ae..000000000 --- a/server/frame_buffer.py +++ /dev/null @@ -1,42 +0,0 @@ -import threading -import time -import numpy as np -import cv2 -import av -from typing import Optional - -class FrameBuffer: - _instance = None - - @classmethod - def get_instance(cls): - if cls._instance is None: - cls._instance = FrameBuffer() - return cls._instance - - def __init__(self): - self.current_frame = None - self.frame_lock = threading.Lock() - self.last_update_time = 0 - self.quality = 70 # JPEG quality (0-100) - - def update_frame(self, frame): - """Update the current frame in the buffer""" - with self.frame_lock: - # Convert frame to numpy array if it's an av.VideoFrame - if isinstance(frame, av.VideoFrame): - frame_np = frame.to_ndarray(format="rgb24") - else: - frame_np = frame - - # Store the frame as a JPEG-encoded bytes object for efficient serving - _, jpeg_frame = cv2.imencode('.jpg', cv2.cvtColor(frame_np, cv2.COLOR_RGB2BGR), - [cv2.IMWRITE_JPEG_QUALITY, self.quality]) - - self.current_frame = jpeg_frame.tobytes() - self.last_update_time = time.time() - - def get_current_frame(self) -> Optional[bytes]: - """Get the current frame from the buffer""" - with self.frame_lock: - return self.current_frame diff --git a/server/frame_processor.py b/server/frame_processor.py new file mode 100644 index 000000000..39272313b --- /dev/null +++ b/server/frame_processor.py @@ -0,0 +1,282 @@ +import asyncio +import json +import logging +import os +from typing import List + +import numpy as np +from pytrickle.frame_processor import FrameProcessor +from pytrickle.frames import VideoFrame, AudioFrame +from comfystream.pipeline import Pipeline +from comfystream.utils import convert_prompt, ComfyStreamParamsUpdateRequest + +logger = logging.getLogger(__name__) + + +class ComfyStreamFrameProcessor(FrameProcessor): + """ + Integrated ComfyStream FrameProcessor for pytrickle. + + This class wraps the ComfyStream Pipeline to work with pytrickle's streaming architecture. + """ + + def __init__(self, text_poll_interval: float = 0.25, **load_params): + """Initialize with load parameters for pipeline creation. + + Args: + text_poll_interval: Interval in seconds to poll for text outputs (default: 0.25) + **load_params: Parameters for pipeline creation + """ + self.pipeline = None + self._load_params = load_params + self._text_poll_interval = text_poll_interval + self._stream_processor = None + self._warmup_task = None + self._text_forward_task = None + self._background_tasks = [] + self._stop_event = asyncio.Event() + super().__init__() + + def set_stream_processor(self, stream_processor): + """Set reference to StreamProcessor for data publishing.""" + self._stream_processor = stream_processor + logger.info("StreamProcessor reference set for text data publishing") + + def _setup_text_monitoring(self): + """Set up background text forwarding from the pipeline.""" + try: + if self.pipeline and self._stream_processor: + # Reset stop event for new stream + self._reset_stop_event() + # Start forwarder only if workflow has text outputs (best-effort) + should_start = True + try: + should_start = bool(self.pipeline.produces_text_output()) + except Exception: + # If capability check fails, default to starting forwarder + should_start = True + + if should_start: + # Start a background task that forwards text outputs via StreamProcessor + if self._text_forward_task and not self._text_forward_task.done(): + logger.debug("Text forwarder already running; not starting another") + return + + async def _forward_text_loop(): + try: + logger.info("Starting background text forwarder task") + while not self._stop_event.is_set(): + try: + # Non-blocking poll; sleep if no text to avoid tight loop + text = await self.pipeline.get_text_output() + if text is None or text.strip() == "": + await asyncio.sleep(self._text_poll_interval) + continue + if self._stream_processor: + success = await self._stream_processor.send_data(text) + if not success: + logger.debug("Text send failed; stopping text forwarder") + break + except asyncio.CancelledError: + logger.debug("Text forwarder task cancelled") + raise + except asyncio.CancelledError: + # Propagate to finally for cleanup + raise + except Exception as e: + logger.error(f"Error in text forwarder: {e}") + finally: + logger.info("Text forwarder task exiting") + + self._text_forward_task = asyncio.create_task(_forward_text_loop()) + self._background_tasks.append(self._text_forward_task) + except Exception: + logger.warning("Failed to set up text monitoring", exc_info=True) + + async def _stop_text_forwarder(self) -> None: + """Stop the background text forwarder task if running.""" + task = self._text_forward_task + if task and not task.done(): + try: + task.cancel() + await task + except asyncio.CancelledError: + pass + except Exception: + logger.debug("Error while awaiting text forwarder cancellation", exc_info=True) + self._text_forward_task = None + + async def on_stream_stop(self): + """Called when stream stops - cleanup background tasks.""" + logger.info("Stream stopped, cleaning up background tasks") + + # Set stop event to signal all background tasks to stop + self._stop_event.set() + + # Stop the ComfyStream client's prompt execution + if self.pipeline and self.pipeline.client: + logger.info("Stopping ComfyStream client prompt execution") + try: + await self.pipeline.client.cleanup() + except Exception as e: + logger.error(f"Error stopping ComfyStream client: {e}") + + # Stop text forwarder + await self._stop_text_forwarder() + + # Cancel any other background tasks started by this processor + for task in list(self._background_tasks): + try: + if task and not task.done(): + task.cancel() + except Exception: + continue + + # Await task cancellations + for task in list(self._background_tasks): + if task: + try: + await task + except asyncio.CancelledError: + pass + except Exception: + logger.debug("Background task raised during shutdown", exc_info=True) + + self._background_tasks.clear() + logger.info("All background tasks cleaned up") + + def _reset_stop_event(self): + """Reset the stop event for a new stream.""" + self._stop_event.clear() + + async def load_model(self, **kwargs): + """Load model and initialize the pipeline.""" + params = {**self._load_params, **kwargs} + + if self.pipeline is None: + self.pipeline = Pipeline( + width=int(params.get('width', 512)), + height=int(params.get('height', 512)), + cwd=params.get('workspace', os.getcwd()), + disable_cuda_malloc=params.get('disable_cuda_malloc', True), + gpu_only=params.get('gpu_only', True), + preview_method=params.get('preview_method', 'none'), + comfyui_inference_log_level=params.get('comfyui_inference_log_level'), + blacklist_nodes=["ComfyUI-Manager"] + ) + + async def warmup(self): + """Warm up the pipeline.""" + if not self.pipeline: + logger.warning("Warmup requested before pipeline initialization") + return + + logger.info("Running pipeline warmup...") + try: + capabilities = self.pipeline.get_workflow_io_capabilities() + logger.info(f"Detected I/O capabilities: {capabilities}") + + if capabilities.get("video", {}).get("input") or capabilities.get("video", {}).get("output"): + await self.pipeline.warm_video() + + if capabilities.get("audio", {}).get("input") or capabilities.get("audio", {}).get("output"): + await self.pipeline.warm_audio() + + except Exception as e: + logger.error(f"Warmup failed: {e}") + + def _schedule_warmup(self) -> None: + """Schedule warmup in background if not already running.""" + try: + if self._warmup_task and not self._warmup_task.done(): + logger.info("Warmup already in progress, skipping new warmup request") + return + + self._warmup_task = asyncio.create_task(self.warmup()) + logger.info("Warmup scheduled in background") + except Exception: + logger.warning("Failed to schedule warmup", exc_info=True) + + async def process_video_async(self, frame: VideoFrame) -> VideoFrame: + """Process video frame through ComfyStream Pipeline.""" + try: + + # Convert pytrickle VideoFrame to av.VideoFrame + av_frame = frame.to_av_frame(frame.tensor) + av_frame.pts = frame.timestamp + av_frame.time_base = frame.time_base + + # Process through pipeline + await self.pipeline.put_video_frame(av_frame) + processed_av_frame = await self.pipeline.get_processed_video_frame() + + # Convert back to pytrickle VideoFrame + processed_frame = VideoFrame.from_av_frame_with_timing(processed_av_frame, frame) + return processed_frame + + except Exception as e: + logger.error(f"Video processing failed: {e}") + return frame + + async def process_audio_async(self, frame: AudioFrame) -> List[AudioFrame]: + """Process audio frame through ComfyStream Pipeline or passthrough.""" + try: + if not self.pipeline: + return [frame] + + # Audio processing needed - use pipeline + av_frame = frame.to_av_frame() + await self.pipeline.put_audio_frame(av_frame) + processed_av_frame = await self.pipeline.get_processed_audio_frame() + processed_frame = AudioFrame.from_av_audio(processed_av_frame) + return [processed_frame] + + except Exception as e: + logger.error(f"Audio processing failed: {e}") + return [frame] + + async def update_params(self, params: dict): + """Update processing parameters.""" + if not self.pipeline: + return + + # Handle list input - take first element + if isinstance(params, list) and params: + params = params[0] + + # Validate parameters using the centralized validation + validated = ComfyStreamParamsUpdateRequest(**params).model_dump() + logger.info(f"Parameter validation successful, keys: {list(validated.keys())}") + + # Process prompts if provided + if "prompts" in validated and validated["prompts"]: + await self._process_prompts(validated["prompts"]) + + # Update pipeline dimensions + if "width" in validated: + self.pipeline.width = int(validated["width"]) + if "height" in validated: + self.pipeline.height = int(validated["height"]) + + # Schedule warmup if requested + if validated.get("warmup", False): + self._schedule_warmup() + + + async def _process_prompts(self, prompts): + """Process and set prompts in the pipeline.""" + try: + converted = convert_prompt(prompts, return_dict=True) + + # Set prompts in pipeline + await self.pipeline.set_prompts([converted]) + logger.info(f"Prompts set successfully: {list(prompts.keys())}") + + # Update text monitoring based on workflow capabilities + if self.pipeline.produces_text_output(): + self._setup_text_monitoring() + else: + await self._stop_text_forwarder() + + except Exception as e: + logger.error(f"Failed to process prompts: {e}") diff --git a/server/http_streaming/__init__.py b/server/http_streaming/__init__.py deleted file mode 100644 index 4fad17f79..000000000 --- a/server/http_streaming/__init__.py +++ /dev/null @@ -1,5 +0,0 @@ -""" -HTTP Streaming module for ComfyStream. - -This module contains components for token management and HTTP streaming routes. -""" diff --git a/server/http_streaming/routes.py b/server/http_streaming/routes.py deleted file mode 100644 index ac309bae5..000000000 --- a/server/http_streaming/routes.py +++ /dev/null @@ -1,69 +0,0 @@ -""" -HTTP streaming routes for ComfyStream. - -This module contains the routes for HTTP streaming. -""" -import asyncio -import logging -from aiohttp import web -from frame_buffer import FrameBuffer -from .tokens import cleanup_expired_sessions, validate_token, create_stream_token - -logger = logging.getLogger(__name__) - -async def stream_mjpeg(request): - """Serve an MJPEG stream with token validation""" - # Clean up expired sessions - cleanup_expired_sessions() - - stream_id = request.query.get("token") - - # Validate the stream token - is_valid, error_message = validate_token(stream_id) - if not is_valid: - return web.Response(status=403, text=error_message) - - frame_buffer = FrameBuffer.get_instance() - - # Use a fixed frame delay for 30 FPS - frame_delay = 1.0 / 30 - - response = web.StreamResponse( - status=200, - reason='OK', - headers={ - 'Content-Type': 'multipart/x-mixed-replace; boundary=frame', - 'Cache-Control': 'no-cache', - 'Connection': 'close', - } - ) - await response.prepare(request) - - try: - while True: - jpeg_frame = frame_buffer.get_current_frame() - if jpeg_frame is not None: - await response.write( - b'--frame\r\n' - b'Content-Type: image/jpeg\r\n\r\n' + jpeg_frame + b'\r\n' - ) - await asyncio.sleep(frame_delay) - except (ConnectionResetError, asyncio.CancelledError): - logger.info("MJPEG stream connection closed") - except Exception as e: - logger.error(f"Error in MJPEG stream: {e}") - finally: - return response - -def setup_routes(app, cors): - """Setup HTTP streaming routes - - Args: - app: The aiohttp web application - cors: The CORS setup object - """ - # Stream token endpoints - cors.add(app.router.add_post("/api/stream-token", create_stream_token)) - - # Stream endpoint with token validation - cors.add(app.router.add_get("/api/stream", stream_mjpeg)) diff --git a/server/http_streaming/tokens.py b/server/http_streaming/tokens.py deleted file mode 100644 index d424cf36d..000000000 --- a/server/http_streaming/tokens.py +++ /dev/null @@ -1,86 +0,0 @@ -""" -Token management system for ComfyStream HTTP streaming. - -This module handles the creation, validation, and management of stream tokens. -""" -import time -import secrets -import logging -from aiohttp import web - -logger = logging.getLogger(__name__) - -# Constants -SESSION_CLEANUP_INTERVAL = 60 # Clean up expired sessions every 60 seconds - -# Global token storage -active_stream_sessions = {} -last_cleanup_time = 0 - -def cleanup_expired_sessions(): - """Clean up expired stream sessions""" - global active_stream_sessions, last_cleanup_time - - current_time = time.time() - - # Only clean up if it's been at least SESSION_CLEANUP_INTERVAL since last cleanup - if current_time - last_cleanup_time < SESSION_CLEANUP_INTERVAL: - return - - # Update the last cleanup time - last_cleanup_time = current_time - - # Find expired sessions - expired_sessions = [sid for sid, expires in active_stream_sessions.items() if current_time > expires] - - # Remove expired sessions - for sid in expired_sessions: - logger.info(f"Removing expired session: {sid[:8]}...") - del active_stream_sessions[sid] - - if expired_sessions: - logger.info(f"Cleaned up {len(expired_sessions)} expired sessions. {len(active_stream_sessions)} active sessions remaining.") - -async def create_stream_token(request): - """Create a unique stream token for secure access to the stream""" - global active_stream_sessions - - # Clean up expired sessions - cleanup_expired_sessions() - - current_time = time.time() - - # Generate a new unique token - stream_id = secrets.token_urlsafe(32) - expires_at = current_time + 3600 # 1 hour from now - - # Store the new session - active_stream_sessions[stream_id] = expires_at - - logger.info(f"Generated new stream token: {stream_id[:8]}... ({len(active_stream_sessions)} active sessions)") - - return web.json_response({ - "stream_id": stream_id, - "expires_at": int(expires_at) - }) - -def validate_token(token): - """Validate a stream token and return whether it's valid - - Args: - token: The token to validate - - Returns: - tuple: (is_valid, error_message) - """ - if not token or token not in active_stream_sessions: - return False, "Invalid stream token" - - # Check if token is expired - current_time = time.time() - if current_time > active_stream_sessions[token]: - # Remove expired token - del active_stream_sessions[token] - return False, "Stream token expired" - - return True, None diff --git a/server/public/stream.html b/server/public/stream.html deleted file mode 100644 index 536781f97..000000000 --- a/server/public/stream.html +++ /dev/null @@ -1,60 +0,0 @@ - - -
- - -