Skip to content

Lea/flink connect yolo#336

Open
LeahMalul wants to merge 5 commits intomainfrom
lea/flink-connect-yolo
Open

Lea/flink connect yolo#336
LeahMalul wants to merge 5 commits intomainfrom
lea/flink-connect-yolo

Conversation

@LeahMalul
Copy link
Copy Markdown
Collaborator

No description provided.

Copy link
Copy Markdown

@greptile-apps greptile-apps Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Greptile Overview

Greptile Summary

This PR integrates YOLO-based fruit segmentation with Flink streaming and adds a fruit ripeness alerting system.

Major Changes:

  • Added new FruitSegmentationRunner using YOLOv8 to detect and segment fruits from camera images, uploading crops to MinIO
  • Created fruit_ripeness_alert service that queries weekly ripeness rollups and sends Kafka alerts when thresholds are exceeded
  • Added database tables ripeness_predictions and ripeness_weekly_rollups_ts to track fruit ripeness over time
  • Added new API endpoint /api/tables/ripeness_weekly_rollups_ts for querying aggregated ripeness data
  • Created camera-inference-http service and flink-dispatcher-camera to process camera imagery through the ML pipeline
  • Configured MinIO event notifications to trigger Kafka events for different image types (aerial, fruits, leaves, etc.)
  • Added image-linker Flink job infrastructure for linking related images

Critical Issues:

  • repo.py calls undefined parse_ts() function causing immediate runtime failure
  • router.py references undefined RipenessWeeklyRollupOut schema
  • Security concern: disabled torch weights_only check allowing arbitrary code execution from pickled models
  • Duplicate import statements in app.py

Confidence Score: 1/5

  • This PR has critical bugs that will cause runtime failures and cannot be merged safely
  • Score of 1 reflects multiple runtime-breaking bugs: undefined parse_ts() function in repo.py will cause NameError when querying rollups, undefined RipenessWeeklyRollupOut schema in router.py will cause immediate import failures. The DB API endpoints are non-functional, breaking the entire alerting pipeline. Additionally, there's a security vulnerability from disabling torch safety checks.
  • Critical attention required for services/db_api_service/app/tables/ripeness_weekly_rollups_ts/repo.py and services/db_api_service/app/tables/ripeness_weekly_rollups_ts/router.py - both have blocking runtime errors. Security review needed for services/inference_http/adapters/fruit_segmentation_runner.py

Important Files Changed

File Analysis

Filename Score Overview
services/fruit_ripeness_alert/app.py 2/5 New alerting service with duplicate imports; fetches ripeness rollups and sends Kafka alerts when threshold exceeded
services/inference_http/adapters/fruit_segmentation_runner.py 1/5 New YOLO-based fruit segmentation runner with security risk (disabled torch weights_only check); detects and crops fruits from images
services/db_api_service/app/tables/ripeness_weekly_rollups_ts/repo.py 0/5 Database repository with critical bug: calls undefined parse_ts function causing NameError at runtime
services/db_api_service/app/tables/ripeness_weekly_rollups_ts/router.py 1/5 FastAPI router with undefined schema reference RipenessWeeklyRollupOut causing runtime error
docker-compose.yml 3/5 Major infrastructure changes: added camera-inference-http, flink-dispatcher-camera, MinIO event notifiers, and image-linker services

Sequence Diagram

sequenceDiagram
    participant Camera as Camera/Device
    participant MinIO as MinIO Storage
    participant Kafka as Kafka
    participant Flink as Flink Dispatcher
    participant InferenceHTTP as Inference HTTP (YOLO)
    participant FlinkWriter as Flink Writer
    participant PostgreSQL as PostgreSQL
    participant DBAPI as DB API Service
    participant AlertService as Fruit Ripeness Alert
    participant AlertConsumer as Alert Consumer

    Camera->>MinIO: Upload image to imagery-hot bucket
    MinIO->>Kafka: Publish event to imagery.new.camera topic
    Kafka->>Flink: Consume image event
    Flink->>InferenceHTTP: HTTP POST /infer_json with bucket/key
    InferenceHTTP->>MinIO: Download image from bucket
    InferenceHTTP->>InferenceHTTP: Run YOLO model for fruit segmentation
    InferenceHTTP->>MinIO: Upload cropped fruit segments
    InferenceHTTP->>Flink: Return detection results (count, latency)
    Flink->>FlinkWriter: Forward inference results
    FlinkWriter->>PostgreSQL: Write to ripeness_predictions table
    FlinkWriter->>PostgreSQL: Aggregate to ripeness_weekly_rollups_ts
    
    Note over AlertService: Scheduled job runs periodically
    AlertService->>DBAPI: GET /api/tables/task_thresholds
    DBAPI->>PostgreSQL: Query thresholds
    PostgreSQL->>DBAPI: Return threshold (e.g., 0.8)
    DBAPI->>AlertService: Return threshold
    AlertService->>DBAPI: GET /api/tables/ripeness_weekly_rollups_ts
    DBAPI->>PostgreSQL: Query rollups for time window
    PostgreSQL->>DBAPI: Return rollup data
    DBAPI->>AlertService: Return rollups with pct_ripe
    AlertService->>AlertService: Check if pct_ripe >= threshold
    AlertService->>Kafka: Publish alert to alerts topic
    Kafka->>AlertConsumer: Forward alert to alertmanager
Loading

35 files reviewed, 7 comments

Edit Code Review Agent Settings | Greptile

from datetime import datetime, timedelta, timezone
from kafka import KafkaProducer
from token_bootstrap import get_service_token
from datetime import datetime, timezone
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

syntax: duplicate import statement

Suggested change
from datetime import datetime, timezone
import os, json, uuid, requests
from datetime import datetime, timedelta, timezone
from kafka import KafkaProducer
from token_bootstrap import get_service_token

threshold = float(match.get("threshold", 0.8))
print(f"[INFO] Task '{task_name}' threshold: {threshold*100:.1f}%")
return threshold
from datetime import datetime, timezone
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

syntax: duplicate import statement (already imported on line 3)

Suggested change
from datetime import datetime, timezone
def get_rollups(window_start, window_end, headers=None):


if from_ts:
q += " AND ts >= :from_ts"
params["from_ts"] = parse_ts(from_ts)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logic: parse_ts function is called but not defined anywhere in this file, causing NameError at runtime

Suggested change
params["from_ts"] = parse_ts(from_ts)
params["from_ts"] = from_ts

params["from_ts"] = parse_ts(from_ts)
if to_ts:
q += " AND ts <= :to_ts"
params["to_ts"] = parse_ts(to_ts)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logic: parse_ts function is called but not defined anywhere in this file, causing NameError at runtime

Suggested change
params["to_ts"] = parse_ts(to_ts)
params["to_ts"] = to_ts

raise HTTPException(status_code=400, detail=str(e))


@router.get("/{id}", response_model=RipenessWeeklyRollupOut)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

syntax: RipenessWeeklyRollupOut is not defined, should be schemas.RipenessWeeklyRollupRead

Suggested change
@router.get("/{id}", response_model=RipenessWeeklyRollupOut)
@router.get("/{id}", response_model=schemas.RipenessWeeklyRollupRead)

Comment on lines +3 to +10
def allow_unrestricted_torch_load():
_original_load = torch.load
def patched_load(*args, **kwargs):
kwargs["weights_only"] = False
return _original_load(*args, **kwargs)
torch.load = patched_load

allow_unrestricted_torch_load()
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logic: patching torch.load to disable weights_only safety check is a security risk - allows arbitrary code execution when loading pickled model files

obj.close()
obj.release_conn()

# Attempt to run the model with bytes input first
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: duplicate comment

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant